在网络安全中,蜜罐是一个诱饵系统,旨在吸引并检测试图攻击系统的潜在攻击者。就像一锅放在外面的蜂蜜会吸引苍蝇一样。

将这些蜜罐视为你系统的监控摄像头。正如监控摄像头帮助我们了解谁试图闯入建筑以及他们是如何做到的,这些蜜罐将帮助你了解谁试图攻击你的系统以及他们使用了什么技术。

通过本教程的最后,你将能够用Python编写一个演示蜜罐,并理解蜜罐是如何工作的。

目录

理解蜜罐的类型

在我们开始设计自己的蜜罐之前,让我们快速了解它们的不同类型:

  1. 生产蜜罐:这些蜜罐被放置在实际的生产环境中,用于检测实际的安全攻击。它们通常设计简单,易于维护和部署,并提供有限的交互以降低风险。

  2. 研究蜜罐:这些是由安全研究人员建立的更复杂的系统,用于研究攻击模式,对这些模式进行实证分析,收集恶意软件样本,并理解以前未发现的新攻击技术。它们通常模拟整个操作系统或网络,而不是像生产环境中的应用程序那样运行。

在本教程中,我们将建立一个中等交互性的蜜罐,用于记录连接尝试和基本的攻击者行为。

如何设置您的开发环境

让我们开始在 Python 中设置您的开发环境。运行以下命令:

import socket
import sys
import datetime
import json
import threading
from pathlib import Path

# 配置日志目录
LOG_DIR = Path("honeypot_logs")
LOG_DIR.mkdir(exist_ok=True)

我们将坚持使用内置库,因此不需要安装任何外部依赖项。我们将把日志存储在 honeypot_logs 目录中。

如何构建核心蜜罐

我们的基本蜜罐将由三个组件组成:

  1. 一个接受连接的网络监听器

  2. 一个记录活动的日志系统

  3. 一个与攻击者交互的基本仿真服务

现在让我们开始初始化核心蜜罐类:

class Honeypot:
    def __init__(self, bind_ip="0.0.0.0", ports=None):
        self.bind_ip = bind_ip
        self.ports = ports or [21, 22, 80, 443]  # 要监视的默认端口
        self.active_connections = {}
        self.log_file = LOG_DIR / f"honeypot_{datetime.datetime.now().strftime('%Y%m%d')}.json"

    def log_activity(self, port, remote_ip, data):
        """Log suspicious activity with timestamp and details"""
        activity = {
            "timestamp": datetime.datetime.now().isoformat(),
            "remote_ip": remote_ip,
            "port": port,
            "data": data.decode('utf-8', errors='ignore')
        }

        with open(self.log_file, 'a') as f:
            json.dump(activity, f)
            f.write('\n')

    def handle_connection(self, client_socket, remote_ip, port):
        """Handle individual connections and emulate services"""
        service_banners = {
            21: "220 FTP server ready\r\n",
            22: "SSH-2.0-OpenSSH_8.2p1 Ubuntu-4ubuntu0.1\r\n",
            80: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n",
            443: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n"
        }

        try:
            # 为服务发送适当的横幅
            if port in service_banners:
                client_socket.send(service_banners[port].encode())

            # 从攻击者接收数据
            while True:
                data = client_socket.recv(1024)
                if not data:
                    break

                self.log_activity(port, remote_ip, data)

                # 发送虚假响应
                client_socket.send(b"Command not recognized.\r\n")

        except Exception as e:
            print(f"Error handling connection: {e}")
        finally:
            client_socket.close()

这个类中包含了许多重要的信息,所以让我们逐个研究每个函数。

__init__函数记录我们将在其上托管蜜罐的IP和端口号,以及日志文件的路径/文件名。我们还将记录我们对蜜罐的总活动连接数。

log_activity函数将接收有关IP、数据和IP尝试连接的端口的信息。然后,我们将将这些信息附加到我们的JSON格式日志文件中。

handle_connection函数将模仿这些将在我们拥有的不同端口上运行的服务。我们将在端口21、22、80和443上运行蜜罐。这些服务分别用于FTP、SSH、HTTP和HTTPS协议。因此,任何试图与蜜罐交互的攻击者都应该在这些端口上期望到这些服务。

为了模仿这些服务的行为,我们将使用它们在现实中使用的服务横幅。当攻击者连接时,此函数将首先发送适当的横幅,然后接收数据并记录下来。蜜罐还将向攻击者发送一个虚假响应“命令不被识别”。

实现网络监听器

