在本教程中,我们将使用Python构建一个简化的、AI风格的SIEM日志分析系统。我们的重点将放在日志分析和异常检测上。

我们将介绍如何摄入日志,使用轻量级机器学习模型检测异常,并且甚至触及系统如何自动响应。

这个实践性的概念验证将展示AI如何以实际、可访问的方式增强安全监控。

目录

什么是SIEM系统?

安全信息与事件管理(SIEM)系统是现代安全运营的中枢神经系统。SIEM聚合并关联来自IT环境的安全日志和事件,以提供对潜在事件的实时洞察。这帮助组织更快地检测威胁并更早地做出响应。

这些系统汇集了大量的日志数据——从防火墙警报到应用程序日志——并分析它们以寻找潜在问题的迹象。在这种情况下,异常检测至关重要,日志中的异常模式可以揭示可能被静态规则忽视的事件。例如,网络请求的突然激增可能表示DDoS攻击,而多次登录失败的尝试可能指向未授权访问的尝试。

AI将SIEM功能提升到一个新水平。通过利用先进的AI模型(如大型语言模型),AI驱动的SIEM能够智能解析和解释日志,学习“正常”行为的样子,并标记出需要关注的“异常”情况。

本质上,AI可以作为分析师的智能副驾驶,发现微妙的异常,甚至用简单的语言总结发现的内容。最近在大型语言模型方面的进展,使得SIEM能够像人类分析师一样对无数数据点进行推理——但速度和规模要大得多。结果是一个强大的数字安全助手,帮助排除噪音,专注于真正的威胁。

先决条件

在我们深入之前,请确保您具备以下条件:

  • 在您的系统上安装Python 3.x。代码示例应适用于任何较新的Python版本。

  • 对Python编程(循环、函数、使用库)有基本了解,并理解日志(例如,日志条目的样子)将会有所帮助。

  • Python库:我们将使用一些常见的轻量级库,不需要特殊硬件:

    • pandas 用于基本数据处理(如果您的日志是CSV或类似格式)。

    • numpy 用于数值运算。

    • scikit-learn 用于异常检测模型(具体来说,我们将使用IsolationForest算法)。

  • 要分析的一组日志数据。您可以使用任何以纯文本或CSV格式的日志文件(系统日志、应用程序日志等)。为了演示方便,我们将模拟一个小的日志数据集,以便您可以在没有现成的日志文件的情况下跟随进行。

注意:如果您没有上述的库,请通过pip安装它们:

pip install pandas numpy scikit-learn

设置项目

让我们建立一个简单的项目结构。为这个SIEM异常检测项目创建一个新目录并进入其中。在其中,您可以有一个Python脚本(例如,siem_anomaly_demo.py)或一个Jupyter笔记本,以逐步运行代码。

确保您的工作目录包含或可以访问您的日志数据。如果您正在使用日志文件,将其副本放在此项目文件夹中可能是一个好主意。对于我们的概念验证,由于我们将生成合成日志数据,我们不需要外部文件-但在实际情况下,您可能需要。

项目设置步骤:

  1. 初始化环境 – 如果你愿意,为这个项目创建一个虚拟环境(可选但是好的实践):

     python -m venv venv
     source venv/bin/activate  # 在Windows上使用 "venv\Scripts\activate"
    

    然后在这个虚拟环境中安装所需的包。

  2. 准备数据源 – 确定你想要分析的日志源。这可以是日志文件或数据库的路径。确保你知道日志的格式(例如,是逗号分隔、JSON行还是纯文本?)。为了说明,我们将编造一些日志条目。

  3. 设置你的脚本或笔记本 – 打开你的Python文件或笔记本。我们将首先导入必要的库并设置任何配置(例如用于可重复性的随机种子)。

完成这个设置后,你应该有一个准备好运行我们的SIEM日志分析代码的Python环境,以及一个真实的日志数据集或者模拟数据的意图。

实现日志分析

在一个完整的SIEM系统中,日志分析涉及从各种来源收集日志并将其解析为统一的格式进行进一步处理。日志通常包含时间戳、严重程度级别、来源、事件消息、用户ID、IP地址等字段。第一个任务是摄取和预处理这些日志。

1. 日志摄取

