네트워크 트래픽을 실시간으로 시각화해 보고 싶은 적이 있었나요? 이 자습서에서는 Python과 Streamlit
을 사용하여 대화형 네트워크 트래픽 분석 대시보드를 구축하는 방법을 배우게 될 것입니다. Streamlit
은 데이터 분석 및 데이터 처리를 위한 웹 애플리케이션을 개발할 수 있는 오픈소스 Python 프레임워크입니다.
이 자습서를 마치면 컴퓨터의 NIC(네트워크 인터페이스 카드)에서 원시 네트워크 패킷을 캡처하고 데이터를 처리하며 실시간으로 업데이트되는 아름다운 시각화를 생성하는 방법을 알게 될 것입니다.
목차
네트워크 트래픽 분석의 중요성은 무엇인가요?
엔터프라이즈에서 네트워크가 거의 모든 애플리케이션과 서비스의 중추를 이루고 있는 곳에서 네트워크 트래픽 분석은 중요한 요구 사항입니다. 이에 있어 핵심은 네트워크 패킷의 분석에 있어 네트워크를 모니터링하고 모든 트래픽(입방향 및 출방향)을 캡처하며, 이러한 패킷들이 네트워크를 통해 흐를 때 이를 해석하는 것입니다. 이 기술을 사용하여 보안 패턴을 식별하고 이상 현상을 감지하며, 네트워크의 보안과 효율성을 보장할 수 있습니다.
본 자습서에서 작업할 이 프로젝트는 실시간으로 네트워크 활동을 시각화하고 분석하여 매우 유용합니다. 이를 통해 엔터프라이즈 시스템에서 문제 해결, 성능 최적화 및 보안 분석이 어떻게 이루어지는지 이해할 수 있습니다.
필수 준비물
-
시스템에 설치된 Python 3.8 이상의 새로운 버전.
-
컴퓨터 네트워킹 개념에 대한 기본적인 이해.
-
Python 프로그래밍 언어와 널리 사용되는 라이브러리에 대한 숙련도.
-
데이터 시각화 기법과 라이브러리에 대한 기본 지식.
프로젝트 설정 방법
시작하려면 다음 명령을 사용하여 프로젝트 구조를 생성하고 필요한 도구를 Pip로 설치하십시오:
mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly
대시보드 시각화에는 Streamlit
을 사용하고 데이터 처리에는 Pandas
, 네트워크 패킷 캡처 및 처리에는 Scapy
, 마지막으로 수집한 데이터로 차트를 그리기 위해 Plotly
를 사용할 것입니다.
핵심 기능 구축 방법
모든 코드를 dashboard.py
라는 단일 파일에 넣을 것입니다. 먼저 사용할 모든 요소를 가져오는 것으로 시작합시다:
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket
이제 기본 로깅 구성을 설정하여 로깅을 구성해 보겠습니다. 이는 이벤트 추적 및 응용 프로그램을 디버그 모드로 실행하는 데 사용될 것입니다. 현재 로깅 레벨을 INFO
로 설정했으며, 이는 레벨이 INFO
또는 그 이상인 이벤트를 표시하게 됨을 의미합니다. Python의 로깅에 익숙하지 않다면, 깊이 있는 내용을 다룬 이 문서를 확인하는 것을 권장합니다.
# 로깅 구성
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
다음으로, 우리의 패킷 프로세서를 구축할 것입니다. 이 클래스에서 캡처한 패킷을 처리하는 기능을 구현할 것입니다.
class PacketProcessor:
"""Process and analyze network packets"""
def __init__(self):
self.protocol_map = {
1: 'ICMP',
6: 'TCP',
17: 'UDP'
}
self.packet_data = []
self.start_time = datetime.now()
self.packet_count = 0
self.lock = threading.Lock()
def get_protocol_name(self, protocol_num: int) -> str:
"""Convert protocol number to name"""
return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')
def process_packet(self, packet) -> None:
"""Process a single packet and extract relevant information"""
try:
if IP in packet:
with self.lock:
packet_info = {
'timestamp': datetime.now(),
'source': packet[IP].src,
'destination': packet[IP].dst,
'protocol': self.get_protocol_name(packet[IP].proto),
'size': len(packet),
'time_relative': (datetime.now() - self.start_time).total_seconds()
}
# TCP에 특화된 정보 추가
if TCP in packet:
packet_info.update({
'src_port': packet[TCP].sport,
'dst_port': packet[TCP].dport,
'tcp_flags': packet[TCP].flags
})
# UDP에 특화된 정보 추가
elif UDP in packet:
packet_info.update({
'src_port': packet[UDP].sport,
'dst_port': packet[UDP].dport
})
self.packet_data.append(packet_info)
self.packet_count += 1
# 메모리 문제를 방지하기 위해 마지막 10000개 패킷만 유지
if len(self.packet_data) > 10000:
self.packet_data.pop(0)
except Exception as e:
logger.error(f"Error processing packet: {str(e)}")
def get_dataframe(self) -> pd.DataFrame:
"""Convert packet data to pandas DataFrame"""
with self.lock:
return pd.DataFrame(self.packet_data)
이 클래스는 핵심 기능을 구축하며, 패킷 처리에 사용될 여러 유틸리티 함수를 포함합니다.
네트워크 패킷은 전송 수준에서 두 가지로 구분됩니다 (TCP 및 UDP) 및 네트워크 수준에서 ICMP 프로토콜이 있습니다. TCP/IP 개념에 익숙하지 않다면, freeCodeCamp News의 이 기사를 확인하는 것을 추천합니다.
저희 생성자는 정의된 TCP/IP 프로토콜 유형 버킷으로 분류된 모든 패킷을 추적할 것입니다. 또한 패킷 캡처 시간, 캡처된 데이터, 그리고 캡처된 패킷 수를 기록할 것입니다.
또한 한 번에 하나의 패킷만 처리되도록 스레드 락을 활용할 것입니다. 이는 프로젝트가 병렬 패킷 처리를 할 수 있도록 더 확장될 수 있습니다.
get_protocol_name
도우미 함수는 프로토콜 번호에 기반하여 올바른 프로토콜 유형을 얻는 데 도움을 줍니다. 이를 위한 배경 설명으로, 인터넷 할당 번호 기관(IANA)은 네트워크 패킷에서 다른 프로토콜을 식별하기 위해 표준화된 번호를 할당합니다. 파싱된 네트워크 패킷에서 이러한 번호를 볼 때마다, 현재 가로채인 패킷에서 어떤 종류의 프로토콜이 사용되고 있는지 알 수 있습니다. 이 프로젝트의 범위에서는 TCP, UDP 및 ICMP(Ping)에만 매핑할 것입니다. 다른 유형의 패킷을 만나면 OTHER(<프로토콜_번호>)
로 분류할 것입니다.
process_packet
함수는 이러한 개별 패킷을 처리할 핵심 기능을 담당합니다. 패킷에 IP 레이어가 포함되어 있는 경우, 출발지 및 목적지 IP 주소, 프로토콜 유형, 패킷 크기, 그리고 패킷 캡처 시작부터 경과된 시간을 기록할 것입니다.
특정 전송 계층 프로토콜(예: TCP 및 UDP)을 갖는 패킷의 경우, TCP 패킷의 경우 출발지 및 목적지 포트 및 TCP 플래그를 캡처할 것입니다. 이 추출된 세부 정보는 packet_data
목록에 메모리에 저장될 것입니다. 또한 이러한 패킷이 처리될 때마다 packet_count
를 추적할 것입니다.
get_dataframe
함수는 packet_data
목록을 Pandas
데이터프레임으로 변환하는 데 도움을 줍니다. 이 데이터프레임은 시각화에 사용될 것입니다.
Streamlit 시각화 생성 방법
이제 대화형 Streamlit 대시보드를 만들 차례입니다. 우리는 dashboard.py
스크립트에 packet
처리 클래스 외부에 create_visualization
함수를 정의할 것입니다.
def create_visualizations(df: pd.DataFrame):
"""Create all dashboard visualizations"""
if len(df) > 0:
# 프로토콜 분포
protocol_counts = df['protocol'].value_counts()
fig_protocol = px.pie(
values=protocol_counts.values,
names=protocol_counts.index,
title="Protocol Distribution"
)
st.plotly_chart(fig_protocol, use_container_width=True)
# 패킷 타임라인
df['timestamp'] = pd.to_datetime(df['timestamp'])
df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
fig_timeline = px.line(
x=df_grouped.index,
y=df_grouped.values,
title="Packets per Second"
)
st.plotly_chart(fig_timeline, use_container_width=True)
# 최상위 출발지 IP
top_sources = df['source'].value_counts().head(10)
fig_sources = px.bar(
x=top_sources.index,
y=top_sources.values,
title="Top Source IP Addresses"
)
st.plotly_chart(fig_sources, use_container_width=True)
이 함수는 데이터프레임을 입력으로 받아 세 가지 차트 / 그래프를 그리는 데 도움을 줄 것입니다:
-
프로토콜 분포 차트: 이 차트는 캡처된 패킷 트래픽에서 다른 프로토콜(TCP, UDP, ICMP 등)의 비율을 표시합니다.
-
패킷 타임라인 차트: 이 차트는 시간 단위로 처리된 패킷 수를 보여줍니다.
-
상위 출발지 IP 주소 차트: 이 차트는 캡처된 트래픽에서 가장 많은 패킷을 보낸 상위 10개 IP 주소를 강조합니다.
프로토콜 분포 차트는 다른 세 가지 유형의 프로토콜 카운트에 대한 파이 차트입니다(기타도 포함). 이러한 차트를 그리기 위해 Streamlit
및 Plotly
Python 도구를 사용합니다. 또한 패킷 캡처가 시작된 시각을 기록했으므로이 데이터를 사용하여 시간이 지남에 따른 캡처된 패킷 트렌드를 그릴 것입니다.
두 번째 차트에서는 데이터에 groupby
작업을 수행하여 각 초마다 캡처된 패킷 수를 얻은 다음 그래프를 그릴 것입니다.
마지막으로, 세 번째 차트에서 관찰된 고유한 출발지 IP 수를 계산하고 상위 10개 IP를 보여주기 위한 IP 카운트 차트를 그릴 것입니다.
네트워크 패킷 캡처하는 방법
이제 네트워크 패킷 데이터를 캡처할 수 있는 기능을 구축해 봅시다.
def start_packet_capture():
"""Start packet capture in a separate thread"""
processor = PacketProcessor()
def capture_packets():
sniff(prn=processor.process_packet, store=False)
capture_thread = threading.Thread(target=capture_packets, daemon=True)
capture_thread.start()
return processor
이것은 PacketProcessor
클래스를 인스턴스화하고 scapy
모듈의 sniff
함수를 사용하여 패킷 캡처를 시작하는 간단한 함수입니다.
이곳에서는 쓰레딩을 사용하여 패킷을 메인 프로그램 흐름과 독립적으로 캡처할 수 있도록 합니다. 이렇게 함으로써 패킷 캡처 작업이 대시보드를 실시간으로 업데이트하는 것과 같은 다른 작업을 차단하지 않도록 합니다. 또한 생성된 PacketProcessor
인스턴스를 반환하여 주 프로그램에서 사용할 수 있도록 합니다.
모든 것을 함께 묶기
이제 이러한 모든 요소를 우리 프로그램의 드라이버 기능으로 작용할 main
함수와 함께 모두 묶어 봅시다.
def main():
"""Main function to run the dashboard"""
st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
st.title("Real-time Network Traffic Analysis")
# 세션 상태에서 패킷 프로세서 초기화
if 'processor' not in st.session_state:
st.session_state.processor = start_packet_capture()
st.session_state.start_time = time.time()
# 대시보드 레이아웃 생성
col1, col2 = st.columns(2)
# 현재 데이터 가져오기
df = st.session_state.processor.get_dataframe()
# 메트릭스 표시
with col1:
st.metric("Total Packets", len(df))
with col2:
duration = time.time() - st.session_state.start_time
st.metric("Capture Duration", f"{duration:.2f}s")
# 시각화 표시
create_visualizations(df)
# 최근 패킷 표시
st.subheader("Recent Packets")
if len(df) > 0:
st.dataframe(
df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
use_container_width=True
)
# 새로고침 버튼 추가
if st.button('Refresh Data'):
st.rerun()
# 자동 새로고침
time.sleep(2)
st.rerun()
이 함수는 또한 Streamlit
대시보드를 인스턴스화하고 모든 구성 요소를 통합합니다. 먼저 Streamlit
대시보드의 페이지 제목을 설정한 다음 PacketProcessor
를 초기화합니다. Streamlit
의 세션 상태를 사용하여 패킷 캡처의 인스턴스가 하나만 생성되고 해당 상태가 유지되도록 합니다.
이제 데이터가 처리될 때마다 세션 상태에서 데이터프레임을 동적으로 가져와 메트릭스와 시각화를 표시하기 시작합니다. 또한 타임스탬프, 출발지 및 목적지 IP, 프로토콜, 패킷 크기와 같은 정보와 함께 최근에 캡처된 패킷을 표시합니다. 또한 대시보드에서 사용자가 데이터를 수동으로 새로 고칠 수 있도록하고 2초마다 자동으로 새로 곱니다.
마지막으로 다음 명령을 사용하여 프로그램을 실행합시다:
sudo streamlit run dashboard.py
프로그램을 실행할 때 sudo
를 사용해야 합니다. 패킷 캡처 기능을 사용하려면 관리자 권한이 필요합니다. 윈도우 사용자는 터미널을 관리자 모드로 열고 sudo
접두사 없이 프로그램을 실행하십시오.
프로그램이 패킷을 캡처하기 시작하도록 잠시 기다리세요. 모든 것이 정상적으로 진행되면 다음과 같은 내용을 볼 수 있어야 합니다:
이것들은 방금 Streamlit 대시보드 프로그램에서 구현한 모든 시각화입니다.
향후 개선 사항
이제 대시보드 기능을 확장할 수 있는 몇 가지 향후 개선 아이디어를 제시합니다:
-
이상 징후 탐지를 위한 기계 학습 기능 추가
-
지리적 IP 매핑 구현
-
트래픽 분석 패턴을 기반으로 사용자 정의 경보 생성
-
패킷 페이로드 분석 옵션 추가
결론
축하합니다! 이제 Python과 Streamlit
으로 실시간 네트워크 트래픽 분석 대시보드를 성공적으로 구축했습니다. 이 프로그램은 네트워크 동작에 대한 가치 있는 통찰력을 제공하며 보안 모니터링부터 네트워크 최적화까지 다양한 용도로 확장할 수 있습니다.
그렇게 함으로써, 네트워크 트래픽 분석에 대한 기본 사항과 파이썬 프로그래밍에 대한 약간을 배웠기를 희망합니다. 읽어 주셔서 감사합니다!