如何保护并监控Kafka

介绍

Apache Kafka 支持各种安全协议和身份验证工作流程,以确保只有授权的人员和应用能够连接到集群。在其默认配置中,Kafka 允许所有人访问,但未启用任何安全检查。虽然这对于探索和开发很有用,但在生产部署对外公开之前,必须对其进行适当的安全加固。此外,还必须监控这些环境以确保平稳运行并防止可能的故障。

在本教程中,您将通过配置TLS流量加密和SASL身份验证来加强您的Kafka安装,以提供标准的用户名和密码登录流程。您将了解如何配置提供的生产者和消费者脚本来连接到安全集群。然后,您将学习如何导出Kafka指标并将其在Grafana中可视化。您还将学习如何通过AKHQ提供的易于使用的基于网页的接口访问您的集群节点和主题。

先决条件

要完成本教程,您需要:

  • 至少具有4GB RAM和2个CPU的核心云服务器。对于Ubuntu服务器,请按照初始服务器设置中的说明进行设置。
  • Apache Kafka已安装在您的核心云服务器上并完成配置。有关设置说明,请遵循Kafka入门教程。您只需完成第1步第2步
  • 了解Java如何处理密钥和证书。更多信息,请访问Java Keytool必备知识:使用Java密钥库教程。
  • Grafana已安装在您的服务器或本地计算机上。请访问如何在Ubuntu上安装和保护Grafana教程以获取说明。您只需完成前四步。
  • 一个完全注册的域名已指向您的核心云服务器。本教程将使用your_domain,并将其作为Grafana的先决条件提到的同一域名。您可以在Namecheap购买域名,在Freenom免费获取一个,或使用您选择的域名注册商。

步骤1 – 配置Kafka安全协议

在默认配置下,Kafka允许任何人无需验证请求的来源即可连接到它。这意味着默认情况下您的集群对所有人开放。虽然这对于测试来说很好,因为它减少了本地机器和私有安装的维护负担,但生产环境和面向公众的Kafka安装必须启用安全特性以防止未经授权的访问。

在本步骤中,您将配置Kafka代理以使用TLS加密代理和消费者之间的流量。您还将设置SASL作为验证连接到集群时凭据的认证框架。

生成TLS证书和存储

为了生成设置TLS所需的证书和密钥,您将使用来自Confluent Platform Security Tools存储库的脚本。首先,通过运行以下命令将其克隆到您的家目录:

git clone https://github.com/confluentinc/confluent-platform-security-tools.git ~/kafka-ssl

导航到它:

cd ~/kafka-ssl

您将要使用的脚本名为 kafka-generate-ssl-automatic.sh,它需要您以环境变量的形式提供国家、州、组织和城市信息。这些参数用于创建证书,但它们的内容并不重要。您还需要提供一个密码,该密码用于保护将要生成的 Java 信任库和密钥库。

运行以下命令来设置所需的环境变量,将 your_tls_password 替换为您期望的值:

export COUNTRY=US
export STATE=NY
export ORGANIZATION_UNIT=SE
export CITY=New York
export PASSWORD=your_tls_password

请注意,PASSWORD 的长度必须至少为六个字符。

通过运行以下命令给脚本执行权限:

chmod +x kafka-generate-ssl-automatic.sh

然后,执行它以生成所需的文件:

./kafka-generate-ssl-automatic.sh

将会输出大量信息。完成后,列出目录中的文件:

ls -l

输出内容应类似于这样:

Output
rw-rw-r-- 1 kafka kafka 964 May 13 09:33 README.md -rw-rw-r-- 1 kafka kafka 1063 May 13 09:34 cert-file -rw-rw-r-- 1 kafka kafka 1159 May 13 09:34 cert-signed -rwxrw-r-- 1 kafka kafka 6016 May 13 09:33 kafka-generate-ssl-automatic.sh -rwxrwxr-x 1 kafka kafka 7382 May 13 09:33 kafka-generate-ssl.sh drwxrwxr-x 2 kafka kafka 4096 May 13 09:34 keystore -rw-rw-r-- 1 kafka kafka 184929 May 13 09:33 single-trust-store-diagram.pages -rw-rw-r-- 1 kafka kafka 36980 May 13 09:33 single-trust-store-diagram.pdf drwxrwxr-x 2 kafka kafka 4096 May 13 09:34 truststore

