Heb je ooit willen visualiseren van je netwerkverkeer in real-time? In deze tutorial leer je hoe je een interactief dashboard voor netwerkverkeersanalyse kunt bouwen met Python en Streamlit. Streamlit is een open-source Python-framework dat je kunt gebruiken voor het ontwikkelen van webapplicaties voor data-analyse en data-verwerking.

Aan het einde van deze tutorial weet je hoe je ruwe netwerkpakketten van de NIC (Network Interface Card) van je computer kunt vastleggen, de gegevens kunt verwerken en prachtige visualisaties kunt maken die in real-time worden bijgewerkt.

Inhoudsopgave

Waarom is netwerkverkeersanalyse belangrijk?

Netwerkanalyse is een cruciale vereiste in bedrijven waar netwerken de ruggengraat vormen van bijna elke toepassing en service. In de kern hebben we de analyse van netwerkpakketten die het monitoren van het netwerk, het vastleggen van al het verkeer (inkomend en uitgaand) en het interpreteren van deze pakketten terwijl ze door een netwerk stromen, omvat. U kunt deze techniek gebruiken om beveiligingspatronen te identificeren, afwijkingen te detecteren en de beveiliging en efficiëntie van het netwerk te waarborgen.

Deze proof-of-concept project waar we aan werken in deze zelfstudie is bijzonder nuttig omdat het u helpt om netwerkactiviteit in realtime te visualiseren en analyseren. Dit stelt u in staat om te begrijpen hoe probleemoplossing, prestatie-optimalisaties en beveiligingsanalyse worden uitgevoerd in bedrijfssystemen.

Vereisten

Hoe u uw project instelt

Om te beginnen, maakt u de projectstructuur en installeert u de benodigde tools met Pip met de volgende commando’s:

mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly

We zullen Streamlit gebruiken voor de dashboardvisualisaties, Pandas voor de dataprocessing, Scapy voor het vastleggen en verwerken van netwerkpakketten, en tot slot Plotly voor het plotten van grafieken met onze verzamelde gegevens.

Hoe de kernfunctionaliteiten te bouwen

We zullen alle code in één bestand met de naam dashboard.py plaatsen. Laten we eerst beginnen met het importeren van alle elementen die we zullen gebruiken:

import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket

Laten we nu het loggen configureren door een basis logconfiguratie in te stellen. Dit zal worden gebruikt voor het bijhouden van gebeurtenissen en het uitvoeren van onze toepassing in de debugmodus. We hebben momenteel het logniveau ingesteld op INFO, wat betekent dat gebeurtenissen met niveau INFO of hoger worden weergegeven. Als je niet bekend bent met loggen in Python, raad ik aan om deze documentatie te bekijken die dieper ingaat.

# Loggen configureren
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

Vervolgens zullen we onze pakketverwerker bouwen. We zullen de functionaliteit implementeren om onze vastgelegde pakketten te verwerken in deze klasse.

class PacketProcessor:
    """Process and analyze network packets"""

    def __init__(self):
        self.protocol_map = {
            1: 'ICMP',
            6: 'TCP',
            17: 'UDP'
        }
        self.packet_data = []
        self.start_time = datetime.now()
        self.packet_count = 0
        self.lock = threading.Lock()

    def get_protocol_name(self, protocol_num: int) -> str:
        """Convert protocol number to name"""
        return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')

    def process_packet(self, packet) -> None:
        """Process a single packet and extract relevant information"""
        try:
            if IP in packet:
                with self.lock:
                    packet_info = {
                        'timestamp': datetime.now(),
                        'source': packet[IP].src,
                        'destination': packet[IP].dst,
                        'protocol': self.get_protocol_name(packet[IP].proto),
                        'size': len(packet),
                        'time_relative': (datetime.now() - self.start_time).total_seconds()
                    }

                    # TCP-specifieke informatie toevoegen
                    if TCP in packet:
                        packet_info.update({
                            'src_port': packet[TCP].sport,
                            'dst_port': packet[TCP].dport,
                            'tcp_flags': packet[TCP].flags
                        })

                    # UDP-specifieke informatie toevoegen
                    elif UDP in packet:
                        packet_info.update({
                            'src_port': packet[UDP].sport,
                            'dst_port': packet[UDP].dport
                        })

                    self.packet_data.append(packet_info)
                    self.packet_count += 1

                    # Alleen de laatste 10000 pakketten behouden om geheugenproblemen te voorkomen
                    if len(self.packet_data) > 10000:
                        self.packet_data.pop(0)

        except Exception as e:
            logger.error(f"Error processing packet: {str(e)}")

    def get_dataframe(self) -> pd.DataFrame:
        """Convert packet data to pandas DataFrame"""
        with self.lock:
            return pd.DataFrame(self.packet_data)

Deze klasse zal onze kernfunctionaliteit opbouwen en heeft verschillende hulpprogrammafuncties die zullen worden gebruikt voor het verwerken van de pakketten.

