Haben Sie jemals gewünscht, Ihren Netzwerkverkehr in Echtzeit zu visualisieren? In diesem Tutorial lernen Sie, wie Sie ein interaktives Dashboard zur Analyse des Netzwerkverkehrs mit Python und Streamlit erstellen. Streamlit ist ein Open-Source-Python-Framework, das Sie verwenden können, um Webanwendungen für Datenanalysen und Datenverarbeitung zu entwickeln.

Am Ende dieses Tutorials wissen Sie, wie Sie rohe Netzwerkpakete von der NIC (Netzwerk-Interface-Karte) Ihres Computers erfassen, die Daten verarbeiten und schöne Visualisierungen erstellen, die in Echtzeit aktualisiert werden.

Inhaltsverzeichnis

Warum ist die Analyse des Netzwerkverkehrs wichtig?

Die Analyse des Netzwerkverkehrs ist eine entscheidende Anforderung in Unternehmen, in denen Netzwerke das Rückgrat nahezu jeder Anwendung und jedes Dienstes bilden. Im Kern steht die Analyse von Netzwerkpaketen, die das Überwachen des Netzwerks, das Erfassen des gesamten Datenverkehrs (Ein- und Ausgang) und das Interpretieren dieser Pakete während ihres Durchlaufs durch ein Netzwerk umfasst. Mit dieser Technik können Sicherheitsmuster identifiziert, Anomalien erkannt und die Sicherheit und Effizienz des Netzwerks gewährleistet werden.

Dieses Proof-of-Concept-Projekt, an dem wir in diesem Tutorial arbeiten werden, ist besonders nützlich, da es Ihnen ermöglicht, die Netzwerkaktivität in Echtzeit zu visualisieren und zu analysieren. Dies wird es Ihnen ermöglichen zu verstehen, wie Fehlerbehebung, Leistungsoptimierungen und Sicherheitsanalysen in Unternehmenssystemen durchgeführt werden.

Voraussetzungen

So richten Sie Ihr Projekt ein

Um loszulegen, erstellen Sie die Projektstruktur und installieren Sie die erforderlichen Tools mit Pip mit den folgenden Befehlen:

mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly

Wir werden Streamlit für die Dashboard-Visualisierungen, Pandas für die Datenverarbeitung, Scapy für die Erfassung und Verarbeitung von Netzwerkpaketen und schließlich Plotly für das Erstellen von Diagrammen mit unseren gesammelten Daten verwenden.

Wie man die Kernfunktionalitäten erstellt

Wir werden den gesamten Code in einer Datei mit dem Namen dashboard.py platzieren. Lassen Sie uns zunächst damit beginnen, alle Elemente zu importieren, die wir verwenden werden:

import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket

Lassen Sie uns nun das Protokollieren konfigurieren, indem wir eine grundlegende Protokollierungskonfiguration einrichten. Diese wird für das Verfolgen von Ereignissen und das Ausführen unserer Anwendung im Debug-Modus verwendet. Wir haben derzeit den Protokollierungsgrad auf INFO gesetzt, was bedeutet, dass Ereignisse mit dem Level INFO oder höher angezeigt werden. Wenn Sie mit dem Protokollieren in Python nicht vertraut sind, empfehle ich, diesen ausführlichen Dokumentationsartikel zu überprüfen.

# Protokollierung konfigurieren
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

Als nächstes werden wir unseren Paketprozessor erstellen. Wir werden die Funktionalität zur Verarbeitung unserer erfassten Pakete in dieser Klasse implementieren.