如果你的日志是文本文件,你可以在Python中读取它们。例如,如果每个日志条目是文件中的一行,你可以这样做:

with open("my_logs.txt") as f:
    raw_logs = f.readlines()

如果日志是结构化的(比如,带有列的CSV格式),Pandas可以极大地简化读取过程:

import pandas as pd
df = pd.read_csv("my_logs.csv")
print(df.head())

这将给你一个名为df的数据帧,其中你的日志条目按列组织。但是许多日志是半结构化的(例如,由空格或特殊字符分隔的组件)。在这种情况下,你可能需要通过分隔符拆分每一行,或者使用正则表达式提取字段。例如,想象一下一行日志:

2025-03-06 08:00:00, INFO, User login success, user: admin

我们可以使用Python的字符串方法解析这样的行:

logs = [
    "2025-03-06 08:00:00, INFO, User login success, user: admin",
    "2025-03-06 08:01:23, INFO, User login success, user: alice",
    "2025-03-06 08:02:45, ERROR, Failed login attempt, user: alice",
    # ...(更多的日志行)
]
parsed_logs = []
for line in logs:
    parts = [p.strip() for p in line.split(",")]
    timestamp = parts[0]
    level = parts[1]
    message = parts[2]
    user = parts[3].split(":")[1].strip() if "user:" in parts[3] else None
    parsed_logs.append({"timestamp": timestamp, "level": level, "message": message, "user": user})

# 转换为数据帧以便更容易进行分析
df_logs = pd.DataFrame(parsed_logs)
print(df_logs.head())

在我们的示例列表上运行上述代码将输出类似于:

            timestamp  level                 message   user
0  2025-03-06 08:00:00   INFO    User login success   admin
1  2025-03-06 08:01:23   INFO    User login success   alice
2  2025-03-06 08:02:45  ERROR  Failed login attempt   alice
...

现在我们已经将日志结构化成了表格。在实际场景中,您可以继续解析日志中的所有相关字段(例如IP地址、错误代码等),具体取决于您想要分析的内容。

2. 预处理和特征提取

有了结构化格式的日志,下一步是为异常检测提取特征。单独的原始日志消息(字符串)对于算法来说很难直接学习。我们通常提取可以量化的数值特征或类别特征。一些特征的示例包括:

  • 事件计数:每分钟/每小时的事件数量,每个用户的登录失败次数等。

  • 持续时间或大小:如果日志包括持续时间或数据大小(例如文件传输大小,查询执行时间),那些数值可以直接使用。

  • 分类编码:日志级别(INFO、ERROR、DEBUG)可以映射为数字,或者特定事件类型可以进行独热编码。

对于这个概念验证,让我们专注于一个简单的数值特征:给定用户每分钟登录尝试的次数。我们将将此模拟为我们的特征数据。

在真实系统中,您将通过时间窗口和用户对解析的日志条目进行分组来计算此值。目标是获得一个数字数组,其中每个数字表示“在给定的一分钟内发生了多少次登录尝试”。大多数情况下,这个数字会很低(正常行为),但如果某一分钟出现异常高的尝试次数,那就是一个异常(可能是暴力攻击)。

为了模拟,我们将生成一个包含50个代表正常行为的值的列表,然后附加一些异常高的值:

import numpy as np

# 模拟50分钟的正常登录尝试次数(平均每分钟约5次)
np.random.seed(42)  # 用于可重复的示例
normal_counts = np.random.poisson(lam=5, size=50)

# 模拟异常情况:登录尝试激增(例如,攻击者在一分钟内尝试30次以上)
anomalous_counts = np.array([30, 40, 50])

# 组合数据
login_attempts = np.concatenate([normal_counts, anomalous_counts])
print("Login attempts per minute:", login_attempts)

运行上述代码后,login_attempts可能如下:

Login attempts per minute: [ 5  4  4  5  5  3  5  ...  4 30 40 50]

大多数值都在个位数,但在最后三分钟中,有30、40和50次尝试 – 明显的异常值。这是我们用于异常检测的准备数据。在真实的日志分析中,这种数据可能来自于按时间计数日志中的事件或从日志内容中提取某些指标。

现在我们的数据准备好了,我们可以开始构建异常检测模型。

