Na cibersegurança, um honeypot é um sistema de isca projetado para atrair e detectar possíveis atacantes tentando comprometer o sistema. Assim como um pote de mel deixado aberto atrairia moscas.
Pense nesses honeypots como câmeras de segurança para o seu sistema. Assim como uma câmera de segurança nos ajuda a entender quem está tentando invadir um prédio e como estão fazendo isso, esses honeypots ajudarão você a entender quem está tentando atacar seu sistema e que técnicas estão usando.
Até o final deste tutorial, você será capaz de escrever um honeypot de demonstração em Python e entender como os honeypots funcionam.
Sumário
Entendendo os Tipos de Honeypots
Antes de começarmos a projetar nosso próprio honeypot, vamos rapidamente entender seus diferentes tipos:
-
Honeypots de Produção: Esses tipos de honeypots são colocados em um ambiente de produção real e são usados para detectar ataques de segurança reais. Eles são tipicamente simples em design, fáceis de manter e implantar, e oferecem interação limitada para reduzir riscos.
-
Honeypots de Pesquisa: Esses são sistemas mais complexos configurados por pesquisadores de segurança para estudar padrões de ataque, realizar análises empíricas sobre esses padrões, coletar amostras de malware e entender novas técnicas de ataque que não foram descobertas anteriormente. Eles frequentemente emulam sistemas operacionais ou redes inteiras em vez de se comportarem como uma aplicação no ambiente de produção.
Para este tutorial, estaremos construindo um honeypot de interação média que registra tentativas de conexão e comportamentos básicos dos atacantes.
Como Configurar Seu Ambiente de Desenvolvimento
Vamos começar configurando seu ambiente de desenvolvimento em Python. Execute os seguintes comandos:
import socket
import sys
import datetime
import json
import threading
from pathlib import Path
# Configurar diretório de logs
LOG_DIR = Path("honeypot_logs")
LOG_DIR.mkdir(exist_ok=True)
Vamos nos manter nas bibliotecas integradas, portanto, não será necessário instalar dependências externas. Armazenaremos nossos logs no diretório honeypot_logs
.
Como Construir o Core Honeypot
Nosso honeypot básico será composto por três componentes:
-
Um listener de rede que aceita conexões
-
Um sistema de registro para gravar atividades
-
Um serviço de emulação básico para interagir com os atacantes
Agora vamos começar inicializando a classe core Honeypot:
class Honeypot:
def __init__(self, bind_ip="0.0.0.0", ports=None):
self.bind_ip = bind_ip
self.ports = ports or [21, 22, 80, 443] # Portas padrão a serem monitoradas
self.active_connections = {}
self.log_file = LOG_DIR / f"honeypot_{datetime.datetime.now().strftime('%Y%m%d')}.json"
def log_activity(self, port, remote_ip, data):
"""Log suspicious activity with timestamp and details"""
activity = {
"timestamp": datetime.datetime.now().isoformat(),
"remote_ip": remote_ip,
"port": port,
"data": data.decode('utf-8', errors='ignore')
}
with open(self.log_file, 'a') as f:
json.dump(activity, f)
f.write('\n')
def handle_connection(self, client_socket, remote_ip, port):
"""Handle individual connections and emulate services"""
service_banners = {
21: "220 FTP server ready\r\n",
22: "SSH-2.0-OpenSSH_8.2p1 Ubuntu-4ubuntu0.1\r\n",
80: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n",
443: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n"
}
try:
# Enviar banner apropriado para o serviço
if port in service_banners:
client_socket.send(service_banners[port].encode())
# Receber dados do atacante
while True:
data = client_socket.recv(1024)
if not data:
break
self.log_activity(port, remote_ip, data)
# Enviar resposta falsa
client_socket.send(b"Command not recognized.\r\n")
except Exception as e:
print(f"Error handling connection: {e}")
finally:
client_socket.close()
Esta classe contém muitas informações importantes, então vamos analisar cada função uma por uma.
A função __init__
registra os números de IP e porta nos quais hospedaremos o honeypot, bem como o caminho/nome do arquivo de log. Também manteremos um registro do número total de conexões ativas que temos com o honeypot.
A função log_activity
vai receber as informações sobre o IP, os dados e a porta para a qual o IP tentou uma conexão. Em seguida, anexaremos essas informações ao nosso arquivo de log formatado em JSON.
A função handle_connection
vai imitar esses serviços que estarão rodando nas diferentes portas que temos. Teremos o honeypot rodando nas portas 21, 22, 80 e 443. Esses serviços são para FTP, SSH, HTTP e o protocolo HTTPS, respectivamente. Portanto, qualquer atacante que tentar interagir com o honeypot deve esperar esses serviços nessas portas.
Para imitar o comportamento desses serviços, usaremos os banners de serviço que eles usam na realidade. Esta função enviará primeiro o banner apropriado quando o atacante se conectar e, em seguida, receberá os dados e os registrará. O honeypot também enviará uma resposta falsa “Comando não reconhecido” de volta para o atacante.
Implemente os Escutadores de Rede
Agora vamos implementar os ouvintes de rede que irão lidar com as conexões de entrada. Para isso, estaremos usando programação de soquetes simples. Se você não sabe como a programação de soquetes funciona, veja este artigo que explica alguns conceitos relacionados.
def start_listener(self, port):
"""Start a listener on specified port"""
try:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((self.bind_ip, port))
server.listen(5)
print(f"[*] Listening on {self.bind_ip}:{port}")
while True:
client, addr = server.accept()
print(f"[*] Accepted connection from {addr[0]}:{addr[1]}")
# Lidar com conexão em uma thread separada
client_handler = threading.Thread(
target=self.handle_connection,
args=(client, addr[0], port)
)
client_handler.start()
except Exception as e:
print(f"Error starting listener on port {port}: {e}")
A função start_listener
iniciará o servidor e ouvirá na porta fornecida. O bind_ip
para nós será 0.0.0.0
, o que indica que o servidor estará ouvindo em todas as interfaces de rede.
Agora, lidaremos com cada nova conexão em uma thread separada, uma vez que pode haver situações em que múltiplos atacantes tentam interagir com o honeypot ou um script ou ferramenta de ataque está escaneando o honeypot. Se você não sabe como threading funciona, você pode ver este artigo que explica threading e concorrência em Python.
Além disso, certifique-se de colocar esta função na classe principal Honeypot
.
Executar o Honeypot
Agora vamos criar a função main
que iniciará nosso honeypot.
def main():
honeypot = Honeypot()
# Iniciar ouvintes para cada porta em threads separadas
for port in honeypot.ports:
listener_thread = threading.Thread(
target=honeypot.start_listener,
args=(port,)
)
listener_thread.daemon = True
listener_thread.start()
try:
# Manter a thread principal viva
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n[*] Shutting down honeypot...")
sys.exit(0)
if __name__ == "__main__":
main()
Esta função instancia a classe Honeypot
e inicia os ouvintes para cada uma de nossas portas definidas (21, 22, 80, 443) como uma thread separada. Agora, manteremos nossa thread principal que está executando nosso programa real viva, colocando-a em um loop infinito. Coloque tudo isso junto em um script e execute-o.
Escreva o Simulador de Ataques Honeypot
Agora vamos tentar simular alguns cenários de ataque e direcionar nosso honeypot para que possamos coletar alguns dados em nosso arquivo de log JSON.
Este simulador nos ajudará a demonstrar alguns aspectos importantes sobre honeypots:
-
Padrões de ataque realistas: O simulador irá simular padrões de ataque comuns, como varredura de portas, tentativas de força bruta e explorações específicas de serviços.
-
Intensidade variável: O simulador ajustará a intensidade da simulação para testar como seu honeypot lida com diferentes cargas.
-
Vários tipos de ataque: Ele demonstrará diferentes tipos de ataques que atacantes reais podem tentar, ajudando você a entender como seu honeypot responde a cada um.
-
Conexões simultâneas: O simulador usará threads para testar como seu honeypot lida com várias conexões simultâneas.
# honeypot_simulator.py
import socket
import time
import random
import threading
from concurrent.futures import ThreadPoolExecutor
import argparse
class HoneypotSimulator:
"""
A class to simulate different types of connections and attacks against our honeypot.
This helps in testing the honeypot's logging and response capabilities.
"""
def __init__(self, target_ip="127.0.0.1", intensity="medium"):
# Configuração para o simulador
self.target_ip = target_ip
self.intensity = intensity
# Portas comuns que os atacantes costumam explorar
self.target_ports = [21, 22, 23, 25, 80, 443, 3306, 5432]
# Dicionário de comandos comuns usados por atacantes para diferentes serviços
self.attack_patterns = {
21: [ # Comandos FTP
"USER admin\r\n",
"PASS admin123\r\n",
"LIST\r\n",
"STOR malware.exe\r\n"
],
22: [ # Tentativas de SSH
"SSH-2.0-OpenSSH_7.9\r\n",
"admin:password123\n",
"root:toor\n"
],
80: [ # Requisições HTTP
"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n",
"POST /admin HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
"GET /wp-admin HTTP/1.1\r\nHost: localhost\r\n\r\n"
]
}
# Configurações de intensidade afetam a frequência e o volume de ataques simulados
self.intensity_settings = {
"low": {"max_threads": 2, "delay_range": (1, 3)},
"medium": {"max_threads": 5, "delay_range": (0.5, 1.5)},
"high": {"max_threads": 10, "delay_range": (0.1, 0.5)}
}
def simulate_connection(self, port):
"""
Simulates a connection attempt to a specific port with realistic attack patterns
"""
try:
# Criar uma nova conexão de socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
print(f"[*] Attempting connection to {self.target_ip}:{port}")
sock.connect((self.target_ip, port))
# Obter banner, se houver
banner = sock.recv(1024)
print(f"[+] Received banner from port {port}: {banner.decode('utf-8', 'ignore').strip()}")
# Enviar padrões de ataque com base na porta
if port in self.attack_patterns:
for command in self.attack_patterns[port]:
print(f"[*] Sending command to port {port}: {command.strip()}")
sock.send(command.encode())
# Aguardar resposta
try:
response = sock.recv(1024)
print(f"[+] Received response: {response.decode('utf-8', 'ignore').strip()}")
except socket.timeout:
print(f"[-] No response received from port {port}")
# Adicionar atraso realista entre comandos
time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))
sock.close()
except ConnectionRefusedError:
print(f"[-] Connection refused on port {port}")
except socket.timeout:
print(f"[-] Connection timeout on port {port}")
except Exception as e:
print(f"[-] Error connecting to port {port}: {e}")
def simulate_port_scan(self):
"""
Simulates a basic port scan across common ports
"""
print(f"\n[*] Starting port scan simulation against {self.target_ip}")
for port in self.target_ports:
self.simulate_connection(port)
time.sleep(random.uniform(0.1, 0.3))
def simulate_brute_force(self, port):
"""
Simulates a brute force attack against a specific service
"""
common_usernames = ["admin", "root", "user", "test"]
common_passwords = ["password123", "admin123", "123456", "root"]
print(f"\n[*] Starting brute force simulation against port {port}")
for username in common_usernames:
for password in common_passwords:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect((self.target_ip, port))
if port == 21: # FTP
sock.send(f"USER {username}\r\n".encode())
sock.recv(1024)
sock.send(f"PASS {password}\r\n".encode())
elif port == 22: # SSH
sock.send(f"{username}:{password}\n".encode())
sock.close()
time.sleep(random.uniform(0.1, 0.3))
except Exception as e:
print(f"[-] Error in brute force attempt: {e}")
def run_continuous_simulation(self, duration=300):
"""
Runs a continuous simulation for a specified duration
"""
print(f"\n[*] Starting continuous simulation for {duration} seconds")
print(f"[*] Intensity level: {self.intensity}")
end_time = time.time() + duration
with ThreadPoolExecutor(
max_workers=self.intensity_settings[self.intensity]["max_threads"]
) as executor:
while time.time() < end_time:
# Mistura de diferentes padrões de ataque
simulation_choices = [
lambda: self.simulate_port_scan(),
lambda: self.simulate_brute_force(21),
lambda: self.simulate_brute_force(22),
lambda: self.simulate_connection(80)
]
# Escolher aleatoriamente e executar um padrão de ataque
executor.submit(random.choice(simulation_choices))
time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))
def main():
"""
Main function to run the honeypot simulator with command-line arguments
"""
parser = argparse.ArgumentParser(description="Honeypot Attack Simulator")
parser.add_argument("--target", default="127.0.0.1", help="Target IP address")
parser.add_argument(
"--intensity",
choices=["low", "medium", "high"],
default="medium",
help="Simulation intensity level"
)
parser.add_argument(
"--duration",
type=int,
default=300,
help="Simulation duration in seconds"
)
args = parser.parse_args()
simulator = HoneypotSimulator(args.target, args.intensity)
try:
simulator.run_continuous_simulation(args.duration)
except KeyboardInterrupt:
print("\n[*] Simulation interrupted by user")
except Exception as e:
print(f"[-] Simulation error: {e}")
finally:
print("\n[*] Simulation complete")
if __name__ == "__main__":
main()
Temos muito acontecendo neste script de simulação, então vamos dividi-lo um por um. Também adicionei comentários para cada função e operação para tornar isso um pouco mais legível no código.
Primeiro, temos nossa classe utilitária chamada HoneypotSimulator
. Nesta classe, temos a função __init__
que configura a configuração básica para nosso simulador. Ela recebe dois parâmetros: um endereço IP de destino (com valor padrão localhost) e um nível de intensidade (com valor padrão “médio”).
Também definimos três componentes importantes: as portas de destino a serem sondadas (serviços comuns como FTP, SSH, HTTP), padrões de ataque específicos para cada serviço (como tentativas de login e comandos), e configurações de intensidade que controlam quão agressiva nossa simulação será através da contagem de threads e atrasos de tempo.
A função simulate_connection
lida com tentativas de conexão individuais a uma porta específica. Ela cria uma conexão de socket, tenta obter quaisquer banners de serviço (como informações da versão do SSH) e, em seguida, envia comandos de ataque apropriados com base no tipo de serviço. Adicionamos tratamento de erros para problemas comuns de rede e também adicionamos atrasos realistas entre os comandos para simular a interação humana.
Nossa função simulate_port_scan
atua como uma ferramenta de reconhecimento, que verificará sistematicamente cada porta em nossa lista de destinos. É semelhante a como ferramentas como nmap
funcionam – passando pelas portas uma a uma para ver quais serviços estão disponíveis. Para cada porta, ela chama a função simulate_connection
e adiciona pequenos atrasos aleatórios para tornar o padrão de varredura mais natural.
A função simulate_brute_force
mantém listas de nomes de usuários e senhas comuns, tentando diferentes combinações contra serviços como FTP e SSH. Para cada tentativa, ela cria uma nova conexão, envia as credenciais de login no formato correto para aquele serviço e, em seguida, fecha a conexão. Isso nos ajuda a testar quão bem o honeypot detecta e registra ataques de credential stuffing.
A função run_continuous_simulation
executa por uma duração especificada, escolhendo aleatoriamente entre diferentes tipos de ataque, como varredura de portas, força bruta ou ataques a serviços específicos. Ela usa o ThreadPoolExecutor
do Python para executar múltiplos ataques simultaneamente com base no nível de intensidade especificado.
Finalmente, temos a função main
que fornece a interface de linha de comando para o simulador. Ela utiliza argparse
para lidar com argumentos de linha de comando, permitindo que os usuários especifiquem o IP alvo, nível de intensidade e duração da simulação. Ela cria uma instância da classe HoneypotSimulator
e gerencia a execução geral, incluindo o manejo adequado de interrupções e erros do usuário.
Depois de colocar o código do simulador em um script separado, execute-o com o seguinte comando:
# Execute com configurações padrão (intensidade média, localhost, 5 minutos)
python honeypot_simulator.py
# Execute com configurações personalizadas
python honeypot_simulator.py --target 192.168.1.100 --intensity high --duration 600
Uma vez que estamos executando o honeypot e o simulador na mesma máquina localmente, o alvo será localhost
. Mas pode ser outra coisa em um cenário real ou se você estiver executando o honeypot em uma VM ou em uma máquina diferente – então certifique-se de confirmar o IP antes de executar o simulador.
Como Analisar Dados do Honeypot
Vamos rapidamente escrever uma função auxiliar que nos permitirá analisar todos os dados coletados pelo Honeypot. Uma vez que armazenamos isso em um arquivo de log JSON, podemos convenientemente analisá-lo usando o pacote JSON integrado.
import datetime
import json
def analyze_logs(log_file):
"""Enhanced honeypot log analysis with temporal and behavioral patterns"""
ip_analysis = {}
port_analysis = {}
hourly_attacks = {}
data_patterns = {}
# Rastrear padrões de sessão
ip_sessions = {}
attack_timeline = []
with open(log_file, 'r') as f:
for line in f:
try:
activity = json.loads(line)
timestamp = datetime.datetime.fromisoformat(activity['timestamp'])
ip = activity['remote_ip']
port = activity['port']
data = activity['data']
# Inicializar rastreamento de IP se for novo
if ip not in ip_analysis:
ip_analysis[ip] = {
'total_attempts': 0,
'first_seen': timestamp,
'last_seen': timestamp,
'targeted_ports': set(),
'unique_payloads': set(),
'session_count': 0
}
# Atualizar estatísticas de IP
ip_analysis[ip]['total_attempts'] += 1
ip_analysis[ip]['last_seen'] = timestamp
ip_analysis[ip]['targeted_ports'].add(port)
ip_analysis[ip]['unique_payloads'].add(data.strip())
# Rastrear padrões por hora
hour = timestamp.hour
hourly_attacks[hour] = hourly_attacks.get(hour, 0) + 1
# Analisar padrões de direcionamento de porta
if port not in port_analysis:
port_analysis[port] = {
'total_attempts': 0,
'unique_ips': set(),
'unique_payloads': set()
}
port_analysis[port]['total_attempts'] += 1
port_analysis[port]['unique_ips'].add(ip)
port_analysis[port]['unique_payloads'].add(data.strip())
# Rastrear padrões de carga útil
if data.strip():
data_patterns[data.strip()] = data_patterns.get(data.strip(), 0) + 1
# Rastrear linha do tempo do ataque
attack_timeline.append({
'timestamp': timestamp,
'ip': ip,
'port': port
})
except (json.JSONDecodeError, KeyError) as e:
continue
# Geração de Relatório de Análise
print("\n=== Honeypot Analysis Report ===")
# 1. Análise Baseada em IP
print("\nTop 10 Most Active IPs:")
sorted_ips = sorted(ip_analysis.items(),
key=lambda x: x[1]['total_attempts'],
reverse=True)[:10]
for ip, stats in sorted_ips:
duration = stats['last_seen'] - stats['first_seen']
print(f"\nIP: {ip}")
print(f"Total Attempts: {stats['total_attempts']}")
print(f"Active Duration: {duration}")
print(f"Unique Ports Targeted: {len(stats['targeted_ports'])}")
print(f"Unique Payloads: {len(stats['unique_payloads'])}")
# 2. Análise de Porta
print("\nPort Targeting Analysis:")
sorted_ports = sorted(port_analysis.items(),
key=lambda x: x[1]['total_attempts'],
reverse=True)
for port, stats in sorted_ports:
print(f"\nPort {port}:")
print(f"Total Attempts: {stats['total_attempts']}")
print(f"Unique Attackers: {len(stats['unique_ips'])}")
print(f"Unique Payloads: {len(stats['unique_payloads'])}")
# 3. Análise Temporal
print("\nHourly Attack Distribution:")
for hour in sorted(hourly_attacks.keys()):
print(f"Hour {hour:02d}: {hourly_attacks[hour]} attempts")
# 4. Análise de Sofisticação do Ataque
print("\nAttacker Sophistication Analysis:")
for ip, stats in sorted_ips:
sophistication_score = (
len(stats['targeted_ports']) * 0.4 + # Diversidade de Portas
len(stats['unique_payloads']) * 0.6 # Diversidade de Carga Útil
)
print(f"IP {ip}: Sophistication Score {sophistication_score:.2f}")
# 5. Padrões Comuns de Carga Útil
print("\nTop 10 Most Common Payloads:")
sorted_payloads = sorted(data_patterns.items(),
key=lambda x: x[1],
reverse=True)[:10]
for payload, count in sorted_payloads:
if len(payload) > 50: # Truncar cargas úteis longas
payload = payload[:50] + "..."
print(f"Count {count}: {payload}")
Você pode colocar isso em um arquivo de script separado e chamar a função nos logs JSON. Essa função nos fornecerá insights abrangentes do arquivo JSON com base nos dados coletados.
Nossa análise começa agrupando os dados em várias categorias, como estatísticas baseadas em IP, padrões de direcionamento de portas, distribuições de ataques por hora e características de carga. Para cada IP, estamos rastreando tentativas totais, horários de primeira e última visualização, portas-alvo e cargas únicas. Isso nos ajudará a construir perfis únicos para os atacantes.
Também examinamos padrões de ataque baseados em portas que monitoram as portas mais frequentemente visadas e por quantos atacantes únicos. Também realizamos uma análise de sofisticação de ataques que nos ajuda a identificar atacantes direcionados, considerando fatores como portas visadas e cargas únicas usadas. Essa análise é usada para separar atividades simples de escaneamento e ataques sofisticados.
A análise temporal nos ajuda a identificar padrões em tentativas de ataque por hora, revelando padrões no tempo de ataque e potenciais campanhas de direcionamento automatizado. Por fim, publicamos cargas comumente vistas para identificar strings ou comandos de ataque frequentemente vistos.
Considerações de Segurança
Ao implantar esta armadilha, certifique-se de considerar as seguintes medidas de segurança:
-
Execute sua armadilha em um ambiente isolado. Tipicamente dentro de uma VM, ou em sua máquina local que está atrás de um NAT e um firewall.
-
Execute a armadilha com privilégios mínimos do sistema (tipicamente não como root) para reduzir o risco em caso de comprometimento.
-
Tenha cautela com os dados coletados se você planeja usá-los algum dia como um honeypot de produção ou de pesquisa, pois eles podem conter malware ou informações sensíveis.
-
Implemente mecanismos de monitoramento robustos para detectar tentativas de escapar do ambiente do honeypot.
Conclusão
Com isso, construímos nosso honeypot, escrevemos um simulador para simular ataques ao nosso honeypot e analisamos os dados dos logs do nosso honeypot para fazer algumas inferências simples. É uma excelente maneira de entender tanto conceitos de segurança ofensiva quanto defensiva. Você pode considerar construir sobre isso para criar sistemas de detecção mais complexos e pensar em adicionar recursos como:
-
Emulação de serviço dinâmica baseada no comportamento do ataque
-
Integração com sistemas de inteligência de ameaças que realizarão uma análise de inferência melhor desses logs coletados do honeypot
-
Coletar logs ainda mais abrangentes além dos dados de IP, porta e rede através de mecanismos avançados de registro
-
Adicionar capacidades de aprendizado de máquina para detectar padrões de ataque
Lembre-se de que, embora os honeypots sejam ferramentas de segurança poderosas, eles devem fazer parte de uma estratégia de segurança defensiva abrangente, e não a única linha de defesa.
Espero que você tenha aprendido como os honeypots funcionam, qual é o seu propósito, além de um pouco de programação em Python também!
Source:
https://www.freecodecamp.org/news/build-a-honeypot-with-python/