在網絡安全中,蜜罐是一種誘餌系統,旨在吸引並檢測試圖入侵系統的潛在攻擊者。就像放在外面的蜂蜜罐會吸引蒼蠅一樣。

將這些蜜罐視為系統的安全攝像頭。正如安全攝像頭幫助我們了解誰試圖闖入一棟建築以及他們是如何做到的,這些蜜罐將幫助您了解誰正在試圖攻擊您的系統以及他們使用的技術。

在本教程結束時,您將能夠用 Python 寫一個演示蜜罐並了解蜜罐的工作原理。

目錄

了解蜜罐的種類

在我們開始設計自己的蜜罐之前,讓我們快速了解它們的不同類型:

  1. 生產蜜罐:這些類型的蜜罐被放置在實際的生產環境中,用於檢測實際的安全攻擊。它們通常設計簡單,易於維護和部署,並提供有限的互動以降低風險。

  2. 研究蜜罐:這些是由安全研究人員設置的更複雜的系統,用於研究攻擊模式,對這些模式進行實證分析,收集惡意軟體樣本,並了解未被發現的新攻擊技術。它們通常模擬整個操作系統或網絡,而不是像生產環境中的應用程序那樣運作。

在本教程中,我們將建立一個中等互動的蜜罐,記錄連接嘗試和基本的攻擊者行為。

如何設置您的開發環境

讓我們開始在 Python 中設置您的開發環境。運行以下命令:

import socket
import sys
import datetime
import json
import threading
from pathlib import Path

# 配置日誌目錄
LOG_DIR = Path("honeypot_logs")
LOG_DIR.mkdir(exist_ok=True)

我們將堅持使用內建庫,因此不需要安裝任何外部依賴。我們將把日誌存儲在 honeypot_logs 目錄中。

如何構建核心蜜罐

我們的基本蜜罐將由三個組件組成:

  1. 一個接受連接的網絡監聽器

  2. 一個用於記錄活動的日誌系統

  3. 一個基本模擬服務,用於與攻擊者互動

現在讓我們開始初始化核心蜜罐類:

class Honeypot:
    def __init__(self, bind_ip="0.0.0.0", ports=None):
        self.bind_ip = bind_ip
        self.ports = ports or [21, 22, 80, 443]  # 預設監控的端口
        self.active_connections = {}
        self.log_file = LOG_DIR / f"honeypot_{datetime.datetime.now().strftime('%Y%m%d')}.json"

    def log_activity(self, port, remote_ip, data):
        """Log suspicious activity with timestamp and details"""
        activity = {
            "timestamp": datetime.datetime.now().isoformat(),
            "remote_ip": remote_ip,
            "port": port,
            "data": data.decode('utf-8', errors='ignore')
        }

        with open(self.log_file, 'a') as f:
            json.dump(activity, f)
            f.write('\n')

    def handle_connection(self, client_socket, remote_ip, port):
        """Handle individual connections and emulate services"""
        service_banners = {
            21: "220 FTP server ready\r\n",
            22: "SSH-2.0-OpenSSH_8.2p1 Ubuntu-4ubuntu0.1\r\n",
            80: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n",
            443: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n"
        }

        try:
            # 發送服務的適當橫幅
            if port in service_banners:
                client_socket.send(service_banners[port].encode())

            # 接收來自攻擊者的數據
            while True:
                data = client_socket.recv(1024)
                if not data:
                    break

                self.log_activity(port, remote_ip, data)

                # 發送假回應
                client_socket.send(b"Command not recognized.\r\n")

        except Exception as e:
            print(f"Error handling connection: {e}")
        finally:
            client_socket.close()

這個類別包含了很多重要的信息,因此我們將逐一介紹每個函數。

這個 __init__ 函數記錄了我們將要承載蜜罐的 IP 和端口號碼,以及日誌文件的路徑/文件名。我們還將維護一個我們與蜜罐的活動連接總數的記錄。

