Avez-vous déjà voulu visualiser votre trafic réseau en temps réel ? Dans ce tutoriel, vous apprendrez comment construire un tableau de bord interactif d’analyse du trafic réseau avec Python et Streamlit
. Streamlit
est un framework Python open-source que vous pouvez utiliser pour développer des applications web pour l’analyse de données et le traitement des données.
À la fin de ce tutoriel, vous saurez comment capturer les paquets de données brutes du NIC (Network Interface Card) de votre ordinateur, traiter les données et créer de belles visualisations qui se mettront à jour en temps réel.
Table des matières
Pourquoi l’analyse du trafic réseau est-elle importante ?
L’analyse du trafic réseau est une exigence critique dans les entreprises où les réseaux constituent l’épine dorsale de presque toutes les applications et services. Au cœur de cela, nous avons l’analyse des paquets réseau qui implique la surveillance du réseau, la capture de tout le trafic (entrant et sortant), et l’interprétation de ces paquets au fur et à mesure qu’ils circulent à travers un réseau. Vous pouvez utiliser cette technique pour identifier des schémas de sécurité, détecter des anomalies, et assurer la sécurité et l’efficacité du réseau.
Ce projet de preuve de concept sur lequel nous travaillerons dans ce tutoriel est particulièrement utile car il vous aide à visualiser et analyser l’activité du réseau en temps réel. Et cela vous permettra de comprendre comment résoudre les problèmes, optimiser les performances, et effectuer des analyses de sécurité dans les systèmes d’entreprise.
Prérequis
-
Python 3.8 ou une version plus récente installée sur votre système.
-
Une compréhension de base des concepts de réseaux informatiques.
-
Une familiarité avec le langage de programmation Python et ses bibliothèques largement utilisées.
-
Connaissance de base des techniques de visualisation des données et des bibliothèques.
Comment configurer votre projet
Pour commencer, créez la structure du projet et installez les outils nécessaires avec Pip en utilisant les commandes suivantes:
mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly
Nous utiliserons Streamlit
pour les visualisations du tableau de bord, Pandas
pour le traitement des données, Scapy
pour la capture et le traitement des paquets réseau, et enfin Plotly
pour tracer des graphiques avec nos données collectées.
Comment construire les fonctionnalités principales
Nous allons mettre tout le code dans un seul fichier nommé dashboard.py
. Tout d’abord, commençons par importer tous les éléments que nous allons utiliser:
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket
Maintenant, configurons la journalisation en mettant en place une configuration de base pour les journaux. Cela sera utilisé pour suivre les événements et exécuter notre application en mode débogage. Nous avons actuellement défini le niveau de journalisation comme étant INFO
, ce qui signifie que les événements de niveau INFO
ou supérieur seront affichés. Si vous n’êtes pas familier avec la journalisation en Python, je vous recommande de consulter ceci document approfondi.
# Configurer la journalisation
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
Ensuite, nous allons construire notre processeur de paquets. Nous allons implémenter la fonctionnalité de traitement de nos paquets capturés dans cette classe.
class PacketProcessor:
"""Process and analyze network packets"""
def __init__(self):
self.protocol_map = {
1: 'ICMP',
6: 'TCP',
17: 'UDP'
}
self.packet_data = []
self.start_time = datetime.now()
self.packet_count = 0
self.lock = threading.Lock()
def get_protocol_name(self, protocol_num: int) -> str:
"""Convert protocol number to name"""
return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')
def process_packet(self, packet) -> None:
"""Process a single packet and extract relevant information"""
try:
if IP in packet:
with self.lock:
packet_info = {
'timestamp': datetime.now(),
'source': packet[IP].src,
'destination': packet[IP].dst,
'protocol': self.get_protocol_name(packet[IP].proto),
'size': len(packet),
'time_relative': (datetime.now() - self.start_time).total_seconds()
}
# Ajouter des informations spécifiques à TCP
if TCP in packet:
packet_info.update({
'src_port': packet[TCP].sport,
'dst_port': packet[TCP].dport,
'tcp_flags': packet[TCP].flags
})
# Ajouter des informations spécifiques à UDP
elif UDP in packet:
packet_info.update({
'src_port': packet[UDP].sport,
'dst_port': packet[UDP].dport
})
self.packet_data.append(packet_info)
self.packet_count += 1
# Conserver uniquement les 10000 derniers paquets pour éviter les problèmes de mémoire
if len(self.packet_data) > 10000:
self.packet_data.pop(0)
except Exception as e:
logger.error(f"Error processing packet: {str(e)}")
def get_dataframe(self) -> pd.DataFrame:
"""Convert packet data to pandas DataFrame"""
with self.lock:
return pd.DataFrame(self.packet_data)
Cette classe va construire notre fonctionnalité principale et comporte plusieurs fonctions utilitaires qui seront utilisées pour traiter les paquets.
Les paquets réseau sont catégorisés en deux au niveau du transport (TCP et UDP) et le protocole ICMP au niveau du réseau. Si vous n’êtes pas familier avec les concepts de TCP/IP, je vous recommande de consulter ce article sur les actualités de freeCodeCamp.
Notre constructeur suivra tous les paquets vus qui sont catégorisés dans ces seaux de types de protocoles TCP/IP que nous avons définis. Nous prendrons également note de l’heure de capture du paquet, des données capturées et du nombre de paquets capturés.
Nous utiliserons également un verrou de thread pour garantir qu’un seul paquet est traité à la fois. Cela peut être étendu pour permettre au projet d’avoir un traitement de paquets parallèle.
La fonction d’aide get_protocol_name
nous aide à obtenir le bon type de protocole en fonction de leurs numéros de protocole. Pour donner un peu de contexte à cela, l’Autorité des numéros attribués sur Internet (IANA) attribue des numéros normalisés pour identifier différents protocoles dans un paquet réseau. À mesure que nous voyons ces numéros dans le paquet réseau analysé, nous saurons quel type de protocole est utilisé dans le paquet intercepté actuellement. Pour la portée de ce projet, nous allons mapper uniquement les protocoles TCP, UDP et ICMP (Ping). Si nous rencontrons un autre type de paquet, nous le catégoriserons comme AUTRE(<numéro_protocole>)
.
La fonction process_packet
gère notre fonctionnalité principale qui traitera ces paquets individuels. Si le paquet contient une couche IP, il prendra note des adresses IP source et de destination, du type de protocole, de la taille du paquet et du temps écoulé depuis le début de la capture du paquet.
Pour les paquets avec des protocoles de couche de transport spécifiques (comme TCP et UDP), nous capturerons les ports source et destination ainsi que les indicateurs TCP pour les paquets TCP. Ces détails extraits seront stockés en mémoire dans la liste packet_data
. Nous suivrons également le packet_count
au fur et à mesure que ces paquets sont traités.
La fonction get_dataframe
nous aide à convertir la liste packet_data
en un data-frame Pandas
qui sera ensuite utilisé pour notre visualisation.
Comment créer les visualisations Streamlit
Il est maintenant temps de construire notre tableau de bord interactif Streamlit. Nous définirons une fonction appelée create_visualization
dans le script dashboard.py
(en dehors de notre classe de traitement de paquets).
def create_visualizations(df: pd.DataFrame):
"""Create all dashboard visualizations"""
if len(df) > 0:
# Distribution des protocoles
protocol_counts = df['protocol'].value_counts()
fig_protocol = px.pie(
values=protocol_counts.values,
names=protocol_counts.index,
title="Protocol Distribution"
)
st.plotly_chart(fig_protocol, use_container_width=True)
# Chronologie des paquets
df['timestamp'] = pd.to_datetime(df['timestamp'])
df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
fig_timeline = px.line(
x=df_grouped.index,
y=df_grouped.values,
title="Packets per Second"
)
st.plotly_chart(fig_timeline, use_container_width=True)
# Principales adresses IP sources
top_sources = df['source'].value_counts().head(10)
fig_sources = px.bar(
x=top_sources.index,
y=top_sources.values,
title="Top Source IP Addresses"
)
st.plotly_chart(fig_sources, use_container_width=True)
Cette fonction prendra le data frame comme entrée et nous aidera à tracer trois graphiques :
-
Graphique de distribution des protocoles : Ce graphique affichera la proportion de différents protocoles (par exemple, TCP, UDP, ICMP) dans le trafic de paquets capturé.
-
Graphique de chronologie des paquets : Ce graphique montrera le nombre de paquets traités par seconde sur une période donnée.
-
Graphique des principales adresses IP sources : Ce graphique mettra en évidence les 10 principales adresses IP qui ont envoyé le plus de paquets dans le trafic capturé.
Le graphique de distribution des protocoles est simplement un diagramme circulaire des comptes de protocoles pour les trois types différents (avec AUTRE). Nous utilisons les outils Python Streamlit
et Plotly
pour tracer ces graphiques. Étant donné que nous avons également noté l’horodatage depuis le début de la capture de paquets, nous utiliserons ces données pour tracer la tendance des paquets capturés au fil du temps.
Pour le deuxième graphique, nous effectuerons une opération groupby
sur les données et obtiendrons le nombre de paquets capturés chaque seconde (S
signifie secondes), puis nous tracerons finalement le graphique.
Enfin, pour le troisième graphique, nous compterons les adresses IP sources distinctes observées et tracerons un graphique des comptes d’IP pour montrer les 10 meilleures IPs.
Comment capturer les paquets réseau
Maintenant, construisons la fonctionnalité qui nous permettra de capturer les données des paquets réseau.
def start_packet_capture():
"""Start packet capture in a separate thread"""
processor = PacketProcessor()
def capture_packets():
sniff(prn=processor.process_packet, store=False)
capture_thread = threading.Thread(target=capture_packets, daemon=True)
capture_thread.start()
return processor
C’est une fonction simple qui instancie la classe PacketProcessor
et utilise ensuite la fonction sniff
dans le module scapy
pour commencer à capturer les paquets.
Nous utilisons le multithreading ici pour nous permettre de capturer des paquets indépendamment du flux principal du programme. Cela garantit que l’opération de capture de paquets ne bloque pas d’autres opérations comme la mise à jour du tableau de bord en temps réel. Nous retournons également l’instance de PacketProcessor
créée afin qu’elle puisse être utilisée dans notre programme principal.
Mise en place de tout ensemble
Maintenant, rassemblons tous ces éléments avec notre fonction main
qui agira comme la fonction pilote de notre programme.
def main():
"""Main function to run the dashboard"""
st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
st.title("Real-time Network Traffic Analysis")
# Initialiser le processeur de paquets en état de session
if 'processor' not in st.session_state:
st.session_state.processor = start_packet_capture()
st.session_state.start_time = time.time()
# Créer la mise en page du tableau de bord
col1, col2 = st.columns(2)
# Obtenir les données actuelles
df = st.session_state.processor.get_dataframe()
# Afficher les indicateurs
with col1:
st.metric("Total Packets", len(df))
with col2:
duration = time.time() - st.session_state.start_time
st.metric("Capture Duration", f"{duration:.2f}s")
# Afficher les visualisations
create_visualizations(df)
# Afficher les paquets récents
st.subheader("Recent Packets")
if len(df) > 0:
st.dataframe(
df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
use_container_width=True
)
# Ajouter le bouton de rafraîchissement
if st.button('Refresh Data'):
st.rerun()
# Rafraîchissement automatique
time.sleep(2)
st.rerun()
Cette fonction instanciera également le tableau de bord Streamlit
et intégrera tous nos composants ensemble. Nous définissons d’abord le titre de la page de notre tableau de bord Streamlit
puis initialisons notre PacketProcessor
. Nous utilisons l’état de session dans Streamlit
pour garantir qu’une seule instance de capture de paquets est créée et que son état est conservé.
Maintenant, nous obtiendrons dynamiquement le dataframe à partir de l’état de session chaque fois que les données sont traitées et commencerons à afficher les indicateurs et les visualisations. Nous afficherons également les paquets récemment capturés avec des informations telles que l’horodatage, les adresses IP source et de destination, le protocole et la taille du paquet. Nous ajouterons également la possibilité pour l’utilisateur de rafraîchir manuellement les données depuis le tableau de bord tout en le rafraîchissant automatiquement toutes les deux secondes.
Enfin, exécutons le programme avec la commande suivante :
sudo streamlit run dashboard.py
Notez que vous devrez exécuter le programme avec sudo
car les capacités de capture de paquets nécessitent des privilèges administratifs. Si vous êtes sur Windows, ouvrez votre terminal en tant qu’administrateur, puis exécutez le programme sans le préfixe sudo
.
Donnez un moment au programme pour commencer à capturer des paquets. Si tout se passe bien, vous devriez voir quelque chose comme ceci :
Voici toutes les visualisations que nous venons de mettre en œuvre dans notre programme de tableau de bord Streamlit
.
Améliorations futures
Avec cela, voici quelques idées d’amélioration future que vous pouvez utiliser pour étendre les fonctionnalités du tableau de bord :
-
Ajouter des capacités d’apprentissage automatique pour la détection d’anomalies
-
Implémenter la cartographie IP géographique
-
Créer des alertes personnalisées basées sur les modèles d’analyse du trafic
-
Ajouter des options d’analyse de la charge utile des paquets
Conclusion
Félicitations ! Vous avez maintenant réussi à créer un tableau de bord d’analyse du trafic réseau en temps réel avec Python et Streamlit
. Ce programme fournira des informations précieuses sur le comportement du réseau et peut être étendu pour divers cas d’utilisation, de la surveillance de la sécurité à l’optimisation du réseau.
Avec cela, j’espère que vous avez appris quelques notions de base sur l’analyse du trafic réseau ainsi qu’un peu de programmation Python. Merci de votre lecture!