האם אי פעם רצית לחזות את תעבורת הרשת שלך בזמן אמת? במדריך זה, תלמד כיצד לבנות לוח בקרה אינטראקטיבי לניתוח תעבורת רשת עם Python ו־Streamlit. Streamlit הוא מסגרת פייתון קוד פתוח שניתן להשתמש בה לפיתוח אפליקציות אינטרנט לניתוח נתונים ועיבוד נתונים.

עד סיום המדריך, תדע כיצד לכוד חבילות רשת גולמיות מכרטיס הרשת (NIC) של המחשב שלך, לעבד את הנתונים, וליצור ויזואליזציות יפות שיעדכנו בזמן אמת.

תוכן עניינים

למה ניתוח תעבורת הרשת חשוב?

ניתוח תעבורת רשת הוא דרישה עיקרית בעסקים שבהם רשתות מהוות את היתרון הכולל של כמעט כל יישום ושירות. בלב הנושא יש ניתוח של חבילות רשת המשלב מעקב אחר הרשת, כיבוד כל התעבורה (כניסה ויציאה), ופרשנות של חבילות אלו בזמן שהן זורמות דרך הרשת. ניתן להשתמש בטכניקה זו כדי לזהות דפוסי אבטחה, לזהות אנומליות, ולהבטיח את האבטחה והיעילות של הרשת. מיזמים קודמים הדרושים לפרוייקט הזה שנעבוד עליו במדריך זה

עשויים להיות מיוחדים במיוחד מאחר שהם יעזרו לך להמחיש ולנתח פעילות רשת בזמן אמת. וזה יאפשר לך להבין כיצד נעשה פתרון בעיות, אופטימיזציה של ביצועים, וניתוח אבטחה במערכות עסקיות.

דרישות מוקדמות

איך להתקין את הפרויקט שלך

כדי להתחיל, צריך ליצור את מבנה הפרויקט ולהתקין את הכלים הדרושים עם Pip באמצעות הפקודות הבאות:

mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly

נשתמש ב־Streamlit להצגת הלוחות, Pandas לעיבוד הנתונים, Scapy לתפיסת חבילות רשת ועיבוד החבילות, ולבסוף Plotly לציור תרשימים עם הנתונים שנאספו.

איך לבנות את התכונות הליביות

נשים את כל הקוד בקובץ אחד בשם dashboard.py. נתחיל ראשית בייבוא כל הרכיבים שנשתמש בהם:

import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket

כעת נגדיר יומן אירועים על ידי הגדרת תצורת יומן בסיסית. יומן זה ישמש למעקב אחר אירועים והרצת האפליקציה שלנו במצב דיבאג. כרגע הגדרנו את רמת הלוגינג להיות INFO, שפירושה שאירועים ברמת INFO ומעלה יוצגו. אם אינך מוכר עם לוגינג בפייתון, אני ממליץ לבדוק כאן את קטע המסמך שנכנס לעומק.

# הגדרת לוגינג
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

לבארץ, נבנה את מעבד החבילות שלנו. נממש את הפונקציונליות לעיבוד החבילות שנלכדו במחלקה זו.