log_activity 函數將接收有關 IP、數據和該 IP 嘗試連接的端口的信息。然後我們將把這些信息附加到我們的 JSON 格式日誌文件中。

handle_connection 函數將模擬我們在不同端口上運行的這些服務。我們將在端口 21、22、80 和 443 上運行蜜罐。這些服務分別是 FTP、SSH、HTTP 和 HTTPS 協議。因此,任何試圖與蜜罐互動的攻擊者應該預期在這些端口上有這些服務。

為了模擬這些服務的行為,我們將使用它們在現實中使用的服務橫幅。當攻擊者連接時,這個函數將首先發送適當的橫幅,然後接收數據並記錄下來。蜜罐還會向攻擊者發送一個假回應 “命令未被識別”。

實現網絡監聽器

現在讓我們實現將處理進來連接的網路監聽器。為此,我們將使用簡單的套接字編程。如果您不知道套接字編程是如何運作的,請查看這篇文章,它解釋了一些相關概念。

def start_listener(self, port):
    """Start a listener on specified port"""
    try:
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.bind((self.bind_ip, port))
        server.listen(5)

        print(f"[*] Listening on {self.bind_ip}:{port}")

        while True:
            client, addr = server.accept()
            print(f"[*] Accepted connection from {addr[0]}:{addr[1]}")

            # 在單獨的線程中處理連接
            client_handler = threading.Thread(
                target=self.handle_connection,
                args=(client, addr[0], port)
            )
            client_handler.start()

    except Exception as e:
        print(f"Error starting listener on port {port}: {e}")

start_listener 函數將啟動伺服器並在提供的端口上進行監聽。我們的 bind_ip 將是 0.0.0.0,這表示伺服器將在所有網路介面上進行監聽。

現在,我們將在單獨的線程中處理每個新連接,因為可能會有多個攻擊者嘗試與蜜罐互動,或者攻擊腳本或工具正在掃描蜜罐。如果您不知道線程是如何運作的,您可以 查看這篇文章,它解釋了 Python 中的線程和併發。

同時,確保將此函數放在核心 Honeypot 類中。

運行蜜罐

現在讓我們創建 main 函數,這將啟動我們的蜜罐。

def main():
    honeypot = Honeypot()

    # 在單獨的線程中為每個端口啟動監聽器
    for port in honeypot.ports:
        listener_thread = threading.Thread(
            target=honeypot.start_listener,
            args=(port,)
        )
        listener_thread.daemon = True
        listener_thread.start()

    try:
        # 使主線程保持活動
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\n[*] Shutting down honeypot...")
        sys.exit(0)

if __name__ == "__main__":
    main()

此功能實例化了Honeypot類別,並為我們定義的每個端口(21、22、80、443)啟動監聽器,作為單獨的線程。現在,我們將通過將主線程放入無限循環來保持正在運行的實際程序的存活。將這一切整合到一個腳本中並執行。

編寫Honeypot攻擊模擬器

現在讓我們嘗試模擬一些攻擊場景並針對我們的蜜罐,以便收集一些JSON日誌文件中的數據。

此模擬器將幫助我們展示一些有關蜜罐的重要方面:

  1. 現實的攻擊模式:模擬器將模擬常見的攻擊模式,如端口掃描、暴力破解嘗試和特定服務的漏洞利用。

  2. 可變強度:模擬器將調整模擬的強度,以測試您的蜜罐如何處理不同的負載。

  3. 多種攻擊類型:它將演示真實攻擊者可能嘗試的不同類型的攻擊,幫助您了解您的蜜罐如何對每種攻擊作出反應。

  4. 同時連接:模擬器將使用線程來測試您的蜜罐如何處理多個同時連接。

# honeypot_simulator.py

import socket
import time
import random
import threading
from concurrent.futures import ThreadPoolExecutor
import argparse

