בביטחון סייבר, "חבית דבש" היא מערכת מְזַיֶּה שנועדה למשוך ולאחר מכן לזהות תוקפים פוטנציאליים המנסים לפגוע במערכת. כמו חבית דבש פתוחה שהייתה מושכת זבובים.

חשוב על חביות הדבש הללו כעל מצלמות אבטחה עבור המערכת שלך. בדיוק כפי שמצלמת אבטחה עוזרת לנו להבין מי מנסה לפרוץ לבניין ואיך הם עושים זאת, חביות הדבש הללו יעזרו לך להבין מי מנסה לתקוף את המערכת שלך ואילו טכניקות הם משתמשים.

בתום המדריך הזה, תוכל לכתוב חבית דבש לדוגמה בפייתון ולהבין איך חביות דבש פועלות.

תוכן העניינים

הבנת סוגי דבורי הדבש

לפני שנתחיל לתכנן את דבור הדבש שלנו, נבין במהירות את סוגיהם השונים:

  1. דבורי דבש ייצור: סוגים אלו של דבורי דבש ממוקמים בסביבה ייצור אמיתית ומשמשים לזיהוי התקפות אבטחה אמיתיות. הם בדרך כלל פשוטים בעיצוב, קלים לתחזוקה ופריסה, ומציעים אינטראקציה מוגבלת כדי להפחית סיכון.

  2. דבורי דבש מחקר: אלו הם מערכות מורכבות יותר המוקמות על ידי חוקרי אבטחה כדי לחקור דפוסי תקיפה, לבצע ניתוח אמפירי על דפוסים אלו, לאסוף דוגמאות של תוכנות זדוניות, ולהבין טכניקות תקיפה חדשות שלא התגלו קודם. לעיתים קרובות הם מחקים מערכות הפעלה או רשתות שלמות במקום להתנהג כמו אפליקציה בסביבת הייצור.

במדריך זה, נבנה מכשול עם אינטראקציה בינונית שמתעד ניסי חיבור והתנהגות בסיסית של תוקפים.

כיצד להקים את סביבת הפיתוח שלך

נחלה להתחיל בהקמת סביבת הפיתוח שלך ב-Python. הרץ את הפקודות הבאות:

import socket
import sys
import datetime
import json
import threading
from pathlib import Path

# הגדר את תיקיית הלוגים
LOG_DIR = Path("honeypot_logs")
LOG_DIR.mkdir(exist_ok=True)

נשמור על הספריות המובנות כך שלא נצטרך להתקין תלויות חיצוניות. אנו נשמור את הלוגים שלנו בתיקיית honeypot_logs.

כיצד לבנות את מכשול הליבה

המכשול הבסיסי שלנו יתבסס על שלושה מרכיבים:

  1. מקשיב רשת שמקבל חיבורים

  2. מערכת לוגים כדי לתעד פעילויות

  3. שירות דמוי בסיסי כדי לתקשר עם תוקפים

עכשיו נתחיל באתחול מחלקת המכשול הליבה:

class Honeypot:
    def __init__(self, bind_ip="0.0.0.0", ports=None):
        self.bind_ip = bind_ip
        self.ports = ports or [21, 22, 80, 443]  # יציאות ברירת מחדל למעקב
        self.active_connections = {}
        self.log_file = LOG_DIR / f"honeypot_{datetime.datetime.now().strftime('%Y%m%d')}.json"

    def log_activity(self, port, remote_ip, data):
        """Log suspicious activity with timestamp and details"""
        activity = {
            "timestamp": datetime.datetime.now().isoformat(),
            "remote_ip": remote_ip,
            "port": port,
            "data": data.decode('utf-8', errors='ignore')
        }

        with open(self.log_file, 'a') as f:
            json.dump(activity, f)
            f.write('\n')

    def handle_connection(self, client_socket, remote_ip, port):
        """Handle individual connections and emulate services"""
        service_banners = {
            21: "220 FTP server ready\r\n",
            22: "SSH-2.0-OpenSSH_8.2p1 Ubuntu-4ubuntu0.1\r\n",
            80: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n",
            443: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n"
        }

        try:
            # שלח באנר מתאים לשירות
            if port in service_banners:
                client_socket.send(service_banners[port].encode())

            # קבל נתונים מהתוקף
            while True:
                data = client_socket.recv(1024)
                if not data:
                    break

                self.log_activity(port, remote_ip, data)

                # שלח תגובה מזויפת
                client_socket.send(b"Command not recognized.\r\n")

        except Exception as e:
            print(f"Error handling connection: {e}")
        finally:
            client_socket.close()

