您是否曾经想要实时可视化您的网络流量?在本教程中,您将学习如何使用Python和Streamlit构建交互式网络流量分析仪表板。Streamlit是一个开源的Python框架,您可以用它来开发用于数据分析和数据处理的Web应用程序。

在本教程结束时,您将知道如何捕获计算机网卡(Network Interface Card,NIC)的原始网络数据包,处理数据,并创建美丽的可视化图表,这些图表将实时更新。

目录

为什么网络流量分析很重要?

网络流量分析是企业中的一个关键要求,在这些企业中,网络几乎是每个应用程序和服务的基础。在其核心,我们有网络数据包的分析,它涉及监视网络、捕获所有流量(入站和出站),并解释这些数据包在网络中流动的过程。您可以使用这种技术来识别安全模式、检测异常,并确保网络的安全性和效率。

这个教程中我们将要进行的概念验证项目特别有用,因为它可以帮助您实时可视化和分析网络活动。这将使您能够了解在企业系统中如何进行故障排除、性能优化和安全分析。

先决条件

如何设置你的项目

要开始,创建项目结构并使用以下命令通过Pip安装必要的工具:

mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly

我们将使用Streamlit进行仪表板可视化,使用Pandas进行数据处理,使用Scapy进行网络数据包捕获和处理,最后使用Plotly根据我们收集的数据绘制图表。

如何构建核心功能

我们将把所有代码放在一个名为dashboard.py的单一文件中。首先,让我们开始导入我们将使用的所有元素:

import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket

现在让我们通过设置基本的日志配置来配置日志记录。这将用于跟踪事件并在调试模式下运行我们的应用程序。我们目前已将日志级别设置为INFO,这意味着将显示级别为INFO或更高的事件。如果您对Python中的日志记录不熟悉,我建议查看this深入探讨的文档。

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

接下来,我们将构建我们的数据包处理器。我们将在这个类中实现处理我们捕获的数据包的功能。

class PacketProcessor:
    """Process and analyze network packets"""

    def __init__(self):
        self.protocol_map = {
            1: 'ICMP',
            6: 'TCP',
            17: 'UDP'
        }
        self.packet_data = []
        self.start_time = datetime.now()
        self.packet_count = 0
        self.lock = threading.Lock()

    def get_protocol_name(self, protocol_num: int) -> str:
        """Convert protocol number to name"""
        return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')

    def process_packet(self, packet) -> None:
        """Process a single packet and extract relevant information"""
        try:
            if IP in packet:
                with self.lock:
                    packet_info = {
                        'timestamp': datetime.now(),
                        'source': packet[IP].src,
                        'destination': packet[IP].dst,
                        'protocol': self.get_protocol_name(packet[IP].proto),
                        'size': len(packet),
                        'time_relative': (datetime.now() - self.start_time).total_seconds()
                    }

                    # 添加特定于TCP的信息
                    if TCP in packet:
                        packet_info.update({
                            'src_port': packet[TCP].sport,
                            'dst_port': packet[TCP].dport,
                            'tcp_flags': packet[TCP].flags
                        })

                    # 添加特定于UDP的信息
                    elif UDP in packet:
                        packet_info.update({
                            'src_port': packet[UDP].sport,
                            'dst_port': packet[UDP].dport
                        })

                    self.packet_data.append(packet_info)
                    self.packet_count += 1

                    # 仅保留最后10000个数据包以防止内存问题
                    if len(self.packet_data) > 10000:
                        self.packet_data.pop(0)

        except Exception as e:
            logger.error(f"Error processing packet: {str(e)}")

    def get_dataframe(self) -> pd.DataFrame:
        """Convert packet data to pandas DataFrame"""
        with self.lock:
            return pd.DataFrame(self.packet_data)

这个类将构建我们的核心功能,并有几个实用函数,将用于处理数据包。

在传输层,网络数据包分为两类(TCP和UDP),在网络层有ICMP协议。如果您对TCP/IP的概念不熟悉,我建议查看this在freeCodeCamp News上的文章。

我们的构造函数将跟踪所有已分类到我们定义的这些TCP/IP协议类型桶中的数据包。我们还将记录数据包捕获时间、捕获的数据以及捕获的数据包数量。

我们还将利用线程锁来确保一次只处理一个数据包。这可以进一步扩展以实现项目具有并行数据包处理的功能。

辅助函数get_protocol_name帮助我们根据协议号获取协议的正确类型。给出一些背景,互联网数字分配机构(IANA)分配标准化数字来识别网络数据包中的不同协议。每当我们在解析的网络数据包中看到这些数字时,我们将知道当前拦截的数据包中正在使用的是何种协议。在此项目范围内,我们将仅映射到TCP、UDP和ICMP(Ping)。如果遇到其他类型的数据包,我们将将其分类为OTHER(<protocol_num>)

函数process_packet处理将处理这些单独数据包的核心功能。如果数据包包含IP层,它将注意源IP地址、目标IP地址、协议类型、数据包大小以及自开始捕获数据包以来经过的时间。

对于具有特定传输层协议(如TCP和UDP)的数据包,我们将捕获源端口和目标端口以及TCP数据包的TCP标志。这些提取的详细信息将存储在packet_data列表中的内存中。我们还将跟踪packet_count,随着这些数据包被处理。

get_dataframe函数帮助我们将packet_data列表转换为一个Pandas数据帧,然后将用于我们的可视化。

如何创建 Streamlit 可视化