现在让我们实现网络监听器,以处理传入的连接。为此,我们将使用简单的套接字编程。如果您不清楚套接字编程是如何工作的,可以查看这篇文章,其中解释了一些相关概念。

def start_listener(self, port):
    """Start a listener on specified port"""
    try:
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.bind((self.bind_ip, port))
        server.listen(5)

        print(f"[*] Listening on {self.bind_ip}:{port}")

        while True:
            client, addr = server.accept()
            print(f"[*] Accepted connection from {addr[0]}:{addr[1]}")

            # 在单独的线程中处理连接
            client_handler = threading.Thread(
                target=self.handle_connection,
                args=(client, addr[0], port)
            )
            client_handler.start()

    except Exception as e:
        print(f"Error starting listener on port {port}: {e}")

函数start_listener将启动服务器并监听提供的端口。对我们而言,bind_ip将是0.0.0.0,这表示服务器将在所有网络接口上监听。

现在,我们将在单独的线程中处理每个新连接,因为可能会出现多个攻击者尝试与蜜罐交互,或者攻击脚本或工具正在扫描蜜罐的情况。如果您不清楚线程是如何工作的,您可以查看这篇文章,它解释了Python中的线程和并发。

另外,请确保将此函数放入核心Honeypot类中。

运行蜜罐

现在让我们创建main函数,以启动我们的蜜罐。

def main():
    honeypot = Honeypot()

    # 在单独的线程中为每个端口启动监听器
    for port in honeypot.ports:
        listener_thread = threading.Thread(
            target=honeypot.start_listener,
            args=(port,)
        )
        listener_thread.daemon = True
        listener_thread.start()

    try:
        # 保持主线程存活
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\n[*] Shutting down honeypot...")
        sys.exit(0)

if __name__ == "__main__":
    main()

此功能实例化了Honeypot类,并为我们定义的每个端口(21,22,80,443)启动侦听器,作为单独的线程。现在,我们将通过将主线程放入无限循环中,使其保持运行。将这一切组合成一个脚本并运行它。

编写蜜罐攻击模拟器

现在让我们尝试模拟一些攻击场景,并针对我们的蜜罐,以便我们可以在JSON日志文件中收集一些数据。

这个模拟器将帮助我们展示关于蜜罐的几个重要方面:

  1. 真实的攻击模式:模拟器将模拟常见的攻击模式,如端口扫描、暴力破解尝试和特定服务的利用。

  2. 可变强度:模拟器将调整模拟的强度,以测试您的蜜罐如何处理不同的负载。

  3. 多种攻击类型:它将展示真实攻击者可能尝试的不同类型攻击,帮助您了解您的蜜罐如何对每种攻击做出反应。

  4. 并发连接:模拟器将使用线程来测试您的蜜罐如何处理多个同时连接。

# honeypot_simulator.py

import socket
import time
import random
import threading
from concurrent.futures import ThreadPoolExecutor
import argparse