כיתה זו מכילה הרבה מידע חשוב, אז נסקור כל פונקציה אחת אחת.

הפונקציה __init__ רושמת את מספרי ה-IP והפורט שבהם נארח את ה-honeypot, כמו גם את הנתיב / שם הקובץ של קובץ היומן. כמו כן, נשמור על רשומה של מספר החיבורים הפעילים שיש לנו ל-honeypot.

הפונקציה log_activity תקבל את המידע על ה-IP, הנתונים, והפורט שאליו ה-IP ניסה להתחבר. לאחר מכן, נוסיף מידע זה לקובץ היומן שלנו בפורמט JSON.

הפונקציה handle_connection תחקה את השירותים שיפעלו על היציאות השונות שיש לנו. ה-honeypot שלנו יפעל על יציאות 21, 22, 80 ו-443. שירותים אלו מיועדים ל-FTP, SSH, HTTP ופרוטוקול HTTPS, בהתאמה. כך שכל תוקף המנסה לתקשר עם ה-honeypot צריך לצפות לשירותים הללו על היציאות הללו.

כדי לחקות את ההתנהגות של שירותים אלה, נשתמש בבאנרים שהשירותים הללו משתמשים בהם במציאות. הפונקציה הזו תשלח קודם את הבאנר המתאים כאשר התוקף מתחבר, ולאחר מכן תקבל את הנתונים ותשמור אותם. ה-honeypot גם ישלח תגובה מזויפת “פקודה לא מוכרת” חזרה לתוקף.

ממש את מאזיני הרשת

עכשיו ניישם את מאזיני הרשת שיטפלו בחיבורים הנכנסים. לשם כך, נשתמש בתכנות סוקטים פשוט. אם אינכם מודעים כיצד עובד תכנות סוקטים, עיינו במאמר הזה שמסביר כמה מושגים הקשורים לכך.

def start_listener(self, port):
    """Start a listener on specified port"""
    try:
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.bind((self.bind_ip, port))
        server.listen(5)

        print(f"[*] Listening on {self.bind_ip}:{port}")

        while True:
            client, addr = server.accept()
            print(f"[*] Accepted connection from {addr[0]}:{addr[1]}")

            # טיפול בחיבור ב-thread נפרד
            client_handler = threading.Thread(
                target=self.handle_connection,
                args=(client, addr[0], port)
            )
            client_handler.start()

    except Exception as e:
        print(f"Error starting listener on port {port}: {e}")

הפונקציה start_listener תתחיל את השרת ותאזין על הפורט שניתן. הbind_ip שלנו יהיה 0.0.0.0 המצביע על כך שהשרת יאזין על כל ממשקי הרשת.

עכשיו, נטפל בכל חיבור חדש ב-thread נפרד, מכיוון שעשויות להיות דוגמאות שבהן מספר תוקפים מנסים לתקשר עם ה-honeypot או שסקריפט או כלי תוקף סורק את ה-honeypot. אם אינכם מודעים כיצד עובד תהליך ה-threading, אתם יכולים לעיין במאמר הזה שמסביר על threading ועל ביצועים מקבילים ב-Python.

בנוסף, ודאו לשים פונקציה זו ב-class הHoneypot המרכזי.

הריצו את ה-Honeypot

עכשיו ניצור את הפונקציה main שתתחיל את ה-honeypot שלנו.

def main():
    honeypot = Honeypot()

    # התחלת מאזינים לכל פורט ב-threads נפרדים
    for port in honeypot.ports:
        listener_thread = threading.Thread(
            target=honeypot.start_listener,
            args=(port,)
        )
        listener_thread.daemon = True
        listener_thread.start()

    try:
        # שמירה על ה-thread הראשי חי
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\n[*] Shutting down honeypot...")
        sys.exit(0)