class PacketProcessor:
    """Process and analyze network packets"""

    def __init__(self):
        self.protocol_map = {
            1: 'ICMP',
            6: 'TCP',
            17: 'UDP'
        }
        self.packet_data = []
        self.start_time = datetime.now()
        self.packet_count = 0
        self.lock = threading.Lock()

    def get_protocol_name(self, protocol_num: int) -> str:
        """Convert protocol number to name"""
        return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')

    def process_packet(self, packet) -> None:
        """Process a single packet and extract relevant information"""
        try:
            if IP in packet:
                with self.lock:
                    packet_info = {
                        'timestamp': datetime.now(),
                        'source': packet[IP].src,
                        'destination': packet[IP].dst,
                        'protocol': self.get_protocol_name(packet[IP].proto),
                        'size': len(packet),
                        'time_relative': (datetime.now() - self.start_time).total_seconds()
                    }

                    # TCP-spezifische Informationen hinzufügen
                    if TCP in packet:
                        packet_info.update({
                            'src_port': packet[TCP].sport,
                            'dst_port': packet[TCP].dport,
                            'tcp_flags': packet[TCP].flags
                        })

                    # UDP-spezifische Informationen hinzufügen
                    elif UDP in packet:
                        packet_info.update({
                            'src_port': packet[UDP].sport,
                            'dst_port': packet[UDP].dport
                        })

                    self.packet_data.append(packet_info)
                    self.packet_count += 1

                    # Nur die letzten 10000 Pakete behalten, um Speicherprobleme zu vermeiden
                    if len(self.packet_data) > 10000:
                        self.packet_data.pop(0)

        except Exception as e:
            logger.error(f"Error processing packet: {str(e)}")

    def get_dataframe(self) -> pd.DataFrame:
        """Convert packet data to pandas DataFrame"""
        with self.lock:
            return pd.DataFrame(self.packet_data)

Diese Klasse wird unsere Kernfunktionalität aufbauen und verfügt über mehrere Hilfsfunktionen, die zur Verarbeitung der Pakete verwendet werden.

Netzwerkpakete werden auf Transportebene in zwei Kategorien unterteilt (TCP und UDP) und das ICMP-Protokoll auf Netzwerkebene. Wenn Ihnen die Konzepte von TCP/IP nicht vertraut sind, empfehle ich, diesen Artikel auf freeCodeCamp News zu lesen.

Unser Konstruktor wird alle gesehenen Pakete verfolgen, die in die von uns definierten TCP/IP-Protokolltyp-Kategorien eingeordnet sind. Wir werden auch die Zeit der Paketaufnahme, die erfassten Daten und die Anzahl der erfassten Pakete notieren.

Wir werden außerdem einen Thread-Lock nutzen, um sicherzustellen, dass jeweils nur ein Paket verarbeitet wird. Dies kann weiter ausgebaut werden, um dem Projekt parallele Paketverarbeitung zu ermöglichen.

Die get_protocol_name-Hilfsfunktion hilft uns, den richtigen Typ des Protokolls basierend auf deren Protokollnummern zu erhalten. Um etwas Hintergrund dazu zu geben: Die Internet Assigned Numbers Authority (IANA) weist standardisierte Nummern zu, um verschiedene Protokolle in einem Netzwerkpaket zu identifizieren. Wenn wir diese Nummern im analysierten Netzwerkpaket sehen, wissen wir, welches Protokoll derzeit im abgefangenen Paket verwendet wird. Im Rahmen dieses Projekts werden wir nur TCP, UDP und ICMP (Ping) zuordnen. Wenn wir auf einen anderen Pakettyp stoßen, werden wir ihn als OTHER(<protocol_num>) kategorisieren.

Die process_packet-Funktion behandelt unsere Kernfunktionalität, die diese einzelnen Pakete verarbeitet. Wenn das Paket eine IP-Schicht enthält, wird es die Quell- und Ziel-IP-Adressen, den Protokolltyp, die Paketgröße und die seit Beginn der Paketaufnahme vergangene Zeit notieren.

Für Pakete mit spezifischen Transportprotokollen (wie TCP und UDP) erfassen wir die Quell- und Zielports sowie die TCP-Flags für TCP-Pakete. Diese extrahierten Details werden im Speicher in der packet_data-Liste gespeichert. Wir werden auch die packet_count verfolgen, während diese Pakete verarbeitet werden.

Die get_dataframe-Funktion hilft uns, die packet_data-Liste in ein Pandas-DataFrame umzuwandeln, das dann für unsere Visualisierung verwendet wird.

Wie man die Streamlit-Visualisierungen erstellt