class PacketProcessor:
    """Process and analyze network packets"""

    def __init__(self):
        self.protocol_map = {
            1: 'ICMP',
            6: 'TCP',
            17: 'UDP'
        }
        self.packet_data = []
        self.start_time = datetime.now()
        self.packet_count = 0
        self.lock = threading.Lock()

    def get_protocol_name(self, protocol_num: int) -> str:
        """Convert protocol number to name"""
        return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')

    def process_packet(self, packet) -> None:
        """Process a single packet and extract relevant information"""
        try:
            if IP in packet:
                with self.lock:
                    packet_info = {
                        'timestamp': datetime.now(),
                        'source': packet[IP].src,
                        'destination': packet[IP].dst,
                        'protocol': self.get_protocol_name(packet[IP].proto),
                        'size': len(packet),
                        'time_relative': (datetime.now() - self.start_time).total_seconds()
                    }

                    # הוספת מידע ספציפי ל־TCP
                    if TCP in packet:
                        packet_info.update({
                            'src_port': packet[TCP].sport,
                            'dst_port': packet[TCP].dport,
                            'tcp_flags': packet[TCP].flags
                        })

                    # הוספת מידע ספציפי ל־UDP
                    elif UDP in packet:
                        packet_info.update({
                            'src_port': packet[UDP].sport,
                            'dst_port': packet[UDP].dport
                        })

                    self.packet_data.append(packet_info)
                    self.packet_count += 1

                    # שמירה רק על אחרונות 10000 חבילות כדי למנוע בעיות זיכרון
                    if len(self.packet_data) > 10000:
                        self.packet_data.pop(0)

        except Exception as e:
            logger.error(f"Error processing packet: {str(e)}")

    def get_dataframe(self) -> pd.DataFrame:
        """Convert packet data to pandas DataFrame"""
        with self.lock:
            return pd.DataFrame(self.packet_data)

מחלקה זו תבנה את הפונקציונליות היסודית שלנו ותכיל מספר פונקציות עזר שיעשו שימוש בעיבוד החבילות.

חבילות רשת מסווגות לשניים ברמת העברת מידע (TCP ו־UDP) ופרוטוקול ICMP ברמת הרשת. אם אינך מוכר עם מושגי TCP/IP, אני ממליץ לבדוק כאן מאמר זה בפורום freeCodeCamp News.

הבונה שלנו יעקוב אחר כל החבילות שנראו שמסווגות לאמצעי תקשורת TCP/IP שהגדרנו. נשים לב גם לזמן תפיסת החבילה, לנתונים שנתפסו, ולמספר החבילות שנתפסו.

נשתמש גם בנעילת תהליך כדי לוודא שרק חבילה אחת נעודף בכל פעם. ניתן להרחיב זאת כדי לאפשר עיבוד מקביל של חבילות בפרויקט.

הפונקציה העוזרת get_protocol_name עוזרת לנו לקבל את סוג הפרוטוקול הנכון על סמך מספרי הפרוטוקול שלהם. לתת רקע לכך, רשות המספרים המוקצית ברשת (IANA) מקצה מספרים תקניים לזיהוי של פרוטוקולים שונים בחבילת רשת. כאשר אנו רואים את המספרים האלה בחבילת הרשת שנפרקה, נדע איזה סוג של פרוטוקול משמש כרגע בחבילה שהופצה. למען הפרויקט הזה, נמפה רק לTCP, UDP ו-ICMP (Ping). אם נתקל בסוג אחר של חבילה, נסווג אותה כ-OTHER(<protocol_num>).

הפונקציה process_packet מטפלת בתפקוד העיקרי שלנו שיעבד את החבילות האישיות הללו. אם החבילה מכילה שכבת IP, תשים לב לכתובות ה-IP של המקור והיעד, סוג הפרוטוקול, גודל החבילה, והזמן שחלף מאז תחילת תפיסת החבילה.

לחבילות עם פרוטוקולים מסוימים בשכבת ההעברה (כמו TCP ו-UDP), נתפוס את כתובות היציאה והיעד יחד עם דגלי TCP לחבילות TCP. הפרטים שיוצאים משורטגים יאוחסנו בזיכרון ברשימת packet_data. נשמור גם על סמיכות ה-packet_count בהעברת החבילות הללו.

הפונקציה get_dataframe עוזרת לנו להמיר את רשימת packet_data למסגרת נתונים של Pandas שתשמש לאחר מכן עבור הוויזואליזציה שלנו.

איך ליצור את הוויזואליזציות של Streamlit

עכשיו הגיע הזמן לבנות את לוח המחוונים האינטראקטיבי שלנו ב-Streamlit. נגדיר פונקציה בשם create_visualization בסקריפט dashboard.py (מחוץ למחלקת עיבוד החבילות שלנו).