if __name__ == "__main__":
    main()

פונקציה זו יוצרת מופע של מחלקת Honeypot ומתחילה את המאזינים עבור כל אחד מהנמלים שהגדרנו (21,22,80,443) כ线程 נפרד. עכשיו, נשמור את ה线程 הראשי שלנו שמפעיל את התוכנית שלנו פעילה על ידי הכנסת אותה בלולאה אינסופית. שימו את כל זה יחד בסקריפט והריצו אותו.

כתבו את סימולטור התקפת ה-Honeypot

עכשיו ננסה לדמות כמה תסריטי התקפה ולמקד את ה-honeypot שלנו כדי שנוכל לאסוף נתונים בקובץ ה-json שלנו.

הסימולטור הזה יעזור לנו להדגים כמה אספקטים חשובים על honeypots:

  1. תבניות התקפה ריאליסטיות: הסימולטור ידמה תבניות התקפה נפוצות כמו סריקות נמלים, ניסי כוח גס, וניצול ספציפי לשירות.

  2. עוצמה משתנה: הסימולטור יתאים את עוצמת הסימולציה כדי לבדוק כיצד ה-honeypot שלך מתמודד עם עומסים שונים.

  3. מספר סוגי התקפות: הוא ידגים סוגים שונים של התקפות שהתקפות אמיתיות עשויות לנסות, ועוזר לך להבין כיצד ה-honeypot שלך מגיב לכל אחת.

  4. חיבורים מקבילים: הסימולטור ישתמש בניהול חוטים כדי לבדוק כיצד ה"הוניפוט" שלך מתמודד עם חיבורים סימולטניים מרובים.

# honeypot_simulator.py

import socket
import time
import random
import threading
from concurrent.futures import ThreadPoolExecutor
import argparse