您会看到证书、信任库和密钥库已成功创建。

为 Kafka 配置 TLS 和 SASL

现在您已经有了启用 TLS 加密所需的文件,接下来将配置 Kafka 使用这些文件,并通过 SASL 对用户进行身份验证。

您需要修改安装目录下config/kraft文件夹中的server.properties文件。您已经将它在您的家目录下的kafka目录中安装,作为先决条件的一部分。通过运行以下命令导航到它:

cd ~/kafka

打开主配置文件进行编辑:

nano config/kraft/server.properties

找到以下行:

config/kraft/server.properties
...
listeners=PLAINTEXT://:9092,CONTROLLER://:9093

# 用于经纪人之间通信的监听器的名称。
inter.broker.listener.name=PLAINTEXT

# 监听器名称、主机名和端口,经纪人将向客户端宣传。
# 如果未设置,它将使用“listeners”的值。
advertised.listeners=PLAINTEXT://localhost:9092

将它们修改为如下所示,将PLAINTEXT替换为BROKER

config/kraft/server.properties
...
listeners=BROKER://:9092,CONTROLLER://:9093

# 用于经纪人之间通信的监听器的名称。
inter.broker.listener.name=BROKER

# 监听器名称、主机名和端口,经纪人将向客户端宣传。
# 如果未设置,它将使用“listeners”的值。
advertised.listeners=BROKER://localhost:9092

然后,找到listener.security.protocol.map行:

config/kraft/server.properties
# 将监听器名称映射到安全协议,默认情况下它们是相同的。有关更多详细信息,请参见配置文档
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

通过在值前加上定义,将BROKER映射到SASL_SSL

config/kraft/server.properties
# 将监听器名称映射到安全协议,默认情况下它们是相同的。有关更多详细信息,请参见配置文档
listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

在这里,您为在监听器中使用的BROKER别名添加了定义,并将其映射到SASL_SSL,这表示将同时使用SSL(TLS的早期名称)和SASL。

接下来,导航到文件的末尾,并添加以下行:

config/kraft/server.properties
ssl.truststore.location=/home/kafka/kafka-ssl/truststore/kafka.truststore.jks
ssl.truststore.password=your_tls_password
ssl.keystore.location=/home/kafka/kafka-ssl/keystore/kafka.keystore.jks
ssl.keystore.password=your_tls_password
ssl.key.password=your_tls_password
ssl.client.auth=required

sasl.enabled.mechanisms=PLAIN
sasl.mechanism.controller.protocol=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN

authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
allow.everyone.if.no.acl.found=false
super.users=User:admin

首先,您定义了生成的信任和密钥库的位置和密码。您将ssl.client.auth参数设置为required,指示Kafka不允许任何不提供有效TLS证书的连接。然后,您将SASL机制设置为PLAIN,以启用它。PLAINPLAINTEXT不同,它要求使用加密连接,并且两者都依赖于用户名和密码凭据组合。

最后,您将StandardAuthorizer设置为授权类,它将凭据与您即将创建的配置文件进行核对。然后,您将allow.everyone.if.no.acl.found参数设置为false,限制具有不正确凭据的连接的访问。您还将admin用户标识为超级用户,因为在集群中执行管理任务至少需要一个超级用户。

请记住,将your_tls_password替换为在前一部分中传递给脚本的密码,然后保存并关闭文件。

现在您已经配置了Kafka,接下来需要创建一个文件来定义连接时允许的凭据。Kafka支持Java身份验证和授权服务(JAAS),这是一个实现身份验证工作流程的框架,并且接受以JAAS格式定义的凭据。

