サイバーセキュリティにおいて、ハニーポットは、システムを妨害しようとする潜在的な攻撃者を引き寄せ、検出するために設計された偽装システムです。まるで、外に置かれたハチミツの壺がハエを引き寄せるように。
これらのハニーポットをあなたのシステムのセキュリティカメラのように考えてください。セキュリティカメラが誰が建物に侵入しようとしているのか、どのようにしているのかを理解するのを助けるのと同様に、これらのハニーポットは誰があなたのシステムを攻撃しようとしているのか、どの技術を使っているのかを理解する手助けをします。
このチュートリアルの終わりまでに、Pythonでデモハニーポットを書くことができ、ハニーポットがどのように機能するのかを理解できるようになります。
目次
ハニーポットの種類の理解
自分のハニーポットを設計する前に、まずはその種類を簡単に理解しましょう:
-
プロダクションハニーポット:この種類のハニーポットは、実際の生産環境に配置され、実際のセキュリティ攻撃を検出するために使用されます。通常は設計がシンプルで、維持管理と展開が容易であり、リスクを減らすために限られたインタラクションを提供します。
-
リサーチハニーポット:これらは、セキュリティ研究者によって設定されるより複雑なシステムで、攻撃パターンを研究し、これらのパターンに関する経験的分析を行い、マルウェアサンプルを収集し、以前に発見されていない新しい攻撃手法を理解するために使用されます。これらはしばしば、実際の生産環境のアプリケーションとして振る舞うのではなく、全体のオペレーティングシステムやネットワークをエミュレートします。
このチュートリアルでは、接続試行と基本的な攻撃者の行動を記録する中間インタラクションハニーポットを構築します。
開発環境の設定方法
Pythonで開発環境を設定することから始めましょう。次のコマンドを実行します:
import socket
import sys
import datetime
import json
import threading
from pathlib import Path
# ロギングディレクトリを設定
LOG_DIR = Path("honeypot_logs")
LOG_DIR.mkdir(exist_ok=True)
組み込みライブラリに従って進めるため、外部依存関係をインストールする必要はありません。ログはhoneypot_logs
ディレクトリに保存されます。
コアハニーポットの構築方法
基本的なハニーポットは、次の3つのコンポーネントで構成されます:
-
接続を受け入れるネットワークリスナー
-
アクティビティを記録するロギングシステム
-
攻撃者と対話する基本的なエミュレーションサービス
さて、コアハニーポットクラスの初期化を開始しましょう:
class Honeypot:
def __init__(self, bind_ip="0.0.0.0", ports=None):
self.bind_ip = bind_ip
self.ports = ports or [21, 22, 80, 443] # 監視するデフォルトポート
self.active_connections = {}
self.log_file = LOG_DIR / f"honeypot_{datetime.datetime.now().strftime('%Y%m%d')}.json"
def log_activity(self, port, remote_ip, data):
"""Log suspicious activity with timestamp and details"""
activity = {
"timestamp": datetime.datetime.now().isoformat(),
"remote_ip": remote_ip,
"port": port,
"data": data.decode('utf-8', errors='ignore')
}
with open(self.log_file, 'a') as f:
json.dump(activity, f)
f.write('\n')
def handle_connection(self, client_socket, remote_ip, port):
"""Handle individual connections and emulate services"""
service_banners = {
21: "220 FTP server ready\r\n",
22: "SSH-2.0-OpenSSH_8.2p1 Ubuntu-4ubuntu0.1\r\n",
80: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n",
443: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n"
}
try:
# サービスに適したバナーを送信する
if port in service_banners:
client_socket.send(service_banners[port].encode())
# 攻撃者からデータを受信する
while True:
data = client_socket.recv(1024)
if not data:
break
self.log_activity(port, remote_ip, data)
# 偽の応答を送信する
client_socket.send(b"Command not recognized.\r\n")
except Exception as e:
print(f"Error handling connection: {e}")
finally:
client_socket.close()
このクラスには多くの重要な情報が含まれているので、各関数を一つずつ確認していきましょう。
__init__
関数は、ハニーポットをホストするIPアドレスとポート番号、およびログファイルのパス/ファイル名を記録します。また、ハニーポットへのアクティブな接続の総数を記録しておきます。
log_activity
関数は、IP、データ、およびIPが接続を試みたポートに関する情報を受信します。そして、この情報をJSON形式のログファイルに追加します。
handle_connection
関数は、私たちが持っている異なるポートで実行されるこれらのサービスを模倣します。ハニーポットはポート21、22、80、443で実行されます。これらのサービスは、それぞれFTP、SSH、HTTP、HTTPSプロトコルに対応しています。したがって、ハニーポットとインタラクトしようとする攻撃者は、これらのポートでこれらのサービスを期待する必要があります。
これらのサービスの動作を模倣するために、実際に使用されるサービスバナーを利用します。この関数は、攻撃者が接続した際に適切なバナーを最初に送信し、その後データを受信してログに記録します。ハニーポットは、攻撃者に対して「コマンドは認識されません」という偽の応答も送信します。
ネットワークリスナーを実装する
今、受信接続を処理するネットワークリスナーを実装しましょう。そのために、シンプルなソケットプログラミングを使用します。ソケットプログラミングの仕組みについて知らない場合は、この記事をチェックしてください。この中で関連するいくつかの概念が説明されています。
def start_listener(self, port):
"""Start a listener on specified port"""
try:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((self.bind_ip, port))
server.listen(5)
print(f"[*] Listening on {self.bind_ip}:{port}")
while True:
client, addr = server.accept()
print(f"[*] Accepted connection from {addr[0]}:{addr[1]}")
# 別スレッドで接続を処理する
client_handler = threading.Thread(
target=self.handle_connection,
args=(client, addr[0], port)
)
client_handler.start()
except Exception as e:
print(f"Error starting listener on port {port}: {e}")
start_listener
関数はサーバーを起動し、指定されたポートでリスニングを開始します。私たちのためのbind_ip
は0.0.0.0
となり、これはサーバーがすべてのネットワークインターフェースでリスニングすることを示します。
今、新しい接続ごとに別のスレッドで処理します。なぜなら、複数の攻撃者がハニーポットと相互作用しようとしたり、攻撃スクリプトやツールがハニーポットをスキャンする場合があるからです。スレッドがどのように機能するか知らない場合は、この記事をチェックしてください。これはPythonにおけるスレッディングと同時実行性を説明しています。
また、この関数をコアHoneypot
クラスに入れることを確認してください。
ハニーポットを実行する
さて、私たちのハニーポットを開始するmain
関数を作成しましょう。
def main():
honeypot = Honeypot()
# 別スレッドで各ポートのリスナーを起動する
for port in honeypot.ports:
listener_thread = threading.Thread(
target=honeypot.start_listener,
args=(port,)
)
listener_thread.daemon = True
listener_thread.start()
try:
# メインスレッドを生かしておく
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n[*] Shutting down honeypot...")
sys.exit(0)
if __name__ == "__main__":
main()
この関数は、Honeypot
クラスをインスタンス化し、定義された各ポート(21、22、80、443)に対してリスナーを個別のスレッドとして開始します。次に、実際のプログラムを実行しているメインスレッドを無限ループに入れて生かしておきます。これらをすべてスクリプトにまとめて実行します。
ハニーポット攻撃シミュレーターを作成します。
次に、いくつかの攻撃シナリオをシミュレートしてハニーポットをターゲットにし、JSONログファイルにデータを収集できるようにしましょう。
このシミュレーターは、ハニーポットに関するいくつかの重要な側面を示すのに役立ちます:
-
現実的な攻撃パターン:シミュレーターは、ポートスキャン、ブルートフォース攻撃、サービス固有の脆弱性を含む一般的な攻撃パターンをシミュレートします。
-
変動する強度:シミュレーターは、シミュレーションの強度を調整して、ハニーポットが異なる負荷に対処する方法をテストします。
-
複数の攻撃タイプ:実際の攻撃者が試みる可能性のあるさまざまな攻撃タイプを示し、ハニーポットがそれぞれにどのように反応するかを理解するのに役立ちます。
-
同時接続:シミュレーターはスレッドを使用して、あなたのハニーポットが複数の同時接続をどのように処理するかをテストします。
# honeypot_simulator.py
import socket
import time
import random
import threading
from concurrent.futures import ThreadPoolExecutor
import argparse
class HoneypotSimulator:
"""
A class to simulate different types of connections and attacks against our honeypot.
This helps in testing the honeypot's logging and response capabilities.
"""
def __init__(self, target_ip="127.0.0.1", intensity="medium"):
# シミュレーターの設定
self.target_ip = target_ip
self.intensity = intensity
# 攻撃者がよくスキャンする一般的なポート
self.target_ports = [21, 22, 23, 25, 80, 443, 3306, 5432]
# 異なるサービスに対して攻撃者が使用する一般的なコマンドの辞書
self.attack_patterns = {
21: [ # FTPコマンド
"USER admin\r\n",
"PASS admin123\r\n",
"LIST\r\n",
"STOR malware.exe\r\n"
],
22: [ # SSHの試行
"SSH-2.0-OpenSSH_7.9\r\n",
"admin:password123\n",
"root:toor\n"
],
80: [ # HTTPリクエスト
"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n",
"POST /admin HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
"GET /wp-admin HTTP/1.1\r\nHost: localhost\r\n\r\n"
]
}
# 強度設定はシミュレートされた攻撃の頻度とボリュームに影響を与えます
self.intensity_settings = {
"low": {"max_threads": 2, "delay_range": (1, 3)},
"medium": {"max_threads": 5, "delay_range": (0.5, 1.5)},
"high": {"max_threads": 10, "delay_range": (0.1, 0.5)}
}
def simulate_connection(self, port):
"""
Simulates a connection attempt to a specific port with realistic attack patterns
"""
try:
# 新しいソケット接続を作成
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
print(f"[*] Attempting connection to {self.target_ip}:{port}")
sock.connect((self.target_ip, port))
# バナーがあれば取得
banner = sock.recv(1024)
print(f"[+] Received banner from port {port}: {banner.decode('utf-8', 'ignore').strip()}")
# ポートに基づいて攻撃パターンを送信
if port in self.attack_patterns:
for command in self.attack_patterns[port]:
print(f"[*] Sending command to port {port}: {command.strip()}")
sock.send(command.encode())
# 応答を待つ
try:
response = sock.recv(1024)
print(f"[+] Received response: {response.decode('utf-8', 'ignore').strip()}")
except socket.timeout:
print(f"[-] No response received from port {port}")
# コマンド間にリアルな遅延を追加
time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))
sock.close()
except ConnectionRefusedError:
print(f"[-] Connection refused on port {port}")
except socket.timeout:
print(f"[-] Connection timeout on port {port}")
except Exception as e:
print(f"[-] Error connecting to port {port}: {e}")
def simulate_port_scan(self):
"""
Simulates a basic port scan across common ports
"""
print(f"\n[*] Starting port scan simulation against {self.target_ip}")
for port in self.target_ports:
self.simulate_connection(port)
time.sleep(random.uniform(0.1, 0.3))
def simulate_brute_force(self, port):
"""
Simulates a brute force attack against a specific service
"""
common_usernames = ["admin", "root", "user", "test"]
common_passwords = ["password123", "admin123", "123456", "root"]
print(f"\n[*] Starting brute force simulation against port {port}")
for username in common_usernames:
for password in common_passwords:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect((self.target_ip, port))
if port == 21: # FTP
sock.send(f"USER {username}\r\n".encode())
sock.recv(1024)
sock.send(f"PASS {password}\r\n".encode())
elif port == 22: # SSH
sock.send(f"{username}:{password}\n".encode())
sock.close()
time.sleep(random.uniform(0.1, 0.3))
except Exception as e:
print(f"[-] Error in brute force attempt: {e}")
def run_continuous_simulation(self, duration=300):
"""
Runs a continuous simulation for a specified duration
"""
print(f"\n[*] Starting continuous simulation for {duration} seconds")
print(f"[*] Intensity level: {self.intensity}")
end_time = time.time() + duration
with ThreadPoolExecutor(
max_workers=self.intensity_settings[self.intensity]["max_threads"]
) as executor:
while time.time() < end_time:
# 異なる攻撃パターンのミックス
simulation_choices = [
lambda: self.simulate_port_scan(),
lambda: self.simulate_brute_force(21),
lambda: self.simulate_brute_force(22),
lambda: self.simulate_connection(80)
]
# ランダムに選択して攻撃パターンを実行
executor.submit(random.choice(simulation_choices))
time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))
def main():
"""
Main function to run the honeypot simulator with command-line arguments
"""
parser = argparse.ArgumentParser(description="Honeypot Attack Simulator")
parser.add_argument("--target", default="127.0.0.1", help="Target IP address")
parser.add_argument(
"--intensity",
choices=["low", "medium", "high"],
default="medium",
help="Simulation intensity level"
)
parser.add_argument(
"--duration",
type=int,
default=300,
help="Simulation duration in seconds"
)
args = parser.parse_args()
simulator = HoneypotSimulator(args.target, args.intensity)
try:
simulator.run_continuous_simulation(args.duration)
except KeyboardInterrupt:
print("\n[*] Simulation interrupted by user")
except Exception as e:
print(f"[-] Simulation error: {e}")
finally:
print("\n[*] Simulation complete")
if __name__ == "__main__":
main()
このシミュレーションスクリプトには多くのことがあるので、一つずつ分解していきましょう。また、コードを少し読みやすくするために、すべての関数と操作にコメントを追加しました。
まず、HoneypotSimulator
というユーティリティクラスがあります。このクラスでは、シミュレーターの基本的な設定を行う__init__
関数があります。これは、ターゲットIPアドレス(デフォルトはlocalhost)と強度レベル(デフォルトは「中」)の2つのパラメータを取ります。
また、重要な3つのコンポーネントを定義しています。ターゲットポートのプローブ(FTP、SSH、HTTPなどの一般的なサービス)、各サービス固有の攻撃パターン(ログイン試行やコマンドなど)、そしてスレッド数やタイミング遅延を通じてシミュレーションの攻撃性を制御する強度設定です。
simulate_connection
関数は、特定のポートへの個別接続試行を処理します。ソケット接続を作成し、サービスバナー(SSHバージョン情報など)を取得しようとし、サービスの種類に基づいて適切な攻撃コマンドを送信します。一般的なネットワーク問題に対するエラーハンドリングを追加し、人間の相互作用を模倣するためにコマンド間に現実的な遅延を追加しました。
simulate_port_scan
関数は、ターゲットリスト内の各ポートを体系的にチェックする偵察ツールのように機能します。これは、nmap
のようなツールが動作する方法に似ており、利用可能なサービスを確認するためにポートを1つずつ通過します。各ポートに対して、simulate_connection
関数を呼び出し、スキャンパターンがより自然に見えるように小さなランダム遅延を加えます。
simulate_brute_force
関数は、一般的なユーザー名とパスワードのリストを維持し、FTPやSSHなどのサービスに対して異なる組み合わせを試みます。各試行のために新しい接続を作成し、そのサービスに対して正しい形式でログイン資格情報を送信し、その後接続を閉じます。これにより、ハニーポットが資格情報の詰め込み攻撃をどれだけ効果的に検出し、ログに記録するかをテストするのに役立ちます。
run_continuous_simulation
関数は指定された期間実行され、ポートスキャン、ブルートフォース、または特定のサービス攻撃などの異なる攻撃タイプの中からランダムに選択します。指定された強度レベルに基づいて複数の攻撃を同時に実行するために、PythonのThreadPoolExecutor
を使用します。
最後に、シミュレーターのコマンドラインインターフェースを提供するmain
関数があります。argparse
を使用してコマンドライン引数を処理し、ユーザーがターゲットIP、強度レベル、およびシミュレーションの期間を指定できるようにします。HoneypotSimulator
クラスのインスタンスを作成し、ユーザーの中断やエラーの適切な処理を含む全体の実行を管理します。
シミュレーターコードを別のスクリプトに分けた後、次のコマンドで実行します:
# デフォルト設定(中強度、ローカルホスト、5分)で実行
python honeypot_simulator.py
# カスタム設定で実行
python honeypot_simulator.py --target 192.168.1.100 --intensity high --duration 600
同じマシンでローカルにハニーポットとシミュレーターを実行しているので、ターゲットはlocalhost
になります。しかし、実際のシナリオや、ハニーポットをVMや別のマシンで実行している場合は、別のものになる可能性がありますので、シミュレーターを実行する前にIPを確認してください。
ハニーポットデータの分析方法
ハニーポットによって収集されたすべてのデータを分析するためのヘルパー関数を迅速に作成しましょう。これをJSONログファイルに保存しているので、組み込みのJSONパッケージを使用して便利に解析できます。
import datetime
import json
def analyze_logs(log_file):
"""Enhanced honeypot log analysis with temporal and behavioral patterns"""
ip_analysis = {}
port_analysis = {}
hourly_attacks = {}
data_patterns = {}
# セッションパターンの追跡
ip_sessions = {}
attack_timeline = []
with open(log_file, 'r') as f:
for line in f:
try:
activity = json.loads(line)
timestamp = datetime.datetime.fromisoformat(activity['timestamp'])
ip = activity['remote_ip']
port = activity['port']
data = activity['data']
# 新しい場合はIPトラッキングを初期化
if ip not in ip_analysis:
ip_analysis[ip] = {
'total_attempts': 0,
'first_seen': timestamp,
'last_seen': timestamp,
'targeted_ports': set(),
'unique_payloads': set(),
'session_count': 0
}
# IP統計を更新
ip_analysis[ip]['total_attempts'] += 1
ip_analysis[ip]['last_seen'] = timestamp
ip_analysis[ip]['targeted_ports'].add(port)
ip_analysis[ip]['unique_payloads'].add(data.strip())
# 時間ごとのパターンを追跡
hour = timestamp.hour
hourly_attacks[hour] = hourly_attacks.get(hour, 0) + 1
# ポートターゲティングパターンを分析
if port not in port_analysis:
port_analysis[port] = {
'total_attempts': 0,
'unique_ips': set(),
'unique_payloads': set()
}
port_analysis[port]['total_attempts'] += 1
port_analysis[port]['unique_ips'].add(ip)
port_analysis[port]['unique_payloads'].add(data.strip())
# ペイロードパターンを追跡
if data.strip():
data_patterns[data.strip()] = data_patterns.get(data.strip(), 0) + 1
# 攻撃タイムラインを追跡
attack_timeline.append({
'timestamp': timestamp,
'ip': ip,
'port': port
})
except (json.JSONDecodeError, KeyError) as e:
continue
# 分析レポート生成
print("\n=== Honeypot Analysis Report ===")
# 1. IPベースの分析
print("\nTop 10 Most Active IPs:")
sorted_ips = sorted(ip_analysis.items(),
key=lambda x: x[1]['total_attempts'],
reverse=True)[:10]
for ip, stats in sorted_ips:
duration = stats['last_seen'] - stats['first_seen']
print(f"\nIP: {ip}")
print(f"Total Attempts: {stats['total_attempts']}")
print(f"Active Duration: {duration}")
print(f"Unique Ports Targeted: {len(stats['targeted_ports'])}")
print(f"Unique Payloads: {len(stats['unique_payloads'])}")
# 2. ポート分析
print("\nPort Targeting Analysis:")
sorted_ports = sorted(port_analysis.items(),
key=lambda x: x[1]['total_attempts'],
reverse=True)
for port, stats in sorted_ports:
print(f"\nPort {port}:")
print(f"Total Attempts: {stats['total_attempts']}")
print(f"Unique Attackers: {len(stats['unique_ips'])}")
print(f"Unique Payloads: {len(stats['unique_payloads'])}")
# 3. 時間的分析
print("\nHourly Attack Distribution:")
for hour in sorted(hourly_attacks.keys()):
print(f"Hour {hour:02d}: {hourly_attacks[hour]} attempts")
# 4. 攻撃の高度化分析
print("\nAttacker Sophistication Analysis:")
for ip, stats in sorted_ips:
sophistication_score = (
len(stats['targeted_ports']) * 0.4 + # ポートの多様性
len(stats['unique_payloads']) * 0.6 # ペイロードの多様性
)
print(f"IP {ip}: Sophistication Score {sophistication_score:.2f}")
# 5. 一般的なペイロードパターン
print("\nTop 10 Most Common Payloads:")
sorted_payloads = sorted(data_patterns.items(),
key=lambda x: x[1],
reverse=True)[:10]
for payload, count in sorted_payloads:
if len(payload) > 50: # 長いペイロードを切り捨てる
payload = payload[:50] + "..."
print(f"Count {count}: {payload}")
このコードを別のスクリプトファイルに置き、JSONログ上で関数を呼び出すことができます。この関数は収集したデータに基づいてJSONファイルから包括的な洞察を提供します。
私たちの分析は、データをIPベースの統計、ポートターゲティングパターン、時間ごとの攻撃分布、ペイロードの特性など、いくつかのカテゴリにグループ化することから始まります。各IPについて、総試行回数、最初と最後の確認時間、ターゲットとするポート、ユニークなペイロードを追跡しています。これにより、攻撃者のユニークなプロファイルを構築するのに役立ちます。
また、ここではポートベースの攻撃パターンも調査し、最も頻繁にターゲットとされるポートと、それに対するユニークな攻撃者の数を監視します。さらに、ターゲットとする攻撃者を特定する助けとなる攻撃の洗練度分析も行い、ターゲットにされたポートや使用されたユニークなペイロードなどの要因を考慮します。この分析は、単純なスキャン活動と洗練された攻撃を区別するために使用されます。
時間的分析は、時間ごとの攻撃試行におけるパターンを特定し、攻撃のタイミングや潜在的な自動ターゲティングキャンペーンのパターンを明らかにするのに役立ちます。最後に、一般的に見られるペイロードを公開し、一般的に見られる攻撃文字列やコマンドを特定します。
セキュリティに関する考慮事項
このハニーポットを展開する際には、以下のセキュリティ対策を考慮してください:
-
ハニーポットを隔離された環境で実行します。通常はVM内、またはNATとファイアウォールの背後にあるローカルマシンで実行します。
-
ハニーポットを最小限のシステム権限で実行します(通常はrootではなく)侵害された場合のリスクを軽減します。
-
本番環境や研究用のハニーポットとして展開する予定がある場合、収集したデータにはマルウェアや機密情報が含まれている可能性があるため、注意が必要です。
-
ハニーポット環境からの脱出を試みる行為を検出するために、堅牢な監視メカニズムを実装してください。
結論
これにより、ハニーポットを構築し、ハニーポットへの攻撃をシミュレートするシミュレーターを作成し、ハニーポットのログからデータを分析していくつかの簡単な推論を行いました。これは、攻撃的および防御的なセキュリティの概念を理解するための優れた方法です。これを基にして、より複雑な検出システムを構築し、以下のような機能を追加することを考慮できます:
-
攻撃の振る舞いに基づいた動的サービスエミュレーション
-
収集したハニーポットログのより良い推論分析を行う脅威インテリジェンスシステムとの統合
-
IP、ポート、ネットワークデータを超えた包括的なログを高度なロギングメカニズムを通じて収集する
-
攻撃パターンを検出するための機械学習機能を追加する
ハニーポットは強力なセキュリティツールですが、唯一の防御線ではなく、包括的な防御戦略の一部であるべきことを忘れないでください。
ハニーポットの仕組み、目的、そして少しのPythonプログラミングについて学んでいただけたことを願っています!
Source:
https://www.freecodecamp.org/news/build-a-honeypot-with-python/