class HoneypotSimulator:
    """
    A class to simulate different types of connections and attacks against our honeypot.
    This helps in testing the honeypot's logging and response capabilities.
    """

    def __init__(self, target_ip="127.0.0.1", intensity="medium"):
        # 模拟器的配置
        self.target_ip = target_ip
        self.intensity = intensity

        # 攻击者常常探测的常用端口
        self.target_ports = [21, 22, 23, 25, 80, 443, 3306, 5432]

        # 攻击者针对不同服务使用的常用命令字典
        self.attack_patterns = {
            21: [  # FTP 命令
                "USER admin\r\n",
                "PASS admin123\r\n",
                "LIST\r\n",
                "STOR malware.exe\r\n"
            ],
            22: [  # SSH 尝试
                "SSH-2.0-OpenSSH_7.9\r\n",
                "admin:password123\n",
                "root:toor\n"
            ],
            80: [  # HTTP 请求
                "GET / HTTP/1.1\r\nHost: localhost\r\n\r\n",
                "POST /admin HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
                "GET /wp-admin HTTP/1.1\r\nHost: localhost\r\n\r\n"
            ]
        }

        # 强度设置影响模拟攻击的频率和数量
        self.intensity_settings = {
            "low": {"max_threads": 2, "delay_range": (1, 3)},
            "medium": {"max_threads": 5, "delay_range": (0.5, 1.5)},
            "high": {"max_threads": 10, "delay_range": (0.1, 0.5)}
        }

    def simulate_connection(self, port):
        """
        Simulates a connection attempt to a specific port with realistic attack patterns
        """
        try:
            # 创建一个新的套接字连接
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(3)

            print(f"[*] Attempting connection to {self.target_ip}:{port}")
            sock.connect((self.target_ip, port))

            # 如果有的话获取横幅
            banner = sock.recv(1024)
            print(f"[+] Received banner from port {port}: {banner.decode('utf-8', 'ignore').strip()}")

            # 根据端口发送攻击模式
            if port in self.attack_patterns:
                for command in self.attack_patterns[port]:
                    print(f"[*] Sending command to port {port}: {command.strip()}")
                    sock.send(command.encode())

                    # 等待响应
                    try:
                        response = sock.recv(1024)
                        print(f"[+] Received response: {response.decode('utf-8', 'ignore').strip()}")
                    except socket.timeout:
                        print(f"[-] No response received from port {port}")

                    # 在命令之间添加真实的延迟
                    time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))

            sock.close()

        except ConnectionRefusedError:
            print(f"[-] Connection refused on port {port}")
        except socket.timeout:
            print(f"[-] Connection timeout on port {port}")
        except Exception as e:
            print(f"[-] Error connecting to port {port}: {e}")

    def simulate_port_scan(self):
        """
        Simulates a basic port scan across common ports
        """
        print(f"\n[*] Starting port scan simulation against {self.target_ip}")
        for port in self.target_ports:
            self.simulate_connection(port)
            time.sleep(random.uniform(0.1, 0.3))

    def simulate_brute_force(self, port):
        """
        Simulates a brute force attack against a specific service
        """
        common_usernames = ["admin", "root", "user", "test"]
        common_passwords = ["password123", "admin123", "123456", "root"]

        print(f"\n[*] Starting brute force simulation against port {port}")

        for username in common_usernames:
            for password in common_passwords:
                try:
                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    sock.settimeout(2)
                    sock.connect((self.target_ip, port))

                    if port == 21:  # FTP
                        sock.send(f"USER {username}\r\n".encode())
                        sock.recv(1024)
                        sock.send(f"PASS {password}\r\n".encode())
                    elif port == 22:  # SSH
                        sock.send(f"{username}:{password}\n".encode())

                    sock.close()
                    time.sleep(random.uniform(0.1, 0.3))

                except Exception as e:
                    print(f"[-] Error in brute force attempt: {e}")

    def run_continuous_simulation(self, duration=300):
        """
        Runs a continuous simulation for a specified duration
        """
        print(f"\n[*] Starting continuous simulation for {duration} seconds")
        print(f"[*] Intensity level: {self.intensity}")

        end_time = time.time() + duration

        with ThreadPoolExecutor(
            max_workers=self.intensity_settings[self.intensity]["max_threads"]
        ) as executor:
            while time.time() < end_time:
                # 不同攻击模式的混合
                simulation_choices = [
                    lambda: self.simulate_port_scan(),
                    lambda: self.simulate_brute_force(21),
                    lambda: self.simulate_brute_force(22),
                    lambda: self.simulate_connection(80)
                ]

                # 随机选择并执行一个攻击模式
                executor.submit(random.choice(simulation_choices))
                time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))

def main():
    """
    Main function to run the honeypot simulator with command-line arguments
    """
    parser = argparse.ArgumentParser(description="Honeypot Attack Simulator")
    parser.add_argument("--target", default="127.0.0.1", help="Target IP address")
    parser.add_argument(
        "--intensity",
        choices=["low", "medium", "high"],
        default="medium",
        help="Simulation intensity level"
    )
    parser.add_argument(
        "--duration",
        type=int,
        default=300,
        help="Simulation duration in seconds"
    )

    args = parser.parse_args()

    simulator = HoneypotSimulator(args.target, args.intensity)

    try:
        simulator.run_continuous_simulation(args.duration)
    except KeyboardInterrupt:
        print("\n[*] Simulation interrupted by user")
    except Exception as e:
        print(f"[-] Simulation error: {e}")
    finally:
        print("\n[*] Simulation complete")

if __name__ == "__main__":
    main()

在这个模拟脚本中有很多内容,所以让我们逐一分解。我还为每个函数和操作添加了注释,以使代码更具可读性。

我们首先有一个名为 HoneypotSimulator 的工具类。在这个类中,我们有一个 __init__ 函数,用于设置我们模拟器的基本配置。它接受两个参数:一个目标 IP 地址(默认为 localhost)和一个强度级别(默认为“中等”)。

