このチュートリアルでは、Pythonを使用して簡略化されたAIフレーバーのSIEMログ解析システムを構築します。焦点はログ解析と異常検出に置かれます。
ログの取り込み、軽量機械学習モデルを使用した異常の検出、システムが自動的に応答する方法について説明します。
この実証を通じて、AIがセキュリティ監視を実用的でアクセスしやすい方法で強化できることを示します。
目次
SIEMシステムとは?
セキュリティ情報およびイベント管理(SIEM)システムは、現代のセキュリティオペレーションの中枢神経系です。SIEMは、IT環境全体からのセキュリティログとイベントを集約し、相関させて、潜在的なインシデントに関するリアルタイムの洞察を提供します。これにより、組織は脅威をより早く検出し、迅速に対応することができます。
これらのシステムは、ファイアウォールのアラートからアプリケーションログまで、大量のログデータを集め、それらの中に問題の兆候がないかを分析します。この文脈における異常検知は重要であり、ログの中の異常なパターンは静的ルールをすり抜ける可能性のあるインシデントを明らかにすることがあります。たとえば、ネットワークリクエストの突然の急増はDDoS攻撃を示す可能性があり、複数の失敗したログイン試行は不正アクセスの試みを示すかもしれません。
AIはSIEMの機能を一歩進めます。高度なAIモデル(大規模言語モデルなど)を活用することで、AI駆動のSIEMはログをインテリジェントに解析・解釈し、「通常」の行動がどのようなものかを学び、注目に値する「異常な」事象をフラグ付けすることができます。
本質的に、AIはアナリストのスマートな共同操縦者として機能し、微妙な異常を見つけ出し、発見を平易な言葉で要約することさえ可能です。最近の大規模言語モデルの進歩により、SIEMは無数のデータポイントを人間のアナリストと同様に推論することができるようになりましたが、その速度と規模ははるかに優れています。その結果、ノイズを切り抜けて本当の脅威に焦点を当てる強力なデジタルセキュリティアシスタントが誕生しました。
前提条件
始める前に、次のものを用意してください:
-
システムにPython 3.xがインストールされていること。コード例は、最近のPythonバージョンで動作するはずです。
-
Pythonプログラミング(ループ、関数、ライブラリの使用)に関する基本的な知識と、ログ(たとえば、ログエントリがどのように見えるか)に対する理解が役立ちます。
-
Pythonライブラリ: 軽量で特別なハードウェアを必要としない一般的なライブラリをいくつか使用します:
-
pandas は基本的なデータ処理用です(ログがCSVまたは類似の形式の場合)。
-
numpy は数値演算用です。
-
scikit-learn は異常検知モデル用です(具体的には、IsolationForestアルゴリズムを使用します)。
-
-
分析するログデータのセット。プレーンテキストまたはCSV形式の任意のログファイル(システムログ、アプリケーションログなど)を使用できます。デモンストレーションのため、準備されたログファイルがなくても、小さなログデータセットをシミュレートします。
注意: 上記のライブラリがない場合は、pipを使用してインストールしてください。
pip install pandas numpy scikit-learn
プロジェクトのセットアップ
シンプルなプロジェクト構造を設定しましょう。このSIEM異常検知プロジェクトのために新しいディレクトリを作成し、そこに移動します。内部にはPythonスクリプト(例:siem_anomaly_demo.py
)またはコードをステップバイステップで実行するためのJupyter Notebookを配置できます。
作業ディレクトリにログデータが含まれているか、アクセスできることを確認してください。ログファイルを使用している場合、このプロジェクトフォルダにコピーを配置することが良いでしょう。私たちの概念実証のために、合成ログデータを生成するため、外部ファイルは必要ありませんが、実際のシナリオでは必要です。
プロジェクトのセットアップ手順:
-
環境を初期化する – 希望の場合は、このプロジェクト用に仮想環境を作成します(オプションですが、良い習慣です):
python -m venv venv source venv/bin/activate # Windowsの場合は「venv\Scripts\activate」を使用
その後、この仮想環境に必要なパッケージをインストールします。
-
データソースを準備する – 分析したいログソースを特定します。これはログファイルやデータベースへのパスである可能性があります。ログの形式を把握してください(たとえば、カンマ区切り、JSON行、またはプレーンテキストですか?)。説明のために、いくつかのログエントリを作成します。
-
スクリプトまたはノートブックを設定する – Pythonファイルまたはノートブックを開きます。必要なライブラリをインポートし、再現性のためにランダムシードなどの構成を設定します。
このセットアップの最後までに、SIEMログ解析コードを実行するためのPython環境が準備され、実際のログデータセットまたは私と一緒にデータをシミュレートする意図があるはずです。
ログ解析の実装
完全なSIEMシステムでは、ログ解析はさまざまなソースからログを収集し、さらなる処理のために一貫した形式にパースすることを含みます。ログにはタイムスタンプ、重要度レベル、ソース、イベントメッセージ、ユーザーID、IPアドレスなどのフィールドが含まれることがよくあります。最初のタスクは、これらのログを取り込んで前処理することです。
1. ログの取り込み
ログがテキストファイルにある場合は、Pythonでそれらを読み取ることができます。たとえば、各ログエントリがファイルの1行である場合は、次のようにできます:
with open("my_logs.txt") as f:
raw_logs = f.readlines()
ログが構造化されている場合(たとえば、列であるCSV形式など)、Pandasを使用すると大幅に簡素化できます:
import pandas as pd
df = pd.read_csv("my_logs.csv")
print(df.head())
これにより、ログエントリが列ごとに整理されたDataFrame df
が得られます。ただし、多くのログは半構造化されています(スペースや特殊文字で区切られたコンポーネントなど)。そのような場合は、各行を区切り文字で分割するか、正規表現を使用してフィールドを抽出する必要がある場合があります。たとえば、次のようなログ行を想像してみてください:
2025-03-06 08:00:00, INFO, User login success, user: admin
これにはタイムスタンプ、ログレベル、メッセージ、ユーザーが含まれています。Pythonの文字列メソッドを使用して、このような行をパースできます:
logs = [
"2025-03-06 08:00:00, INFO, User login success, user: admin",
"2025-03-06 08:01:23, INFO, User login success, user: alice",
"2025-03-06 08:02:45, ERROR, Failed login attempt, user: alice",
# ...(さらにログ行)
]
parsed_logs = []
for line in logs:
parts = [p.strip() for p in line.split(",")]
timestamp = parts[0]
level = parts[1]
message = parts[2]
user = parts[3].split(":")[1].strip() if "user:" in parts[3] else None
parsed_logs.append({"timestamp": timestamp, "level": level, "message": message, "user": user})
# 解析を容易にするためにDataFrameに変換
df_logs = pd.DataFrame(parsed_logs)
print(df_logs.head())
上記をサンプルリストに実行すると、次のような出力が得られます。
timestamp level message user
0 2025-03-06 08:00:00 INFO User login success admin
1 2025-03-06 08:01:23 INFO User login success alice
2 2025-03-06 08:02:45 ERROR Failed login attempt alice
...
今、ログをテーブル形式に整理しました。実際のシナリオでは、ログから関連するフィールド(例:IPアドレス、エラーコードなど)を解析し続けることがあります。解析したい内容に応じて、数値の特徴量やカテゴリを抽出します。
2. プリプロセッシングと特徴抽出
ログを構造化した形式にしたら、次のステップは異常検知のための特徴量を導出することです。生のログメッセージ(文字列)だけではアルゴリズムが直接学習するのは難しいです。しばしば、数値の特徴量やカテゴリを抽出します。特徴量の例としては、次のようなものがあります:
-
イベント数:1分/1時間あたりのイベント数、各ユーザーのログイン失敗数など。
-
期間またはサイズ:ログに期間やデータサイズ(ファイル転送サイズ、クエリ実行時間など)が含まれている場合、それらの数値は直接使用できます。
-
カテゴリエンコーディング:ログレベル(INFO、ERROR、DEBUG)を数値にマッピングしたり、特定のイベントタイプをワンホットエンコーディングできます。
この概念実証では、特定のユーザーの1分間あたりのログイン試行回数というシンプルな数値特徴に焦点を当てます。これを私たちの特徴データとしてシミュレーションします。
実際のシステムでは、解析されたログエントリを時間ウィンドウとユーザーでグループ化することでこれを計算します。目標は、各数字が「特定の1分間に発生したログイン試行の回数」を表す数字の配列を取得することです。ほとんどの場合、この数字は低く(正常な動作)、特定の1分間に異常に多くの試行があった場合、それは異常(おそらくブルートフォース攻撃)です。
シミュレーションするために、正常な動作を表す50の値のリストを生成し、その後、異常に高い値をいくつか追加します:
import numpy as np
# 正常なログイン試行回数の50分をシミュレーション(平均して1分あたり約5回)
np.random.seed(42) # 再現可能な例のために
normal_counts = np.random.poisson(lam=5, size=50)
# 異常をシミュレーション:ログイン試行のスパイク(例:攻撃者が1分間に30回以上試みる)
anomalous_counts = np.array([30, 40, 50])
# データを統合する
login_attempts = np.concatenate([normal_counts, anomalous_counts])
print("Login attempts per minute:", login_attempts)
上記を実行すると、login_attempts
は次のようになります:
Login attempts per minute: [ 5 4 4 5 5 3 5 ... 4 30 40 50]
ほとんどの値は1桁ですが、最後に30回、40回、50回の試行があった3分間があり、明らかな外れ値です。これが異常検出のために準備したデータです。実際のログ分析では、この種のデータは、時間の経過に伴ってログ内のイベントをカウントすることや、ログ内容から何らかの指標を抽出することから得られるかもしれません。
データが準備できたので、異常検出モデルの構築に進むことができます。
異常検知モデルの構築方法
ログから得られたデータの異常を検出するために、機械学習アプローチを使用します。具体的には、無監視異常検知に人気のあるアルゴリズムであるIsolation Forestを使用します。
Isolation Forestは、データをランダムに分割し、ポイントを孤立させることによって機能します。異常は、他のポイントから素早く孤立する(分離される)ポイントであり、それはより少ないランダムな分割で起こります。これにより、ラベルが必要なく(どのログエントリが「悪い」かを事前に知る必要がありません)、データセット内の外れ値を特定するのに最適です。
なぜIsolation Forestなのか?
-
効率的で、大量のデータがあっても良好に機能します。
-
特定のデータ分布を仮定しません(いくつかの統計的方法とは異なります)。
-
異常をスコアリングするための明確な方法を提供します。
それでは、login_attempts
データに基づいてIsolation Forestをトレーニングしましょう:
from sklearn.ensemble import IsolationForest
# モデルが期待する形にデータを準備する(サンプル、特徴)
X = login_attempts.reshape(-1, 1) # 各サンプルは1次元の[count]
# Isolation Forestモデルを初期化する
model = IsolationForest(contamination=0.05, random_state=42)
# contamination=0.05はデータの約5%が異常値であることを予想していることを意味する
# データを使ってモデルを訓練する
model.fit(X)
コードに関するいくつかの注意点:
-
login_attempts
を1つの特徴列を持つ2D配列X
に再形成しました。scikit-learnは訓練(fit
)に2D配列が必要です。 -
contamination=0.05
を設定して、モデルにデータの約5%が異常値であることを示します。合成データでは53ポイントのうち3つの異常値を追加しましたが、これは約5.7%ですので、5%は妥当な推測です(contaminationを指定しない場合、アルゴリズムは推定に基づいてデフォルトを選択するか、一部のバージョンではデフォルト0.1を使用します)。 -
random_state=42
は再現性を確保するだけです。
この時点で、Isolation Forestモデルはデータにトレーニングされています。内部で、データを分割するランダムツリーのアンサンブルが構築されます。孤立化が難しい点(つまり、通常の点の密なクラスタにある点)は、これらのツリーの奥深くに配置され、孤立化が容易な点(外れ値)は、短いパスで配置されます。
次に、このモデルを使用して、どのデータポイントが異常と見なされるかを特定します。
テストと結果の可視化
さて、興奮する部分がやってきました:トレーニング済みモデルを使用してログデータで異常を検出します。モデルを使用して各データポイントにラベルを予測し、そのうちの外れ値としてフラグ付けされたものをフィルタリングします。
# モデルを使用して異常を予測
labels = model.predict(X)
# モデルは通常の点に+1、異常に-1を出力します
# 異常のインデックスと値を抽出
anomaly_indices = np.where(labels == -1)[0]
anomaly_values = login_attempts[anomaly_indices]
print("Anomaly indices:", anomaly_indices)
print("Anomaly values (login attempts):", anomaly_values)
今回の場合、異常は挿入した大きな数値(30、40、50など)であると予想されます。出力は次のようになるかもしれません:
Anomaly indices: [50 51 52]
Anomaly values (login attempts): [30 40 50]
具体的に「ログイン試行」について何も知らなくても、Isolation Forestはこれらの値を他のデータと異なると認識しました。
これはセキュリティの文脈での異常検知の力です。新たな攻撃がどのようなものになるかはわかりませんが、通常のパターンから大きく逸脱するもの(例えば、ユーザーがいつもの10倍のログイン試行を突然行うなど)があれば、異常検知器はそれにスポットライトを当てます。
結果の可視化
実際の分析では、データと異常を可視化することがしばしば役立ちます。例えば、時間(分単位)ごとのlogin_attempts
の値をプロットし、異常を別の色で強調することができます。
この単純な場合では、折れ線グラフはほとんど平坦な線が3〜8回/分の周囲にあることを示し、最後に3つの大きなスパイクがあります。これらのスパイクが私たちの異常です。このノートブックで実行している場合、Matplotlibを使用してこれを実現できます:
import matplotlib.pyplot as plt
plt.plot(login_attempts, label="Login attempts per minute")
plt.scatter(anomaly_indices, anomaly_values, color='red', label="Anomalies")
plt.xlabel("Time (minute index)")
plt.ylabel("Login attempts")
plt.legend()
plt.show()
ここでのテキストベースの出力では、出力結果は既に高い値が検出されたことを確認しています。より複雑な場合では、異常検知モデルは各ポイントに対して異常スコア(例えば、正常範囲からどれだけ離れているか)も提供します。例えば、Scikit-learnのIsolationForestにはdecision_function
メソッドがあり、スコア(スコアが低いほど異常度が高い)を返します。
ここでは簡単のためにスコアには触れませんが、異常度によって異常をランク付けするためにそれらを取得することができることを知っておくと良いでしょう。
異常検知が機能している場合、異常を見つけたときにはどのような対応を取ることができるでしょうか?それは自動応答について考えることにつながります。
自動応答の可能性
異常を検出することは戦いの半分に過ぎません。次のステップはそれに対応することです。企業のSIEMシステムでは、自動応答(しばしばSOAR(セキュリティ・オーケストレーション、自動化、対応)と関連付けられる)によって、インシデントへの反応時間を劇的に短縮することができます。
異常なものをフラグ付けしたAI搭載のSIEMは何ができるでしょうか?以下にいくつかの可能性を示します:
-
警告: 最も簡単なアクションは、セキュリティ担当者にアラートを送信することです。これは電子メール、Slackメッセージ、またはインシデント管理システムでチケットを作成することがあります。アラートには異常の詳細が含まれます(例:「ユーザーaliceが1分間に50回の失敗したログイン試行を行いましたが、これは異常です」)。GenAIは、アナリストに対してインシデントの明確な自然言語の要約を生成することでここで役立つことができます。
-
自動化された緩和: より高度なシステムでは、直接的な対策を取ることがあります。たとえば、ログでIPアドレスが悪意のある動作を示している場合、システムはそのIPをファイアウォールで自動的にブロックすることができます。ログインスパイクの例では、システムはユーザーアカウントを一時的にロックしたり、追加の認証を求めたりする可能性があります。これはボット攻撃かもしれないという前提のもとです。今日のAIベースのSIEMは、特定の脅威が検出されたときに事前定義された応答アクションをトリガーしたり、複雑なワークフローをオーケストレートしたりすることができます(詳細については、AI SIEM: How SIEM with AI/ML is Revolutionizing the SOC | Exabeamを参照してください)。
-
調査支援: ジェネレーティブAIは、自動的にコンテキストを収集するためにも使用できます。例えば、異常を検出した際に、システムが関連するログ(周囲のイベント、同じユーザーや同じIPからの他のアクション)を引き出し、集計されたレポートを提供することができます。これにより、アナリストは複数のデータソースを手動で問い合わせる必要がなくなります。
自動応答を慎重に実装することが重要です — システムが誤検知に過剰反応してしまってはなりません。一般的な戦略は段階的な応答です:低信頼度の異常は警告を記録するか、低優先度のアラートを送信するだけですが、高信頼度の異常(または異常の組み合わせ)は、積極的な防御措置を引き起こします。
実際には、AI搭載のSIEMは、これらのアクションを実行するためにあなたのインフラストラクチャと統合されます(APIやスクリプトなどを通じて)。私たちのPythonのPoCでは、異常が検出されたときにメッセージを印刷するか、ダミー関数を呼び出すことで自動応答をシミュレーションできます。例えば:
if len(anomaly_indices) > 0:
print(f"Alert! Detected {len(anomaly_indices)} anomalous events. Initiating response procedures...")
# ここで、ユーザーを無効にしたり、管理者に通知するためのコードを追加できます。
私たちのデモはシンプルですが、これをスケールアップすることは容易に想像できます。SIEMは、例えば、異常をより大きな生成モデルに供給し、状況を評価して最適な行動方針を決定することができます(あなたの運用手順を知っているチャットボットOpsアシスタントのように)。AIがより洗練されるにつれて、自動化の可能性は広がっています。
結論
このチュートリアルでは、ログデータを取り込み、機械学習モデルを使用して異常を分析し、セキュリティ脅威を表す可能性のある異常なイベントを特定する基本的なAI搭載SIEMコンポーネントを構築しました。
ログデータをパースして準備し、次にIsolation Forestモデルを使用してログイン試行数のストリーム内の外れ値を検出しました。このモデルは、「攻撃」がどのようなものかについての事前知識なしに、標準から外れた動作を成功裏にフラッグしました — 学習した通常のパターンからの逸脱のみに依存しました。
また、そのようなシステムが異常を検知した場合には、人間に通知するだけでなく、自動的に対処する方法についても議論しました。
AI/MLを活用した現代のSIEMシステムは、この方向に進化しています。問題を検知するだけでなく、トリアージや対応も支援します。生成型AIはさらにこれを強化し、アナリストから学習して知的な要約や意思決定を提供し、セキュリティオペレーションセンターで不疲労のアシスタントとなります。
次のステップと改善策について:
-
実際のログデータでこのアプローチを試してみることができます。たとえば、システムのログファイルから「1時間あたりのエラーログ数」や「セッションごとの転送バイト数」といった特徴量を抽出し、それに対して異常検知を実行することができます。
-
One-Class SVMやLocal Outlier Factorなどの他のアルゴリズムを使用して異常検知を行い、比較してみることもおすすめです。
-
ログの行を解析したり、異常を説明するために簡単な言語モデルを組み込むことも検討してみてください。たとえば、異常なログエントリーを読み取り、何が問題かを提案することができます(「このエラーは通常、データベースにアクセスできないことを意味します」といった具体的なメッセージ)。
-
機能を拡張する:実際のSIEMでは、同時に多くの信号を使用します(失敗したログイン数、異常なIPジオロケーション、ログ内の珍しいプロセス名など)。より多くの機能とデータが検出のコンテキストを改善します。
Source:
https://www.freecodecamp.org/news/how-to-create-a-python-siem-system-using-ai-and-llms/