如何构建异常检测模型

为了检测我们的日志数据中的异常,我们将采用机器学习方法。具体而言,我们将使用孤立森林(Isolation Forest)——一种流行的无监督异常检测算法。

孤立森林通过随机分割数据并隔离数据点来工作。异常点是那些被迅速隔离(与其他点分离)的点,即在较少的随机分割次数内被隔离。这使得它非常适合在数据集中识别异常值,而无需任何标签(我们不需要事先知道哪些日志条目是“坏的”)。

为什么选择孤立森林?

  • 它高效且在数据量很大时也能很好地工作。

  • 它不假设任何特定的数据分布(不像某些统计方法)。

  • 它为我们提供了一种简单直接的评分异常的方法。

让我们在我们的login_attempts数据上训练一个孤立森林:

from sklearn.ensemble import IsolationForest

# 准备数据以符合模型的期望形状(样本,特征)
X = login_attempts.reshape(-1, 1)  # 每个样本是一个一维数组[数量]

# 初始化隔离森林模型
model = IsolationForest(contamination=0.05, random_state=42)
# 污染度=0.05 表示我们预计大约有5%的数据是异常值

# 在数据上训练模型
model.fit(X)

关于代码的一些说明:

  • 我们将login_attempts重塑为一个具有一个特征列的二维数组X,因为scikit-learn在训练(fit)时需要一个二维数组。

  • 我们设置contamination=0.05,以向模型提示大约5%的数据可能是异常值。在我们的合成数据中,我们添加了53个点中的3个异常值,约占5.7%,因此5%是一个合理的猜测。(如果不指定污染度,算法将根据假设选择一个默认值,或在某些版本中使用默认值0.1。)

  • random_state=42 只是确保可重现性。

此时,隔离森林模型已在我们的数据上进行了训练。内部构建了一个随机树的集合来对数据进行划分。难以隔离的点(即普通点的密集群)最终位于这些树的深处,而易于隔离的点(异常值)最终具有较短的路径。

接下来,我们将使用该模型识别哪些数据点被视为异常。

测试和可视化结果

现在是令人兴奋的部分:使用我们训练好的模型来检测日志数据中的异常。我们将使模型为每个数据点预测标签,然后过滤出被标记为异常值的数据点。

# 使用模型预测异常值
labels = model.predict(X)
# 模型输出+1表示普通点,-1表示异常值

# 提取异常值的索引和数值
anomaly_indices = np.where(labels == -1)[0]
anomaly_values = login_attempts[anomaly_indices]

print("Anomaly indices:", anomaly_indices)
print("Anomaly values (login attempts):", anomaly_values)

在我们的情况下,我们预计异常值将是我们插入的大数字(30、40、50)。输出可能如下所示:

Anomaly indices: [50 51 52]
Anomaly values (login attempts): [30 40 50]

即使不了解“登录尝试”具体情况,隔离森林也将这些值识别为与其余数据不一致的值。

这是在安全环境中异常检测的力量:我们并不总是知道新的攻击会是什么样子,但如果它导致某些事情偏离正常模式太远(例如,一个用户突然尝试登录的次数比平常多出10倍),异常检测器就会对其进行关注。

可视化结果

在实际分析中,通常可视化数据和异常是有用的。例如,我们可以绘制随时间变化的login_attempts值(逐分钟)并用不同的颜色突出显示异常。

在这个简单的例子中,折线图将显示大约3-8次登录/分钟的基本平稳线,并在最后出现三个巨大的峰值。这些峰值就是我们的异常。如果你在笔记本中运行这个,可以使用Matplotlib来实现:

import matplotlib.pyplot as plt

plt.plot(login_attempts, label="Login attempts per minute")
plt.scatter(anomaly_indices, anomaly_values, color='red', label="Anomalies")
plt.xlabel("Time (minute index)")
plt.ylabel("Login attempts")
plt.legend()
plt.show()

对于我们这里的文本输出,打印的结果已经确认高值被捕获。在更复杂的情况下,异常检测模型还为每个点提供异常分数(例如,它距离正常范围有多远)。例如,Scikit-learn的IsolationForest有一个decision_function方法,可以输出一个分数(分数越低表示越异常)。

