リアルタイムでネットワークトラフィックを可視化したことはありますか?このチュートリアルでは、PythonとStreamlitを使用してインタラクティブなネットワークトラフィック分析ダッシュボードを構築する方法を学びます。Streamlitは、データ分析やデータ処理のためのWebアプリケーションを開発するために使用できるオープンソースのPythonフレームワークです。

このチュートリアルの最後までに、コンピュータのNIC(ネットワークインターフェースカード)から生のネットワークパケットをキャプチャし、データを処理し、リアルタイムで更新される美しい可視化を作成する方法がわかります。

目次

ネットワークトラフィック分析の重要性はなぜですか?

ネットワークトラフィック分析は、ネットワークがほぼすべてのアプリケーションとサービスのバックボーンを形成する企業において、重要な要件です。その中心には、ネットワークを監視し、すべてのトラフィック(進入と出力)をキャプチャし、これらのパケットがネットワークを通過する際に解釈するネットワークパケットの分析があります。この技術を使用して、セキュリティパターンを特定し、異常を検出し、ネットワークのセキュリティと効率を確保できます。

このチュートリアルで取り組むこの概念実証プロジェクトは、特にリアルタイムでネットワーク活動を視覚化および分析するのに役立つため、非常に便利です。これにより、企業システムにおける問題のトラブルシューティング、パフォーマンスの最適化、セキュリティ分析がどのように行われているかを理解することができます。

前提条件

プロジェクトのセットアップ方法

始めるには、プロジェクト構造を作成し、次のコマンドでPipを使用して必要なツールをインストールします:

mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly

ダッシュボードの可視化にはStreamlit、データ処理にはPandas、ネットワークパケットのキャプチャと処理にはScapy、最後に収集したデータを使ってチャートを描画するためにPlotlyを使用します。

コア機能の構築方法

すべてのコードをdashboard.pyという名前の単一ファイルにまとめます。まず、使用するすべての要素をインポートすることから始めましょう:

import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket

今度は、基本的なログ設定を行うことで、ログの構成を行いましょう。これはイベントの追跡およびデバッグモードでのアプリケーションの実行に使用されます。現在、ログレベルをINFOに設定しており、INFOレベル以上のイベントが表示されます。Pythonのログに慣れていない場合は、詳細について説明しているthisドキュメントをチェックすることをお勧めします。

# ログの構成
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

次に、パケット処理プロセッサを構築します。このクラスで、キャプチャしたパケットを処理する機能を実装します。

class PacketProcessor:
    """Process and analyze network packets"""

    def __init__(self):
        self.protocol_map = {
            1: 'ICMP',
            6: 'TCP',
            17: 'UDP'
        }
        self.packet_data = []
        self.start_time = datetime.now()
        self.packet_count = 0
        self.lock = threading.Lock()

    def get_protocol_name(self, protocol_num: int) -> str:
        """Convert protocol number to name"""
        return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')

    def process_packet(self, packet) -> None:
        """Process a single packet and extract relevant information"""
        try:
            if IP in packet:
                with self.lock:
                    packet_info = {
                        'timestamp': datetime.now(),
                        'source': packet[IP].src,
                        'destination': packet[IP].dst,
                        'protocol': self.get_protocol_name(packet[IP].proto),
                        'size': len(packet),
                        'time_relative': (datetime.now() - self.start_time).total_seconds()
                    }

                    # TCP固有情報の追加
                    if TCP in packet:
                        packet_info.update({
                            'src_port': packet[TCP].sport,
                            'dst_port': packet[TCP].dport,
                            'tcp_flags': packet[TCP].flags
                        })

                    # UDP固有情報の追加
                    elif UDP in packet:
                        packet_info.update({
                            'src_port': packet[UDP].sport,
                            'dst_port': packet[UDP].dport
                        })

                    self.packet_data.append(packet_info)
                    self.packet_count += 1

                    # メモリの問題を防ぐため、最後の10000パケットのみ保持
                    if len(self.packet_data) > 10000:
                        self.packet_data.pop(0)

        except Exception as e:
            logger.error(f"Error processing packet: {str(e)}")

    def get_dataframe(self) -> pd.DataFrame:
        """Convert packet data to pandas DataFrame"""
        with self.lock:
            return pd.DataFrame(self.packet_data)

このクラスは、コア機能を構築し、パケットの処理に使用されるいくつかのユーティリティ関数が含まれています。