我们还定义了三个重要组件:要探测的目标端口(常见服务如 FTP、SSH、HTTP)、特定于每个服务的攻击模式(如登录尝试和命令),以及通过线程计数和时间延迟控制我们模拟的攻击强度的设置。

simulate_connection 函数处理对特定端口的单个连接尝试。它创建一个套接字连接,尝试获取任何服务横幅(如 SSH 版本信息),然后根据服务类型发送适当的攻击命令。我们为常见网络问题添加了错误处理,并在命令之间添加了现实的延迟,以模拟人类交互。

我们的 simulate_port_scan 函数像一个侦察工具,会系统地检查我们目标列表中的每个端口。它类似于 nmap 等工具的工作方式——逐个查看端口以了解可用服务。对于每个端口,它调用 simulate_connection 函数,并添加小的随机延迟,使扫描模式看起来更自然。

simulate_brute_force 函数维护了常见用户名和密码的列表,尝试不同的组合来攻击像 FTP 和 SSH 的服务。每次尝试时,它都会创建一个新的连接,以该服务的正确格式发送登录凭据,然后关闭连接。这帮助我们测试蜜罐检测和记录凭证填充攻击的效果。

run_continuous_simulation 函数在指定的时间内运行,随机选择不同的攻击类型,如端口扫描、暴力破解或特定服务攻击。它使用 Python 的 ThreadPoolExecutor 同时运行多个攻击,基于指定的强度级别。

最后,我们有 main 函数,为模拟器提供命令行接口。它使用 argparse 处理命令行参数,让用户指定目标 IP、强度级别和模拟持续时间。它创建了 HoneypotSimulator 类的一个实例,并管理整体执行,包括正确处理用户中断和错误。

将模拟器代码放入单独的脚本后,使用以下命令运行它:

# 使用默认设置运行(中等强度、本地主机、5分钟)
python honeypot_simulator.py

# 使用自定义设置运行
python honeypot_simulator.py --target 192.168.1.100 --intensity high --duration 600

由于我们在同一台机器上本地运行蜜罐和模拟器,目标将是localhost。但在实际场景中,或者如果你在虚拟机或不同的机器上运行蜜罐,这可能会有所不同——因此请确保在运行模拟器之前确认IP。

如何分析蜜罐数据

让我们快速编写一个辅助函数,以便分析蜜罐收集的所有数据。由于我们将其存储在一个JSON日志文件中,可以方便地使用内置的JSON包进行解析。

import datetime
import json

def analyze_logs(log_file):
    """Enhanced honeypot log analysis with temporal and behavioral patterns"""
    ip_analysis = {}
    port_analysis = {}
    hourly_attacks = {}
    data_patterns = {}

    # 跟踪会话模式
    ip_sessions = {}
    attack_timeline = []

    with open(log_file, 'r') as f:
        for line in f:
            try:
                activity = json.loads(line)
                timestamp = datetime.datetime.fromisoformat(activity['timestamp'])
                ip = activity['remote_ip']
                port = activity['port']
                data = activity['data']

                # 如果是新IP,初始化IP跟踪
                if ip not in ip_analysis:
                    ip_analysis[ip] = {
                        'total_attempts': 0,
                        'first_seen': timestamp,
                        'last_seen': timestamp,
                        'targeted_ports': set(),
                        'unique_payloads': set(),
                        'session_count': 0
                    }

                # 更新IP统计信息
                ip_analysis[ip]['total_attempts'] += 1
                ip_analysis[ip]['last_seen'] = timestamp
                ip_analysis[ip]['targeted_ports'].add(port)
                ip_analysis[ip]['unique_payloads'].add(data.strip())

                # 跟踪每小时模式
                hour = timestamp.hour
                hourly_attacks[hour] = hourly_attacks.get(hour, 0) + 1

                # 分析端口目标模式
                if port not in port_analysis:
                    port_analysis[port] = {
                        'total_attempts': 0,
                        'unique_ips': set(),
                        'unique_payloads': set()
                    }
                port_analysis[port]['total_attempts'] += 1
                port_analysis[port]['unique_ips'].add(ip)
                port_analysis[port]['unique_payloads'].add(data.strip())

                # 跟踪有效载荷模式
                if data.strip():
                    data_patterns[data.strip()] = data_patterns.get(data.strip(), 0) + 1

                # 跟踪攻击时间线
                attack_timeline.append({
                    'timestamp': timestamp,
                    'ip': ip,
                    'port': port
                })

            except (json.JSONDecodeError, KeyError) as e:
                continue

    # 分析报告生成
    print("\n=== Honeypot Analysis Report ===")

    # 1. 基于IP的分析
    print("\nTop 10 Most Active IPs:")
    sorted_ips = sorted(ip_analysis.items(), 
                       key=lambda x: x[1]['total_attempts'], 
                       reverse=True)[:10]
    for ip, stats in sorted_ips:
        duration = stats['last_seen'] - stats['first_seen']
        print(f"\nIP: {ip}")
        print(f"Total Attempts: {stats['total_attempts']}")
        print(f"Active Duration: {duration}")
        print(f"Unique Ports Targeted: {len(stats['targeted_ports'])}")
        print(f"Unique Payloads: {len(stats['unique_payloads'])}")

    # 2. 端口分析
    print("\nPort Targeting Analysis:")
    sorted_ports = sorted(port_analysis.items(),
                         key=lambda x: x[1]['total_attempts'],
                         reverse=True)
    for port, stats in sorted_ports:
        print(f"\nPort {port}:")
        print(f"Total Attempts: {stats['total_attempts']}")
        print(f"Unique Attackers: {len(stats['unique_ips'])}")
        print(f"Unique Payloads: {len(stats['unique_payloads'])}")

    # 3. 时间分析
    print("\nHourly Attack Distribution:")
    for hour in sorted(hourly_attacks.keys()):
        print(f"Hour {hour:02d}: {hourly_attacks[hour]} attempts")

    # 4. 攻击复杂性分析
    print("\nAttacker Sophistication Analysis:")
    for ip, stats in sorted_ips:
        sophistication_score = (
            len(stats['targeted_ports']) * 0.4 +  # 端口多样性
            len(stats['unique_payloads']) * 0.6   # 有效载荷多样性
        )
        print(f"IP {ip}: Sophistication Score {sophistication_score:.2f}")

    # 5. 常见有效载荷模式
    print("\nTop 10 Most Common Payloads:")
    sorted_payloads = sorted(data_patterns.items(),
                            key=lambda x: x[1],
                            reverse=True)[:10]
    for payload, count in sorted_payloads:
        if len(payload) > 50:  # 截断长有效载荷
            payload = payload[:50] + "..."
        print(f"Count {count}: {payload}")