为了简化,我们不会深入探讨这些分数,但知道可以检索它们以按严重性对异常进行排名是很好的。

异常检测工作正常后,当我们发现异常时我们可以做什么?这引导我们思考自动响应。

自动响应可能性

检测到异常只是战斗的一半——下一步是对此做出响应。在企业SIEM系统中,自动响应(通常与SOAR——安全编排、自动化和响应相关)可以显著减少对事件的反应时间。

当AI驱动的SIEM标记出异常时,它可以做些什么?以下是一些可能性:

  • 警报:最简单的行动是向安全人员发送警报。这可以是电子邮件、Slack消息或在事件管理系统中创建工单。警报将包含异常的详细信息(例如,“用户 alice 在 1 分钟内有 50 次登录失败尝试,这很不寻常”)。GenAI可以帮助生成清晰的自然语言摘要,以供分析人员使用。

  • 自动缓解: 更先进的系统可能会采取直接行动。例如,如果日志中显示某个IP地址表现出恶意行为,系统可以自动在防火墙上阻止该IP。在我们登录激增的示例中,系统可能会暂时锁定用户账户或要求额外的身份验证,假定这可能是一次机器人攻击。基于人工智能的SIEM系统今天确实可以在检测到特定威胁时触发预定义的响应动作,甚至在检测到威胁时编排复杂的工作流程(有关更多信息,请参阅AI SIEM:AI/ML 如何彻底改变 SOC 的 SIEM | Exabeam)。

  • 调查支持: 生成式AI还可以用于自动收集上下文。例如,在检测到异常时,系统可以提取相关日志(周围事件、同一用户或同一IP的其他操作)并提供汇总报告。这可以节省分析师手动查询多个数据源的时间。

实施自动响应时需要谨慎——你不希望系统对假阳性反应过度。一种常见的策略是分层响应:低置信度的异常可能仅记录警告或发送低优先级的警报,而高置信度的异常(或异常组合)则触发主动防御措施。

在实践中,一个基于AI的SIEM将通过API、脚本等与您的基础设施集成,以执行这些操作。对于我们的Python概念验证,你可以通过在检测到异常时打印消息或调用一个虚拟函数来模拟自动响应。例如:

if len(anomaly_indices) > 0:
    print(f"Alert! Detected {len(anomaly_indices)} anomalous events. Initiating response procedures...")
    # 在这里,你可以添加代码来禁用用户或通知管理员等。

虽然我们的演示很简单,但很容易想象将其扩展。SIEM可以,例如,将异常输入到一个更大的生成模型中,该模型评估情况并决定最佳行动方案(就像一个知道你运行手册的聊天机器人Ops助手)。随着AI变得越来越复杂,自动化的可能性正在扩大。

结论

在本教程中,我们构建了一个基本的AI驱动的SIEM组件,负责获取日志数据,使用机器学习模型分析异常,并识别可能代表安全威胁的异常事件。

我们首先解析和准备日志数据,然后使用孤立森林模型检测登录尝试计数流中的异常值。该模型成功标记了不正常的行为,而无需事先了解“攻击”的样子——它完全依赖于从学习到的正常模式中的偏差。

我们还讨论了这样的系统如何应对检测到的异常,从警告人类到自动采取行动。

现代的SIEM系统通过AI/ML增强,正朝这个方向发展:它们不仅能检测问题,还能帮助分类和响应。生成性AI进一步通过向分析师学习并提供智能摘要和决策,有效地成为安全运营中心中不知疲倦的助手。

接下来的步骤和改进:

  • 您可以在真实的日志数据上尝试这种方法。例如,获取一个系统日志文件,提取“每小时错误日志数量”或“每个会话传输的字节数”等特征,并对其进行异常检测。

  • 尝试使用其他算法,如一类SVM或局部异常因子进行异常检测,以查看它们的比较效果。

  • 结合一个简单的语言模型来解析日志行或解释异常。例如,一个大型语言模型可以读取异常的日志条目并建议可能出错的地方(“这个错误通常意味着数据库无法访问”)。

  • 扩展功能:在真实的SIEM中,您会同时使用多个信号(失败的登录次数、异常的IP地理位置、日志中罕见的进程名称等)。更多功能和数据可以提高检测的上下文。