def create_visualizations(df: pd.DataFrame):
    """Create all dashboard visualizations"""
    if len(df) > 0:
        # חלוקת פרוטוקולים
        protocol_counts = df['protocol'].value_counts()
        fig_protocol = px.pie(
            values=protocol_counts.values,
            names=protocol_counts.index,
            title="Protocol Distribution"
        )
        st.plotly_chart(fig_protocol, use_container_width=True)

        # ציר הזמן של החבילות
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
        fig_timeline = px.line(
            x=df_grouped.index,
            y=df_grouped.values,
            title="Packets per Second"
        )
        st.plotly_chart(fig_timeline, use_container_width=True)

        # כתובות ה-IP המקורות המובילות
        top_sources = df['source'].value_counts().head(10)
        fig_sources = px.bar(
            x=top_sources.index,
            y=top_sources.values,
            title="Top Source IP Addresses"
        )
        st.plotly_chart(fig_sources, use_container_width=True)

פונקציה זו תקבל את מסגרת הנתונים כהזנה ותעזור לנו לשרטט שלושה תרשימים / גרפים:

  1. תרשים חלוקת פרוטוקולים: תרשים זה יציג את הפרופורציה של פרוטוקולים שונים (למשל, TCP, UDP, ICMP) בתעבורת החבילות שנתפסה.

  2. תרשים ציר הזמן של החבילות: תרשים זה יראה את מספר החבילות המעובדות בשנייה במשך פרק זמן.

  3. תרשים כתובות ה-IP המקורות המובילות: תרשים זה ידגיש את 10 כתובות ה-IP המובילות ששלחו את מספר החבילות הרב ביותר בתעבורה הנתפסת.

תרשים ההפצה של הפרוטוקול הוא פשוט תרשים עוגה של ספירת הפרוטוקולים לשלושת סוגי הפרוטוקולים השונים (יחד עם OTHER). אנו משתמשים בכלים הפייתוניים Streamlit ו־Plotly כדי לעצב את התרשימים הללו. מאחר שרשמנו גם את הזמן המדויק מרגע תחילת תפיסת החבילות, נשתמש בנתונים אלו כדי לצייר את מגמת תפיסת החבילות לאורך הזמן.

לתרשים השני, נבצע פעולת groupby על הנתונים ונקבל את מספר החבילות שנתפסו בכל שנייה (S מייצג שניות), ולבסוף נצייר את הגרף.

לתרשים השלישי, נספור את כמות כתובות ה־IP מקור ששצפו ונצייר תרשים של ספירת ה־IP כדי להציג את עשרת ה־IP העליונות.

כיצד לתפוס את חבילות הרשת

כעת, נבנה את הפונקציונליות שתאפשר לנו לתפוס נתוני חבילות הרשת.

def start_packet_capture():
    """Start packet capture in a separate thread"""
    processor = PacketProcessor()

    def capture_packets():
        sniff(prn=processor.process_packet, store=False)

    capture_thread = threading.Thread(target=capture_packets, daemon=True)
    capture_thread.start()

    return processor

זוהי פונקציה פשוטה שמייצרת את מחלקת ה־PacketProcessor ואז משתמשת בפונקציית ה־sniff במודול ה־scapy כדי להתחיל לתפוס את החבילות.

אנו משתמשים בעיבוד מקבילי כאן כדי לאפשר לנו לתפוס חבילות באופן עצמאי מזרם התוכנית הראשי. זהו מבטיח כי פעולת תפיסת החבילות אינה חוסמת פעולות אחרות כמו עדכון הלוח בזמן אמת. נחזיר גם את מופע ה־PacketProcessor שנוצר כדי שיוכל לשמש בתוכנית הראשית שלנו.

הכל יחד

