האם אי פעם רצית לחזות את תעבורת הרשת שלך בזמן אמת? במדריך זה, תלמד כיצד לבנות לוח בקרה אינטראקטיבי לניתוח תעבורת רשת עם Python ו־Streamlit
. Streamlit
הוא מסגרת פייתון קוד פתוח שניתן להשתמש בה לפיתוח אפליקציות אינטרנט לניתוח נתונים ועיבוד נתונים.
עד סיום המדריך, תדע כיצד לכוד חבילות רשת גולמיות מכרטיס הרשת (NIC) של המחשב שלך, לעבד את הנתונים, וליצור ויזואליזציות יפות שיעדכנו בזמן אמת.
תוכן עניינים
למה ניתוח תעבורת הרשת חשוב?
ניתוח תעבורת רשת הוא דרישה עיקרית בעסקים שבהם רשתות מהוות את היתרון הכולל של כמעט כל יישום ושירות. בלב הנושא יש ניתוח של חבילות רשת המשלב מעקב אחר הרשת, כיבוד כל התעבורה (כניסה ויציאה), ופרשנות של חבילות אלו בזמן שהן זורמות דרך הרשת. ניתן להשתמש בטכניקה זו כדי לזהות דפוסי אבטחה, לזהות אנומליות, ולהבטיח את האבטחה והיעילות של הרשת. מיזמים קודמים הדרושים לפרוייקט הזה שנעבוד עליו במדריך זה
עשויים להיות מיוחדים במיוחד מאחר שהם יעזרו לך להמחיש ולנתח פעילות רשת בזמן אמת. וזה יאפשר לך להבין כיצד נעשה פתרון בעיות, אופטימיזציה של ביצועים, וניתוח אבטחה במערכות עסקיות.
דרישות מוקדמות
-
Python 3.8 או גרסה חדשה יותר מותקנת במערכת שלך.
-
הבנה בסיסית של מושגי רשתות המחשב.
-
ידע בעבר עם שפת התכנות Python וספריותיה הנפוצות.
-
ידע בסיסי בטכניקות ובספריות ל הצגת נתונים.
איך להתקין את הפרויקט שלך
כדי להתחיל, צריך ליצור את מבנה הפרויקט ולהתקין את הכלים הדרושים עם Pip באמצעות הפקודות הבאות:
mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly
נשתמש ב־Streamlit
להצגת הלוחות, Pandas
לעיבוד הנתונים, Scapy
לתפיסת חבילות רשת ועיבוד החבילות, ולבסוף Plotly
לציור תרשימים עם הנתונים שנאספו.
איך לבנות את התכונות הליביות
נשים את כל הקוד בקובץ אחד בשם dashboard.py
. נתחיל ראשית בייבוא כל הרכיבים שנשתמש בהם:
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket
כעת נגדיר יומן אירועים על ידי הגדרת תצורת יומן בסיסית. יומן זה ישמש למעקב אחר אירועים והרצת האפליקציה שלנו במצב דיבאג. כרגע הגדרנו את רמת הלוגינג להיות INFO
, שפירושה שאירועים ברמת INFO
ומעלה יוצגו. אם אינך מוכר עם לוגינג בפייתון, אני ממליץ לבדוק כאן את קטע המסמך שנכנס לעומק.
# הגדרת לוגינג
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
לבארץ, נבנה את מעבד החבילות שלנו. נממש את הפונקציונליות לעיבוד החבילות שנלכדו במחלקה זו.
class PacketProcessor:
"""Process and analyze network packets"""
def __init__(self):
self.protocol_map = {
1: 'ICMP',
6: 'TCP',
17: 'UDP'
}
self.packet_data = []
self.start_time = datetime.now()
self.packet_count = 0
self.lock = threading.Lock()
def get_protocol_name(self, protocol_num: int) -> str:
"""Convert protocol number to name"""
return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')
def process_packet(self, packet) -> None:
"""Process a single packet and extract relevant information"""
try:
if IP in packet:
with self.lock:
packet_info = {
'timestamp': datetime.now(),
'source': packet[IP].src,
'destination': packet[IP].dst,
'protocol': self.get_protocol_name(packet[IP].proto),
'size': len(packet),
'time_relative': (datetime.now() - self.start_time).total_seconds()
}
# הוספת מידע ספציפי ל־TCP
if TCP in packet:
packet_info.update({
'src_port': packet[TCP].sport,
'dst_port': packet[TCP].dport,
'tcp_flags': packet[TCP].flags
})
# הוספת מידע ספציפי ל־UDP
elif UDP in packet:
packet_info.update({
'src_port': packet[UDP].sport,
'dst_port': packet[UDP].dport
})
self.packet_data.append(packet_info)
self.packet_count += 1
# שמירה רק על אחרונות 10000 חבילות כדי למנוע בעיות זיכרון
if len(self.packet_data) > 10000:
self.packet_data.pop(0)
except Exception as e:
logger.error(f"Error processing packet: {str(e)}")
def get_dataframe(self) -> pd.DataFrame:
"""Convert packet data to pandas DataFrame"""
with self.lock:
return pd.DataFrame(self.packet_data)
מחלקה זו תבנה את הפונקציונליות היסודית שלנו ותכיל מספר פונקציות עזר שיעשו שימוש בעיבוד החבילות.
חבילות רשת מסווגות לשניים ברמת העברת מידע (TCP ו־UDP) ופרוטוקול ICMP ברמת הרשת. אם אינך מוכר עם מושגי TCP/IP, אני ממליץ לבדוק כאן מאמר זה בפורום freeCodeCamp News.
הבונה שלנו יעקוב אחר כל החבילות שנראו שמסווגות לאמצעי תקשורת TCP/IP שהגדרנו. נשים לב גם לזמן תפיסת החבילה, לנתונים שנתפסו, ולמספר החבילות שנתפסו.
נשתמש גם בנעילת תהליך כדי לוודא שרק חבילה אחת נעודף בכל פעם. ניתן להרחיב זאת כדי לאפשר עיבוד מקביל של חבילות בפרויקט.
הפונקציה העוזרת get_protocol_name
עוזרת לנו לקבל את סוג הפרוטוקול הנכון על סמך מספרי הפרוטוקול שלהם. לתת רקע לכך, רשות המספרים המוקצית ברשת (IANA) מקצה מספרים תקניים לזיהוי של פרוטוקולים שונים בחבילת רשת. כאשר אנו רואים את המספרים האלה בחבילת הרשת שנפרקה, נדע איזה סוג של פרוטוקול משמש כרגע בחבילה שהופצה. למען הפרויקט הזה, נמפה רק לTCP, UDP ו-ICMP (Ping). אם נתקל בסוג אחר של חבילה, נסווג אותה כ-OTHER(<protocol_num>)
.
הפונקציה process_packet
מטפלת בתפקוד העיקרי שלנו שיעבד את החבילות האישיות הללו. אם החבילה מכילה שכבת IP, תשים לב לכתובות ה-IP של המקור והיעד, סוג הפרוטוקול, גודל החבילה, והזמן שחלף מאז תחילת תפיסת החבילה.
לחבילות עם פרוטוקולים מסוימים בשכבת ההעברה (כמו TCP ו-UDP), נתפוס את כתובות היציאה והיעד יחד עם דגלי TCP לחבילות TCP. הפרטים שיוצאים משורטגים יאוחסנו בזיכרון ברשימת packet_data
. נשמור גם על סמיכות ה-packet_count
בהעברת החבילות הללו.
הפונקציה get_dataframe
עוזרת לנו להמיר את רשימת packet_data
למסגרת נתונים של Pandas
שתשמש לאחר מכן עבור הוויזואליזציה שלנו.
איך ליצור את הוויזואליזציות של Streamlit
עכשיו הגיע הזמן לבנות את לוח המחוונים האינטראקטיבי שלנו ב-Streamlit. נגדיר פונקציה בשם create_visualization
בסקריפט dashboard.py
(מחוץ למחלקת עיבוד החבילות שלנו).
def create_visualizations(df: pd.DataFrame):
"""Create all dashboard visualizations"""
if len(df) > 0:
# חלוקת פרוטוקולים
protocol_counts = df['protocol'].value_counts()
fig_protocol = px.pie(
values=protocol_counts.values,
names=protocol_counts.index,
title="Protocol Distribution"
)
st.plotly_chart(fig_protocol, use_container_width=True)
# ציר הזמן של החבילות
df['timestamp'] = pd.to_datetime(df['timestamp'])
df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
fig_timeline = px.line(
x=df_grouped.index,
y=df_grouped.values,
title="Packets per Second"
)
st.plotly_chart(fig_timeline, use_container_width=True)
# כתובות ה-IP המקורות המובילות
top_sources = df['source'].value_counts().head(10)
fig_sources = px.bar(
x=top_sources.index,
y=top_sources.values,
title="Top Source IP Addresses"
)
st.plotly_chart(fig_sources, use_container_width=True)
פונקציה זו תקבל את מסגרת הנתונים כהזנה ותעזור לנו לשרטט שלושה תרשימים / גרפים:
-
תרשים חלוקת פרוטוקולים: תרשים זה יציג את הפרופורציה של פרוטוקולים שונים (למשל, TCP, UDP, ICMP) בתעבורת החבילות שנתפסה.
-
תרשים ציר הזמן של החבילות: תרשים זה יראה את מספר החבילות המעובדות בשנייה במשך פרק זמן.
-
תרשים כתובות ה-IP המקורות המובילות: תרשים זה ידגיש את 10 כתובות ה-IP המובילות ששלחו את מספר החבילות הרב ביותר בתעבורה הנתפסת.
תרשים ההפצה של הפרוטוקול הוא פשוט תרשים עוגה של ספירת הפרוטוקולים לשלושת סוגי הפרוטוקולים השונים (יחד עם OTHER). אנו משתמשים בכלים הפייתוניים Streamlit
ו־Plotly
כדי לעצב את התרשימים הללו. מאחר שרשמנו גם את הזמן המדויק מרגע תחילת תפיסת החבילות, נשתמש בנתונים אלו כדי לצייר את מגמת תפיסת החבילות לאורך הזמן.
לתרשים השני, נבצע פעולת groupby
על הנתונים ונקבל את מספר החבילות שנתפסו בכל שנייה (S
מייצג שניות), ולבסוף נצייר את הגרף.
לתרשים השלישי, נספור את כמות כתובות ה־IP מקור ששצפו ונצייר תרשים של ספירת ה־IP כדי להציג את עשרת ה־IP העליונות.
כיצד לתפוס את חבילות הרשת
כעת, נבנה את הפונקציונליות שתאפשר לנו לתפוס נתוני חבילות הרשת.
def start_packet_capture():
"""Start packet capture in a separate thread"""
processor = PacketProcessor()
def capture_packets():
sniff(prn=processor.process_packet, store=False)
capture_thread = threading.Thread(target=capture_packets, daemon=True)
capture_thread.start()
return processor
זוהי פונקציה פשוטה שמייצרת את מחלקת ה־PacketProcessor
ואז משתמשת בפונקציית ה־sniff
במודול ה־scapy
כדי להתחיל לתפוס את החבילות.
אנו משתמשים בעיבוד מקבילי כאן כדי לאפשר לנו לתפוס חבילות באופן עצמאי מזרם התוכנית הראשי. זהו מבטיח כי פעולת תפיסת החבילות אינה חוסמת פעולות אחרות כמו עדכון הלוח בזמן אמת. נחזיר גם את מופע ה־PacketProcessor
שנוצר כדי שיוכל לשמש בתוכנית הראשית שלנו.
הכל יחד
עכשיו נצרף את כל החלקים הללו בעזרת הפונקציה main
שתפקידה לשמש כפונקציית הנהג לתוכנית שלנו.
def main():
"""Main function to run the dashboard"""
st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
st.title("Real-time Network Traffic Analysis")
# הפעלת מעבד חבילות במצב הסשן
if 'processor' not in st.session_state:
st.session_state.processor = start_packet_capture()
st.session_state.start_time = time.time()
# יצירת פריסת לוח בקרה
col1, col2 = st.columns(2)
# קבלת נתונים נוכחיים
df = st.session_state.processor.get_dataframe()
# הצגת מדדים
with col1:
st.metric("Total Packets", len(df))
with col2:
duration = time.time() - st.session_state.start_time
st.metric("Capture Duration", f"{duration:.2f}s")
# הצגת ויזואליזציות
create_visualizations(df)
# הצגת חבילות אחרונות
st.subheader("Recent Packets")
if len(df) > 0:
st.dataframe(
df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
use_container_width=True
)
# הוספת כפתור רענון
if st.button('Refresh Data'):
st.rerun()
# רענון אוטומטי
time.sleep(2)
st.rerun()
פונקציה זו תפעיל את לוח הבקרה של Streamlit
, ותשלב את כל הרכיבים שלנו יחד. נקבע תחילה את כותרת העמוד של לוח הבקרה שלנו ואז נאתחל את PacketProcessor
שלנו. אנו משתמשים במצב הסשן ב־Streamlit
כדי לוודא שנוצר רק מופע אחד של תפיסת חבילות ושהמצב שלו נשמר.
עכשיו, נקבל את מסגרת הנתונים ממצב הסשן בכל פעם שהנתונים מעובדים ונתחיל להציג את המדדים והוויזואליזציות. נציג גם את החבילות שתפסנו לאחרונה יחד עם מידע כמו הזמן, כתובות ה־IP ממקור ומיעד, פרוטוקול וגודל החבילה. נוסיף גם את היכולת לרענן באופן ידני את הנתונים מלוח הבקרה בזמן שנרענן אותם באופן אוטומטי כל שתי שניות.
ואז נפעיל את התוכנית עם הפקודה הבאה:
sudo streamlit run dashboard.py
שימו לב שעליכם להריץ את התוכנית עם sudo
מכיוון שיכולות לכידת החבילות דורשות הרשאות מנהל. אם אתם משתמשים ב-Windows, פתחו את המסוף שלכם כמנהל ולאחר מכן הריצו את התוכנית ללא הקידומת sudo
.
תנו לתוכנית רגע כדי להתחיל ללכוד חבילות. אם הכל מתנהל כראוי, אתם אמורים לראות משהו כזה:
אלו כל הוויזואליזציות שהטמענו בתוכנית לוח הבקרה של Streamlit
.
שיפורים עתידיים
עם זאת, הנה כמה רעיונות לשיפורים עתידיים שאתם יכולים להשתמש בהם כדי להרחיב את הפונקציות של לוח הבקרה:
-
הוסיפו יכולות למידת מכונה לזיהוי אנומליות
-
יישמו מיפוי IP גיאוגרפי
-
צרו התראות מותאמות אישית בהתבסס על דפוסי ניתוח תעבורה
-
הוסיפו אפשרויות ניתוח תוכן חבילות
סיכום
מזל טוב! עכשיו בניתם בהצלחה לוח בקרה לניתוח תעבורת רשת בזמן אמת עם Python ו-Streamlit
. תוכנית זו תספק תובנות יקרות ערך לגבי התנהגות הרשת וניתן להרחיב אותה למגוון שימושים, החל ממעקב אבטחה ועד אופטימיזציה של רשת.
עם זאת, אני מקווה שלמדת קצת יסודות על ניתוח תעבורת רשת וגם קצת תכנות ב-Python. תודה על הקריאה!