Вы когда-нибудь хотели визуализировать свой сетевой трафик в реальном времени? В этом уроке вы научитесь создавать интерактивную панель анализа сетевого трафика с помощью Python и Streamlit
. Streamlit
— это фреймворк Python с открытым исходным кодом, который вы можете использовать для разработки веб-приложений для анализа и обработки данных.
К концу этого урока вы узнаете, как захватывать необработанные сетевые пакеты с сетевой карты (NIC) вашего компьютера, обрабатывать данные и создавать красивые визуализации, которые будут обновляться в реальном времени.
Содержание
Почему анализ сетевого трафика важен?
Анализ сетевого трафика является критическим требованием в предприятиях, где сети являются основой практически каждого приложения и сервиса. В центре этого находится анализ сетевых пакетов, который включает в себя мониторинг сети, захват всего трафика (входящего и исходящего) и интерпретацию этих пакетов по мере их прохождения через сеть. Вы можете использовать эту технику для выявления шаблонов безопасности, обнаружения аномалий и обеспечения безопасности и эффективности сети.
Этот проект концепции, над которым мы будем работать в этом учебнике, особенно полезен, поскольку позволяет визуализировать и анализировать сетевую активность в реальном времени. Это позволит вам понять, как выполняется устранение проблем, оптимизация производительности и анализ безопасности в корпоративных системах.
Предварительные требования
-
Python 3.8 или более новая версия, установленная на вашей системе.
-
Базовое понимание концепций компьютерных сетей.
-
Знакомство с языком программирования Python и его широко используемыми библиотеками.
-
Базовые знания методов и библиотек визуализации данных.
Как настроить ваш проект
Для начала создайте структуру проекта и установите необходимые инструменты с помощью Pip с помощью следующих команд:
mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly
Мы будем использовать Streamlit
для визуализации панели управления, Pandas
для обработки данных, Scapy
для захвата сетевых пакетов и их обработки, а также Plotly
для построения графиков с нашими собранными данными.
Как построить основные функции
Весь код будет помещен в один файл с именем dashboard.py
. Во-первых, начнем с импорта всех элементов, которые мы будем использовать:
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket
Теперь давайте настроим ведение журналов, установив базовую конфигурацию логирования. Это будет использоваться для отслеживания событий и запуска нашего приложения в режиме отладки. В настоящее время мы установили уровень логирования на INFO
, что означает, что будут отображаться события с уровнем INFO
или выше. Если вы не знакомы с логированием в Python, я рекомендую ознакомиться с этой документацией, которая подробно объясняет тему.
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
Далее мы создадим наш процессор пакетов. Мы реализуем функциональность обработки захваченных пакетов в этом классе.
class PacketProcessor:
"""Process and analyze network packets"""
def __init__(self):
self.protocol_map = {
1: 'ICMP',
6: 'TCP',
17: 'UDP'
}
self.packet_data = []
self.start_time = datetime.now()
self.packet_count = 0
self.lock = threading.Lock()
def get_protocol_name(self, protocol_num: int) -> str:
"""Convert protocol number to name"""
return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')
def process_packet(self, packet) -> None:
"""Process a single packet and extract relevant information"""
try:
if IP in packet:
with self.lock:
packet_info = {
'timestamp': datetime.now(),
'source': packet[IP].src,
'destination': packet[IP].dst,
'protocol': self.get_protocol_name(packet[IP].proto),
'size': len(packet),
'time_relative': (datetime.now() - self.start_time).total_seconds()
}
# Добавить информацию, специфичную для TCP
if TCP in packet:
packet_info.update({
'src_port': packet[TCP].sport,
'dst_port': packet[TCP].dport,
'tcp_flags': packet[TCP].flags
})
# Добавить информацию, специфичную для UDP
elif UDP in packet:
packet_info.update({
'src_port': packet[UDP].sport,
'dst_port': packet[UDP].dport
})
self.packet_data.append(packet_info)
self.packet_count += 1
# Хранить только последние 10000 пакетов, чтобы избежать проблем с памятью
if len(self.packet_data) > 10000:
self.packet_data.pop(0)
except Exception as e:
logger.error(f"Error processing packet: {str(e)}")
def get_dataframe(self) -> pd.DataFrame:
"""Convert packet data to pandas DataFrame"""
with self.lock:
return pd.DataFrame(self.packet_data)
Этот класс будет строить нашу основную функциональность и включает несколько утилитных функций, которые будут использоваться для обработки пакетов.
Сетевые пакеты классифицируются на два типа на транспортном уровне (TCP и UDP) и протокол ICMP на сетевом уровне. Если вы не знакомы с концепциями TCP/IP, я рекомендую ознакомиться с этой статьей на freeCodeCamp News.
Наш конструктор будет отслеживать все полученные пакеты, которые классифицируются по типам протоколов TCP/IP, которые мы определили. Мы также учтем время захвата пакета, захваченные данные и количество захваченных пакетов.
Мы также будем использовать блокировку потока для обеспечения обработки только одного пакета за раз. Это может быть дополнительно расширено для обеспечения параллельной обработки пакетов в проекте.
Вспомогательная функция get_protocol_name
помогает нам получить правильный тип протокола на основе их номеров протоколов. Для того чтобы дать некоторое представление об этом, Управление назначенных номеров Интернета (IANA) назначает стандартизированные номера для идентификации различных протоколов в сетевом пакете. Как только мы увидим эти номера в разобранном сетевом пакете, мы узнаем, какой протокол используется в текущем перехваченном пакете. Для целей этого проекта мы будем сопоставлять только TCP, UDP и ICMP (Ping). Если мы столкнемся с любым другим типом пакета, мы классифицируем его как ДРУГОЕ(<номер_протокола>)
.
Функция process_packet
обрабатывает нашу основную функциональность, которая будет обрабатывать эти отдельные пакеты. Если пакет содержит уровень IP, она будет учитывать исходный и конечный IP-адреса, тип протокола, размер пакета и время, прошедшее с начала захвата пакета.
Для пакетов с конкретными протоколами транспортного уровня (например, TCP и UDP) мы будем захватывать исходные и конечные порты вместе с флагами TCP для пакетов TCP. Эти извлеченные детали будут храниться в памяти в списке packet_data
. Мы также будем отслеживать packet_count
по мере обработки этих пакетов.
Функция get_dataframe
помогает нам преобразовать список packet_data
в объект данных Pandas
, который затем будет использован для визуализации.
Как создать визуализации в Streamlit
Теперь пришло время создать интерактивную панель управления Streamlit. Мы определим функцию с именем create_visualization
в скрипте dashboard.py
(вне класса обработки пакетов).
def create_visualizations(df: pd.DataFrame):
"""Create all dashboard visualizations"""
if len(df) > 0:
# Распределение протоколов
protocol_counts = df['protocol'].value_counts()
fig_protocol = px.pie(
values=protocol_counts.values,
names=protocol_counts.index,
title="Protocol Distribution"
)
st.plotly_chart(fig_protocol, use_container_width=True)
# Временная шкала пакетов
df['timestamp'] = pd.to_datetime(df['timestamp'])
df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
fig_timeline = px.line(
x=df_grouped.index,
y=df_grouped.values,
title="Packets per Second"
)
st.plotly_chart(fig_timeline, use_container_width=True)
# Топ исходных IP-адресов
top_sources = df['source'].value_counts().head(10)
fig_sources = px.bar(
x=top_sources.index,
y=top_sources.values,
title="Top Source IP Addresses"
)
st.plotly_chart(fig_sources, use_container_width=True)
Эта функция будет принимать фрейм данных в качестве входных данных и поможет нам построить три графика:
-
График распределения протоколов: Этот график отобразит долю различных протоколов (например, TCP, UDP, ICMP) в захваченном трафике пакетов.
-
График временной шкалы пакетов: Этот график покажет количество обработанных пакетов в секунду за определенный период времени.
-
График топовых исходных IP-адресов: Этот график выделит топ-10 IP-адресов, отправивших наибольшее количество пакетов в захваченном трафике.
График распределения протоколов — это просто круговая диаграмма количества протоколов для трех различных типов (вместе с ДРУГИМИ). Мы используем инструменты Python Streamlit
и Plotly
для построения этих графиков. Поскольку мы также отметили временную метку с момента начала захвата пакетов, мы будем использовать эти данные для построения тренда захваченных пакетов с течением времени.
Для второго графика мы проведем операцию groupby
над данными и получим количество захваченных пакетов за каждую секунду (S
обозначает секунды), а затем, наконец, построим график.
Наконец, для третьего графика мы сосчитаем уникальные наблюдаемые IP-адреса источников и построим график количества IP-адресов, чтобы показать топ-10 IP.
Как захватить сетевые пакеты
Теперь давайте создадим функциональность, которая позволит нам захватывать данные сетевых пакетов.
def start_packet_capture():
"""Start packet capture in a separate thread"""
processor = PacketProcessor()
def capture_packets():
sniff(prn=processor.process_packet, store=False)
capture_thread = threading.Thread(target=capture_packets, daemon=True)
capture_thread.start()
return processor
Это простая функция, которая создает экземпляр класса PacketProcessor
, а затем использует функцию sniff
из модуля scapy
, чтобы начать захват пакетов.
Мы используем потоки для того, чтобы захватывать пакеты независимо от основного потока программы. Это гарантирует, что операция захвата пакетов не блокирует другие операции, такие как обновление панели управления в реальном времени. Мы также возвращаем созданный экземпляр PacketProcessor
, чтобы его можно было использовать в нашей основной программе.
Собираем все вместе
Теперь давайте соединим все эти части с нашей функцией main
, которая будет выступать в качестве управляющей функции для нашей программы.
def main():
"""Main function to run the dashboard"""
st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
st.title("Real-time Network Traffic Analysis")
# Инициализировать процессор пакетов в состоянии сессии
if 'processor' not in st.session_state:
st.session_state.processor = start_packet_capture()
st.session_state.start_time = time.time()
# Создать макет панели инструментов
col1, col2 = st.columns(2)
# Получить текущие данные
df = st.session_state.processor.get_dataframe()
# Отобразить метрики
with col1:
st.metric("Total Packets", len(df))
with col2:
duration = time.time() - st.session_state.start_time
st.metric("Capture Duration", f"{duration:.2f}s")
# Отобразить визуализации
create_visualizations(df)
# Отобразить последние пакеты
st.subheader("Recent Packets")
if len(df) > 0:
st.dataframe(
df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
use_container_width=True
)
# Добавить кнопку обновления
if st.button('Refresh Data'):
st.rerun()
# Автообновление
time.sleep(2)
st.rerun()
Эта функция также создаст панель инструментов Streamlit
и интегрирует все наши компоненты вместе. Сначала мы устанавливаем заголовок страницы нашей панели инструментов Streamlit
, а затем инициализируем наш PacketProcessor
. Мы используем состояние сессии в Streamlit
, чтобы гарантировать, что создается только один экземпляр захвата пакетов, и его состояние сохраняется.
Теперь мы будем динамически получать фрейм данных из состояния сессии каждый раз, когда обрабатываются данные, и начнем отображать метрики и визуализации. Мы также отобразим недавно захваченные пакеты вместе с информацией, такой как временная метка, IP-адреса источника и назначения, протокол и размер пакета. Мы также добавим возможность пользователю вручную обновлять данные с панели инструментов, в то время как мы также будем автоматически обновлять их каждые две секунды.
Давайте, наконец, запустим программу с помощью следующей команды:
sudo streamlit run dashboard.py
Обратите внимание, что вам придется запускать программу с sudo
, так как возможности захвата пакетов требуют административных привилегий. Если вы на Windows, откройте терминал от имени администратора и затем запустите программу без префикса sudo
.
Подождите немного, пока программа начнет захватывать пакеты. Если все пройдет успешно, вы должны увидеть что-то вроде этого:
Это все визуализации, которые мы только что реализовали в нашей программе панели управления Streamlit
.
Будущие улучшения
Итак, вот несколько идей для будущих улучшений, которые вы можете использовать для расширения функциональности панели:
-
Добавьте возможности машинного обучения для обнаружения аномалий
-
Реализуйте географическое картирование IP
-
Создайте пользовательские оповещения на основе анализа трафика
-
Добавьте опции анализа полезной нагрузки пакетов
Заключение
Поздравляем! Вы успешно создали панель управления для анализа сетевого трафика в реальном времени с помощью Python и Streamlit
. Эта программа предоставит ценные сведения о поведении сети и может быть расширена для различных случаев использования, от мониторинга безопасности до оптимизации сети.
С этим я надеюсь, что вы узнали основные вещи о анализе сетевого трафика, а также немного о программировании на Python. Спасибо за чтение!