class HoneypotSimulator:
    """
    A class to simulate different types of connections and attacks against our honeypot.
    This helps in testing the honeypot's logging and response capabilities.
    """

    def __init__(self, target_ip="127.0.0.1", intensity="medium"):
        # קונפיגורציה עבור הסימולטור
        self.target_ip = target_ip
        self.intensity = intensity

        # פורטים נפוצים שפורצים לרוב בודקים
        self.target_ports = [21, 22, 23, 25, 80, 443, 3306, 5432]

        # מילון של פקודות נפוצות שמשתמשים פורצים עבור שירותים שונים
        self.attack_patterns = {
            21: [  # פקודות FTP
                "USER admin\r\n",
                "PASS admin123\r\n",
                "LIST\r\n",
                "STOR malware.exe\r\n"
            ],
            22: [  # ניסיונות SSH
                "SSH-2.0-OpenSSH_7.9\r\n",
                "admin:password123\n",
                "root:toor\n"
            ],
            80: [  # בקשות HTTP
                "GET / HTTP/1.1\r\nHost: localhost\r\n\r\n",
                "POST /admin HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
                "GET /wp-admin HTTP/1.1\r\nHost: localhost\r\n\r\n"
            ]
        }

        # הגדרות אינטנסיביות משפיעות על התדירות והכמות של התקפות מדומות
        self.intensity_settings = {
            "low": {"max_threads": 2, "delay_range": (1, 3)},
            "medium": {"max_threads": 5, "delay_range": (0.5, 1.5)},
            "high": {"max_threads": 10, "delay_range": (0.1, 0.5)}
        }

    def simulate_connection(self, port):
        """
        Simulates a connection attempt to a specific port with realistic attack patterns
        """
        try:
            # צור חיבור סוקט חדש
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(3)

            print(f"[*] Attempting connection to {self.target_ip}:{port}")
            sock.connect((self.target_ip, port))

            # קבל בנר אם יש
            banner = sock.recv(1024)
            print(f"[+] Received banner from port {port}: {banner.decode('utf-8', 'ignore').strip()}")

            # שלח דפוסי התקפה בהתבסס על הפורט
            if port in self.attack_patterns:
                for command in self.attack_patterns[port]:
                    print(f"[*] Sending command to port {port}: {command.strip()}")
                    sock.send(command.encode())

                    # המתן לתגובה
                    try:
                        response = sock.recv(1024)
                        print(f"[+] Received response: {response.decode('utf-8', 'ignore').strip()}")
                    except socket.timeout:
                        print(f"[-] No response received from port {port}")

                    # הוסף עיכוב ריאלי בין פקודות
                    time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))

            sock.close()

        except ConnectionRefusedError:
            print(f"[-] Connection refused on port {port}")
        except socket.timeout:
            print(f"[-] Connection timeout on port {port}")
        except Exception as e:
            print(f"[-] Error connecting to port {port}: {e}")

    def simulate_port_scan(self):
        """
        Simulates a basic port scan across common ports
        """
        print(f"\n[*] Starting port scan simulation against {self.target_ip}")
        for port in self.target_ports:
            self.simulate_connection(port)
            time.sleep(random.uniform(0.1, 0.3))

    def simulate_brute_force(self, port):
        """
        Simulates a brute force attack against a specific service
        """
        common_usernames = ["admin", "root", "user", "test"]
        common_passwords = ["password123", "admin123", "123456", "root"]

        print(f"\n[*] Starting brute force simulation against port {port}")

        for username in common_usernames:
            for password in common_passwords:
                try:
                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    sock.settimeout(2)
                    sock.connect((self.target_ip, port))

                    if port == 21:  # FTP
                        sock.send(f"USER {username}\r\n".encode())
                        sock.recv(1024)
                        sock.send(f"PASS {password}\r\n".encode())
                    elif port == 22:  # SSH
                        sock.send(f"{username}:{password}\n".encode())

                    sock.close()
                    time.sleep(random.uniform(0.1, 0.3))

                except Exception as e:
                    print(f"[-] Error in brute force attempt: {e}")

    def run_continuous_simulation(self, duration=300):
        """
        Runs a continuous simulation for a specified duration
        """
        print(f"\n[*] Starting continuous simulation for {duration} seconds")
        print(f"[*] Intensity level: {self.intensity}")

        end_time = time.time() + duration

        with ThreadPoolExecutor(
            max_workers=self.intensity_settings[self.intensity]["max_threads"]
        ) as executor:
            while time.time() < end_time:
                # תערובת של דפוסי התקפה שונים
                simulation_choices = [
                    lambda: self.simulate_port_scan(),
                    lambda: self.simulate_brute_force(21),
                    lambda: self.simulate_brute_force(22),
                    lambda: self.simulate_connection(80)
                ]

                # בחר באקראי ובצע דפוס התקפה
                executor.submit(random.choice(simulation_choices))
                time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))

def main():
    """
    Main function to run the honeypot simulator with command-line arguments
    """
    parser = argparse.ArgumentParser(description="Honeypot Attack Simulator")
    parser.add_argument("--target", default="127.0.0.1", help="Target IP address")
    parser.add_argument(
        "--intensity",
        choices=["low", "medium", "high"],
        default="medium",
        help="Simulation intensity level"
    )
    parser.add_argument(
        "--duration",
        type=int,
        default=300,
        help="Simulation duration in seconds"
    )

    args = parser.parse_args()

    simulator = HoneypotSimulator(args.target, args.intensity)

    try:
        simulator.run_continuous_simulation(args.duration)
    except KeyboardInterrupt:
        print("\n[*] Simulation interrupted by user")
    except Exception as e:
        print(f"[-] Simulation error: {e}")
    finally:
        print("\n[*] Simulation complete")

if __name__ == "__main__":
    main()

יש לנו הרבה קורה בסקריפט הסימולציה הזה, אז בואו נפרק את זה אחד אחד. גם הוספתי הערות לכל פונקציה ופעולה כדי להפוך את הקוד הזה לקצת יותר קריא.

ראשית יש לנו את מחלקת השירות שלנו שנקראת HoneypotSimulator. במחלקה זו, יש לנו את הפונקציה __init__ שמקימה את התצורה הבסיסית עבור הסימולטור שלנו. היא מקבלת שני פרמטרים: כתובת IP יעד (ברירת מחדל היא localhost) ורמת אינטנסיביות (ברירת מחדל היא "בינונית").

