Você já quis visualizar seu tráfego de rede em tempo real? Neste tutorial, você aprenderá como construir um painel interativo de análise de tráfego de rede com Python e Streamlit
. Streamlit
é um framework Python de código aberto que você pode usar para desenvolver aplicações web para análise e processamento de dados.
Ao final deste tutorial, você saberá como capturar pacotes de rede brutos do NIC (Placa de Interface de Rede) do seu computador, processar os dados e criar belas visualizações que se atualizarão em tempo real.
Sumário
Por que a Análise de Tráfego de Rede é Importante?
A análise de tráfego de rede é um requisito crítico em empresas onde as redes formam a espinha dorsal de quase todos os aplicativos e serviços. No cerne disso, temos a análise de pacotes de rede que envolve monitorar a rede, capturar todo o tráfego (entrada e saída) e interpretar esses pacotes enquanto fluem por uma rede. Você pode usar essa técnica para identificar padrões de segurança, detectar anomalias e garantir a segurança e eficiência da rede.
Este projeto de prova de conceito no qual trabalharemos neste tutorial é particularmente útil, pois ajuda você a visualizar e analisar a atividade de rede em tempo real. E isso permitirá que você entenda como a resolução de problemas, as otimizações de desempenho e a análise de segurança são feitas em sistemas empresariais.
Pré-requisitos
-
Python 3.8 ou uma versão mais recente instalada em seu sistema.
-
Uma compreensão básica de conceitos de redes de computadores.
-
Familiaridade com a linguagem de programação Python e suas bibliotecas amplamente utilizadas.
-
Conhecimento básico de técnicas e bibliotecas de visualização de dados.
Como Configurar seu Projeto
Para começar, crie a estrutura do projeto e instale as ferramentas necessárias com o Pip usando os seguintes comandos:
mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly
Vamos utilizar o Streamlit
para as visualizações do painel, o Pandas
para o processamento de dados, o Scapy
para a captura e processamento de pacotes de rede, e finalmente o Plotly
para plotar gráficos com nossos dados coletados.
Como Construir as Funcionalidades Principais
Todas as linhas de código serão colocadas em um único arquivo chamado dashboard.py
. Primeiramente, vamos começar importando todos os elementos que iremos utilizar:
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket
Agora vamos configurar o registro definindo uma configuração básica de log. Isso será usado para rastrear eventos e executar nossa aplicação no modo de depuração. Atualmente, definimos o nível de log como INFO
, o que significa que eventos com nível INFO
ou superior serão exibidos. Se você não está familiarizado com o registro em Python, recomendo dar uma olhada neste documento que explora em detalhes.
# Configurar o registro
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
Em seguida, vamos construir nosso processador de pacotes. Implementaremos a funcionalidade de processar nossos pacotes capturados nesta classe.
class PacketProcessor:
"""Process and analyze network packets"""
def __init__(self):
self.protocol_map = {
1: 'ICMP',
6: 'TCP',
17: 'UDP'
}
self.packet_data = []
self.start_time = datetime.now()
self.packet_count = 0
self.lock = threading.Lock()
def get_protocol_name(self, protocol_num: int) -> str:
"""Convert protocol number to name"""
return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')
def process_packet(self, packet) -> None:
"""Process a single packet and extract relevant information"""
try:
if IP in packet:
with self.lock:
packet_info = {
'timestamp': datetime.now(),
'source': packet[IP].src,
'destination': packet[IP].dst,
'protocol': self.get_protocol_name(packet[IP].proto),
'size': len(packet),
'time_relative': (datetime.now() - self.start_time).total_seconds()
}
# Adicionar informações específicas de TCP
if TCP in packet:
packet_info.update({
'src_port': packet[TCP].sport,
'dst_port': packet[TCP].dport,
'tcp_flags': packet[TCP].flags
})
# Adicionar informações específicas de UDP
elif UDP in packet:
packet_info.update({
'src_port': packet[UDP].sport,
'dst_port': packet[UDP].dport
})
self.packet_data.append(packet_info)
self.packet_count += 1
# Manter apenas os últimos 10000 pacotes para evitar problemas de memória
if len(self.packet_data) > 10000:
self.packet_data.pop(0)
except Exception as e:
logger.error(f"Error processing packet: {str(e)}")
def get_dataframe(self) -> pd.DataFrame:
"""Convert packet data to pandas DataFrame"""
with self.lock:
return pd.DataFrame(self.packet_data)
Esta classe construirá nossa funcionalidade principal e possui várias funções de utilidade que serão usadas para processar os pacotes.
Os pacotes de rede são categorizados em dois níveis de transporte (TCP e UDP) e o protocolo ICMP no nível de rede. Se você não está familiarizado com os conceitos de TCP/IP, recomendo verificar este artigo no freeCodeCamp News.
O nosso construtor irá manter o registo de todos os pacotes vistos que estão categorizados nestes baldes de tipo de protocolo TCP/IP que definimos. Também iremos tomar nota do tempo de captura do pacote, dos dados capturados e do número de pacotes capturados.
Também iremos utilizar um bloqueio de thread para garantir que apenas um pacote seja processado de cada vez. Isto pode ser ainda mais estendido para permitir que o projeto tenha processamento de pacotes paralelo.
A função auxiliar get_protocol_name
ajuda-nos a obter o tipo correto do protocolo com base nos seus números de protocolo. Para dar algum contexto sobre isso, a Autoridade de Números Designados para a Internet (IANA) atribui números padronizados para identificar diferentes protocolos num pacote de rede. Quando vemos estes números no pacote de rede analisado, saberemos que tipo de protocolo está a ser usado no pacote atualmente interceptado. Para o âmbito deste projeto, iremos mapear apenas para TCP, UDP e ICMP (Ping). Se encontrarmos outro tipo de pacote, iremos categorizá-lo como OUTRO(<número_do_protocolo>)
.
A função process_packet
trata da nossa funcionalidade principal que irá processar estes pacotes individuais. Se o pacote contiver uma camada IP, irá tomar nota dos endereços IP de origem e destino, tipo de protocolo, tamanho do pacote e tempo decorrido desde o início da captura do pacote.
Para pacotes com protocolos específicos da camada de transporte (como TCP e UDP), iremos capturar as portas de origem e destino juntamente com as flags TCP para pacotes TCP. Estes detalhes extraídos serão armazenados na memória na lista packet_data
. Também iremos manter o registo da packet_count
à medida que estes pacotes são processados.
A função get_dataframe
ajuda-nos a converter a lista packet_data
num data-frame Pandas
que será posteriormente usado para a nossa visualização.
Como Criar as Visualizações no Streamlit
Agora é hora de construirmos o nosso painel interativo no Streamlit. Vamos definir uma função chamada create_visualization
no script dashboard.py
(fora da nossa classe de processamento de pacotes).
def create_visualizations(df: pd.DataFrame):
"""Create all dashboard visualizations"""
if len(df) > 0:
# Distribuição de Protocolo
protocol_counts = df['protocol'].value_counts()
fig_protocol = px.pie(
values=protocol_counts.values,
names=protocol_counts.index,
title="Protocol Distribution"
)
st.plotly_chart(fig_protocol, use_container_width=True)
# Linha do Tempo de Pacotes
df['timestamp'] = pd.to_datetime(df['timestamp'])
df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
fig_timeline = px.line(
x=df_grouped.index,
y=df_grouped.values,
title="Packets per Second"
)
st.plotly_chart(fig_timeline, use_container_width=True)
# Principais IPs de Origem
top_sources = df['source'].value_counts().head(10)
fig_sources = px.bar(
x=top_sources.index,
y=top_sources.values,
title="Top Source IP Addresses"
)
st.plotly_chart(fig_sources, use_container_width=True)
Esta função irá receber o data frame como entrada e nos ajudará a plotar três gráficos:
-
Gráfico de Distribuição de Protocolo: Este gráfico irá exibir a proporção de diferentes protocolos (por exemplo, TCP, UDP, ICMP) no tráfego de pacotes capturado.
-
Gráfico de Linha do Tempo de Pacotes: Este gráfico irá mostrar o número de pacotes processados por segundo ao longo de um período de tempo.
-
Gráfico de Principais Endereços IP de Origem: Este gráfico irá destacar os 10 principais endereços IP que enviaram a maioria dos pacotes no tráfego capturado.
O gráfico de distribuição de protocolos é simplesmente um gráfico de pizza das contagens de protocolos para os três tipos diferentes (juntamente com OUTROS). Usamos as ferramentas Python Streamlit
e Plotly
para plotar esses gráficos. Como também observamos o carimbo de data/hora desde o início da captura de pacotes, usaremos esses dados para plotar a tendência de pacotes capturados ao longo do tempo.
Para o segundo gráfico, faremos uma operação de groupby
nos dados e obteremos o número de pacotes capturados a cada segundo (S
significa segundos), e então finalmente plotaremos o gráfico.
Por fim, para o terceiro gráfico, contaremos os IPs de origem distintos observados e plotaremos um gráfico das contagens de IPs para mostrar os 10 principais IPs.
Como Capturar os Pacotes de Rede
Agora, vamos construir a funcionalidade para nos permitir capturar dados de pacotes de rede.
def start_packet_capture():
"""Start packet capture in a separate thread"""
processor = PacketProcessor()
def capture_packets():
sniff(prn=processor.process_packet, store=False)
capture_thread = threading.Thread(target=capture_packets, daemon=True)
capture_thread.start()
return processor
Esta é uma função simples que instancia a classe PacketProcessor
e então utiliza a função sniff
no módulo scapy
para começar a capturar os pacotes.
Usamos threading aqui para nos permitir capturar pacotes independentemente do fluxo principal do programa. Isso garante que a operação de captura de pacotes não bloqueie outras operações como a atualização do painel em tempo real. Também retornamos a instância criada de PacketProcessor
para que ela possa ser usada em nosso programa principal.
Juntando Tudo
Agora vamos unir todas essas peças com nossa função main
que atuará como a função principal para nosso programa.
def main():
"""Main function to run the dashboard"""
st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
st.title("Real-time Network Traffic Analysis")
# Inicializar processador de pacotes no estado da sessão
if 'processor' not in st.session_state:
st.session_state.processor = start_packet_capture()
st.session_state.start_time = time.time()
# Criar layout do painel
col1, col2 = st.columns(2)
# Obter dados atuais
df = st.session_state.processor.get_dataframe()
# Exibir métricas
with col1:
st.metric("Total Packets", len(df))
with col2:
duration = time.time() - st.session_state.start_time
st.metric("Capture Duration", f"{duration:.2f}s")
# Exibir visualizações
create_visualizations(df)
# Exibir pacotes recentes
st.subheader("Recent Packets")
if len(df) > 0:
st.dataframe(
df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
use_container_width=True
)
# Adicionar botão de atualização
if st.button('Refresh Data'):
st.rerun()
# Atualização automática
time.sleep(2)
st.rerun()
Esta função também irá instanciar o painel Streamlit
e integrar todos os nossos componentes juntos. Primeiro definimos o título da página do nosso painel Streamlit
e então inicializamos nosso PacketProcessor
. Utilizamos o estado da sessão no Streamlit
para garantir que apenas uma instância de captura de pacotes seja criada e que seu estado seja mantido.
Agora, iremos obter dinamicamente o dataframe do estado da sessão sempre que os dados forem processados e começar a exibir as métricas e as visualizações. Também iremos exibir os pacotes capturados recentemente juntamente com informações como o timestamp, IPs de origem e destino, protocolo e tamanho do pacote. Também adicionaremos a capacidade do usuário de atualizar manualmente os dados no painel, enquanto também os atualizamos automaticamente a cada dois segundos.
Finalmente, vamos executar o programa com o seguinte comando:
sudo streamlit run dashboard.py
Note que você terá que executar o programa com sudo
pois as capacidades de captura de pacotes requerem privilégios administrativos. Se estiver no Windows, abra seu terminal como Administrador e então execute o programa sem o prefixo sudo
.
Dê um momento para o programa começar a capturar pacotes. Se tudo correr bem, você deverá ver algo como isto:
Estas são todas as visualizações que acabamos de implementar em nosso programa de painel Streamlit
.
Aprimoramentos Futuros
Com isso, aqui estão algumas ideias de aprimoramento futuro que você pode usar para estender as funcionalidades do painel:
-
Adicionar capacidades de aprendizado de máquina para detecção de anomalias
-
Implementar mapeamento de IP geográfico
-
Criar alertas personalizados com base em padrões de análise de tráfego
-
Adicionar opções de análise de carga útil de pacotes
Conclusão
Parabéns! Você agora construiu com sucesso um painel de análise de tráfego de rede em tempo real com Python e Streamlit
. Este programa fornecerá insights valiosos sobre o comportamento da rede e pode ser estendido para vários casos de uso, desde monitoramento de segurança até otimização de rede.
Com isso, espero que tenha aprendido alguns conceitos básicos sobre análise de tráfego de rede, bem como um pouco de programação em Python. Obrigado por ler!