עכשיו נצרף את כל החלקים הללו בעזרת הפונקציה main שתפקידה לשמש כפונקציית הנהג לתוכנית שלנו.

def main():
    """Main function to run the dashboard"""
    st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
    st.title("Real-time Network Traffic Analysis")

    # הפעלת מעבד חבילות במצב הסשן
    if 'processor' not in st.session_state:
        st.session_state.processor = start_packet_capture()
        st.session_state.start_time = time.time()

    # יצירת פריסת לוח בקרה
    col1, col2 = st.columns(2)

    # קבלת נתונים נוכחיים
    df = st.session_state.processor.get_dataframe()

    # הצגת מדדים
    with col1:
        st.metric("Total Packets", len(df))
    with col2:
        duration = time.time() - st.session_state.start_time
        st.metric("Capture Duration", f"{duration:.2f}s")

    # הצגת ויזואליזציות
    create_visualizations(df)

    # הצגת חבילות אחרונות
    st.subheader("Recent Packets")
    if len(df) > 0:
        st.dataframe(
            df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
            use_container_width=True
        )

    # הוספת כפתור רענון
    if st.button('Refresh Data'):
        st.rerun()

    # רענון אוטומטי
    time.sleep(2)
    st.rerun()

פונקציה זו תפעיל את לוח הבקרה של Streamlit, ותשלב את כל הרכיבים שלנו יחד. נקבע תחילה את כותרת העמוד של לוח הבקרה שלנו ואז נאתחל את PacketProcessor שלנו. אנו משתמשים במצב הסשן ב־Streamlit כדי לוודא שנוצר רק מופע אחד של תפיסת חבילות ושהמצב שלו נשמר.

עכשיו, נקבל את מסגרת הנתונים ממצב הסשן בכל פעם שהנתונים מעובדים ונתחיל להציג את המדדים והוויזואליזציות. נציג גם את החבילות שתפסנו לאחרונה יחד עם מידע כמו הזמן, כתובות ה־IP ממקור ומיעד, פרוטוקול וגודל החבילה. נוסיף גם את היכולת לרענן באופן ידני את הנתונים מלוח הבקרה בזמן שנרענן אותם באופן אוטומטי כל שתי שניות.

ואז נפעיל את התוכנית עם הפקודה הבאה:

sudo streamlit run dashboard.py

שימו לב שעליכם להריץ את התוכנית עם sudo מכיוון שיכולות לכידת החבילות דורשות הרשאות מנהל. אם אתם משתמשים ב-Windows, פתחו את המסוף שלכם כמנהל ולאחר מכן הריצו את התוכנית ללא הקידומת sudo.

תנו לתוכנית רגע כדי להתחיל ללכוד חבילות. אם הכל מתנהל כראוי, אתם אמורים לראות משהו כזה:

אלו כל הוויזואליזציות שהטמענו בתוכנית לוח הבקרה של Streamlit.

שיפורים עתידיים

עם זאת, הנה כמה רעיונות לשיפורים עתידיים שאתם יכולים להשתמש בהם כדי להרחיב את הפונקציות של לוח הבקרה:

  1. הוסיפו יכולות למידת מכונה לזיהוי אנומליות

  2. יישמו מיפוי IP גיאוגרפי

  3. צרו התראות מותאמות אישית בהתבסס על דפוסי ניתוח תעבורה

  4. הוסיפו אפשרויות ניתוח תוכן חבילות

סיכום

מזל טוב! עכשיו בניתם בהצלחה לוח בקרה לניתוח תעבורת רשת בזמן אמת עם Python ו-Streamlit. תוכנית זו תספק תובנות יקרות ערך לגבי התנהגות הרשת וניתן להרחיב אותה למגוון שימושים, החל ממעקב אבטחה ועד אופטימיזציה של רשת.

עם זאת, אני מקווה שלמדת קצת יסודות על ניתוח תעבורת רשת וגם קצת תכנות ב-Python. תודה על הקריאה!