class HoneypotSimulator:
    """
    A class to simulate different types of connections and attacks against our honeypot.
    This helps in testing the honeypot's logging and response capabilities.
    """

    def __init__(self, target_ip="127.0.0.1", intensity="medium"):
        # 模擬器的配置
        self.target_ip = target_ip
        self.intensity = intensity

        # 攻擊者經常探測的常見端口
        self.target_ports = [21, 22, 23, 25, 80, 443, 3306, 5432]

        # 攻擊者用於不同服務的常見命令字典
        self.attack_patterns = {
            21: [  # FTP 命令
                "USER admin\r\n",
                "PASS admin123\r\n",
                "LIST\r\n",
                "STOR malware.exe\r\n"
            ],
            22: [  # SSH 嘗試
                "SSH-2.0-OpenSSH_7.9\r\n",
                "admin:password123\n",
                "root:toor\n"
            ],
            80: [  # HTTP 請求
                "GET / HTTP/1.1\r\nHost: localhost\r\n\r\n",
                "POST /admin HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
                "GET /wp-admin HTTP/1.1\r\nHost: localhost\r\n\r\n"
            ]
        }

        # 強度設置會影響模擬攻擊的頻率和量
        self.intensity_settings = {
            "low": {"max_threads": 2, "delay_range": (1, 3)},
            "medium": {"max_threads": 5, "delay_range": (0.5, 1.5)},
            "high": {"max_threads": 10, "delay_range": (0.1, 0.5)}
        }

    def simulate_connection(self, port):
        """
        Simulates a connection attempt to a specific port with realistic attack patterns
        """
        try:
            # 創建一個新的套接字連接
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(3)

            print(f"[*] Attempting connection to {self.target_ip}:{port}")
            sock.connect((self.target_ip, port))

            # 獲取橫幅(如果有)
            banner = sock.recv(1024)
            print(f"[+] Received banner from port {port}: {banner.decode('utf-8', 'ignore').strip()}")

            # 根據端口發送攻擊模式
            if port in self.attack_patterns:
                for command in self.attack_patterns[port]:
                    print(f"[*] Sending command to port {port}: {command.strip()}")
                    sock.send(command.encode())

                    # 等待回應
                    try:
                        response = sock.recv(1024)
                        print(f"[+] Received response: {response.decode('utf-8', 'ignore').strip()}")
                    except socket.timeout:
                        print(f"[-] No response received from port {port}")

                    # 在命令之間添加現實延遲
                    time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))

            sock.close()

        except ConnectionRefusedError:
            print(f"[-] Connection refused on port {port}")
        except socket.timeout:
            print(f"[-] Connection timeout on port {port}")
        except Exception as e:
            print(f"[-] Error connecting to port {port}: {e}")

    def simulate_port_scan(self):
        """
        Simulates a basic port scan across common ports
        """
        print(f"\n[*] Starting port scan simulation against {self.target_ip}")
        for port in self.target_ports:
            self.simulate_connection(port)
            time.sleep(random.uniform(0.1, 0.3))

    def simulate_brute_force(self, port):
        """
        Simulates a brute force attack against a specific service
        """
        common_usernames = ["admin", "root", "user", "test"]
        common_passwords = ["password123", "admin123", "123456", "root"]

        print(f"\n[*] Starting brute force simulation against port {port}")

        for username in common_usernames:
            for password in common_passwords:
                try:
                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    sock.settimeout(2)
                    sock.connect((self.target_ip, port))

                    if port == 21:  # FTP
                        sock.send(f"USER {username}\r\n".encode())
                        sock.recv(1024)
                        sock.send(f"PASS {password}\r\n".encode())
                    elif port == 22:  # SSH
                        sock.send(f"{username}:{password}\n".encode())

                    sock.close()
                    time.sleep(random.uniform(0.1, 0.3))

                except Exception as e:
                    print(f"[-] Error in brute force attempt: {e}")

    def run_continuous_simulation(self, duration=300):
        """
        Runs a continuous simulation for a specified duration
        """
        print(f"\n[*] Starting continuous simulation for {duration} seconds")
        print(f"[*] Intensity level: {self.intensity}")

        end_time = time.time() + duration

        with ThreadPoolExecutor(
            max_workers=self.intensity_settings[self.intensity]["max_threads"]
        ) as executor:
            while time.time() < end_time:
                # 混合不同的攻擊模式
                simulation_choices = [
                    lambda: self.simulate_port_scan(),
                    lambda: self.simulate_brute_force(21),
                    lambda: self.simulate_brute_force(22),
                    lambda: self.simulate_connection(80)
                ]

                # 隨機選擇並執行一個攻擊模式
                executor.submit(random.choice(simulation_choices))
                time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))