Netwerkpakketten zijn ingedeeld in twee op transportniveau (TCP en UDP) en het ICMP-protocol op het netwerkniveau. Als je niet bekend bent met de concepten van TCP/IP, raad ik aan om dit artikel op freeCodeCamp News te bekijken.

Onze constructor zal alle gezien pakketten bijhouden die zijn gecategoriseerd in deze TCP/IP-protocoltype emmers die we hebben gedefinieerd. We zullen ook aantekeningen maken van de tijd van vastleggen van het pakket, de vastgelegde gegevens en het aantal vastgelegde pakketten.

We zullen ook gebruik maken van een threadvergrendeling om ervoor te zorgen dat slechts één pakket per keer wordt verwerkt. Dit kan verder worden uitgebreid om het project in staat te stellen parallelle pakketverwerking te hebben.

De helperfunctie get_protocol_name helpt ons het juiste type van het protocol te krijgen op basis van hun protocolnummers. Om hier wat achtergrondinformatie over te geven, kent de Internet Assigned Numbers Authority (IANA) gestandaardiseerde nummers toe om verschillende protocollen in een netwerkpakket te identificeren. Telkens wanneer we deze nummers zien in het geanalyseerde netwerkpakket, zullen we weten welk soort protocol wordt gebruikt in het pakket dat momenteel onderschept wordt. Voor de scope van dit project zullen we alleen TCP, UDP en ICMP (Ping) toewijzen. Als we een ander type pakket tegenkomen, zullen we het categoriseren als ANDERE(<protocol_num>).

De functie process_packet behandelt onze kernfunctionaliteit die deze individuele pakketten zal verwerken. Als het pakket een IP-laag bevat, zal het aantekening maken van de bron- en bestemmings-IP-adressen, het protocoltype, de pakketgrootte en de verstreken tijd sinds het begin van het vastleggen van het pakket.

Voor pakketten met specifieke transportlaagprotocollen (zoals TCP en UDP), zullen we de bron- en bestemmingspoorten vastleggen samen met TCP-vlaggen voor TCP-pakketten. Deze geëxtraheerde details zullen worden opgeslagen in het geheugen in de packet_data-lijst. We zullen ook het packet_count bijhouden wanneer deze pakketten worden verwerkt.

De functie get_dataframe helpt ons om de lijst packet_data om te zetten in een Pandas data-frame die vervolgens zal worden gebruikt voor onze visualisatie.

Hoe maak je de Streamlit visualisaties

Het is nu tijd voor ons om onze interactieve Streamlit Dashboard te bouwen. We zullen een functie genaamd create_visualization definiëren in het script dashboard.py (buiten onze packet processing class).

def create_visualizations(df: pd.DataFrame):
    """Create all dashboard visualizations"""
    if len(df) > 0:
        # Protocol distributie
        protocol_counts = df['protocol'].value_counts()
        fig_protocol = px.pie(
            values=protocol_counts.values,
            names=protocol_counts.index,
            title="Protocol Distribution"
        )
        st.plotly_chart(fig_protocol, use_container_width=True)

        # Pakketten tijdlijn
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
        fig_timeline = px.line(
            x=df_grouped.index,
            y=df_grouped.values,
            title="Packets per Second"
        )
        st.plotly_chart(fig_timeline, use_container_width=True)

        # Top source IP's
        top_sources = df['source'].value_counts().head(10)
        fig_sources = px.bar(
            x=top_sources.index,
            y=top_sources.values,
            title="Top Source IP Addresses"
        )
        st.plotly_chart(fig_sources, use_container_width=True)

Deze functie zal het data frame als input nemen en zal ons helpen bij het plotten van drie grafieken:

  1. Protocol Distributie Grafiek: Deze grafiek zal de verhouding van verschillende protocollen (bijvoorbeeld, TCP, UDP, ICMP) in het vastgelegde pakketverkeer weergeven.

  2. Pakketten Tijdlijn Grafiek: Deze grafiek zal het aantal verwerkte pakketten per seconde over een bepaalde periode weergeven.

  3. Top Source IP Adressen Grafiek: Deze grafiek zal de top 10 IP-adressen markeren die de meeste pakketten hebben verzonden in het vastgelegde verkeer.

De protocolverdelingsgrafiek is eenvoudigweg een cirkeldiagram van de protocol tellingen voor de drie verschillende types (samen met ANDERE). We gebruiken de Streamlit en Plotly Python tools om deze grafieken te maken. Aangezien we ook de tijdstempel hebben genoteerd sinds het begin van het pakket vastleggen, zullen we deze gegevens gebruiken om de trend van de vastgelegde pakketten in de loop van de tijd te plotten.

Voor de tweede grafiek zullen we een groupby operatie op de gegevens uitvoeren en het aantal vastgelegde pakketten in elke seconde verkrijgen (S staat voor seconden), en vervolgens zullen we de grafiek plotten.