בנוסף, אנו מגדירים שלושה מרכיבים חשובים: הנמלים היעד שצריך לבדוק (שירותים נפוצים כמו FTP, SSH, HTTP), דפוסי התקפה ספציפיים לכל שירות (כמו ניסי התחברות ופקודות), והגדרות אינטנסיביות שיכולות לשלוט עד כמה אגרסיבית תהיה הסימולציה שלנו דרך מספרי חוטים ועיכובי זמן.

הפונקציה simulate_connection מטפלת בניסי חיבור פרטיים לנמל ספציפי. היא יוצרת חיבור סוקט, מנסה לקבל כל דגל שירות (כמו מידע על גרסת SSH), ואז שולחת פקודות התקפה מתאימות בהתאם לסוג השירות. הוספנו טיפול שגיאות עבור בעיות רשת נפוצות והוספנו גם עיכובים ריאליסטיים בין הפקודות כדי לחקות אינטראקציה אנושית.

הפונקציה simulate_port_scan פועלת כמו כלי ריגול, שבודק באופן שיטתי כל נמל ברשימת היעד שלנו. זה דומה לאופן שבו פועלים כלים כמו nmap – עובר דרך נמלים אחד אחד כדי לראות אילו שירותים זמינים. עבור כל נמל, היא קוראת לפונקציה simulate_connection ומוסיפה עיכובים אקראיים קטנים כדי שהדפוס של הסריקה ייראה יותר טבעי.

הפונקציה simulate_brute_force שומרת רשימות של שמות משתמש וסיסמאות נפוצים, מנסה קומבינציות שונות נגד שירותים כמו FTP ו-SSH. עבור כל ניסיון, היא יוצרת חיבור חדש, שולחת את פרטי הכניסה בפורמט הנכון עבור השירות הזה, ואז סוגרת את החיבור. זה עוזר לנו לבדוק עד כמה היטב המערכת מזהה ומודיעה על התקפות של הזנת Credentials.

הפונקציה run_continuous_simulation פועלת למשך פרק זמן מוגדר, בוחרת באקראי בין סוגי התקפות שונים כמו סריקות פורטים, התקפות בכוח גס או התקפות שירות ספציפיות. היא משתמשת ב-ThreadPoolExecutor של פייתון כדי להריץ מספר התקפות בו זמנית בהתאם לרמת העוצמה המוגדרת.

לבסוף, יש לנו את הפונקציה main המספקת את ממשק השורה הפקודה עבור הסימולטור. היא משתמשת ב-argparse כדי לנהל את ארגומנטי השורה הפקודה, מאפשרת למשתמשים לציין את ה-IP המטרה, רמת העוצמה, ומשך הזמן של הסימולציה. היא יוצרת מופע של מחלקת HoneypotSimulator ומנהלת את הביצוע הכללי, כולל טיפול נכון בהפרעות ושגיאות של המשתמש.

לאחר שהנחת את קוד הסימולטור בסקריפט נפרד, הרץ אותו עם הפקודה הבאה:

# רץ עם הגדרות ברירת מחדל (עוצמה בינונית, localhost, 5 דקות)
python honeypot_simulator.py

# רץ עם הגדרות מותאמות אישית
python honeypot_simulator.py --target 192.168.1.100 --intensity high --duration 600

מאחר שאנו מפעילים את הוניפוט כמו גם את הסימולטור באותה מכונה מקומית, היעד יהיה localhost. אבל זה יכול להיות משהו אחר בתרחיש אמיתי או אם אתה מפעיל את הוניפוט במכונה וירטואלית או מכונה שונה – אז ודא שאתה מאמת את ה-IP לפני הפעלת הסימולטור.

איך לנתח נתוני הוניפוט

בואו נכתוב במהירות פונקציית עזר שתאפשר לנו לנתח את כל הנתונים שנאספו על ידי הוניפוט. מאחר ששמרנו את זה בקובץ יומן JSON, נוכל בנוחות לנתח אותו באמצעות חבילת JSON המובנית.

import datetime
import json