ネットワークパケットは、トランスポートレベル(TCPおよびUDP)で2つに分類されます。そして、ネットワークレベルではICMPプロトコルがあります。TCP/IPの概念に馴染みがない場合は、freeCodeCamp Newsのthis記事をチェックすることをお勧めします。

私たちのコンストラクタは、定義したTCP/IPプロトコルタイプのバケットに分類されるすべてのパケットを追跡します。また、パケットのキャプチャ時刻、キャプチャされたデータ、およびキャプチャされたパケットの数にも注目します。

また、スレッドロックを活用して、1度に1つのパケットのみが処理されるようにします。これはさらに拡張して、プロジェクトが並列パケット処理を行えるようにすることができます。

get_protocol_nameヘルパー関数は、プロトコル番号に基づいてプロトコルの正しいタイプを取得するのに役立ちます。これについての背景情報として、インターネット割り当て番号機関(IANA)は、ネットワークパケット内の異なるプロトコルを識別するための標準化された番号を割り当てています。解析されたネットワークパケットでこれらの番号を見るとき、現在インターセプトされているパケットでどの種類のプロトコルが使用されているかがわかります。このプロジェクトの範囲では、TCP、UDP、およびICMP(Ping)にのみマッピングされます。他の種類のパケットに遭遇した場合、OTHER(<protocol_num>)として分類されます。

process_packet関数は、これらの個々のパケットを処理するコア機能を処理します。パケットにIPレイヤーが含まれている場合、ソースおよび宛先IPアドレス、プロトコルタイプ、パケットサイズ、およびパケットキャプチャの開始から経過した時間をメモします。

TCPやUDPなど特定のトランスポート層プロトコルを持つパケットの場合、TCPパケットの場合はソースおよび宛先ポート、TCPパケットの場合はTCPフラグをキャプチャします。これらの抽出された詳細は、packet_dataリストにメモリに保存されます。これらのパケットが処理されるたびに、packet_countも追跡します。

get_dataframe関数は、packet_dataリストをPandasデータフレームに変換するのに役立ち、そのデータフレームは後で可視化に使用されます。

Streamlitビジュアライゼーションの作成方法

これで、インタラクティブなStreamlitダッシュボードを構築する準備が整いました。パケット処理クラスの外部にcreate_visualization関数をdashboard.pyスクリプトに定義します。

def create_visualizations(df: pd.DataFrame):
    """Create all dashboard visualizations"""
    if len(df) > 0:
        # プロトコル分布
        protocol_counts = df['protocol'].value_counts()
        fig_protocol = px.pie(
            values=protocol_counts.values,
            names=protocol_counts.index,
            title="Protocol Distribution"
        )
        st.plotly_chart(fig_protocol, use_container_width=True)

        # パケットタイムライン
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
        fig_timeline = px.line(
            x=df_grouped.index,
            y=df_grouped.values,
            title="Packets per Second"
        )
        st.plotly_chart(fig_timeline, use_container_width=True)

        # トップソースIP
        top_sources = df['source'].value_counts().head(10)
        fig_sources = px.bar(
            x=top_sources.index,
            y=top_sources.values,
            title="Top Source IP Addresses"
        )
        st.plotly_chart(fig_sources, use_container_width=True)

この関数はデータフレームを入力として受け取り、3つのチャート/グラフをプロットするのに役立ちます:

  1. プロトコル分布チャート: このチャートは、キャプチャされたパケットトラフィックにおける異なるプロトコル(例: TCP、UDP、ICMP)の割合を表示します。

  2. パケットタイムラインチャート: このチャートは、時間の経過とともに1秒あたり処理されたパケット数を表示します。

  3. トップソースIPアドレスチャート: このチャートは、キャプチャされたトラフィックで最も多くのパケットを送信した上位10のIPアドレスを強調表示します。

プロトコル分布チャートは、3つの異なるタイプ(その他を含む)のプロトコルカウントの円グラフです。これらのチャートをプロットするために、StreamlitPlotly Pythonツールを使用します。パケットキャプチャを開始してからのタイムスタンプも記録しているため、このデータを使用して時間経過に伴うキャプチャされたパケットのトレンドをプロットします。

2番目のチャートでは、データにgroupby操作を実行し、各秒にキャプチャされたパケットの数を取得し、最終的にグラフをプロットします。