现在是时候建立我们的交互式 Streamlit 仪表板了。我们将在dashboard.py脚本中定义一个名为create_visualization的函数(在数据包处理类之外)。

def create_visualizations(df: pd.DataFrame):
    """Create all dashboard visualizations"""
    if len(df) > 0:
        # 协议分布
        protocol_counts = df['protocol'].value_counts()
        fig_protocol = px.pie(
            values=protocol_counts.values,
            names=protocol_counts.index,
            title="Protocol Distribution"
        )
        st.plotly_chart(fig_protocol, use_container_width=True)

        # 数据包时间线
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
        fig_timeline = px.line(
            x=df_grouped.index,
            y=df_grouped.values,
            title="Packets per Second"
        )
        st.plotly_chart(fig_timeline, use_container_width=True)

        # 前十大源 IP
        top_sources = df['source'].value_counts().head(10)
        fig_sources = px.bar(
            x=top_sources.index,
            y=top_sources.values,
            title="Top Source IP Addresses"
        )
        st.plotly_chart(fig_sources, use_container_width=True)

此函数将以数据帧作为输入,并帮助我们绘制三个图表:

  1. 协议分布图:此图表将显示捕获的数据包流量中不同协议(例如,TCP、UDP、ICMP)的比例。

  2. 数据包时间线图:此图表将显示每秒处理的数据包数量,时间跨度为一定时期。

  3. 前十大源 IP 地址图:此图表将突出显示在捕获的流量中发送最多数据包的前十个 IP 地址。

协议分布图仅是三种不同类型(以及其他)协议计数的饼图。我们使用StreamlitPlotly Python工具绘制这些图表。由于我们还记录了数据包捕获开始时的时间戳,我们将使用这些数据来绘制随时间变化的数据包捕获趋势。

对于第二个图表,我们将在数据上执行groupby操作,并获取每秒捕获的数据包数量(S代表秒),然后最终绘制图表。

最后,对于第三个图表,我们将统计观察到的不同源IP,并绘制IP计数图表以显示前10个IP。

如何捕获网络数据包

现在,让我们构建功能,以允许我们捕获网络数据包。

def start_packet_capture():
    """Start packet capture in a separate thread"""
    processor = PacketProcessor()

    def capture_packets():
        sniff(prn=processor.process_packet, store=False)

    capture_thread = threading.Thread(target=capture_packets, daemon=True)
    capture_thread.start()

    return processor

这是一个简单的函数,实例化PacketProcessor类,然后使用scapy模块中的sniff函数开始捕获数据包。

我们在这里使用线程,使我们能够独立于主程序流捕获数据包。这确保数据包捕获操作不会阻塞其他操作,如实时更新仪表板。我们还返回创建的PacketProcessor实例,以便在我们的主程序中使用。

将所有内容整合在一起

现在让我们用main函数将所有这些部分结合在一起,这将作为我们程序的驱动函数。

def main():
    """Main function to run the dashboard"""
    st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
    st.title("Real-time Network Traffic Analysis")

    # 初始化会话状态下的数据包处理器
    if 'processor' not in st.session_state:
        st.session_state.processor = start_packet_capture()
        st.session_state.start_time = time.time()

    # 创建仪表板布局
    col1, col2 = st.columns(2)

    # 获取当前数据
    df = st.session_state.processor.get_dataframe()

    # 显示指标
    with col1:
        st.metric("Total Packets", len(df))
    with col2:
        duration = time.time() - st.session_state.start_time
        st.metric("Capture Duration", f"{duration:.2f}s")

    # 显示可视化
    create_visualizations(df)

    # 显示最近捕获的数据包
    st.subheader("Recent Packets")
    if len(df) > 0:
        st.dataframe(
            df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
            use_container_width=True
        )

    # 添加刷新按钮
    if st.button('Refresh Data'):
        st.rerun()

    # 自动刷新
    time.sleep(2)
    st.rerun()

这个函数还会实例化Streamlit仪表板,并将所有组件整合在一起。我们首先设置Streamlit仪表板的页面标题,然后初始化我们的PacketProcessor。我们在Streamlit中使用会话状态,以确保只创建一个数据包捕获实例,并保持其状态。

现在,我们将动态从会话状态中获取数据框,每次处理数据时开始显示指标和可视化。我们还将显示最近捕获的数据包,以及时间戳、源和目标IP、协议和数据包大小等信息。我们还将为用户添加手动刷新仪表板数据的功能,同时每两秒自动刷新数据。

最后,让我们通过以下命令运行程序:

sudo streamlit run dashboard.py

请注意,由于数据包捕获功能需要管理权限,您将不得不使用sudo运行程序。如果您使用的是Windows系统,请以管理员身份打开终端,然后运行程序,不需要sudo前缀。

请给程序一些时间来开始捕获数据包。如果一切顺利,您应该会看到类似这样的内容:

这些都是我们在Streamlit仪表板程序中刚刚实现的所有可视化效果。

未来的增强功能

因此,这里有一些未来的增强功能想法,您可以用来扩展仪表板的功能:

  1. 为异常检测添加机器学习功能

  2. 实现地理IP映射

  3. 根据流量分析模式创建自定义警报

  4. 添加数据包载荷分析选项

结论

恭喜!您已成功使用Python和Streamlit构建了实时网络流量分析仪表板。这个程序将为您提供有关网络行为的宝贵见解,并可根据需要扩展至各种用例,从安全监控到网络优化。

希望你通过这些内容学到了一些关于网络流量分析以及一点Python编程的基础知识。谢谢阅读!