def analyze_logs(log_file):
    """Enhanced honeypot log analysis with temporal and behavioral patterns"""
    ip_analysis = {}
    port_analysis = {}
    hourly_attacks = {}
    data_patterns = {}

    # מעקב אחר דפוסי מושבים
    ip_sessions = {}
    attack_timeline = []

    with open(log_file, 'r') as f:
        for line in f:
            try:
                activity = json.loads(line)
                timestamp = datetime.datetime.fromisoformat(activity['timestamp'])
                ip = activity['remote_ip']
                port = activity['port']
                data = activity['data']

                # אתחול מעקב IP אם חדש
                if ip not in ip_analysis:
                    ip_analysis[ip] = {
                        'total_attempts': 0,
                        'first_seen': timestamp,
                        'last_seen': timestamp,
                        'targeted_ports': set(),
                        'unique_payloads': set(),
                        'session_count': 0
                    }

                # עדכון סטטיסטיקות IP
                ip_analysis[ip]['total_attempts'] += 1
                ip_analysis[ip]['last_seen'] = timestamp
                ip_analysis[ip]['targeted_ports'].add(port)
                ip_analysis[ip]['unique_payloads'].add(data.strip())

                # מעקב אחר דפוסי שעות
                hour = timestamp.hour
                hourly_attacks[hour] = hourly_attacks.get(hour, 0) + 1

                # ניתוח דפוסי מיקוד פורטים
                if port not in port_analysis:
                    port_analysis[port] = {
                        'total_attempts': 0,
                        'unique_ips': set(),
                        'unique_payloads': set()
                    }
                port_analysis[port]['total_attempts'] += 1
                port_analysis[port]['unique_ips'].add(ip)
                port_analysis[port]['unique_payloads'].add(data.strip())

                # מעקב אחר דפוסי מטען
                if data.strip():
                    data_patterns[data.strip()] = data_patterns.get(data.strip(), 0) + 1

                # מעקב אחר ציר זמן של התקפות
                attack_timeline.append({
                    'timestamp': timestamp,
                    'ip': ip,
                    'port': port
                })

            except (json.JSONDecodeError, KeyError) as e:
                continue

    # יצירת דוח ניתוח
    print("\n=== Honeypot Analysis Report ===")

    # 1. ניתוח מבוסס IP
    print("\nTop 10 Most Active IPs:")
    sorted_ips = sorted(ip_analysis.items(), 
                       key=lambda x: x[1]['total_attempts'], 
                       reverse=True)[:10]
    for ip, stats in sorted_ips:
        duration = stats['last_seen'] - stats['first_seen']
        print(f"\nIP: {ip}")
        print(f"Total Attempts: {stats['total_attempts']}")
        print(f"Active Duration: {duration}")
        print(f"Unique Ports Targeted: {len(stats['targeted_ports'])}")
        print(f"Unique Payloads: {len(stats['unique_payloads'])}")

    # 2. ניתוח פורטים
    print("\nPort Targeting Analysis:")
    sorted_ports = sorted(port_analysis.items(),
                         key=lambda x: x[1]['total_attempts'],
                         reverse=True)
    for port, stats in sorted_ports:
        print(f"\nPort {port}:")
        print(f"Total Attempts: {stats['total_attempts']}")
        print(f"Unique Attackers: {len(stats['unique_ips'])}")
        print(f"Unique Payloads: {len(stats['unique_payloads'])}")

    # 3. ניתוח זמני
    print("\nHourly Attack Distribution:")
    for hour in sorted(hourly_attacks.keys()):
        print(f"Hour {hour:02d}: {hourly_attacks[hour]} attempts")

    # 4. ניתוח מתוחכמות התקפות
    print("\nAttacker Sophistication Analysis:")
    for ip, stats in sorted_ips:
        sophistication_score = (
            len(stats['targeted_ports']) * 0.4 +  # מגוון פורטים
            len(stats['unique_payloads']) * 0.6   # מגוון מטענים
        )
        print(f"IP {ip}: Sophistication Score {sophistication_score:.2f}")

    # 5. דפוסי מטען נפוצים
    print("\nTop 10 Most Common Payloads:")
    sorted_payloads = sorted(data_patterns.items(),
                            key=lambda x: x[1],
                            reverse=True)[:10]
    for payload, count in sorted_payloads:
        if len(payload) > 50:  # קיצור מטענים ארוכים
            payload = payload[:50] + "..."
        print(f"Count {count}: {payload}")