您将把它们存储在名为kafka-server-jaas.conf的文件中,该文件位于config/kraft目录下。通过运行以下命令来创建并打开它进行编辑:

nano config/kraft/kafka-server-jaas.conf

添加以下内容:

config/kraft/kafka-server-jaas.conf
KafkaServer {
 org.apache.kafka.common.security.plain.PlainLoginModule required
 username="admin"
 password="admin"
 user_admin="admin";
};

其中的usernamepassword定义了主要凭据,这些凭据在集群中有多个节点时用于节点间的通信。user_admin行定义了一个用户,其用户名为admin,密码为admin,可以从外部连接到代理。完成编辑后,请保存并关闭文件。

Kafka需要知道kafka-server-jaas.conf文件,因为它补充了主要配置。您需要修改kafka systemd服务配置,并将其引用传递进去。运行以下命令以打开服务进行编辑:

sudo systemctl edit --full kafka

通过传入--full,您可以访问服务的完整内容。找到ExecStart行:

kafka.service
...
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1'
...

在它上面添加以下行,使其看起来像这样:

kafka.service
...
User=kafka
Environment="KAFKA_OPTS=-Djava.security.auth.login.config=/home/kafka/kafka/config/kraft/kafka-server-jaas.conf"
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1'
...

这样,您在配置中将java.security.auth.login.config参数设置为JAAS配置文件的路径,将其与主Kafka配置分离。完成后,保存并关闭文件。通过运行以下命令重新加载服务定义:

sudo systemctl daemon-reload

然后,重新启动Kafka:

sudo systemctl restart kafka

您现在已经为您的Kafka安装配置了TLS加密和SASL身份验证,接下来将学习如何使用提供的控制台脚本连接到它。

第二步 – 连接到受保护的集群

在这一步中,您将学习如何使用提供的控制台脚本,使用JAAS配置文件连接到受保护的Kafka集群。

用于操作主题的生产和消费消息的脚本内部也使用Java,因此接受一个JAAS配置,详细说明信任和密钥库的位置,以及SASL凭据。

您将在您的家目录中创建一个名为client-jaas.conf的文件来存储此配置。创建并打开它以进行编辑:

nano ~/client-jaas.conf

添加以下行:

~/client-jaas.conf
security.protocol=SASL_SSL
ssl.truststore.location=/home/kafka/kafka-ssl/truststore/kafka.truststore.jks
ssl.truststore.password=your_tls_password
ssl.keystore.location=/home/kafka/kafka-ssl/keystore/kafka.keystore.jks
ssl.keystore.password=your_tls_password

sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
ssl.endpoint.identification.algorithm=

与之前类似,您将协议设置为SASL_SSL,并提供您创建的密钥和信任存储的路径和密码。然后,您将SASL机制设置为PLAIN,并提供用户admin的凭据。为了防止连接问题,您显式清除ssl.endpoint.identification.algorithm参数,因为初始脚本将运行它的机器的主机名设置为证书端点,这可能是不正确的。

your_tls_password替换为正确的值,然后保存并关闭文件。

要将此文件传递给脚本,您可以使用--command-config参数。尝试使用以下命令在集群中创建新主题:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic new_topic --command-config ~/client-jaas.conf

该命令应该成功执行:

Output
... Created topic new_topic.

为了验证它是否已创建,请运行以下命令列出集群中的所有主题:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --list --command-config ~/client-jaas.conf

输出将显示new_topic已存在:

Output
__consumer_offsets new_topic ...

在本节中,您已配置Kafka安装使用TLS加密流量和SASL进行身份验证,结合用户名和密码。接下来,您将学习如何使用Prometheus通过JMX导出各种Kafka指标。

第3步 – 使用Prometheus监控Kafka JMX指标

在本节中,您将使用普罗米修斯收集Kafka指标,并在Grafana中使其可查询。这涉及为Kafka设置JMX导出器并将其连接到普罗米修斯。

