En ciberseguridad, un honeypot es un sistema trampa diseñado para atraer y luego detectar posibles atacantes que intentan comprometer el sistema. Al igual que un tarro de miel que se deja al aire libre atraería a las moscas.
Piensa en estos honeypots como cámaras de seguridad para tu sistema. Así como una cámara de seguridad nos ayuda a entender quién intenta entrar en un edificio y cómo lo está haciendo, estos honeypots te ayudarán a comprender quién intenta atacar tu sistema y qué técnicas están utilizando.
Al final de este tutorial, podrás escribir un honeypot de demostración en Python y entender cómo funcionan los honeypots.
Índice
Entendiendo los Tipos de Honeypots
Antes de comenzar a diseñar nuestro propio honeypot, entendamos rápidamente sus diferentes tipos:
-
Honeypots de Producción: Estos tipos de honeypots se colocan en un entorno de producción real y se utilizan para detectar ataques de seguridad reales. Suelen ser simples en diseño, fáciles de mantener y desplegar, y ofrecen interacciones limitadas para reducir el riesgo.
-
Honeypots de Investigación: Estos son sistemas más complejos establecidos por investigadores de seguridad para estudiar patrones de ataque, realizar análisis empíricos sobre estos patrones, recolectar muestras de malware y entender nuevas técnicas de ataque que no se han descubierto previamente. A menudo emulan sistemas operativos o redes enteras en lugar de comportarse como una aplicación en el entorno de producción.
Para este tutorial, construiremos un honeypot de interacción media que registre intentos de conexión y el comportamiento básico de los atacantes.
Cómo Configurar Su Entorno de Desarrollo
Comencemos configurando su entorno de desarrollo en Python. Ejecute los siguientes comandos:
import socket
import sys
import datetime
import json
import threading
from pathlib import Path
# Configurar el directorio de registro
LOG_DIR = Path("honeypot_logs")
LOG_DIR.mkdir(exist_ok=True)
Nos ceñiremos a las bibliotecas integradas, por lo que no necesitaremos instalar dependencias externas. Almacenaremos nuestros registros en el directorio honeypot_logs
.
Cómo Construir el Honeypot Principal
Nuestro honeypot básico estará compuesto por tres componentes:
-
Un escucha de red que acepta conexiones
-
Un sistema de registro para registrar actividades
-
Un servicio de emulación básico para interactuar con los atacantes
Ahora comencemos inicializando la clase principal del Honeypot:
class Honeypot:
def __init__(self, bind_ip="0.0.0.0", ports=None):
self.bind_ip = bind_ip
self.ports = ports or [21, 22, 80, 443] # Puertos predeterminados para monitorear
self.active_connections = {}
self.log_file = LOG_DIR / f"honeypot_{datetime.datetime.now().strftime('%Y%m%d')}.json"
def log_activity(self, port, remote_ip, data):
"""Log suspicious activity with timestamp and details"""
activity = {
"timestamp": datetime.datetime.now().isoformat(),
"remote_ip": remote_ip,
"port": port,
"data": data.decode('utf-8', errors='ignore')
}
with open(self.log_file, 'a') as f:
json.dump(activity, f)
f.write('\n')
def handle_connection(self, client_socket, remote_ip, port):
"""Handle individual connections and emulate services"""
service_banners = {
21: "220 FTP server ready\r\n",
22: "SSH-2.0-OpenSSH_8.2p1 Ubuntu-4ubuntu0.1\r\n",
80: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n",
443: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n"
}
try:
# Enviar banner apropiado para el servicio
if port in service_banners:
client_socket.send(service_banners[port].encode())
# Recibir datos del atacante
while True:
data = client_socket.recv(1024)
if not data:
break
self.log_activity(port, remote_ip, data)
# Enviar respuesta falsa
client_socket.send(b"Command not recognized.\r\n")
except Exception as e:
print(f"Error handling connection: {e}")
finally:
client_socket.close()
Esta clase contiene mucha información importante, así que vamos a repasar cada función una por una.
La función __init__
registra las direcciones IP y los números de puerto en los que alojaremos el honeypot, así como la ruta / nombre de archivo del archivo de registro. También mantendremos un registro del número total de conexiones activas que tenemos al honeypot.
La función log_activity
va a recibir la información sobre la IP, los datos y el puerto al que la IP intentó conectarse. Luego, agregaremos esta información a nuestro archivo de registro en formato JSON.
La función handle_connection
va a imitar estos servicios que estarán funcionando en los diferentes puertos que tenemos. Tendremos el honeypot funcionando en los puertos 21, 22, 80 y 443. Estos servicios son para FTP, SSH, HTTP y el protocolo HTTPS, respectivamente. Así que cualquier atacante que intente interactuar con el honeypot debería esperar estos servicios en estos puertos.
Para imitar el comportamiento de estos servicios, utilizaremos los banners de servicio que ellos usan en la realidad. Esta función primero enviará el banner apropiado cuando el atacante se conecte, y luego recibirá los datos y los registrará. El honeypot también enviará una respuesta falsa “Comando no reconocido” de vuelta al atacante.
Implementar los Listeners de Red
Ahora implementemos los escuchadores de red que manejarán las conexiones entrantes. Para esto, utilizaremos programación de sockets simple. Si no sabes cómo funciona la programación de sockets, consulta este artículo que explica algunos conceptos relacionados con ello.
def start_listener(self, port):
"""Start a listener on specified port"""
try:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((self.bind_ip, port))
server.listen(5)
print(f"[*] Listening on {self.bind_ip}:{port}")
while True:
client, addr = server.accept()
print(f"[*] Accepted connection from {addr[0]}:{addr[1]}")
# Manejar la conexión en un hilo separado
client_handler = threading.Thread(
target=self.handle_connection,
args=(client, addr[0], port)
)
client_handler.start()
except Exception as e:
print(f"Error starting listener on port {port}: {e}")
La función start_listener
iniciará el servidor y escuchará en el puerto proporcionado. El bind_ip
para nosotros será 0.0.0.0
lo que indica que el servidor escuchará en todas las interfaces de red.
Ahora, manejaremos cada nueva conexión en un hilo separado, ya que podría haber instancias en las que múltiples atacantes intenten interactuar con el honeypot o un script o herramienta de ataque esté escaneando el honeypot. Si no sabes cómo funciona el threading, puedes consultar este artículo que explica el threading y la concurrencia en Python.
Además, asegúrate de poner esta función en la clase central Honeypot
.
Ejecutar el Honeypot
Ahora creemos la función main
que iniciará nuestro honeypot.
def main():
honeypot = Honeypot()
# Iniciar escuchadores para cada puerto en hilos separados
for port in honeypot.ports:
listener_thread = threading.Thread(
target=honeypot.start_listener,
args=(port,)
)
listener_thread.daemon = True
listener_thread.start()
try:
# Mantener el hilo principal activo
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n[*] Shutting down honeypot...")
sys.exit(0)
if __name__ == "__main__":
main()
Esta función instancia la clase Honeypot
y comienza los escuchadores para cada uno de nuestros puertos definidos (21, 22, 80, 443) como un hilo separado. Ahora, mantendremos nuestro hilo principal que está ejecutando nuestro programa real activo al ponerlo en un bucle infinito. Junte todo esto en un script y ejecútelo.
Escriba el Simulador de Ataques de Honeypot
Ahora intentemos simular algunos escenarios de ataque y apuntar a nuestro honeypot para que podamos recopilar algunos datos en nuestro archivo de registro JSON.
Este simulador nos ayudará a demostrar algunos aspectos importantes sobre los honeypots:
-
Patrones de ataque realistas: El simulador simulará patrones de ataque comunes como el escaneo de puertos, intentos de fuerza bruta y exploits específicos de servicios.
-
Intensidad variable: El simulador ajustará la intensidad de la simulación para probar cómo su honeypot maneja diferentes cargas.
-
Varios tipos de ataques: Demostrará diferentes tipos de ataques que los atacantes reales podrían intentar, ayudándole a entender cómo su honeypot responde a cada uno.
-
Conexiones simultáneas: El simulador utilizará subprocesos para probar cómo tu honeypot maneja múltiples conexiones simultáneas.
# honeypot_simulator.py
import socket
import time
import random
import threading
from concurrent.futures import ThreadPoolExecutor
import argparse
class HoneypotSimulator:
"""
A class to simulate different types of connections and attacks against our honeypot.
This helps in testing the honeypot's logging and response capabilities.
"""
def __init__(self, target_ip="127.0.0.1", intensity="medium"):
# Configuración para el simulador
self.target_ip = target_ip
self.intensity = intensity
# Puertos comunes que los atacantes suelen explorar
self.target_ports = [21, 22, 23, 25, 80, 443, 3306, 5432]
# Diccionario de comandos comunes utilizados por atacantes para diferentes servicios
self.attack_patterns = {
21: [ # Comandos FTP
"USER admin\r\n",
"PASS admin123\r\n",
"LIST\r\n",
"STOR malware.exe\r\n"
],
22: [ # Intentos SSH
"SSH-2.0-OpenSSH_7.9\r\n",
"admin:password123\n",
"root:toor\n"
],
80: [ # Solicitudes HTTP
"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n",
"POST /admin HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
"GET /wp-admin HTTP/1.1\r\nHost: localhost\r\n\r\n"
]
}
# Configuraciones de intensidad que afectan la frecuencia y el volumen de los ataques simulados
self.intensity_settings = {
"low": {"max_threads": 2, "delay_range": (1, 3)},
"medium": {"max_threads": 5, "delay_range": (0.5, 1.5)},
"high": {"max_threads": 10, "delay_range": (0.1, 0.5)}
}
def simulate_connection(self, port):
"""
Simulates a connection attempt to a specific port with realistic attack patterns
"""
try:
# Crear una nueva conexión de socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
print(f"[*] Attempting connection to {self.target_ip}:{port}")
sock.connect((self.target_ip, port))
# Obtener banner si hay alguno
banner = sock.recv(1024)
print(f"[+] Received banner from port {port}: {banner.decode('utf-8', 'ignore').strip()}")
# Enviar patrones de ataque basados en el puerto
if port in self.attack_patterns:
for command in self.attack_patterns[port]:
print(f"[*] Sending command to port {port}: {command.strip()}")
sock.send(command.encode())
# Esperar respuesta
try:
response = sock.recv(1024)
print(f"[+] Received response: {response.decode('utf-8', 'ignore').strip()}")
except socket.timeout:
print(f"[-] No response received from port {port}")
# Agregar un retraso realista entre comandos
time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))
sock.close()
except ConnectionRefusedError:
print(f"[-] Connection refused on port {port}")
except socket.timeout:
print(f"[-] Connection timeout on port {port}")
except Exception as e:
print(f"[-] Error connecting to port {port}: {e}")
def simulate_port_scan(self):
"""
Simulates a basic port scan across common ports
"""
print(f"\n[*] Starting port scan simulation against {self.target_ip}")
for port in self.target_ports:
self.simulate_connection(port)
time.sleep(random.uniform(0.1, 0.3))
def simulate_brute_force(self, port):
"""
Simulates a brute force attack against a specific service
"""
common_usernames = ["admin", "root", "user", "test"]
common_passwords = ["password123", "admin123", "123456", "root"]
print(f"\n[*] Starting brute force simulation against port {port}")
for username in common_usernames:
for password in common_passwords:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect((self.target_ip, port))
if port == 21: # FTP
sock.send(f"USER {username}\r\n".encode())
sock.recv(1024)
sock.send(f"PASS {password}\r\n".encode())
elif port == 22: # SSH
sock.send(f"{username}:{password}\n".encode())
sock.close()
time.sleep(random.uniform(0.1, 0.3))
except Exception as e:
print(f"[-] Error in brute force attempt: {e}")
def run_continuous_simulation(self, duration=300):
"""
Runs a continuous simulation for a specified duration
"""
print(f"\n[*] Starting continuous simulation for {duration} seconds")
print(f"[*] Intensity level: {self.intensity}")
end_time = time.time() + duration
with ThreadPoolExecutor(
max_workers=self.intensity_settings[self.intensity]["max_threads"]
) as executor:
while time.time() < end_time:
# Mezcla de diferentes patrones de ataque
simulation_choices = [
lambda: self.simulate_port_scan(),
lambda: self.simulate_brute_force(21),
lambda: self.simulate_brute_force(22),
lambda: self.simulate_connection(80)
]
# Elegir y ejecutar aleatoriamente un patrón de ataque
executor.submit(random.choice(simulation_choices))
time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))
def main():
"""
Main function to run the honeypot simulator with command-line arguments
"""
parser = argparse.ArgumentParser(description="Honeypot Attack Simulator")
parser.add_argument("--target", default="127.0.0.1", help="Target IP address")
parser.add_argument(
"--intensity",
choices=["low", "medium", "high"],
default="medium",
help="Simulation intensity level"
)
parser.add_argument(
"--duration",
type=int,
default=300,
help="Simulation duration in seconds"
)
args = parser.parse_args()
simulator = HoneypotSimulator(args.target, args.intensity)
try:
simulator.run_continuous_simulation(args.duration)
except KeyboardInterrupt:
print("\n[*] Simulation interrupted by user")
except Exception as e:
print(f"[-] Simulation error: {e}")
finally:
print("\n[*] Simulation complete")
if __name__ == "__main__":
main()
Tenemos mucho en este script de simulación, así que desglosémoslo uno por uno. También he añadido comentarios para cada función y operación para que sea un poco más legible en el código.
Primero tenemos nuestra clase de utilidad llamada HoneypotSimulator
. En esta clase, tenemos la función __init__
que configura la configuración básica para nuestro simulador. Toma dos parámetros: una dirección IP objetivo (que por defecto es localhost) y un nivel de intensidad (que por defecto es “medio”).
También definimos tres componentes importantes: los puertos objetivo a sondear (servicios comunes como FTP, SSH, HTTP), patrones de ataque específicos para cada servicio (como intentos de inicio de sesión y comandos), y configuraciones de intensidad que controlan cuán agresiva será nuestra simulación a través de recuentos de hilos y retrasos de tiempo.
La función simulate_connection
maneja los intentos de conexión individuales a un puerto específico. Crea una conexión de socket, intenta obtener cualquier banner de servicio (como información de versión de SSH), y luego envía comandos de ataque apropiados según el tipo de servicio. Hemos agregado manejo de errores para problemas comunes de red y también hemos añadido retrasos realistas entre comandos para imitar la interacción humana.
Nuestra función simulate_port_scan
actúa como una herramienta de reconocimiento, que verificará sistemáticamente cada puerto en nuestra lista de objetivos. Es similar a cómo funcionan herramientas como nmap
– revisando los puertos uno por uno para ver qué servicios están disponibles. Para cada puerto, llama a la función simulate_connection
y agrega pequeños retrasos aleatorios para que el patrón de escaneo parezca más natural.
La función simulate_brute_force
mantiene listas de nombres de usuario y contraseñas comunes, intentando diferentes combinaciones contra servicios como FTP y SSH. Para cada intento, crea una nueva conexión, envía las credenciales de inicio de sesión en el formato correcto para ese servicio y luego cierra la conexión. Esto nos ayuda a probar qué tan bien el honeypot detecta y registra ataques de relleno de credenciales.
La función run_continuous_simulation
se ejecuta durante una duración especificada, eligiendo aleatoriamente entre diferentes tipos de ataque como escaneo de puertos, fuerza bruta o ataques a servicios específicos. Utiliza ThreadPoolExecutor
de Python para ejecutar múltiples ataques simultáneamente según el nivel de intensidad especificado.
Finalmente, tenemos la función main
que proporciona la interfaz de línea de comandos para el simulador. Utiliza argparse
para manejar los argumentos de línea de comandos, permitiendo a los usuarios especificar la IP de destino, el nivel de intensidad y la duración de la simulación. Crea una instancia de la clase HoneypotSimulator
y gestiona la ejecución general, incluyendo el manejo adecuado de interrupciones de usuario y errores.
Después de poner el código del simulador en un script separado, ejecútalo con el siguiente comando:
# Ejecutar con configuraciones predeterminadas (intensidad media, localhost, 5 minutos)
python honeypot_simulator.py
# Ejecutar con configuraciones personalizadas
python honeypot_simulator.py --target 192.168.1.100 --intensity high --duration 600
Dado que estamos ejecutando el honeypot así como el simulador en la misma máquina localmente, el objetivo será localhost
. Pero puede ser algo diferente en un escenario real o si estás ejecutando el honeypot en una máquina virtual o en otra máquina, así que asegúrate de confirmar la dirección IP antes de ejecutar el simulador.
Cómo Analizar Datos del Honeypot
Escribamos rápidamente una función auxiliar que nos permita analizar todos los datos recopilados por el Honeypot. Dado que hemos almacenado esto en un archivo de registro JSON, podemos analizarlo convenientemente utilizando el paquete JSON incorporado.
import datetime
import json
def analyze_logs(log_file):
"""Enhanced honeypot log analysis with temporal and behavioral patterns"""
ip_analysis = {}
port_analysis = {}
hourly_attacks = {}
data_patterns = {}
# Rastrear patrones de sesión
ip_sessions = {}
attack_timeline = []
with open(log_file, 'r') as f:
for line in f:
try:
activity = json.loads(line)
timestamp = datetime.datetime.fromisoformat(activity['timestamp'])
ip = activity['remote_ip']
port = activity['port']
data = activity['data']
# Inicializar el seguimiento de IP si es nuevo
if ip not in ip_analysis:
ip_analysis[ip] = {
'total_attempts': 0,
'first_seen': timestamp,
'last_seen': timestamp,
'targeted_ports': set(),
'unique_payloads': set(),
'session_count': 0
}
# Actualizar estadísticas de IP
ip_analysis[ip]['total_attempts'] += 1
ip_analysis[ip]['last_seen'] = timestamp
ip_analysis[ip]['targeted_ports'].add(port)
ip_analysis[ip]['unique_payloads'].add(data.strip())
# Rastrear patrones horarios
hour = timestamp.hour
hourly_attacks[hour] = hourly_attacks.get(hour, 0) + 1
# Analizar patrones de targeting de puertos
if port not in port_analysis:
port_analysis[port] = {
'total_attempts': 0,
'unique_ips': set(),
'unique_payloads': set()
}
port_analysis[port]['total_attempts'] += 1
port_analysis[port]['unique_ips'].add(ip)
port_analysis[port]['unique_payloads'].add(data.strip())
# Rastrear patrones de carga útil
if data.strip():
data_patterns[data.strip()] = data_patterns.get(data.strip(), 0) + 1
# Rastrear la línea de tiempo de ataques
attack_timeline.append({
'timestamp': timestamp,
'ip': ip,
'port': port
})
except (json.JSONDecodeError, KeyError) as e:
continue
# Generación de Informe de Análisis
print("\n=== Honeypot Analysis Report ===")
# 1. Análisis basado en IP
print("\nTop 10 Most Active IPs:")
sorted_ips = sorted(ip_analysis.items(),
key=lambda x: x[1]['total_attempts'],
reverse=True)[:10]
for ip, stats in sorted_ips:
duration = stats['last_seen'] - stats['first_seen']
print(f"\nIP: {ip}")
print(f"Total Attempts: {stats['total_attempts']}")
print(f"Active Duration: {duration}")
print(f"Unique Ports Targeted: {len(stats['targeted_ports'])}")
print(f"Unique Payloads: {len(stats['unique_payloads'])}")
# 2. Análisis de Puertos
print("\nPort Targeting Analysis:")
sorted_ports = sorted(port_analysis.items(),
key=lambda x: x[1]['total_attempts'],
reverse=True)
for port, stats in sorted_ports:
print(f"\nPort {port}:")
print(f"Total Attempts: {stats['total_attempts']}")
print(f"Unique Attackers: {len(stats['unique_ips'])}")
print(f"Unique Payloads: {len(stats['unique_payloads'])}")
# 3. Análisis Temporal
print("\nHourly Attack Distribution:")
for hour in sorted(hourly_attacks.keys()):
print(f"Hour {hour:02d}: {hourly_attacks[hour]} attempts")
# 4. Análisis de Sofisticación de Ataques
print("\nAttacker Sophistication Analysis:")
for ip, stats in sorted_ips:
sophistication_score = (
len(stats['targeted_ports']) * 0.4 + # Diversidad de Puertos
len(stats['unique_payloads']) * 0.6 # Diversidad de Cargas Útiles
)
print(f"IP {ip}: Sophistication Score {sophistication_score:.2f}")
# 5. Patrones Comunes de Carga Útil
print("\nTop 10 Most Common Payloads:")
sorted_payloads = sorted(data_patterns.items(),
key=lambda x: x[1],
reverse=True)[:10]
for payload, count in sorted_payloads:
if len(payload) > 50: # Truncar cargas útiles largas
payload = payload[:50] + "..."
print(f"Count {count}: {payload}")
Puede colocar esto en un archivo de script separado y llamar a la función en los registros JSON. Esta función nos proporcionará información integral del archivo JSON basada en los datos recopilados.
Nuestro análisis comienza agrupando los datos en varias categorías como estadísticas basadas en IP, patrones de ataque por puerto, distribuciones de ataques por hora y características de carga útil. Para cada IP, estamos rastreando intentos totales, tiempos de primera y última vista, puertos objetivo y cargas útiles únicas. Esto nos ayudará a construir perfiles únicos para los atacantes.
También examinamos aquí los patrones de ataque basados en puertos que monitorean los puertos más frecuentemente atacados y por cuántos atacantes únicos. También realizamos un análisis de sofisticación de ataques que nos ayuda a identificar a los atacantes objetivo, considerando factores como los puertos atacados y las cargas útiles únicas utilizadas. Este análisis se utiliza para separar actividades de escaneo simples y ataques sofisticados.
El análisis temporal nos ayuda a identificar patrones en los intentos de ataque por hora, revelando patrones en el tiempo de los ataques y posibles campañas de targeting automatizadas. Finalmente, publicamos cargas útiles comúnmente vistas para identificar cadenas o comandos de ataque comúnmente observados.
Consideraciones de Seguridad
Al desplegar este honeypot, asegúrate de considerar las siguientes medidas de seguridad:
-
Ejecuta tu honeypot en un entorno aislado. Típicamente dentro de una VM, o en tu máquina local que está detrás de un NAT y un firewall.
-
Ejecuta el honeypot con privilegios mínimos del sistema (típicamente no como root) para reducir el riesgo en caso de compromiso.
-
Tenga cuidado con los datos recopilados si planea implementarlos alguna vez como un honeypot de grado de producción o de investigación, ya que pueden contener malware o información sensible.
-
Implemente mecanismos de monitoreo robustos para detectar intentos de salir del entorno del honeypot.
Conclusión
Con esto hemos construido nuestro honeypot, escrito un simulador para simular ataques a nuestro honeypot y analizado los datos de los registros de nuestro honeypot para hacer algunas inferencias simples. Es una excelente manera de entender tanto conceptos de seguridad ofensiva como defensiva. Puede considerar construir sobre esto para crear sistemas de detección más complejos y pensar en agregar características como:
-
Emulación dinámica de servicios basada en el comportamiento del ataque
-
Integración con sistemas de inteligencia de amenazas que realizarán un mejor análisis de inferencias de estos registros de honeypot recopilados
-
Recopilar incluso registros más completos más allá de la IP, el puerto y los datos de la red a través de mecanismos avanzados de registro
-
Añadir capacidades de aprendizaje automático para detectar patrones de ataque
Recuerda que, aunque los honeypots son herramientas de seguridad poderosas, deben ser parte de una estrategia de seguridad defensiva integral, no la única línea de defensa.
¡Espero que hayas aprendido sobre cómo funcionan los honeypots, cuál es su propósito y un poco de programación en Python también!
Source:
https://www.freecodecamp.org/news/build-a-honeypot-with-python/