最後に、3番目のチャートでは、観測された異なるソースIPの数をカウントし、トップ10のIPを示すIPカウントのグラフをプロットします。

ネットワークパケットのキャプチャ方法

さて、ネットワークパケットデータをキャプチャする機能を構築しましょう。

def start_packet_capture():
    """Start packet capture in a separate thread"""
    processor = PacketProcessor()

    def capture_packets():
        sniff(prn=processor.process_packet, store=False)

    capture_thread = threading.Thread(target=capture_packets, daemon=True)
    capture_thread.start()

    return processor

これは、PacketProcessorクラスをインスタンス化し、scapyモジュール内のsniff関数を使用してパケットのキャプチャを開始する単純な関数です。

ここではスレッディングを使用して、パケットのキャプチャをメインプログラムのフローから独立してキャプチャできるようにします。これにより、パケットのキャプチャ操作がダッシュボードのリアルタイム更新など他の操作をブロックしないようにします。また、作成したPacketProcessorインスタンスを返して、メインプログラムで使用できるようにします。

すべてを組み合わせる

これで、これらすべての要素をmain関数で繋ぎ合わせ、プログラムのドライバー関数として機能させる準備が整いました。

def main():
    """Main function to run the dashboard"""
    st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
    st.title("Real-time Network Traffic Analysis")

    # セッション状態でパケット処理機を初期化
    if 'processor' not in st.session_state:
        st.session_state.processor = start_packet_capture()
        st.session_state.start_time = time.time()

    # ダッシュボードレイアウトを作成
    col1, col2 = st.columns(2)

    # 現在のデータを取得
    df = st.session_state.processor.get_dataframe()

    # メトリクスを表示
    with col1:
        st.metric("Total Packets", len(df))
    with col2:
        duration = time.time() - st.session_state.start_time
        st.metric("Capture Duration", f"{duration:.2f}s")

    # ビジュアライゼーションを表示
    create_visualizations(df)

    # 最近キャプチャしたパケットを表示
    st.subheader("Recent Packets")
    if len(df) > 0:
        st.dataframe(
            df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
            use_container_width=True
        )

    # リフレッシュボタンを追加
    if st.button('Refresh Data'):
        st.rerun()

    # 自動リフレッシュ
    time.sleep(2)
    st.rerun()

この関数は、Streamlitダッシュボードをインスタンス化し、すべてのコンポーネントを統合します。ます最初にStreamlitダッシュボードのページタイトルを設定し、次にPacketProcessorを初期化します。 Streamlitのセッション状態を使用して、パケットのキャプチャのインスタンスが1つだけ作成され、その状態が保持されるようにします。

さて、データが処理されるたびにセッション状態からデータフレームを動的に取得し、メトリクスとビジュアライゼーションを表示し始めます。また、タイムスタンプ、ソースおよび宛先IP、プロトコル、パケットのサイズなどの情報とともに、最近キャプチャしたパケットを表示します。また、ユーザーがダッシュボードからデータを手動でリフレッシュできる機能を追加し、同時に2秒ごとに自動的にリフレッシュします。

最後に、次のコマンドでプログラムを実行しましょう:

sudo streamlit run dashboard.py

プログラムを実行する際には、パケットのキャプチャ機能が管理者権限が必要なため、sudoで実行する必要があります。Windowsをご利用の場合は、管理者としてターミナルを開き、sudoプレフィックスなしでプログラムを実行してください。

プログラムがパケットのキャプチャを開始するのを待ちます。すべてがうまくいくと、次のようなものが表示されるはずです:

これらは、私たちのStreamlitダッシュボードプログラムに実装したすべての可視化です。

将来の拡張

それでは、ダッシュボードの機能を拡張するために使用できる将来の拡張アイデアをいくつか紹介します:

  1. 異常検知のための機械学習機能を追加

  2. 地理的IPマッピングを実装

  3. トラフィック解析パターンに基づいてカスタムアラートを作成

  4. パケットペイロード解析オプションを追加

結論

おめでとうございます!PythonとStreamlitを使用してリアルタイムのネットワークトラフィック解析ダッシュボードを正常に構築しました。このプログラムはネットワークの動作に関する貴重な洞察を提供し、セキュリティモニタリングからネットワークの最適化まで、さまざまなユースケースに拡張できます。

そのようにして、ネットワークトラフィック解析の基本とPythonプログラミングの基礎を学んでもらえたことを願っています。読んでいただき、ありがとうございました!