Jetzt ist es an der Zeit, unser interaktives Streamlit-Dashboard zu erstellen. Wir werden eine Funktion namens create_visualization im Skript dashboard.py definieren (außerhalb unserer Paketverarbeitungsklasse).

def create_visualizations(df: pd.DataFrame):
    """Create all dashboard visualizations"""
    if len(df) > 0:
        # Protokollverteilung
        protocol_counts = df['protocol'].value_counts()
        fig_protocol = px.pie(
            values=protocol_counts.values,
            names=protocol_counts.index,
            title="Protocol Distribution"
        )
        st.plotly_chart(fig_protocol, use_container_width=True)

        # Pakete-Zeitleiste
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
        fig_timeline = px.line(
            x=df_grouped.index,
            y=df_grouped.values,
            title="Packets per Second"
        )
        st.plotly_chart(fig_timeline, use_container_width=True)

        # Top-Quell-IP-Adressen
        top_sources = df['source'].value_counts().head(10)
        fig_sources = px.bar(
            x=top_sources.index,
            y=top_sources.values,
            title="Top Source IP Addresses"
        )
        st.plotly_chart(fig_sources, use_container_width=True)

Diese Funktion wird das DataFrame als Eingabe nehmen und uns helfen, drei Diagramme / Grafiken zu erstellen:

  1. Protokollverteilungsdiagramm: Dieses Diagramm zeigt den Anteil der verschiedenen Protokolle (zum Beispiel TCP, UDP, ICMP) im erfassten Paketverkehr.

  2. Pakete-Zeitleistendiagramm: Dieses Diagramm zeigt die Anzahl der pro Sekunde verarbeiteten Pakete über einen Zeitraum.

  3. Top-Quell-IP-Adressen-Diagramm: Dieses Diagramm hebt die 10 IP-Adressen hervor, die die meisten Pakete im erfassten Verkehr gesendet haben.

Die Protokollverteilungstabelle ist einfach ein Tortendiagramm der Protokollzählungen für die drei verschiedenen Typen (zusammen mit ANDEREN). Wir verwenden die Python-Tools Streamlit und Plotly, um diese Diagramme zu erstellen. Da wir auch den Zeitstempel seit dem Start des Paketerfassungsprozesses festgehalten haben, werden wir diese Daten verwenden, um den Trend der erfassten Pakete im Laufe der Zeit darzustellen.

Für das zweite Diagramm führen wir eine groupby-Operation auf den Daten durch, um die Anzahl der erfassten Pakete in jeder Sekunde zu erhalten (S steht für Sekunden), und schließlich werden wir den Graphen erstellen.

Zuletzt werden wir für das dritte Diagramm die Anzahl der beobachteten verschiedenen Quell-IPs zählen und ein Diagramm der IP-Zählungen erstellen, um die Top 10 IPs anzuzeigen.

So erfassen Sie Netzwerk-Pakete

Jetzt bauen wir die Funktionalität auf, um uns das Erfassen von Netzwerk-Paketdaten zu ermöglichen.

def start_packet_capture():
    """Start packet capture in a separate thread"""
    processor = PacketProcessor()

    def capture_packets():
        sniff(prn=processor.process_packet, store=False)

    capture_thread = threading.Thread(target=capture_packets, daemon=True)
    capture_thread.start()

    return processor

Dies ist eine einfache Funktion, die die Klasse PacketProcessor instanziiert und dann die Funktion sniff im Modul scapy verwendet, um mit dem Erfassen der Pakete zu beginnen.

Wir verwenden hier das Threading, um uns das Erfassen von Paketen unabhängig vom Hauptprogrammfluss zu ermöglichen. Dies stellt sicher, dass der Paketerfassungsvorgang andere Operationen wie das Aktualisieren des Dashboards in Echtzeit nicht blockiert. Wir geben auch die erstellte Instanz von PacketProcessor zurück, damit sie in unserem Hauptprogramm verwendet werden kann.

Alles Zusammenfügen

Jetzt fügen wir all diese Teile mit unserer main-Funktion zusammen, die als Treiberfunktion für unser Programm fungieren wird.