def main():
    """
    Main function to run the honeypot simulator with command-line arguments
    """
    parser = argparse.ArgumentParser(description="Honeypot Attack Simulator")
    parser.add_argument("--target", default="127.0.0.1", help="Target IP address")
    parser.add_argument(
        "--intensity",
        choices=["low", "medium", "high"],
        default="medium",
        help="Simulation intensity level"
    )
    parser.add_argument(
        "--duration",
        type=int,
        default=300,
        help="Simulation duration in seconds"
    )

    args = parser.parse_args()

    simulator = HoneypotSimulator(args.target, args.intensity)

    try:
        simulator.run_continuous_simulation(args.duration)
    except KeyboardInterrupt:
        print("\n[*] Simulation interrupted by user")
    except Exception as e:
        print(f"[-] Simulation error: {e}")
    finally:
        print("\n[*] Simulation complete")

if __name__ == "__main__":
    main()

這個模擬腳本中有很多內容,所以讓我們一個一個地來解析。我還為每個函數和操作添加了註釋,以使代碼更易於閱讀。

我們首先有一個名為 HoneypotSimulator 的工具類。在這個類中,我們有 __init__ 函數,用於設置模擬器的基本配置。它接受兩個參數:一個目標 IP 地址(默認為 localhost)和一個強度級別(默認為「中等」)。

我們還定義了三個重要組件:要探測的目標端口(如 FTP、SSH、HTTP 等常見服務)、特定於每個服務的攻擊模式(如登錄嘗試和命令),以及控制模擬攻擊強度的設置,包括線程數和時間延遲。

simulate_connection 函數處理對特定端口的單個連接嘗試。它創建一個套接字連接,嘗試獲取任何服務橫幅(如 SSH 版本信息),然後根據服務類型發送適當的攻擊命令。我們為常見的網絡問題添加了錯誤處理,並在命令之間添加了現實的延遲,以模擬人類互動。

我們的 simulate_port_scan 函數像是一個偵察工具,將系統地檢查我們目標列表中的每個端口。這類似於 nmap 等工具的工作原理——一個一個地檢查端口,以查看可用的服務。對於每個端口,它調用 simulate_connection 函數,並添加小的隨機延遲,使掃描模式看起來更自然。

simulate_brute_force 函數維護了常見的用戶名和密碼列表,嘗試不同的組合來針對 FTP 和 SSH 等服務。在每次嘗試中,它會創建一個新的連接,以該服務的正確格式發送登錄憑據,然後關閉連接。這有助於我們測試蜜罐如何檢測和記錄憑據填充攻擊。

run_continuous_simulation 函數在指定的持續時間內運行,隨機選擇不同的攻擊類型,如端口掃描、暴力攻擊或特定服務攻擊。它使用 Python 的 ThreadPoolExecutor 來根據指定的強度級別同時運行多個攻擊。

最後,我們有 main 函數,為模擬器提供命令行界面。它使用 argparse 來處理命令行參數,讓用戶指定目標 IP、強度級別和模擬的持續時間。它創建了一個 HoneypotSimulator 類的實例,並管理整體執行,包括正確處理用戶中斷和錯誤。