Tenslotte zullen we voor de derde grafiek de verschillende waargenomen bron-IP’s tellen en een grafiek van de IP tellingen maken om de top 10 IP’s te tonen.

Hoe pakketten in het netwerk vastleggen

Nu gaan we de functionaliteit opbouwen om ons in staat te stellen netwerkpakketgegevens vast te leggen.

def start_packet_capture():
    """Start packet capture in a separate thread"""
    processor = PacketProcessor()

    def capture_packets():
        sniff(prn=processor.process_packet, store=False)

    capture_thread = threading.Thread(target=capture_packets, daemon=True)
    capture_thread.start()

    return processor

Dit is een eenvoudige functie die een instantie van de PacketProcessor klasse instantieert en vervolgens de sniff functie in de scapy module gebruikt om de pakketten vast te leggen.

We gebruiken hier threading om ons in staat te stellen pakketten onafhankelijk van de hoofdprogrammastroom vast te leggen. Dit zorgt ervoor dat de pakketvastlegoperatie andere bewerkingen zoals het bijwerken van het dashboard in realtime niet blokkeert. We geven ook de aangemaakte PacketProcessor instantie terug zodat deze in ons hoofdprogramma kan worden gebruikt.

Alles samenvoegen

Nu zullen we al deze stukken samenvoegen met onze main functie die zal fungeren als de driverfunctie voor ons programma.

def main():
    """Main function to run the dashboard"""
    st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
    st.title("Real-time Network Traffic Analysis")

    # Initialiseer packetprocessor in sessietoestand
    if 'processor' not in st.session_state:
        st.session_state.processor = start_packet_capture()
        st.session_state.start_time = time.time()

    # Maak dashboard lay-out
    col1, col2 = st.columns(2)

    # Haal huidige gegevens op
    df = st.session_state.processor.get_dataframe()

    # Toon metingen
    with col1:
        st.metric("Total Packets", len(df))
    with col2:
        duration = time.time() - st.session_state.start_time
        st.metric("Capture Duration", f"{duration:.2f}s")

    # Toon visualisaties
    create_visualizations(df)

    # Toon recente pakketten
    st.subheader("Recent Packets")
    if len(df) > 0:
        st.dataframe(
            df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
            use_container_width=True
        )

    # Voeg vernieuwingsknop toe
    if st.button('Refresh Data'):
        st.rerun()

    # Automatisch vernieuwen
    time.sleep(2)
    st.rerun()

Deze functie zal ook het Streamlit dashboard instantiëren en al onze componenten integreren. We stellen eerst de paginatitel van ons Streamlit dashboard in en initialiseren vervolgens onze PacketProcessor. We gebruiken de sessietoestand in Streamlit om ervoor te zorgen dat er slechts één instantie van pakketcapturing wordt gemaakt en dat de status ervan behouden blijft.

Nu zullen we dynamisch het dataframe uit de sessietoestand halen telkens wanneer de gegevens worden verwerkt en beginnen met het tonen van de metingen en de visualisaties. We zullen ook de recentelijk vastgelegde pakketten weergeven samen met informatie zoals de tijdstempel, bron- en doel-IP’s, protocol en grootte van het pakket. We zullen ook de mogelijkheid toevoegen voor de gebruiker om handmatig de gegevens van het dashboard te vernieuwen terwijl we deze ook automatisch elke twee seconden vernieuwen.

Laten we tot slot het programma uitvoeren met de volgende opdracht:

sudo streamlit run dashboard.py

Let erop dat je het programma moet uitvoeren met sudo omdat de pakketopnamefuncties administratieve rechten vereisen. Als je Windows gebruikt, open dan je terminal als Administrator en voer vervolgens het programma uit zonder het sudo voorvoegsel.

Geef het programma even de tijd om te beginnen met het vastleggen van pakketten. Als alles goed gaat, zou je iets moeten zien zoals dit:

Dit zijn alle visualisaties die we zojuist hebben geïmplementeerd in ons Streamlit dashboard programma.

Toekomstige verbeteringen

Met dat, hier zijn enkele ideeën voor toekomstige verbeteringen die je kunt gebruiken om de functionaliteiten van het dashboard uit te breiden:

  1. Voeg machine learning mogelijkheden toe voor anomaliedetectie

  2. Implementeer geografische IP-mapping

  3. Maak aangepaste waarschuwingen op basis van verkeersanalysepatronen

  4. Voeg opties toe voor het analyseren van pakketpayload

Conclusie

Gefeliciteerd! Je hebt nu succesvol een real-time netwerkverkeersanalyse dashboard gebouwd met Python en Streamlit. Dit programma zal waardevolle inzichten bieden in het netwerkgedrag en kan worden uitgebreid voor diverse toepassingen, van beveiligingsmonitoring tot netwerkoptimalisatie.

Daarmee hoop ik dat je wat basiskennis hebt opgedaan over netwerkverkeersanalyse en een beetje Python-programmering. Bedankt voor het lezen!