Na cibersegurança, um honeypot é um sistema de enganação projetado para atrair e, em seguida, detectar potenciais atacantes que tentam comprometer o sistema. Assim como um pote de mel deixado à vista atrairia moscas.
Considere esses honeypots como câmeras de segurança para o seu sistema. Assim como uma câmera de segurança nos ajuda a entender quem está tentando invadir um prédio e como estão fazendo isso, esses honeypots ajudarão você a entender quem está tentando atacar seu sistema e quais técnicas estão usando.
No final deste tutorial, você será capaz de escrever um honeypot de demonstração em Python e entender como os honeypots funcionam.
Sumário
Compreendendo os Tipos de Honeypots
Antes de começarmos a projetar nosso próprio honeypot, vamos rapidamente entender seus diferentes tipos:
-
Honeypots de Produção: Esses tipos de honeypots são colocados em um ambiente de produção real e são usados para detectar ataques de segurança reais. Eles são tipicamente simples em design, fáceis de manter e implantar, e oferecem interação limitada para reduzir riscos.
-
Honeypots de Pesquisa: Esses são sistemas mais complexos configurados por pesquisadores de segurança para estudar padrões de ataque, realizar análises empíricas sobre esses padrões, coletar amostras de malware e entender novas técnicas de ataque que não foram descobertas anteriormente. Eles frequentemente emulam sistemas operacionais ou redes inteiras em vez de se comportarem como uma aplicação no ambiente de produção.
Para este tutorial, construiremos um honeypot de média interação que registra tentativas de conexão e comportamentos básicos de atacantes.
Como Configurar Seu Ambiente de Desenvolvimento
Vamos começar configurando seu ambiente de desenvolvimento em Python. Execute os seguintes comandos:
import socket
import sys
import datetime
import json
import threading
from pathlib import Path
# Configurar diretório de logs
LOG_DIR = Path("honeypot_logs")
LOG_DIR.mkdir(exist_ok=True)
Manteremos as bibliotecas internas, portanto não será necessário instalar nenhuma dependência externa. Armazenaremos nossos logs no diretório honeypot_logs
.
Como Construir o Core do Honeypot
Nosso honeypot básico será composto por três componentes:
-
Um ouvinte de rede que aceita conexões
-
Um sistema de registro para gravar atividades
-
Um serviço de emulação básico para interagir com atacantes
Agora vamos começar inicializando a classe principal do Honeypot:
class Honeypot:
def __init__(self, bind_ip="0.0.0.0", ports=None):
self.bind_ip = bind_ip
self.ports = ports or [21, 22, 80, 443] # Portos padrão a serem monitorados
self.active_connections = {}
self.log_file = LOG_DIR / f"honeypot_{datetime.datetime.now().strftime('%Y%m%d')}.json"
def log_activity(self, port, remote_ip, data):
"""Log suspicious activity with timestamp and details"""
activity = {
"timestamp": datetime.datetime.now().isoformat(),
"remote_ip": remote_ip,
"port": port,
"data": data.decode('utf-8', errors='ignore')
}
with open(self.log_file, 'a') as f:
json.dump(activity, f)
f.write('\n')
def handle_connection(self, client_socket, remote_ip, port):
"""Handle individual connections and emulate services"""
service_banners = {
21: "220 FTP server ready\r\n",
22: "SSH-2.0-OpenSSH_8.2p1 Ubuntu-4ubuntu0.1\r\n",
80: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n",
443: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n"
}
try:
# Enviar banner apropriado para o serviço
if port in service_banners:
client_socket.send(service_banners[port].encode())
# Receber dados do atacante
while True:
data = client_socket.recv(1024)
if not data:
break
self.log_activity(port, remote_ip, data)
# Enviar resposta falsa
client_socket.send(b"Command not recognized.\r\n")
except Exception as e:
print(f"Error handling connection: {e}")
finally:
client_socket.close()
Esta classe contém muitas informações importantes, então vamos analisar cada função uma por uma.
A função __init__
registra os números de IP e porta nos quais iremos hospedar o honeypot, assim como o caminho / nome do arquivo de log. Também manteremos um registro do número total de conexões ativas que temos com o honeypot.
A função log_activity
vai receber as informações sobre o IP, os dados e a porta à qual o IP tentou se conectar. Em seguida, vamos anexar essas informações ao nosso arquivo de log em formato JSON.
A função handle_connection
vai imitar esses serviços que estarão rodando nas diferentes portas que temos. Teremos o honeypot rodando nas portas 21, 22, 80 e 443. Esses serviços são para FTP, SSH, HTTP e o protocolo HTTPS, respectivamente. Portanto, qualquer atacante que tentar interagir com o honeypot deve esperar esses serviços nessas portas.
Para imitar o comportamento desses serviços, usaremos os banners de serviço que eles usam na realidade. Esta função primeiro enviará o banner apropriado quando o atacante se conectar e, em seguida, receberá os dados e os registrará. O honeypot também enviará uma resposta falsa “Comando não reconhecido” de volta ao atacante.
Implemente os Escutadores de Rede
Agora vamos implementar os ouvintes de rede que irão gerenciar as conexões recebidas. Para isso, usaremos uma programação de socket simples. Se você não sabe como a programação de socket funciona, confira este artigo que explica alguns conceitos relacionados a isso.
def start_listener(self, port):
"""Start a listener on specified port"""
try:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((self.bind_ip, port))
server.listen(5)
print(f"[*] Listening on {self.bind_ip}:{port}")
while True:
client, addr = server.accept()
print(f"[*] Accepted connection from {addr[0]}:{addr[1]}")
# Gerenciar conexão em uma thread separada
client_handler = threading.Thread(
target=self.handle_connection,
args=(client, addr[0], port)
)
client_handler.start()
except Exception as e:
print(f"Error starting listener on port {port}: {e}")
A função start_listener
iniciará o servidor e ouvirá na porta fornecida. O bind_ip
para nós será 0.0.0.0
que indica que o servidor estará ouvindo em todas as interfaces de rede.
Agora, gerenciaremos cada nova conexão em uma thread separada, já que pode haver casos em que múltiplos atacantes tentam interagir com o honeypot ou um script ou ferramenta de ataque está escaneando o honeypot. Se você não sabe como funciona a programação em threads, você pode confira este artigo que explica threading e concorrência em Python.
Além disso, certifique-se de colocar essa função na classe principal Honeypot
.
Executar o Honeypot
Agora vamos criar a função main
que iniciará nosso honeypot.
def main():
honeypot = Honeypot()
# Iniciar ouvintes para cada porta em threads separadas
for port in honeypot.ports:
listener_thread = threading.Thread(
target=honeypot.start_listener,
args=(port,)
)
listener_thread.daemon = True
listener_thread.start()
try:
# Manter a thread principal ativa
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n[*] Shutting down honeypot...")
sys.exit(0)
if __name__ == "__main__":
main()
Esta função instancia a classe Honeypot
e inicia os ouvintes para cada uma de nossas portas definidas (21, 22, 80, 443) como uma thread separada. Agora, manteremos nossa thread principal que está executando nosso programa real ativa colocando-a em um loop infinito. Junte tudo isso em um script e execute-o.
Escreva o Simulador de Ataque Honeypot
Agora vamos tentar simular alguns cenários de ataque e direcionar nosso honeypot para que possamos coletar alguns dados em nosso arquivo de log JSON.
Este simulador nos ajudará a demonstrar alguns aspectos importantes sobre honeypots:
-
Padrões de ataque realistas: O simulador irá simular padrões comuns de ataque, como varredura de portas, tentativas de força bruta e exploits específicos de serviços.
-
Intensidade variável: O simulador ajustará a intensidade da simulação para testar como seu honeypot lida com diferentes cargas.
-
Vários tipos de ataque: Ele demonstrará diferentes tipos de ataques que atacantes reais podem tentar, ajudando você a entender como seu honeypot responde a cada um.
-
Conexões simultâneas: O simulador usará threading para testar como seu honeypot lida com múltiplas conexões simultâneas.
# honeypot_simulator.py
import socket
import time
import random
import threading
from concurrent.futures import ThreadPoolExecutor
import argparse
class HoneypotSimulator:
"""
A class to simulate different types of connections and attacks against our honeypot.
This helps in testing the honeypot's logging and response capabilities.
"""
def __init__(self, target_ip="127.0.0.1", intensity="medium"):
# Configuração para o simulador
self.target_ip = target_ip
self.intensity = intensity
# Portas comuns que atacantes costumam explorar
self.target_ports = [21, 22, 23, 25, 80, 443, 3306, 5432]
# Dicionário de comandos comuns usados por atacantes para diferentes serviços
self.attack_patterns = {
21: [ # Comandos FTP
"USER admin\r\n",
"PASS admin123\r\n",
"LIST\r\n",
"STOR malware.exe\r\n"
],
22: [ # Tentativas de SSH
"SSH-2.0-OpenSSH_7.9\r\n",
"admin:password123\n",
"root:toor\n"
],
80: [ # Requisições HTTP
"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n",
"POST /admin HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
"GET /wp-admin HTTP/1.1\r\nHost: localhost\r\n\r\n"
]
}
# Configurações de intensidade afetam a frequência e o volume dos ataques simulados
self.intensity_settings = {
"low": {"max_threads": 2, "delay_range": (1, 3)},
"medium": {"max_threads": 5, "delay_range": (0.5, 1.5)},
"high": {"max_threads": 10, "delay_range": (0.1, 0.5)}
}
def simulate_connection(self, port):
"""
Simulates a connection attempt to a specific port with realistic attack patterns
"""
try:
# Criar uma nova conexão de socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
print(f"[*] Attempting connection to {self.target_ip}:{port}")
sock.connect((self.target_ip, port))
# Obter banner, se houver
banner = sock.recv(1024)
print(f"[+] Received banner from port {port}: {banner.decode('utf-8', 'ignore').strip()}")
# Enviar padrões de ataque com base na porta
if port in self.attack_patterns:
for command in self.attack_patterns[port]:
print(f"[*] Sending command to port {port}: {command.strip()}")
sock.send(command.encode())
# Aguardar resposta
try:
response = sock.recv(1024)
print(f"[+] Received response: {response.decode('utf-8', 'ignore').strip()}")
except socket.timeout:
print(f"[-] No response received from port {port}")
# Adicionar atraso realista entre os comandos
time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))
sock.close()
except ConnectionRefusedError:
print(f"[-] Connection refused on port {port}")
except socket.timeout:
print(f"[-] Connection timeout on port {port}")
except Exception as e:
print(f"[-] Error connecting to port {port}: {e}")
def simulate_port_scan(self):
"""
Simulates a basic port scan across common ports
"""
print(f"\n[*] Starting port scan simulation against {self.target_ip}")
for port in self.target_ports:
self.simulate_connection(port)
time.sleep(random.uniform(0.1, 0.3))
def simulate_brute_force(self, port):
"""
Simulates a brute force attack against a specific service
"""
common_usernames = ["admin", "root", "user", "test"]
common_passwords = ["password123", "admin123", "123456", "root"]
print(f"\n[*] Starting brute force simulation against port {port}")
for username in common_usernames:
for password in common_passwords:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect((self.target_ip, port))
if port == 21: # FTP
sock.send(f"USER {username}\r\n".encode())
sock.recv(1024)
sock.send(f"PASS {password}\r\n".encode())
elif port == 22: # SSH
sock.send(f"{username}:{password}\n".encode())
sock.close()
time.sleep(random.uniform(0.1, 0.3))
except Exception as e:
print(f"[-] Error in brute force attempt: {e}")
def run_continuous_simulation(self, duration=300):
"""
Runs a continuous simulation for a specified duration
"""
print(f"\n[*] Starting continuous simulation for {duration} seconds")
print(f"[*] Intensity level: {self.intensity}")
end_time = time.time() + duration
with ThreadPoolExecutor(
max_workers=self.intensity_settings[self.intensity]["max_threads"]
) as executor:
while time.time() < end_time:
# Mistura de diferentes padrões de ataque
simulation_choices = [
lambda: self.simulate_port_scan(),
lambda: self.simulate_brute_force(21),
lambda: self.simulate_brute_force(22),
lambda: self.simulate_connection(80)
]
# Escolher aleatoriamente e executar um padrão de ataque
executor.submit(random.choice(simulation_choices))
time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))
def main():
"""
Main function to run the honeypot simulator with command-line arguments
"""
parser = argparse.ArgumentParser(description="Honeypot Attack Simulator")
parser.add_argument("--target", default="127.0.0.1", help="Target IP address")
parser.add_argument(
"--intensity",
choices=["low", "medium", "high"],
default="medium",
help="Simulation intensity level"
)
parser.add_argument(
"--duration",
type=int,
default=300,
help="Simulation duration in seconds"
)
args = parser.parse_args()
simulator = HoneypotSimulator(args.target, args.intensity)
try:
simulator.run_continuous_simulation(args.duration)
except KeyboardInterrupt:
print("\n[*] Simulation interrupted by user")
except Exception as e:
print(f"[-] Simulation error: {e}")
finally:
print("\n[*] Simulation complete")
if __name__ == "__main__":
main()
Temos muito acontecendo neste script de simulação, então vamos dividir isso um por um. Também adicionei comentários para cada função e operação para tornar isso um pouco mais legível no código.
Primeiro, temos nossa classe utilitária chamada HoneypotSimulator
. Nesta classe, temos a função __init__
que configura a configuração básica para nosso simulador. Ela recebe dois parâmetros: um endereço IP de destino (padrão localhost) e um nível de intensidade (padrão “médio”).
Também definimos três componentes importantes: as portas de destino a serem sondadas (serviços comuns como FTP, SSH, HTTP), padrões de ataque específicos para cada serviço (como tentativas de login e comandos) e configurações de intensidade que controlam quão agressiva será nossa simulação através da contagem de threads e atrasos de tempo.
A função simulate_connection
lida com tentativas de conexão individuais a uma porta específica. Ela cria uma conexão de socket, tenta obter quaisquer banners de serviço (como informações da versão SSH) e, em seguida, envia comandos de ataque apropriados com base no tipo de serviço. Adicionamos tratamento de erros para problemas comuns de rede e também adicionamos atrasos realistas entre os comandos para imitar a interação humana.
Nossa função simulate_port_scan
atua como uma ferramenta de reconhecimento, que verificará sistematicamente cada porta em nossa lista de destino. É semelhante a como ferramentas como nmap
funcionam – passando pelas portas uma por uma para ver quais serviços estão disponíveis. Para cada porta, ela chama a função simulate_connection
e adiciona pequenos atrasos aleatórios para que o padrão de varredura pareça mais natural.
A função simulate_brute_force
mantém listas de nomes de usuários e senhas comuns, tentando diferentes combinações contra serviços como FTP e SSH. Para cada tentativa, ela cria uma nova conexão, envia as credenciais de login no formato correto para aquele serviço e, em seguida, fecha a conexão. Isso nos ajuda a testar quão bem o honeypot detecta e registra ataques de preenchimento de credenciais.
A função run_continuous_simulation
é executada por um período especificado, escolhendo aleatoriamente entre diferentes tipos de ataque, como varredura de portas, força bruta ou ataques a serviços específicos. Ela utiliza o ThreadPoolExecutor
do Python para realizar múltiplos ataques simultaneamente com base no nível de intensidade especificado.
Finalmente, temos a função main
que fornece a interface de linha de comando para o simulador. Ela utiliza argparse
para lidar com argumentos da linha de comando, permitindo que os usuários especifiquem o IP de destino, o nível de intensidade e a duração da simulação. Ela cria uma instância da classe HoneypotSimulator
e gerencia a execução geral, incluindo o tratamento adequado de interrupções e erros do usuário.
Após colocar o código do simulador em um script separado, execute-o com o seguinte comando:
# Execute com configurações padrão (intensidade média, localhost, 5 minutos)
python honeypot_simulator.py
# Execute com configurações personalizadas
python honeypot_simulator.py --target 192.168.1.100 --intensity high --duration 600
Como estamos executando o honeypot e o simulador na mesma máquina localmente, o alvo será localhost
. Mas pode ser algo diferente em um cenário real ou se você estiver executando o honeypot em uma VM ou em uma máquina diferente – então, certifique-se de confirmar o IP antes de executar o simulador.
Como Analisar os Dados do Honeypot
Vamos rapidamente escrever uma função auxiliar que nos permitirá analisar todos os dados coletados pelo Honeypot. Como armazenamos isso em um arquivo de log JSON, podemos convenientemente analisá-lo usando o pacote JSON embutido.
import datetime
import json
def analyze_logs(log_file):
"""Enhanced honeypot log analysis with temporal and behavioral patterns"""
ip_analysis = {}
port_analysis = {}
hourly_attacks = {}
data_patterns = {}
# Rastrear padrões de sessão
ip_sessions = {}
attack_timeline = []
with open(log_file, 'r') as f:
for line in f:
try:
activity = json.loads(line)
timestamp = datetime.datetime.fromisoformat(activity['timestamp'])
ip = activity['remote_ip']
port = activity['port']
data = activity['data']
# Inicializar rastreamento de IP se novo
if ip not in ip_analysis:
ip_analysis[ip] = {
'total_attempts': 0,
'first_seen': timestamp,
'last_seen': timestamp,
'targeted_ports': set(),
'unique_payloads': set(),
'session_count': 0
}
# Atualizar estatísticas de IP
ip_analysis[ip]['total_attempts'] += 1
ip_analysis[ip]['last_seen'] = timestamp
ip_analysis[ip]['targeted_ports'].add(port)
ip_analysis[ip]['unique_payloads'].add(data.strip())
# Rastrear padrões horários
hour = timestamp.hour
hourly_attacks[hour] = hourly_attacks.get(hour, 0) + 1
# Analisar padrões de direcionamento de porta
if port not in port_analysis:
port_analysis[port] = {
'total_attempts': 0,
'unique_ips': set(),
'unique_payloads': set()
}
port_analysis[port]['total_attempts'] += 1
port_analysis[port]['unique_ips'].add(ip)
port_analysis[port]['unique_payloads'].add(data.strip())
# Rastrear padrões de carga útil
if data.strip():
data_patterns[data.strip()] = data_patterns.get(data.strip(), 0) + 1
# Rastrear linha do tempo de ataques
attack_timeline.append({
'timestamp': timestamp,
'ip': ip,
'port': port
})
except (json.JSONDecodeError, KeyError) as e:
continue
# Geração de Relatório de Análise
print("\n=== Honeypot Analysis Report ===")
# 1. Análise baseada em IP
print("\nTop 10 Most Active IPs:")
sorted_ips = sorted(ip_analysis.items(),
key=lambda x: x[1]['total_attempts'],
reverse=True)[:10]
for ip, stats in sorted_ips:
duration = stats['last_seen'] - stats['first_seen']
print(f"\nIP: {ip}")
print(f"Total Attempts: {stats['total_attempts']}")
print(f"Active Duration: {duration}")
print(f"Unique Ports Targeted: {len(stats['targeted_ports'])}")
print(f"Unique Payloads: {len(stats['unique_payloads'])}")
# 2. Análise de Porta
print("\nPort Targeting Analysis:")
sorted_ports = sorted(port_analysis.items(),
key=lambda x: x[1]['total_attempts'],
reverse=True)
for port, stats in sorted_ports:
print(f"\nPort {port}:")
print(f"Total Attempts: {stats['total_attempts']}")
print(f"Unique Attackers: {len(stats['unique_ips'])}")
print(f"Unique Payloads: {len(stats['unique_payloads'])}")
# 3. Análise Temporal
print("\nHourly Attack Distribution:")
for hour in sorted(hourly_attacks.keys()):
print(f"Hour {hour:02d}: {hourly_attacks[hour]} attempts")
# 4. Análise de Sofisticação de Ataques
print("\nAttacker Sophistication Analysis:")
for ip, stats in sorted_ips:
sophistication_score = (
len(stats['targeted_ports']) * 0.4 + # Diversidade de Portas
len(stats['unique_payloads']) * 0.6 # Diversidade de Cargas Úteis
)
print(f"IP {ip}: Sophistication Score {sophistication_score:.2f}")
# 5. Padrões Comuns de Carga Útil
print("\nTop 10 Most Common Payloads:")
sorted_payloads = sorted(data_patterns.items(),
key=lambda x: x[1],
reverse=True)[:10]
for payload, count in sorted_payloads:
if len(payload) > 50: # Truncar cargas úteis longas
payload = payload[:50] + "..."
print(f"Count {count}: {payload}")
Você pode colocar isso em um arquivo de script separado e chamar a função nos logs JSON. Esta função nos fornecerá informações abrangentes do arquivo JSON com base nos dados coletados.
Nossa análise começa agrupando os dados em várias categorias, como estatísticas baseadas em IP, padrões de direcionamento de portas, distribuições de ataques por hora e características de carga útil. Para cada IP, estamos acompanhando o total de tentativas, os horários do primeiro e último acesso, as portas alvo e as cargas úteis únicas. Isso nos ajudará a construir perfis únicos para os atacantes.
Também examinamos aqui os padrões de ataque baseados em portas que monitoram as portas mais frequentemente atacadas e quantos atacantes únicos. Também realizamos uma análise de sofisticação de ataques que nos ajuda a identificar atacantes direcionados, considerando fatores como portas atacadas e cargas úteis únicas utilizadas. Essa análise é utilizada para separar atividades de varredura simples de ataques sofisticados.
A análise temporal nos ajuda a identificar padrões nas tentativas de ataque por hora, revelando padrões no tempo dos ataques e possíveis campanhas de direcionamento automatizado. Finalmente, publicamos cargas úteis comumente observadas para identificar strings ou comandos de ataque frequentemente vistos.
Considerações de Segurança
Ao implantar este honeypot, certifique-se de considerar as seguintes medidas de segurança:
-
Execute seu honeypot em um ambiente isolado. Normalmente dentro de uma VM ou em sua máquina local que está atrás de um NAT e um firewall.
-
Execute o honeypot com privilégios mínimos do sistema (normalmente não como root) para reduzir o risco em caso de comprometimento.
-
Tenha cuidado com os dados coletados se você planeja usá-los como um honeypot de produção ou para pesquisa, pois podem conter malware ou informações sensíveis.
-
Implemente mecanismos de monitoramento robustos para detectar tentativas de escapar do ambiente do honeypot.
Conclusão
Com isso, construímos nosso honeypot, escrevemos um simulador para simular ataques ao nosso honeypot e analisamos os dados dos logs do nosso honeypot para fazer algumas inferências simples. É uma excelente maneira de entender tanto os conceitos de segurança ofensiva quanto defensiva. Você pode considerar construir sobre isso para criar sistemas de detecção mais complexos e pensar em adicionar recursos como:
-
Emulação de serviço dinâmica baseada no comportamento do ataque
-
Integração com sistemas de inteligência de ameaças que realizarão uma análise de inferência melhor desses logs coletados do honeypot
-
Coletar logs ainda mais abrangentes além dos dados de IP, porta e rede por meio de mecanismos de registro avançados
-
Adicionar capacidades de aprendizado de máquina para detectar padrões de ataque
Lembre-se de que, embora os honeypots sejam ferramentas de segurança poderosas, eles devem fazer parte de uma estratégia de segurança defensiva abrangente, e não serem a única linha de defesa.
Espero que você tenha aprendido como os honeypots funcionam, qual é seu propósito e um pouco de programação em Python também!
Source:
https://www.freecodecamp.org/news/build-a-honeypot-with-python/