هل سبق لك أن رغبت في تصور حركة مرور شبكتك في الوقت الحقيقي؟ في هذا البرنامج التعليمي، ستتعلم كيفية بناء لوحة تحليل حركة المرور على الشبكة تفاعلية باستخدام Python وStreamlit
. Streamlit
هو إطار عمل Python مفتوح المصدر يمكنك استخدامه لتطوير تطبيقات الويب لتحليل البيانات ومعالجتها.
بحلول نهاية هذا البرنامج التعليمي، ستعرف كيفية التقاط حزم الشبكة الخام من بطاقة الشبكة (Network Interface Card) على جهاز الكمبيوتر الخاص بك، معالجة البيانات، وإنشاء تصورات بصرية جميلة ستتحدث في الوقت الحقيقي.
جدول المحتويات
لماذا يعتبر تحليل حركة الشبكة مهمًا؟
تحليل حركة المرور في الشبكة هو متطلب حرج في الشركات حيث تشكل الشبكات العمود الفقري لمعظم التطبيقات والخدمات تقريبًا. في جوهره، لدينا تحليل لحزم الشبكة الذي ينطوي على مراقبة الشبكة، والتقاط كل حركة المرور (الواردة والصادرة)، وتفسير هذه الحزم أثناء تدفقها عبر الشبكة. يمكنك استخدام هذه التقنية لتحديد أنماط الأمان، واكتشاف الشواذ، وضمان أمان وكفاءة الشبكة.
هذا المشروع النموذجي الذي سنعمل عليه في هذا البرنامج التعليمي مفيد بشكل خاص لأنه يساعدك على تصور وتحليل نشاط الشبكة في الوقت الحقيقي. وسيسمح لك هذا بفهم كيفية حل مشاكل الأداء وتحسين الأداء وتحليل الأمان في أنظمة المؤسسات.
المتطلبات المسبقة
-
تثبيت Python 3.8 أو أحدث نسخة على نظامك.
-
فهم أساسي لمفاهيم شبكات الحاسوب.
-
المعرفة بلغة البرمجة Python ومكتباتها المستخدمة على نطاق واسع.
-
المعرفة الأساسية بتقنيات ومكتبات تصور البيانات.
كيفية إعداد مشروعك
للبدء، قم بإنشاء هيكل المشروع وتثبيت الأدوات اللازمة باستخدام Pip بالأوامر التالية:
mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly
سنستخدم Streamlit
لتصوري لوحة التحكم، Pandas
لمعالجة البيانات، Scapy
لالتقاط حزم الشبكة ومعالجتها، وأخيراً Plotly
لرسم الرسوم البيانية باستخدام البيانات التي تم جمعها.
كيفية بناء الوظائف الأساسية
سنقوم بوضع جميع الشفرات في ملف واحد يسمى dashboard.py
. أولاً، دعنا نبدأ بإستيراد جميع العناصر التي سنستخدمها:
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket
الآن دعنا نقوم بتكوين تسجيل عن طريق إعداد تكوين تسجيل أساسي. سيتم استخدام هذا لتتبع الأحداث وتشغيل تطبيقنا في وضع التصحيح. لقد ضبطنا مستوى التسجيل حاليًا ليكون INFO
، مما يعني أن الأحداث ذات المستوى INFO
أو أعلى ستعرض. إذا كنت غير ملم بالتسجيل في Python، فأوصي بالتحقق من this الوثيقة التي تعمق في الموضوع.
# تكوين التسجيل
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
بعد ذلك، سنقوم ببناء معالج الحزم. سننفذ وظيفة معالجة الحزم التي تم التقاطها في هذه الفئة.
class PacketProcessor:
"""Process and analyze network packets"""
def __init__(self):
self.protocol_map = {
1: 'ICMP',
6: 'TCP',
17: 'UDP'
}
self.packet_data = []
self.start_time = datetime.now()
self.packet_count = 0
self.lock = threading.Lock()
def get_protocol_name(self, protocol_num: int) -> str:
"""Convert protocol number to name"""
return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')
def process_packet(self, packet) -> None:
"""Process a single packet and extract relevant information"""
try:
if IP in packet:
with self.lock:
packet_info = {
'timestamp': datetime.now(),
'source': packet[IP].src,
'destination': packet[IP].dst,
'protocol': self.get_protocol_name(packet[IP].proto),
'size': len(packet),
'time_relative': (datetime.now() - self.start_time).total_seconds()
}
# إضافة معلومات خاصة ببروتوكول TCP
if TCP in packet:
packet_info.update({
'src_port': packet[TCP].sport,
'dst_port': packet[TCP].dport,
'tcp_flags': packet[TCP].flags
})
# إضافة معلومات خاصة ببروتوكول UDP
elif UDP in packet:
packet_info.update({
'src_port': packet[UDP].sport,
'dst_port': packet[UDP].dport
})
self.packet_data.append(packet_info)
self.packet_count += 1
# الاحتفاظ بآخر 10000 حزمة فقط لمنع مشكلات الذاكرة
if len(self.packet_data) > 10000:
self.packet_data.pop(0)
except Exception as e:
logger.error(f"Error processing packet: {str(e)}")
def get_dataframe(self) -> pd.DataFrame:
"""Convert packet data to pandas DataFrame"""
with self.lock:
return pd.DataFrame(self.packet_data)
سيقوم هذا الفئة ببناء وظيفتنا الأساسية ولديها عدة وظائف توضعية ستستخدم لمعالجة الحزم.
تُصنف حزم الشبكة إلى قسمين على مستوى النقل (TCP و UDP) وبروتوكول ICMP على مستوى الشبكة. إذا كنت غير ملم بمفاهيم TCP/IP، فأوصي بالتحقق من this المقالة على موقع freeCodeCamp News.
سيقوم مُنشئنا بتتبع جميع الحزم التي تم رؤيتها والتي تصنف في هذه الفئات من أنواع بروتوكول TCP/IP التي قمنا بتعريفها. كما سنلاحظ وقت التقاط الحزمة، والبيانات الملتقطة، وعدد الحزم الملتقطة.
سنستفيد أيضًا من قفل الموضوع لضمان معالجة حزمة واحدة في كل مرة. يمكن مد هذا لتمكين المشروع من معالجة الحزم بشكل موازي.
تساعد وظيفة المساعد get_protocol_name
في الحصول على نوع البروتوكول الصحيح بناءً على أرقام البروتوكول الخاصة بهم. لإعطاء بعض الخلفية على هذا، تُسند السلطة المخصصة للأرقام مهام موحدة لتحديد البروتوكولات المختلفة في حزمة شبكة. عندما نرى هذه الأرقام في حزمة الشبكة المحللة، سنعرف نوع البروتوكول الذي يتم استخدامه في الحزمة التي تم التقاطها حاليًا. من أجل نطاق هذا المشروع، سنقوم بتعيينها إلى TCP، UDP و ICMP (Ping) فقط. إذا واجهنا أي نوع آخر من الحزم، سنصنفها على أنها OTHER(<protocol_num>)
.
تتولى وظيفة process_packet
وظيفتنا الأساسية التي ستعالج هذه الحزم الفردية. إذا احتوت الحزمة على طبقة IP، ستلاحظ عناوين IP المصدر والوجهة، نوع البروتوكول، حجم الحزمة، والزمن المنقضي منذ بدء التقاط الحزمة.
بالنسبة للحزم ذات البروتوكولات النقل المحددة (مثل TCP و UDP)، سنلتقط منافذ المصدر والوجهة مع علامات TCP لحزم TCP. سيتم تخزين التفاصيل المستخرجة هذه في الذاكرة في قائمة packet_data
. سنواصل أيضًا تتبع packet_count
عندما يتم معالجة هذه الحزم.
تساعد وظيفة get_dataframe
في تحويل قائمة packet_data
إلى إطار بيانات Pandas
الذي سيتم استخدامه للتصور البياني الخاص بنا.
كيفية إنشاء تصورات Streamlit
الآن حان الوقت لبناء لوحة تحكم Streamlit تفاعلية. سنقوم بتعريف وظيفة تسمى create_visualization
في سيناريو dashboard.py
(خارج فئة معالجة الحزمة لدينا).
def create_visualizations(df: pd.DataFrame):
"""Create all dashboard visualizations"""
if len(df) > 0:
# توزيع البروتوكول
protocol_counts = df['protocol'].value_counts()
fig_protocol = px.pie(
values=protocol_counts.values,
names=protocol_counts.index,
title="Protocol Distribution"
)
st.plotly_chart(fig_protocol, use_container_width=True)
# الجدول الزمني للحزم
df['timestamp'] = pd.to_datetime(df['timestamp'])
df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
fig_timeline = px.line(
x=df_grouped.index,
y=df_grouped.values,
title="Packets per Second"
)
st.plotly_chart(fig_timeline, use_container_width=True)
# أفضل عناوين IP المصدر
top_sources = df['source'].value_counts().head(10)
fig_sources = px.bar(
x=top_sources.index,
y=top_sources.values,
title="Top Source IP Addresses"
)
st.plotly_chart(fig_sources, use_container_width=True)
ستأخذ هذه الوظيفة الإطار البياناتي كمدخل وستساعدنا في رسم ثلاثة رسوم بيانية:
-
مخطط توزيع البروتوكول: سيعرض هذا المخطط نسبة البروتوكولات المختلفة (على سبيل المثال، TCP، UDP، ICMP) في حركة حزم البيانات الملتقطة.
-
مخطط الجدول الزمني للحزم: سيظهر هذا المخطط عدد الحزم التي تم معالجتها في الثانية على مدى فترة زمنية.
-
مخطط أعلى عناوين IP المصدر: سيسلط هذا المخطط الضوء على أعلى 10 عناوين IP التي أرسلت أكبر عدد من الحزم في حركة البيانات الملتقطة.
يعتبر مخطط توزيع البروتوكول ببساطة مخطط دائري لعدد البروتوكولات للأنواع الثلاثة المختلفة (بالإضافة إلى “أخرى”). نحن نستخدم أدوات Python Streamlit
و Plotly
لرسم هذه الرسوم البيانية. نظرًا لأننا لاحظنا الطابع الزمني منذ بدء تسجيل الحزمة، سنستخدم هذه البيانات لرسم اتجاه الحزم المسجلة مع مرور الوقت.
بالنسبة للرسم البياني الثاني، سنقوم بعملية groupby
على البيانات ونحصل على عدد الحزم المسجلة في كل ثانية (تمثل S
الثواني)، ثم أخيرًا سنقوم برسم الرسم البياني.
وأخيرًا، بالنسبة للرسم البياني الثالث، سنقوم بعد الاحتساب بعدد عناوين الـ IP المراقبة ورسم رسم بياني لعدد العناوين IP لعرض أفضل 10 عناوين IP.
كيفية التقاط حزم الشبكة
الآن، دعونا نبني الوظيفة للسماح لنا بالتقاط بيانات حزم الشبكة.
def start_packet_capture():
"""Start packet capture in a separate thread"""
processor = PacketProcessor()
def capture_packets():
sniff(prn=processor.process_packet, store=False)
capture_thread = threading.Thread(target=capture_packets, daemon=True)
capture_thread.start()
return processor
هذه وظيفة بسيطة تقوم بإنشاء فئة PacketProcessor
ثم تستخدم وظيفة sniff
في وحدة scapy
لبدء التقاط الحزم.
نستخدم التقسيم هنا للسماح لنا بالتقاط الحزم بشكل مستقل عن تدفق البرنامج الرئيسي. يضمن هذا أن عملية التقاط الحزم لا تحجب عمليات أخرى مثل تحديث لوحة التحكم في الوقت الحقيقي. كما نقوم بإرجاع نسخة الـ PacketProcessor
التي تم إنشاؤها حتى يمكن استخدامها في برنامجنا الرئيسي.
وضع كل شيء معًا
الآن دعنا نجمع كل هذه القطع مع وظيفتنا main
التي ستعمل كوظيفة تشغيل برنامجنا.
def main():
"""Main function to run the dashboard"""
st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
st.title("Real-time Network Traffic Analysis")
# تهيئة معالج الحزم في حالة الجلسة
if 'processor' not in st.session_state:
st.session_state.processor = start_packet_capture()
st.session_state.start_time = time.time()
# إنشاء تخطيط لوحة القيادة
col1, col2 = st.columns(2)
# الحصول على البيانات الحالية
df = st.session_state.processor.get_dataframe()
# عرض المقاييس
with col1:
st.metric("Total Packets", len(df))
with col2:
duration = time.time() - st.session_state.start_time
st.metric("Capture Duration", f"{duration:.2f}s")
# عرض التصويرات البصرية
create_visualizations(df)
# عرض الحزم الأخيرة
st.subheader("Recent Packets")
if len(df) > 0:
st.dataframe(
df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
use_container_width=True
)
# إضافة زر التحديث
if st.button('Refresh Data'):
st.rerun()
# التحديث التلقائي
time.sleep(2)
st.rerun()
سيقوم هذا الدالة أيضًا بتهيئة لوحة القيادة Streamlit
، ودمج جميع مكوناتنا معًا. نقوم أولاً بتعيين عنوان الصفحة لوحة القيادة Streamlit
الخاصة بنا ثم نقوم بتهيئة PacketProcessor
الخاص بنا. نستخدم حالة الجلسة في Streamlit
لضمان أنه تم إنشاء مثيل واحد فقط من التقاط الحزمة وأن حالته محتفظة.
الآن، سنحصل ديناميكيًا على إطار البيانات من حالة الجلسة في كل مرة يتم فيها معالجة البيانات ونبدأ في عرض المقاييس والتصويرات البصرية. سنعرض أيضًا الحزم التي تم التقاطها مؤخرًا مع معلومات مثل الطابع الزمني، عنوان IP المصدر والوجهة، البروتوكول، وحجم الحزمة. سنضيف أيضًا القدرة على تحديث البيانات يدويًا من لوحة القيادة بينما نقوم أيضًا بتحديثها تلقائيًا كل ثانيتين.
دعونا في النهاية نشغل البرنامج بالأمر التالي:
sudo streamlit run dashboard.py
يرجى ملاحظة أنه يجب تشغيل البرنامج باستخدام sudo
حيث تتطلب قدرات اصطياد الحزم امتيازات إدارية. إذا كنت تستخدم نظام Windows، فافتح الطرفية الخاصة بك كمسؤول ثم قم بتشغيل البرنامج بدون بادئة sudo
.
امنح البرنامج لحظة لبدء اصطياد الحزم. إذا كان كل شيء على ما يرام، يجب أن ترى شيئًا مشابهًا لهذا:
هذه كل التصورات التي قمنا بتنفيذها في برنامج لوحة تحكم Streamlit
الخاص بنا.
تعزيزات مستقبلية
بهذا، إليك بعض أفكار تعزيزات المستقبل التي يمكنك استخدامها لتوسيع وظائف اللوحة الخاصة بك:
-
إضافة إمكانيات التعلم الآلي للكشف عن الشذوذ
-
تنفيذ رسم خرائط الآي بي الجغرافية
-
إنشاء تنبيهات مخصصة استنادًا إلى أنماط تحليل حركة المرور
-
إضافة خيارات تحليل حمولة الحزم
الاستنتاج
تهانينا! لقد قمت الآن بنجاح ببناء لوحة تحكم لتحليل حركة الشبكة في الوقت الفعلي باستخدام Python و Streamlit
. سيوفر هذا البرنامج رؤى قيمة حول سلوك الشبكة ويمكن توسيعه لمختلف حالات الاستخدام، من مراقبة الأمان إلى تحسين الشبكة.
مع ذلك، آمل أن تكون قد تعلمت بعض الأساسيات حول تحليل حركة الشبكة بالإضافة إلى قليلاً من برمجة بايثون. شكرًا لقراءتك!