def main():
    """Main function to run the dashboard"""
    st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
    st.title("Real-time Network Traffic Analysis")

    # Initialisiere Paketprozessor im Sitzungszustand
    if 'processor' not in st.session_state:
        st.session_state.processor = start_packet_capture()
        st.session_state.start_time = time.time()

    # Erstelle Dashboard-Layout
    col1, col2 = st.columns(2)

    # Hole aktuelle Daten
    df = st.session_state.processor.get_dataframe()

    # Zeige Metriken an
    with col1:
        st.metric("Total Packets", len(df))
    with col2:
        duration = time.time() - st.session_state.start_time
        st.metric("Capture Duration", f"{duration:.2f}s")

    # Zeige Visualisierungen an
    create_visualizations(df)

    # Zeige aktuelle Pakete an
    st.subheader("Recent Packets")
    if len(df) > 0:
        st.dataframe(
            df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
            use_container_width=True
        )

    # Füge Aktualisierungs-Button hinzu
    if st.button('Refresh Data'):
        st.rerun()

    # Automatische Aktualisierung
    time.sleep(2)
    st.rerun()

Diese Funktion wird auch das Streamlit Dashboard instanziieren und alle unsere Komponenten zusammen integrieren. Zuerst setzen wir den Seitentitel unseres Streamlit Dashboards und initialisieren dann unseren PacketProcessor. Wir verwenden den Sitzungszustand in Streamlit, um sicherzustellen, dass nur eine Instanz der Paketerfassung erstellt wird und der Zustand davon beibehalten wird.

Jetzt werden wir dynamisch den DataFrame aus dem Sitzungszustand abrufen, jedes Mal wenn die Daten verarbeitet werden, und beginnen, die Metriken und die Visualisierungen anzuzeigen. Wir werden auch die kürzlich erfassten Pakete zusammen mit Informationen wie Zeitstempel, Quell- und Ziel-IP-Adressen, Protokoll und Größe des Pakets anzeigen. Zudem fügen wir die Möglichkeit hinzu, dass der Benutzer die Daten manuell vom Dashboard aktualisieren kann, während wir sie alle zwei Sekunden automatisch aktualisieren.

Lass uns schließlich das Programm mit dem folgenden Befehl ausführen:

sudo streamlit run dashboard.py

Beachten Sie, dass Sie das Programm mit sudo ausführen müssen, da die Paketerfassung administrative Rechte erfordert. Wenn Sie Windows verwenden, öffnen Sie Ihr Terminal als Administrator und führen Sie das Programm ohne das sudo Präfix aus.

Geben Sie dem Programm einen Moment Zeit, um mit der Erfassung der Pakete zu beginnen. Wenn alles richtig läuft, sollten Sie etwas Ähnliches wie dieses sehen:

Das sind alle Visualisierungen, die wir gerade in unserem Streamlit Dashboard-Programm implementiert haben.

Zukünftige Erweiterungen

Mit diesen sind einige zukünftige Verbesserungsideen, die Sie nutzen können, um die Funktionalitäten des Dashboards zu erweitern:

  1. Fügen Sie maschinelles Lernen für die Anomalieerkennung hinzu

  2. Implementieren Sie geografisches IP-Mapping

  3. Erstellen Sie benutzerdefinierte Warnmeldungen basierend auf Verkehrsanalysemustern

  4. Fügen Sie Optionen für die Analyse des Paketinhalts hinzu

Fazit

Herzlichen Glückwunsch! Sie haben erfolgreich ein Echtzeit-Netzwerkverkehrsanalyse-Dashboard mit Python und Streamlit erstellt. Dieses Programm wird wertvolle Einblicke in das Netzwerkverhalten bieten und kann für verschiedene Anwendungsfälle erweitert werden, von der Sicherheitsüberwachung bis zur Netzwerkoptimierung.

Damit hoffe ich, dass du einige Grundlagen zur Analyse von Netzwerkverkehr sowie ein wenig Python-Programmierung gelernt hast. Danke fürs Lesen!