您可以将其放在单独的脚本文件中,并在JSON日志上调用该函数。此函数将根据收集的数据为我们提供JSON文件的全面洞察。

我们的分析首先将数据分为几个类别,例如基于IP的统计、端口目标模式、每小时攻击分布和有效载荷特征。对于每个IP,我们跟踪总尝试次数、首次和最后一次见到的时间、目标端口和独特有效载荷。这将帮助我们为攻击者建立独特的档案。

我们还在这里检查基于端口的攻击模式,监控最常被攻击的端口,以及有多少个独特的攻击者。我们还进行攻击复杂性分析,帮助我们识别目标攻击者,考虑到目标端口和使用的独特有效载荷等因素。此分析用于区分简单的扫描活动和复杂的攻击。

时间分析帮助我们识别每小时攻击尝试中的模式,揭示攻击时机和潜在的自动化目标活动的模式。最后,我们发布常见的有效载荷,以识别常见的攻击字符串或命令。

安全考虑事项

在部署此蜜罐时,请确保考虑以下安全措施:

  1. 在隔离环境中运行蜜罐。通常是在虚拟机内,或在一个位于NAT和防火墙后面的本地计算机上。

  2. 以最小的系统权限运行蜜罐(通常不是以root身份),以降低被攻陷时的风险。

  3. 如果你计划将收集的数据作为生产级或研究蜜罐进行部署,请谨慎处理这些数据,因为它可能包含恶意软件或敏感信息。

  4. 实施强大的监控机制,以检测试图突破蜜罐环境的行为。

结论

通过这些,我们建立了蜜罐,编写了模拟器来模拟对蜜罐的攻击,并分析了蜜罐日志中的数据以得出一些简单的推论。这是理解攻击和防御安全概念的绝佳方式。你可以考虑在此基础上构建更复杂的检测系统,并考虑添加以下功能:

  1. 基于攻击行为的动态服务仿真

  2. 与威胁情报系统集成,以对这些收集的蜜罐日志进行更好的推理分析

  3. 通过先进的日志记录机制收集更全面的日志,超越IP、端口和网络数据

  4. 添加机器学习功能以检测攻击模式

请记住,尽管蜜罐是强大的安全工具,但它们应该是综合防御安全策略的一部分,而不是唯一的防线。

我希望你了解蜜罐是如何工作的,它们的目的是什么,以及一些Python编程的知识!