אתה יכול לשים את זה בקובץ סקריפט נפרד ולקרוא לפונקציה על יומני JSON. פונקציה זו תספק לנו תובנות מקיפות מהקובץ JSON על בסיס הנתונים שנאספו.

הניתוח שלנו מתחיל על ידי קבוצת הנתונים למספר קטגוריות כמו סטטיסטיקות מבוססות IP, דפוסי התקפה ממוקדים, הפצות התקפות לפי שעות ותכונות של payload. עבור כל IP, אנו עוקבים אחרי סך הניסיונות, זמני ההופעה הראשונים והאחרונים, הפורטים הממוקדים וה-payloads הייחודיים. זה יעזור לנו לבנות פרופילים ייחודיים עבור התוקפים.

אנו גם בודקים כאן דפוסי התקפה מבוססי פורט שמנטרים את הפורטים הממוקדים ביותר, וכמה תוקפים ייחודיים יש. אנו גם מבצעים ניתוח של רמת הסיבוכיות של ההתקפות שעוזר לנו לזהות תוקפים ממוקדים, תוך התייחסות לגורמים כמו פורטים ממוקדים ו-payloads ייחודיים בשימוש. ניתוח זה משמש להפרדת פעילויות סריקה פשוטות מהתקפות מתקדמות.

הניתוח הזמני עוזר לנו לזהות דפוסים בניסיונות התקפה לפי שעות, חושף דפוסים בזמן ההתקפה וקמפיינים פוטנציאליים ממומנים אוטומטית. לבסוף, אנו מפרסמים payloads הנפוצים כדי לזהות מיתרי התקפה או פקודות שנמצאות לעיתים קרובות.

שיקולי אבטחה

בעת פריסת ה-honeypot הזה, ודא שאתה מתחשב באמצעי האבטחה הבאים:

  1. הרץ את ה-honeypot שלך בסביבה מבודדת. בדרך כלל בתוך VM, או במחשב המקומי שלך שמאחורי NAT וחומת אש.

  2. הרץ את ה-honeypot עם הרשאות מערכת מינימליות (בדרך כלל לא כ-root) כדי להפחית סיכון במקרה של פשרה.

  3. היזהר עם נתונים שנאספו אם אתה מתכנן אי פעם לפרוס אותם כ"מוקש" ברמה מסחרית או למחקר, מכיוון שהם עשויים להכיל תוכנות זדוניות או מידע רגיש.

  4. יישם מנגנוני ניטור חזקים כדי לגלות ניסיונות לפרוץ מסביבת המוקש.

סיכום

עם זאת, בנינו את המוקש שלנו, כתבנו סימולטור כדי לדמות התקפות על המוקש שלנו וניתחנו את הנתונים מהיומנים של המוקש שלנו כדי לבצע כמה מסקנות פשוטות. זו דרך מצוינת להבין הן את המושגים ההתקפיים והן את המושגים ההגנתיים של אבטחת מידע. אתה יכול לשקול לבנות על כך כדי ליצור מערכות גילוי מורכבות יותר ולחשוב על הוספת תכונות כמו:

  1. הדמיית שירות דינמית על בסיס התנהגות התקפה

  2. שילוב עם מערכות מודיעין איומים שיבצעו ניתוח מסקנות טוב יותר של היומנים שנאספו מהמוקש

  3. אסוף גם יומנים מקיפים מעבר ל-IP, פורט ונתוני רשת באמצעות מנגנוני רישום מתקדמים

  4. יהיה צורך להוסיף יכולות למידת מכונה כדי לזהות דפוסי תקיפה

זכרו שאף על פי שהוניפוטים הם כלים חזקים לאבטחה, הם צריכים להיות חלק מאסטרטגיית אבטחה הגנתית מקיפה, ולא הקו היחיד של הגנה.

אני מקווה שלמדתם איך הוניפוטים פועלים, מה המטרה שלהם וכמה תכנות בפייתון גם כן!