[Java管理扩展(JMX)是一个用于Java应用程序的框架,它允许开发者在标准格式中收集关于应用程序运行时的通用和自定义指标。由于Kafka是用Java编写的,它支持JMX协议并通过它暴露其自定义指标,例如主题和代理的状态。

配置Kafka和普罗米修斯

在继续之前,您需要安装普罗米修斯。在Ubuntu机器上,您可以使用apt。通过运行:

sudo apt update

然后,安装普罗米修斯:

sudo apt install prometheus -y

对于其他平台,请遵循官方网站上的安装说明。

安装完成后,您需要将JMX导出器库添加到您的Kafka安装中,以供Prometheus使用。导航到发布页面,并选择名称中包含javaagent的最新版本。在撰写本文时,最新的可用版本是0.20.0。使用以下命令将其下载到Kafka安装位置的libs/目录:

curl https://repo.maven.apache.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.20.0/jmx_prometheus_javaagent-0.20.0.jar -o ~/kafka/libs/jmx_prometheus_javaagent.jar

JMX导出器库现在将被Kafka获取。

在激活导出器之前,您需要定义它将向Prometheus报告哪些指标,并将该配置存储在Kafka安装位置的config/目录下名为jmx-exporter.yml的文件中。JMX导出器项目提供了一个合适的默认配置,因此运行以下命令将其存储为Kafka安装位置的config/目录下的jmx-exporter.yml

curl https://raw.githubusercontent.com/prometheus/jmx_exporter/main/example_configs/kafka-2_0_0.yml -o ~/kafka/config/jmx-exporter.yml

接下来,要激活导出器,您需要修改Kafka systemd服务。您需要修改KAFKA_OPTS环境变量,以包括导出器及其配置。运行以下命令以编辑服务:

sudo systemctl edit --full kafka

Environment行修改为如下所示:

kafka.service
Environment="KAFKA_OPTS=-Djava.security.auth.login.config=/home/kafka/kafka/config/kraft/kafka-server-jaas.conf -javaagent:/home/kafka/kafka/libs/jmx_prometheus_javaagent.jar=7075:/home/kafka/kafka/config/jmx-exporter.yml"

在这里,您使用-javaagent参数来初始化带有其配置的JMX导出器。

完成修改后保存并关闭文件,然后通过运行以下命令重新启动Kafka:

sudo systemctl restart kafka

一分钟后,通过查询端口7075是否被占用,验证JMX导出器是否正在运行:

sudo ss -tunelp | grep 7075

这是翻译结果:

Output
tcp LISTEN 0 3 *:7075 *:* users:(("java",pid=6311,fd=137)) uid:1000 ino:48151 sk:8 cgroup:/system.slice/kafka.service v6only:0 <->

这行显示端口7075被由Kafka服务启动的Java进程使用,该进程指的是JMX导出器。

接下来,您将配置Prometheus以监控导出的JMX指标。其主配置文件位于/etc/prometheus/prometheus.yml,因此请打开它进行编辑:

sudo nano /etc/prometheus/prometheus.yml

找到以下行:

/etc/prometheus/prometheus.yml
...
# 包含一个确切的端点来抓取的抓取配置:
# 这里它是Prometheus本身。
scrape_configs:
  # 作业名称作为标签`job=`添加到从该配置抓取的任何时间序列中。
  - job_name: 'prometheus'

    # 覆盖全局默认值,并每隔5秒从该作业抓取目标。
    scrape_interval: 5s
    scrape_timeout: 5s

    # metrics_path 默认为 '/metrics'
    # 方案默认为 'http'。

    static_configs:
      - targets: ['localhost:9090']

  - job_name: node
    # 如果安装了prometheus-node-exporter,可以通过默认方式获取本地机器的统计信息。
    # 如果安装了prometheus-node-exporter,可以通过默认方式获取本地机器的统计信息。
    static_configs:
      - targets: ['localhost:9100']

scrape_configs下面,指定Prometheus应该监控哪些端点的部分,为抓取Kafka指标添加一个新的部分:

/etc/prometheus/prometheus.yml
# 包含一个精确抓取端点的抓取配置:
# 这里是Prometheus本身。
scrape_configs:
  # 此配置抓取的任何时间序列都会添加一个标签 `job=<job_name>`。
  - job_name: 'prometheus'

    # 覆盖全局默认设置,并从此任务中每5秒抓取一次目标。
    scrape_interval: 5s
    scrape_timeout: 5s

    # metrics_path 默认为 '/metrics'
    # scheme 默认为 'http'。

    static_configs:
      - targets: ['localhost:9090']

  - job_name: node
    # 如果安装了prometheus-node-exporter,默认情况下获取本地
    # 机器的统计信息。
    static_configs:
      - targets: ['localhost:9100']

  - job_name: 'kafka'
    static_configs:
    - targets: ['your_domain:7075']

kafka 任务有一个目标,指向JMX导出器端点。

记得将 your_domain 替换为您的域名,然后保存并关闭文件。然后,通过运行以下命令重新启动Prometheus:

sudo systemctl restart prometheus

在您的浏览器中,导航到您域名的端口 9090。您将访问Prometheus UI。在 状态 下,点击 目标 以列出作业:

注意,Prometheus已经接受了 kafka 任务并开始抓取其指标。现在,您将学习如何在Grafana中访问它们。

在Grafana中查询指标

作为先决条件的一部分,您已经在您的Droplet上安装了Grafana,并且通过your_domain暴露了它。在浏览器中导航到它,然后在侧边栏中的连接下,点击添加新连接,然后在搜索字段中输入Prometheus

点击Prometheus,然后在上方右侧点击添加新数据源按钮。系统将提示您填写Prometheus实例的地址:

输入http://your_domain_name:9090,用您的实际域名替换,然后滚动到底部并点击保存 & 测试。您应该会收到成功消息:

已将Prometheus连接添加到Grafana中。在侧边栏中点击探索,然后系统将提示您选择一个指标。您可以输入kafka_以列出与集群相关的所有指标,如下所示:

例如,选择kafka_log_log_size指标,该指标显示每个分区磁盘上内部日志的大小,然后在上方右侧点击运行查询。您将看到每个可用主题随时间变化的结果大小:

在这个步骤中,您已经设置好了导出Kafka提供的JMX指标,并配置了Prometheus来抓取它们。然后,您从Grafana内部连接到它,并对Kafka指标运行了查询。现在,您将学习如何使用网页界面管理Kafka集群。

步骤4 – 使用AKHQ管理Kafka集群

在这一步中,您将学习如何设置和使用AKHQ,这是一个用于管理Kafka集群的网页应用程序。它允许您列出和操作主题、分区、消费者组和配置参数,以及从单一位置生产和消费主题中的消息。

您将把可执行文件及其配置存储在一个名为akhq的目录中。通过运行以下命令,在您的家目录中创建它:

mkdir ~/akhq

导航到它:

cd ~/akhq

在您的浏览器中,访问官方发布页面,并复制最新版本的JAR文件的链接。在撰写本文时,最新版本是0.24.0。运行以下命令将其下载到您的家目录:

curl -L https://github.com/tchiotludo/akhq/releases/download/0.24.0/akhq-0.24.0-all.jar -o ~/akhq/akhq.jar

您现在已经下载了AKHQ,并准备好定义其配置以连接到您的集群。您将把它存储在一个名为akhq-config.yml的文件中。通过运行以下命令创建并打开它进行编辑:

nano ~/akhq/akhq-config.yml

添加以下几行:

~/akhq/akhq-config.yml
akhq:
  connections:
    localhost-sasl:
      properties:
        bootstrap.servers: "localhost:9092"
        security.protocol: SASL_SSL
        sasl.mechanism: PLAIN
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
        ssl.truststore.location: /home/kafka/kafka-ssl/truststore/kafka.truststore.jks
        ssl.truststore.password: secret
        ssl.keystore.location: /home/kafka/kafka-ssl/keystore/kafka.keystore.jks
        ssl.keystore.password: secret
        ssl.key.password: secret
        ssl.endpoint.identification.algorithm: ""

这是一个基本的AKHQ配置,指定了位于localhost:9092的一个集群,并配备了相应的SASL和TLS参数。同时支持多个集群,因为你可以定义尽可能多的连接。这使得AKHQ在管理Kafka方面具有多样性。完成后,保存并关闭文件。

下一步,你需要为在后台运行AKHQ定义一个systemd服务。systemd服务可以一致地启动、停止和重新启动。

你将在/lib/systemd/system目录中创建一个名为code-server.service的文件来存储服务配置,这是systemd存储其服务的目录。使用你的文本编辑器创建它:

sudo nano /etc/systemd/system/akhq.service

添加以下行:

/etc/systemd/system/akhq.service
[Unit]
Description=akhq

[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c 'java -Dmicronaut.config.files=/home/kafka/akhq/akhq-config.yml -jar /home/kafka/akhq/akhq.jar'
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

首先指定服务的描述。然后在[Service]部分,定义服务的类型(simple意味着命令应该简单执行)并提供将要运行的命令。你还指定它以kafka用户运行,如果它退出,服务应该自动重新启动。

[Install]部分,指示systemd在你登录到你的服务器时启动这个服务。完成后保存并关闭文件。

通过运行:

sudo systemctl daemon-reload

通过运行以下命令启动AKHQ服务:

sudo systemctl start akhq

然后,通过观察其状态来确认它是否正确启动:

sudo systemctl status akhq

输出应该看起来像这样:

Output
● akhq.service - akhq Loaded: loaded (/etc/systemd/system/akhq.service; disabled; vendor preset: enabled) Active: active (running) since Wed 2024-05-15 07:37:10 UTC; 3s ago Main PID: 3241 (sh) Tasks: 21 (limit: 4647) Memory: 123.3M CPU: 4.474s CGroup: /system.slice/akhq.service ├─3241 /bin/sh -c "java -Dmicronaut.config.files=/home/kafka/akhq/akhq-config.yml -jar /home/kafka/akhq/akhq.jar" └─3242 java -Dmicronaut.config.files=/home/kafka/akhq/akhq-config.yml -jar /home/kafka/akhq/akhq.jar

AKHQ现在正在后台运行。默认情况下,它暴露在8080端口上。在您的浏览器中,导航到带有该端口的域名来访问它。您将看到默认视图,显示主题列表:

您可以通过双击表格中主题对应的行来访问它,以获得详细视图:

AKHQ允许您查看主题中的消息,以及分区、消费者组和它们的配置。您还可以使用右下角的按钮清空或复制主题。

由于new_topic主题为空,请按生产到主题按钮,这将打开用于选择新消息参数的界面:

AKHQ将自动为您填充主题名称。在字段中,输入Hello World!,然后按生产。消息将被发送到Kafka,并且您将在数据选项卡中看到它:

由于消息内容可能非常大,AKHQ只显示第一行。要查看完整消息,请点击行后的深色区域以展开它。

在左侧边栏中,您还可以通过点击节点来列出集群中的代理。现在,集群只有一个节点:

双击一个节点将打开其配置,使您能够远程更改任何设置:

一旦您进行了更改,您可以通过点击右下角的更新配置按钮来应用它们。同样,您可以通过访问它们并切换到配置标签来查看和修改任何主题的配置。

在此部分,您设置了AKHQ,一个提供易于使用的UI以便远程管理和观察Kafka节点和主题的Web应用。它允许您在主题和节点上实时生产和消费消息以及更新配置参数。

结论

在本教程中,您通过为加密配置TLS和为用户身份验证配置SASL来保护您的Kafka安装。您还设置了使用Prometheus进行度量导出并在Grafana中可视化它们。然后,您学习了如何使用AKHQ,一个用于管理Kafka集群的Web应用。


作者选择了Apache软件基金会,作为写作为捐赠计划的一部分接收捐赠。

Source:
https://www.digitalocean.com/community/developer-center/how-to-secure-and-monitor-kafka