リアルタイムでネットワークトラフィックを可視化したことはありますか?このチュートリアルでは、PythonとStreamlit
を使用してインタラクティブなネットワークトラフィック分析ダッシュボードを構築する方法を学びます。Streamlit
は、データ分析やデータ処理のためのWebアプリケーションを開発するために使用できるオープンソースのPythonフレームワークです。
このチュートリアルの最後までに、コンピュータのNIC(ネットワークインターフェースカード)から生のネットワークパケットをキャプチャし、データを処理し、リアルタイムで更新される美しい可視化を作成する方法がわかります。
目次
ネットワークトラフィック分析の重要性はなぜですか?
ネットワークトラフィック分析は、ネットワークがほぼすべてのアプリケーションとサービスのバックボーンを形成する企業において、重要な要件です。その中心には、ネットワークを監視し、すべてのトラフィック(進入と出力)をキャプチャし、これらのパケットがネットワークを通過する際に解釈するネットワークパケットの分析があります。この技術を使用して、セキュリティパターンを特定し、異常を検出し、ネットワークのセキュリティと効率を確保できます。
このチュートリアルで取り組むこの概念実証プロジェクトは、特にリアルタイムでネットワーク活動を視覚化および分析するのに役立つため、非常に便利です。これにより、企業システムにおける問題のトラブルシューティング、パフォーマンスの最適化、セキュリティ分析がどのように行われているかを理解することができます。
前提条件
-
システムにPython 3.8またはそれ以降のバージョンがインストールされていること。
-
コンピュータネットワーキングの基本的な概念についての基本的な理解。
-
Pythonプログラミング言語とその広く使用されているライブラリに対する親しみ。
-
データ可視化技術とライブラリに関する基本的な知識。
プロジェクトのセットアップ方法
始めるには、プロジェクト構造を作成し、次のコマンドでPipを使用して必要なツールをインストールします:
mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly
ダッシュボードの可視化にはStreamlit
、データ処理にはPandas
、ネットワークパケットのキャプチャと処理にはScapy
、最後に収集したデータを使ってチャートを描画するためにPlotly
を使用します。
コア機能の構築方法
すべてのコードをdashboard.py
という名前の単一ファイルにまとめます。まず、使用するすべての要素をインポートすることから始めましょう:
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket
今度は、基本的なログ設定を行うことで、ログの構成を行いましょう。これはイベントの追跡およびデバッグモードでのアプリケーションの実行に使用されます。現在、ログレベルをINFO
に設定しており、INFO
レベル以上のイベントが表示されます。Pythonのログに慣れていない場合は、詳細について説明しているthisドキュメントをチェックすることをお勧めします。
# ログの構成
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
次に、パケット処理プロセッサを構築します。このクラスで、キャプチャしたパケットを処理する機能を実装します。
class PacketProcessor:
"""Process and analyze network packets"""
def __init__(self):
self.protocol_map = {
1: 'ICMP',
6: 'TCP',
17: 'UDP'
}
self.packet_data = []
self.start_time = datetime.now()
self.packet_count = 0
self.lock = threading.Lock()
def get_protocol_name(self, protocol_num: int) -> str:
"""Convert protocol number to name"""
return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')
def process_packet(self, packet) -> None:
"""Process a single packet and extract relevant information"""
try:
if IP in packet:
with self.lock:
packet_info = {
'timestamp': datetime.now(),
'source': packet[IP].src,
'destination': packet[IP].dst,
'protocol': self.get_protocol_name(packet[IP].proto),
'size': len(packet),
'time_relative': (datetime.now() - self.start_time).total_seconds()
}
# TCP固有情報の追加
if TCP in packet:
packet_info.update({
'src_port': packet[TCP].sport,
'dst_port': packet[TCP].dport,
'tcp_flags': packet[TCP].flags
})
# UDP固有情報の追加
elif UDP in packet:
packet_info.update({
'src_port': packet[UDP].sport,
'dst_port': packet[UDP].dport
})
self.packet_data.append(packet_info)
self.packet_count += 1
# メモリの問題を防ぐため、最後の10000パケットのみ保持
if len(self.packet_data) > 10000:
self.packet_data.pop(0)
except Exception as e:
logger.error(f"Error processing packet: {str(e)}")
def get_dataframe(self) -> pd.DataFrame:
"""Convert packet data to pandas DataFrame"""
with self.lock:
return pd.DataFrame(self.packet_data)
このクラスは、コア機能を構築し、パケットの処理に使用されるいくつかのユーティリティ関数が含まれています。
ネットワークパケットは、トランスポートレベル(TCPおよびUDP)で2つに分類されます。そして、ネットワークレベルではICMPプロトコルがあります。TCP/IPの概念に馴染みがない場合は、freeCodeCamp Newsのthis記事をチェックすることをお勧めします。
私たちのコンストラクタは、定義したTCP/IPプロトコルタイプのバケットに分類されるすべてのパケットを追跡します。また、パケットのキャプチャ時刻、キャプチャされたデータ、およびキャプチャされたパケットの数にも注目します。
また、スレッドロックを活用して、1度に1つのパケットのみが処理されるようにします。これはさらに拡張して、プロジェクトが並列パケット処理を行えるようにすることができます。
get_protocol_name
ヘルパー関数は、プロトコル番号に基づいてプロトコルの正しいタイプを取得するのに役立ちます。これについての背景情報として、インターネット割り当て番号機関(IANA)は、ネットワークパケット内の異なるプロトコルを識別するための標準化された番号を割り当てています。解析されたネットワークパケットでこれらの番号を見るとき、現在インターセプトされているパケットでどの種類のプロトコルが使用されているかがわかります。このプロジェクトの範囲では、TCP、UDP、およびICMP(Ping)にのみマッピングされます。他の種類のパケットに遭遇した場合、OTHER(<protocol_num>)
として分類されます。
process_packet
関数は、これらの個々のパケットを処理するコア機能を処理します。パケットにIPレイヤーが含まれている場合、ソースおよび宛先IPアドレス、プロトコルタイプ、パケットサイズ、およびパケットキャプチャの開始から経過した時間をメモします。
TCPやUDPなど特定のトランスポート層プロトコルを持つパケットの場合、TCPパケットの場合はソースおよび宛先ポート、TCPパケットの場合はTCPフラグをキャプチャします。これらの抽出された詳細は、packet_data
リストにメモリに保存されます。これらのパケットが処理されるたびに、packet_count
も追跡します。
get_dataframe
関数は、packet_data
リストをPandas
データフレームに変換するのに役立ち、そのデータフレームは後で可視化に使用されます。
Streamlitビジュアライゼーションの作成方法
これで、インタラクティブなStreamlitダッシュボードを構築する準備が整いました。パケット処理クラスの外部にcreate_visualization
関数をdashboard.py
スクリプトに定義します。
def create_visualizations(df: pd.DataFrame):
"""Create all dashboard visualizations"""
if len(df) > 0:
# プロトコル分布
protocol_counts = df['protocol'].value_counts()
fig_protocol = px.pie(
values=protocol_counts.values,
names=protocol_counts.index,
title="Protocol Distribution"
)
st.plotly_chart(fig_protocol, use_container_width=True)
# パケットタイムライン
df['timestamp'] = pd.to_datetime(df['timestamp'])
df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
fig_timeline = px.line(
x=df_grouped.index,
y=df_grouped.values,
title="Packets per Second"
)
st.plotly_chart(fig_timeline, use_container_width=True)
# トップソースIP
top_sources = df['source'].value_counts().head(10)
fig_sources = px.bar(
x=top_sources.index,
y=top_sources.values,
title="Top Source IP Addresses"
)
st.plotly_chart(fig_sources, use_container_width=True)
この関数はデータフレームを入力として受け取り、3つのチャート/グラフをプロットするのに役立ちます:
-
プロトコル分布チャート: このチャートは、キャプチャされたパケットトラフィックにおける異なるプロトコル(例: TCP、UDP、ICMP)の割合を表示します。
-
パケットタイムラインチャート: このチャートは、時間の経過とともに1秒あたり処理されたパケット数を表示します。
-
トップソースIPアドレスチャート: このチャートは、キャプチャされたトラフィックで最も多くのパケットを送信した上位10のIPアドレスを強調表示します。
プロトコル分布チャートは、3つの異なるタイプ(その他を含む)のプロトコルカウントの円グラフです。これらのチャートをプロットするために、Streamlit
とPlotly
Pythonツールを使用します。パケットキャプチャを開始してからのタイムスタンプも記録しているため、このデータを使用して時間経過に伴うキャプチャされたパケットのトレンドをプロットします。
2番目のチャートでは、データにgroupby
操作を実行し、各秒にキャプチャされたパケットの数を取得し、最終的にグラフをプロットします。
最後に、3番目のチャートでは、観測された異なるソースIPの数をカウントし、トップ10のIPを示すIPカウントのグラフをプロットします。
ネットワークパケットのキャプチャ方法
さて、ネットワークパケットデータをキャプチャする機能を構築しましょう。
def start_packet_capture():
"""Start packet capture in a separate thread"""
processor = PacketProcessor()
def capture_packets():
sniff(prn=processor.process_packet, store=False)
capture_thread = threading.Thread(target=capture_packets, daemon=True)
capture_thread.start()
return processor
これは、PacketProcessor
クラスをインスタンス化し、scapy
モジュール内のsniff
関数を使用してパケットのキャプチャを開始する単純な関数です。
ここではスレッディングを使用して、パケットのキャプチャをメインプログラムのフローから独立してキャプチャできるようにします。これにより、パケットのキャプチャ操作がダッシュボードのリアルタイム更新など他の操作をブロックしないようにします。また、作成したPacketProcessor
インスタンスを返して、メインプログラムで使用できるようにします。
すべてを組み合わせる
これで、これらすべての要素をmain
関数で繋ぎ合わせ、プログラムのドライバー関数として機能させる準備が整いました。
def main():
"""Main function to run the dashboard"""
st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
st.title("Real-time Network Traffic Analysis")
# セッション状態でパケット処理機を初期化
if 'processor' not in st.session_state:
st.session_state.processor = start_packet_capture()
st.session_state.start_time = time.time()
# ダッシュボードレイアウトを作成
col1, col2 = st.columns(2)
# 現在のデータを取得
df = st.session_state.processor.get_dataframe()
# メトリクスを表示
with col1:
st.metric("Total Packets", len(df))
with col2:
duration = time.time() - st.session_state.start_time
st.metric("Capture Duration", f"{duration:.2f}s")
# ビジュアライゼーションを表示
create_visualizations(df)
# 最近キャプチャしたパケットを表示
st.subheader("Recent Packets")
if len(df) > 0:
st.dataframe(
df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
use_container_width=True
)
# リフレッシュボタンを追加
if st.button('Refresh Data'):
st.rerun()
# 自動リフレッシュ
time.sleep(2)
st.rerun()
この関数は、Streamlit
ダッシュボードをインスタンス化し、すべてのコンポーネントを統合します。ます最初にStreamlit
ダッシュボードのページタイトルを設定し、次にPacketProcessor
を初期化します。 Streamlit
のセッション状態を使用して、パケットのキャプチャのインスタンスが1つだけ作成され、その状態が保持されるようにします。
さて、データが処理されるたびにセッション状態からデータフレームを動的に取得し、メトリクスとビジュアライゼーションを表示し始めます。また、タイムスタンプ、ソースおよび宛先IP、プロトコル、パケットのサイズなどの情報とともに、最近キャプチャしたパケットを表示します。また、ユーザーがダッシュボードからデータを手動でリフレッシュできる機能を追加し、同時に2秒ごとに自動的にリフレッシュします。
最後に、次のコマンドでプログラムを実行しましょう:
sudo streamlit run dashboard.py
プログラムを実行する際には、パケットのキャプチャ機能が管理者権限が必要なため、sudo
で実行する必要があります。Windowsをご利用の場合は、管理者としてターミナルを開き、sudo
プレフィックスなしでプログラムを実行してください。
プログラムがパケットのキャプチャを開始するのを待ちます。すべてがうまくいくと、次のようなものが表示されるはずです:
これらは、私たちのStreamlit
ダッシュボードプログラムに実装したすべての可視化です。
将来の拡張
それでは、ダッシュボードの機能を拡張するために使用できる将来の拡張アイデアをいくつか紹介します:
-
異常検知のための機械学習機能を追加
-
地理的IPマッピングを実装
-
トラフィック解析パターンに基づいてカスタムアラートを作成
-
パケットペイロード解析オプションを追加
結論
おめでとうございます!PythonとStreamlit
を使用してリアルタイムのネットワークトラフィック解析ダッシュボードを正常に構築しました。このプログラムはネットワークの動作に関する貴重な洞察を提供し、セキュリティモニタリングからネットワークの最適化まで、さまざまなユースケースに拡張できます。
そのようにして、ネットワークトラフィック解析の基本とPythonプログラミングの基礎を学んでもらえたことを願っています。読んでいただき、ありがとうございました!