在將模擬器代碼放入單獨的腳本後,使用以下命令運行它:

# 使用默認設置運行(中強度、本地主機、5 分鐘)
python honeypot_simulator.py

# 使用自定義設置運行
python honeypot_simulator.py --target 192.168.1.100 --intensity high --duration 600

由於我們在同一台本地機器上運行蜜罐和模擬器,目標將是 localhost。但在實際情況下,或者如果您在虛擬機或其他機器上運行蜜罐,目標可能會有所不同 – 因此在運行模擬器之前,請確保確認 IP。

如何分析蜜罐數據

讓我們快速編寫一個輔助函數,以便分析蜜罐收集的所有數據。由於我們將其存儲在 JSON 日誌文件中,因此可以方便地使用內建的 JSON 套件進行解析。

import datetime
import json

def analyze_logs(log_file):
    """Enhanced honeypot log analysis with temporal and behavioral patterns"""
    ip_analysis = {}
    port_analysis = {}
    hourly_attacks = {}
    data_patterns = {}

    # 跟踪會話模式
    ip_sessions = {}
    attack_timeline = []

    with open(log_file, 'r') as f:
        for line in f:
            try:
                activity = json.loads(line)
                timestamp = datetime.datetime.fromisoformat(activity['timestamp'])
                ip = activity['remote_ip']
                port = activity['port']
                data = activity['data']

                # 如果是新IP,則初始化IP跟踪
                if ip not in ip_analysis:
                    ip_analysis[ip] = {
                        'total_attempts': 0,
                        'first_seen': timestamp,
                        'last_seen': timestamp,
                        'targeted_ports': set(),
                        'unique_payloads': set(),
                        'session_count': 0
                    }

                # 更新IP統計數據
                ip_analysis[ip]['total_attempts'] += 1
                ip_analysis[ip]['last_seen'] = timestamp
                ip_analysis[ip]['targeted_ports'].add(port)
                ip_analysis[ip]['unique_payloads'].add(data.strip())

                # 跟踪每小時模式
                hour = timestamp.hour
                hourly_attacks[hour] = hourly_attacks.get(hour, 0) + 1

                # 分析端口目標模式
                if port not in port_analysis:
                    port_analysis[port] = {
                        'total_attempts': 0,
                        'unique_ips': set(),
                        'unique_payloads': set()
                    }
                port_analysis[port]['total_attempts'] += 1
                port_analysis[port]['unique_ips'].add(ip)
                port_analysis[port]['unique_payloads'].add(data.strip())

                # 跟踪有效載荷模式
                if data.strip():
                    data_patterns[data.strip()] = data_patterns.get(data.strip(), 0) + 1

                # 跟踪攻擊時間線
                attack_timeline.append({
                    'timestamp': timestamp,
                    'ip': ip,
                    'port': port
                })

            except (json.JSONDecodeError, KeyError) as e:
                continue

    # 分析報告生成
    print("\n=== Honeypot Analysis Report ===")

    # 1. 基於IP的分析
    print("\nTop 10 Most Active IPs:")
    sorted_ips = sorted(ip_analysis.items(), 
                       key=lambda x: x[1]['total_attempts'], 
                       reverse=True)[:10]
    for ip, stats in sorted_ips:
        duration = stats['last_seen'] - stats['first_seen']
        print(f"\nIP: {ip}")
        print(f"Total Attempts: {stats['total_attempts']}")
        print(f"Active Duration: {duration}")
        print(f"Unique Ports Targeted: {len(stats['targeted_ports'])}")
        print(f"Unique Payloads: {len(stats['unique_payloads'])}")

    # 2. 端口分析
    print("\nPort Targeting Analysis:")
    sorted_ports = sorted(port_analysis.items(),
                         key=lambda x: x[1]['total_attempts'],
                         reverse=True)
    for port, stats in sorted_ports:
        print(f"\nPort {port}:")
        print(f"Total Attempts: {stats['total_attempts']}")
        print(f"Unique Attackers: {len(stats['unique_ips'])}")
        print(f"Unique Payloads: {len(stats['unique_payloads'])}")

    # 3. 時間分析
    print("\nHourly Attack Distribution:")
    for hour in sorted(hourly_attacks.keys()):
        print(f"Hour {hour:02d}: {hourly_attacks[hour]} attempts")

    # 4. 攻擊複雜性分析
    print("\nAttacker Sophistication Analysis:")
    for ip, stats in sorted_ips:
        sophistication_score = (
            len(stats['targeted_ports']) * 0.4 +  # 端口多樣性
            len(stats['unique_payloads']) * 0.6   # 有效載荷多樣性
        )
        print(f"IP {ip}: Sophistication Score {sophistication_score:.2f}")

    # 5. 常見有效載荷模式
    print("\nTop 10 Most Common Payloads:")
    sorted_payloads = sorted(data_patterns.items(),
                            key=lambda x: x[1],
                            reverse=True)[:10]
    for payload, count in sorted_payloads:
        if len(payload) > 50:  # 截斷長有效載荷
            payload = payload[:50] + "..."
        print(f"Count {count}: {payload}")

