Você já quis visualizar seu tráfego de rede em tempo real? Neste tutorial, você aprenderá a construir um painel interativo de análise de tráfego de rede com Python e Streamlit
. Streamlit
é uma estrutura de código aberto em Python que você pode usar para desenvolver aplicações web para análise e processamento de dados.
Ao final deste tutorial, você saberá como capturar pacotes de rede brutos da NIC (Placa de Interface de Rede) do seu computador, processar os dados e criar visualizações bonitas que serão atualizadas em tempo real.
Sumário
Por que a Análise do Tráfego de Rede é Importante?
A análise de tráfego de rede é um requisito crítico em empresas onde as redes são a espinha dorsal de quase todas as aplicações e serviços. No cerne disso, temos a análise de pacotes de rede que envolve monitorar a rede, capturar todo o tráfego (entrada e saída) e interpretar esses pacotes conforme eles fluem por uma rede. Você pode usar essa técnica para identificar padrões de segurança, detectar anomalias e garantir a segurança e eficiência da rede.
Este projeto de prova de conceito em que trabalharemos neste tutorial é particularmente útil, pois ajuda a visualizar e analisar a atividade da rede em tempo real. E isso permitirá que você entenda como são feitas a solução de problemas, otimizações de desempenho e análises de segurança em sistemas empresariais.
Pré-requisitos
-
O Python 3.8 ou uma versão mais recente instalada em seu sistema.
-
Um entendimento básico de conceitos de redes de computadores.
-
Familiaridade com a linguagem de programação Python e suas bibliotecas amplamente utilizadas.
-
Conhecimento básico de técnicas e bibliotecas de visualização de dados.
Como Configurar seu Projeto
Para começar, crie a estrutura do projeto e instale as ferramentas necessárias com o Pip com os seguintes comandos:
mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly
Usaremos Streamlit
para as visualizações do painel, Pandas
para o processamento de dados, Scapy
para captura de pacotes de rede e processamento de pacotes, e finalmente Plotly
para plotar gráficos com nossos dados coletados.
Como Construir as Funcionalidades Principais
Colocaremos todo o código em um único arquivo chamado dashboard.py
. Primeiro, vamos começar importando todos os elementos que usaremos:
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket
Agora vamos configurar o registro de logs configurando uma configuração básica de logging. Isso será usado para rastrear eventos e executar nossa aplicação em modo de depuração. Atualmente, definimos o nível de logging como INFO
, o que significa que eventos com nível INFO
ou superior serão exibidos. Se você não está familiarizado com logging em Python, eu recomendo conferir este documento que explora o assunto em profundidade.
# Configurar logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
Em seguida, vamos construir nosso processador de pacotes. Vamos implementar a funcionalidade de processar nossos pacotes capturados nesta classe.
class PacketProcessor:
"""Process and analyze network packets"""
def __init__(self):
self.protocol_map = {
1: 'ICMP',
6: 'TCP',
17: 'UDP'
}
self.packet_data = []
self.start_time = datetime.now()
self.packet_count = 0
self.lock = threading.Lock()
def get_protocol_name(self, protocol_num: int) -> str:
"""Convert protocol number to name"""
return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')
def process_packet(self, packet) -> None:
"""Process a single packet and extract relevant information"""
try:
if IP in packet:
with self.lock:
packet_info = {
'timestamp': datetime.now(),
'source': packet[IP].src,
'destination': packet[IP].dst,
'protocol': self.get_protocol_name(packet[IP].proto),
'size': len(packet),
'time_relative': (datetime.now() - self.start_time).total_seconds()
}
# Adicionar informações específicas do TCP
if TCP in packet:
packet_info.update({
'src_port': packet[TCP].sport,
'dst_port': packet[TCP].dport,
'tcp_flags': packet[TCP].flags
})
# Adicionar informações específicas do UDP
elif UDP in packet:
packet_info.update({
'src_port': packet[UDP].sport,
'dst_port': packet[UDP].dport
})
self.packet_data.append(packet_info)
self.packet_count += 1
# Manter apenas os últimos 10000 pacotes para evitar problemas de memória
if len(self.packet_data) > 10000:
self.packet_data.pop(0)
except Exception as e:
logger.error(f"Error processing packet: {str(e)}")
def get_dataframe(self) -> pd.DataFrame:
"""Convert packet data to pandas DataFrame"""
with self.lock:
return pd.DataFrame(self.packet_data)
Esta classe construirá nossa funcionalidade principal e possui várias funções utilitárias que serão usadas para processar os pacotes.
Pacotes de rede são categorizados em dois níveis de transporte (TCP e UDP) e o protocolo ICMP no nível de rede. Se você não está familiarizado com os conceitos de TCP/IP, eu recomendo conferir este artigo no freeCodeCamp News.
Nosso construtor manterá o controle de todos os pacotes vistos que são categorizados em esses buckets de tipos de protocolo TCP/IP que definimos. Também iremos tomar nota do tempo de captura do pacote, dos dados capturados e do número de pacotes capturados.
Também estaremos utilizando um bloqueio de thread para garantir que apenas um pacote seja processado por vez. Isso pode ser estendido ainda mais para permitir que o projeto tenha processamento de pacotes paralelo.
A função auxiliar get_protocol_name
nos ajuda a obter o tipo correto do protocolo com base em seus números de protocolo. Para dar um pouco de contexto sobre isso, a Autoridade de Números Designados para Internet (IANA) atribui números padronizados para identificar diferentes protocolos em um pacote de rede. Assim que vemos esses números no pacote de rede analisado, saberemos que tipo de protocolo está sendo usado no pacote atual interceptado. Para o escopo deste projeto, faremos o mapeamento apenas para TCP, UDP e ICMP (Ping). Se encontrarmos outro tipo de pacote, o categorizaremos como OUTRO(<número_do_protocolo>)
.
A função process_packet
lidará com nossa funcionalidade principal que processará esses pacotes individuais. Se o pacote contiver uma camada IP, ele tomará nota dos endereços IP de origem e destino, tipo de protocolo, tamanho do pacote e tempo decorrido desde o início da captura do pacote.
Para pacotes com protocolos específicos da camada de transporte (como TCP e UDP), capturaremos as portas de origem e destino juntamente com as flags TCP para pacotes TCP. Esses detalhes extraídos serão armazenados na memória na lista packet_data
. Também manteremos o controle da packet_count
conforme esses pacotes forem processados.
A função get_dataframe
ajuda-nos a converter a lista packet_data
em um data-frame Pandas
que será utilizado para nossa visualização.
Como Criar as Visualizações no Streamlit
Agora é hora de construir nosso Painel Interativo no Streamlit. Vamos definir uma função chamada create_visualization
no script dashboard.py
(fora de nossa classe de processamento de pacotes).
def create_visualizations(df: pd.DataFrame):
"""Create all dashboard visualizations"""
if len(df) > 0:
# Distribuição de Protocolo
protocol_counts = df['protocol'].value_counts()
fig_protocol = px.pie(
values=protocol_counts.values,
names=protocol_counts.index,
title="Protocol Distribution"
)
st.plotly_chart(fig_protocol, use_container_width=True)
# Linha do tempo de Pacotes
df['timestamp'] = pd.to_datetime(df['timestamp'])
df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
fig_timeline = px.line(
x=df_grouped.index,
y=df_grouped.values,
title="Packets per Second"
)
st.plotly_chart(fig_timeline, use_container_width=True)
# Principais IPs de Origem
top_sources = df['source'].value_counts().head(10)
fig_sources = px.bar(
x=top_sources.index,
y=top_sources.values,
title="Top Source IP Addresses"
)
st.plotly_chart(fig_sources, use_container_width=True)
Esta função receberá o data frame como entrada e nos ajudará a plotar três gráficos:
-
Gráfico de Distribuição de Protocolo: Este gráfico exibirá a proporção de diferentes protocolos (por exemplo, TCP, UDP, ICMP) no tráfego de pacotes capturado.
-
Gráfico da Linha do Tempo de Pacotes: Este gráfico mostrará o número de pacotes processados por segundo ao longo de um período de tempo.
-
Gráfico dos Principais Endereços IP de Origem: Este gráfico destacará os 10 principais endereços IP que enviaram a maior quantidade de pacotes no tráfego capturado.
O gráfico de distribuição de protocolos é simplesmente um gráfico de pizza das contagens de protocolos para os três tipos diferentes (junto com OUTROS). Usamos as ferramentas Python Streamlit
e Plotly
para traçar esses gráficos. Como também anotamos a marca de tempo desde o início da captura de pacotes, usaremos esses dados para traçar a tendência de pacotes capturados ao longo do tempo.
Para o segundo gráfico, faremos uma operação groupby
nos dados e obteremos o número de pacotes capturados em cada segundo (S
representa segundos), e então finalmente traçaremos o gráfico.
Finalmente, para o terceiro gráfico, contaremos os IPs de origem distintos observados e traçaremos um gráfico das contagens de IPs para mostrar os 10 principais IPs.
Como Capturar os Pacotes de Rede
Agora, vamos construir a funcionalidade para nos permitir capturar os dados dos pacotes de rede.
def start_packet_capture():
"""Start packet capture in a separate thread"""
processor = PacketProcessor()
def capture_packets():
sniff(prn=processor.process_packet, store=False)
capture_thread = threading.Thread(target=capture_packets, daemon=True)
capture_thread.start()
return processor
Esta é uma função simples que instancia a classe PacketProcessor
e então usa a função sniff
no módulo scapy
para começar a capturar os pacotes.
Usamos threading aqui para nos permitir capturar pacotes independentemente do fluxo principal do programa. Isso garante que a operação de captura de pacotes não bloqueie outras operações, como atualizar o painel em tempo real. Também retornamos a instância criada de PacketProcessor
para que possa ser usada em nosso programa principal.
Colocando Tudo Junto
Agora vamos juntar todas essas peças com nossa função main
que atuará como a função principal do nosso programa.
def main():
"""Main function to run the dashboard"""
st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
st.title("Real-time Network Traffic Analysis")
# Inicializar processador de pacotes no estado da sessão
if 'processor' not in st.session_state:
st.session_state.processor = start_packet_capture()
st.session_state.start_time = time.time()
# Criar layout do painel
col1, col2 = st.columns(2)
# Obter dados atuais
df = st.session_state.processor.get_dataframe()
# Exibir métricas
with col1:
st.metric("Total Packets", len(df))
with col2:
duration = time.time() - st.session_state.start_time
st.metric("Capture Duration", f"{duration:.2f}s")
# Exibir visualizações
create_visualizations(df)
# Exibir pacotes recentes
st.subheader("Recent Packets")
if len(df) > 0:
st.dataframe(
df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
use_container_width=True
)
# Adicionar botão de atualização
if st.button('Refresh Data'):
st.rerun()
# Atualização automática
time.sleep(2)
st.rerun()
Esta função também irá instanciar o painel Streamlit
e integrar todos os nossos componentes juntos. Primeiro, definimos o título da página do nosso painel Streamlit
e então inicializamos nosso PacketProcessor
. Usamos o estado da sessão no Streamlit
para garantir que apenas uma instância da captura de pacotes seja criada e que seu estado seja mantido.
Agora, obteremos dinamicamente o dataframe do estado da sessão toda vez que os dados forem processados e começaremos a exibir as métricas e as visualizações. Também exibiremos os pacotes capturados recentemente, juntamente com informações como timestamp, IPs de origem e destino, protocolo e tamanho do pacote. Adicionaremos também a capacidade do usuário de atualizar manualmente os dados do painel, enquanto também os atualizamos automaticamente a cada dois segundos.
Vamos finalmente executar o programa com o seguinte comando:
sudo streamlit run dashboard.py
Observe que você terá que executar o programa com sudo
pois as capacidades de captura de pacotes requerem privilégios de administrador. Se você estiver no Windows, abra seu terminal como Administrador e então execute o programa sem o prefixo sudo
.
Dê um momento para o programa começar a capturar os pacotes. Se tudo correr bem, você deverá ver algo assim:
Essas são todas as visualizações que acabamos de implementar em nosso programa de painel Streamlit
.
Próximas melhorias
Com isso, aqui estão algumas ideias de melhorias futuras que você pode usar para estender as funcionalidades do painel:
-
Adicionar capacidades de aprendizado de máquina para detecção de anomalias
-
Implementar mapeamento de IP geográfico
-
Criar alertas personalizados com base em padrões de análise de tráfego
-
Adicionar opções de análise de carga útil de pacotes
Conclusão
Parabéns! Você agora construiu com sucesso um painel de análise de tráfego de rede em tempo real com Python e Streamlit
. Este programa fornecerá insights valiosos sobre o comportamento da rede e pode ser estendido para diversos casos de uso, desde monitoramento de segurança até otimização de rede.
Com isso, espero que você tenha aprendido alguns conceitos básicos sobre análise de tráfego de rede, bem como um pouco de programação em Python. Obrigado por ler!