您可以將這放在單獨的腳本文件中,並對JSON日誌調用該函數。此函數將根據收集到的數據,為我們提供來自JSON文件的全面見解。

我們的分析首先將數據分組為幾個類別,例如基於IP的統計、端口目標模式、每小時攻擊分佈和有效載荷特徵。對於每個IP,我們追蹤總嘗試次數、首次和最後一次出現的時間、目標端口和唯一有效載荷。這將幫助我們為攻擊者建立獨特的檔案。

我們還在這裡檢查基於端口的攻擊模式,以監控最常見的目標端口,以及有多少個獨特的攻擊者。我們還進行攻擊複雜度分析,幫助我們識別針對的攻擊者,考慮如目標端口和使用的唯一有效載荷等因素。這項分析用於區分簡單的掃描活動和複雜的攻擊。

時間分析幫助我們識別每小時攻擊嘗試中的模式,揭示攻擊時機的模式和潛在的自動化目標攻擊活動。最後,我們發布常見的有效載荷,以識別常見的攻擊字符串或命令。

安全考量

在部署此蜜罐時,請確保考慮以下安全措施:

  1. 在隔離環境中運行蜜罐。通常在虛擬機中,或在您的本地機器上,該機器位於NAT和防火牆後面。

  2. 以最小的系統權限運行蜜罐(通常不是以root身份),以減少被攻擊時的風險。

  3. 如果您計劃將收集到的數據用作生產級或研究蜜罐,請小心處理這些數據,因為它可能包含惡意軟件或敏感信息。

  4. 實施穩健的監控機制,以檢測試圖突破蜜罐環境的行為。

結論

通過這些步驟,我們已經建立了蜜罐,編寫了一個模擬器來模擬對我們的蜜罐的攻擊,並分析了我們的蜜罐日誌中的數據,以做出一些簡單的推斷。這是一個了解攻擊和防禦安全概念的絕佳方法。您可以考慮在此基礎上構建更複雜的檢測系統,並考慮添加以下功能:

  1. 根據攻擊行為進行動態服務模擬

  2. 與威脅情報系統集成,以對這些收集的蜜罐日誌進行更好的推斷分析

  3. 通過先進的日誌機制收集超出IP、端口和網絡數據的更全面的日誌

  4. 新增機器學習功能以檢測攻擊模式

請記住,儘管蜜罐是強大的安全工具,但它們應該是全面防禦安全策略的一部分,而不是唯一的防線。

我希望你了解蜜罐的運作方式、它們的目的,以及一些Python編程的知識!