在现代系统设计中,事件驱动架构(EDA)侧重于在系统内部创建、检测、使用和响应事件。事件是可能影响系统硬件或软件的重要发生,如用户操作、状态更改或数据更新。

EDA使应用程序的不同部分以一种解耦的方式进行交互,使它们能够通过事件而不是直接调用进行通信。这种设置让组件可以独立工作,异步响应事件,并根据不断变化的业务需求进行调整,而无需进行重大系统重构,从而促进了敏捷性。

新的和现代应用程序现在在很大程度上依赖实时数据处理和响应。EDA的重要性不可言喻,因为它提供了支持这些需求的框架。通过使用异步通信和事件驱动的交互,系统可以高效处理大量事务,并在不稳定的负载下保持性能。这些特性在变化非常突发的环境中特别受欢迎,例如电子商务平台或物联网应用。

EDA的一些关键组件包括:

  • 事件来源:这些是在系统内发生重要动作时生成事件的生产者。例如用户交互或数据更改。

  • 监听器:这些是订阅特定事件并在事件发生时做出响应的实体。监听器使系统能够动态地对变化做出反应。

  • 处理程序:这些负责在监听器检测到事件后处理事件,执行由事件触发的必要业务逻辑或工作流。

在本文中,您将学习如何使用Traefik、Kafka和Docker实现事件驱动的数据处理。

这里有一个您可以快速运行以了解今天将构建的内容概述的托管在GitHub上的简单应用程序

目录

以下是我们将要涵盖的内容:

让我们开始吧!

先决条件

开始之前:

  • 部署一个 Ubuntu 24.04 实例,至少拥有 4GB RAM 和至少 20GB 空闲磁盘空间,以容纳 Docker 镜像、容器和 Kafka 数据。

  • 使用具有 sudo 权限的非 root 用户访问该实例。

  • 更新软件包索引。

sudo apt update

了解技术

Apache Kafka

Apache Kafka 是一个为高吞吐量数据管道和实时流应用程序构建的分布式事件流平台。它作为实现 EDA 的支柱,通过有效管理大量事件来发挥作用。Kafka 使用发布-订阅模型,生产者将事件发送到主题,而消费者订阅这些主题以接收事件。

Kafka 的一些关键特性包括:

  • 高吞吐量:Kafka 能够处理每秒数百万事件,延迟低,适用于高容量应用。

  • 容错性:Kafka的分布式架构确保数据在服务器故障时仍具有持久性和可用性。它会在集群内的多个代理节点之间复制数据。

  • 可扩展性:Kafka可以通过向集群添加更多代理节点或向主题添加分区来轻松实现水平扩展,满足不断增长的数据需求,无需进行重大重新配置。

Traefik

Traefik是一种现代的HTTP反向代理和负载均衡器,专为微服务架构设计。它会自动发现运行在您基础设施中的服务,并相应地路由流量。Traefik通过基于服务元数据提供动态路由功能,简化了微服务的管理。

Traefik的一些关键特性包括:

  • 动态配置:Traefik会随着服务的增加或移除自动更新其路由配置,消除了手动干预。

  • 负载均衡:它能够高效地将传入请求分发到多个服务实例,提高性能和可靠性。

  • 集成仪表板:Traefik提供了一个用户友好的仪表板,用于实时监控流量和服务健康状况。

通过在事件驱动架构中使用Kafka和Traefik,您可以构建响应迅速的系统,高效处理实时数据处理,同时保持高可用性和可扩展性。

如何设置环境

如何在Ubuntu 24.04上安装Docker

  1. 安装所需的软件包。
sudo apt install ca-certificates curl gnupg lsb-release
  1. 添加Docker的官方GPG密钥。
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
  1. 将Docker存储库添加到您的APT源中。
echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
  1. 再次更新软件包索引,并使用Docker Compose插件安装Docker Engine。
sudo apt update
sudo apt install docker-ce docker-ce-cli containerd.io docker-compose-plugin
  1. 检查以验证安装。
sudo docker run hello-world

预期输出:

Unable to find image 'hello-world:latest' locally
latest: Pulling from library/hello-world
c1ec31eb5944: Pull complete
Digest: sha256:305243c734571da2d100c8c8b3c3167a098cab6049c9a5b066b6021a60fcb966
Status: Downloaded newer image for hello-world:latest

Hello from Docker!
This message shows that your installation appears to be working correctly.

如何配置Docker Compose

Docker Compose简化了多容器应用程序的管理,允许您在单个文件中定义和运行服务。

  1. 创建项目目录
mkdir ~/kafka-traefik-setup && cd ~/kafka-traefik-setup
  1. 创建一个docker-compose.yml文件。
nano docker-compose.yml
  1. 将以下配置添加到文件中以定义您的服务。
version: '3.8'

services:
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

  zookeeper:
    image: wurstmeister/zookeeper:latest
    ports:
      - "2181:2181"

  traefik:
    image: traefik:v2.9
    ports:
      - "80:80"       # HTTP流量
      - "8080:8080"   # Traefik仪表板(不安全)
    command:
      - "--api.insecure=true"
      - "--providers.docker=true"
    volumes:
      - "/var/run/docker.sock:/var/run/docker.sock"

使用ctrl + o保存更改,然后使用ctrl + x退出。

  1. 启动您的服务。
docker compose up -d

预期输出:

[+] Running 4/4
 ✔ Network kafka-traefik-setup_default        Created                  0.2s
 ✔ Container kafka-traefik-setup-zookeeper-1  Started                  1.9s
 ✔ Container kafka-traefik-setup-traefik-1    Started                  1.9s
 ✔ Container kafka-traefik-setup-kafka-1      Started                  1.9s

如何构建事件驱动系统

如何创建事件生产者

要在Kafka中生成事件,您需要实现一个Kafka生产者。以下是使用Java的示例。

  1. 创建一个文件kafka-producer.java
nano kafka-producer.java
  1. 为Kafka生产者添加以下配置。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) {
        // 设置生产者属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        try {
            // 发送消息到主题"my-topic"
            ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key1", "Hello, Kafka!");
            RecordMetadata metadata = producer.send(record).get(); // 同步发送
            System.out.printf("Sent message with key %s to partition %d with offset %d%n", 
                              record.key(), metadata.partition(), metadata.offset());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭生产者
            producer.close();
        }
    }
}

使用ctrl + o保存更改,然后使用ctrl + x退出。

在上述配置中,生产者使用键”key1″和值”Hello, Kafka!”发送消息到主题”my-topic”。

如何设置Kafka主题

在生成或消费消息之前,您需要在Kafka中创建主题。

  1. 使用随Kafka安装包含的kafka-topics.sh脚本创建一个主题。
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic <TopicName> --partitions <NumberOfPartitions> --replication-factor <ReplicationFactor>

例如,如果您想创建一个名为my-topic的主题,分区数为3,复制因子为1,请运行:

docker exec <Kafka Container ID> /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 3 --replication-factor 1

预期输出:

Created topic my-topic.
  1. 检查确认主题是否成功创建。
docker exec -it kafka-traefik-setup-kafka-1 /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

预期输出:

my-topic

如何创建事件消费者

在创建生产者和主题之后,您可以创建消费者以从这些主题中读取消息。

  1. 创建一个名为kafka-consumer.java的文件。
nano kafka-consumer.java
  1. 添加以下Kafka消费者的配置。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        // 设置消费者属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("my-topic"));

        try {
            while (true) {
                // 拉取新记录
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Consumed message with key %s and value %s from partition %d at offset %d%n",
                                      record.key(), record.value(), record.partition(), record.offset());
                }
            }
        } finally {
            // 关闭消费者
            consumer.close();
        }
    }
}

保存更改:ctrl + o,然后退出:ctrl + x

在上述配置中,消费者订阅my-topic并持续拉取新消息。当接收到消息时,它会打印出它们的键和值以及分区和偏移信息。

如何将Traefik与Kafka集成

配置 Traefik 作为反向代理。

将 Traefik 集成为 Kafka 的反向代理,可以有效管理传入流量,同时提供动态路由和 SSL 终止等功能。

  1. 更新 docker-compose.yml 文件。
version: '3.8'

services:
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    labels:
      - "traefik.enable=true"
      - "traefik.http.routers.kafka.rule=Host(`kafka.example.com`)"
      - "traefik.http.services.kafka.loadbalancer.server.port=9092"

  zookeeper:
    image: wurstmeister/zookeeper:latest
    ports:
      - "2181:2181"

  traefik:
    image: traefik:v2.9
    ports:
      - "80:80"        # HTTP 流量
      - "8080:8080"    # Traefik 仪表板(非安全)
    command:
      - "--api.insecure=true"
      - "--providers.docker=true"
    volumes:
      - "/var/run/docker.sock:/var/run/docker.sock"

在此配置中,将kafka.example.com替换为您实际的域名。这些标签定义了 Traefik 将用于将流量定向到 Kafka 服务的路由规则。

  1. 重新启动您的服务。
docker compose up -d
  1. 通过在您的 Web 浏览器中访问http://localhost:8080来访问 Traefik 仪表板。

    使用 Traefik 进行负载均衡

    Traefik 提供了内置的负载均衡功能,可帮助在多个 Kafka 生产者和消费者实例之间分配请求。

    事件驱动微服务的负载均衡策略

    1. 轮询(Round Robin)

默认情况下,Traefik 使用轮询法策略来均匀分发所有可用服务实例的传入请求。当运行多个 Kafka 生产者或消费者实例时,这是均衡负载的有效方法。

  1. 粘性会话:

如果您需要来自特定客户端的请求始终发送到同一实例(例如,维护会话状态),您可以在 Traefik 中使用 Cookie 或标头配置粘性会话。

  1. 健康检查:

在 Traefik 中配置健康检查,以确保流量仅路由到健康的 Kafka 服务实例。您可以通过在您的docker-compose.yml文件中的服务定义中添加健康检查参数来实现这一点:

    labels:
      - "traefik.http.services.kafka.loadbalancer.healthcheck.path=/health"
      - "traefik.http.services.kafka.loadbalancer.healthcheck.interval=10s"
      - "traefik.http.services.kafka.loadbalancer.healthcheck.timeout=3s"

测试设置

验证事件生产和消费

  1. Kafka 提供了用于测试的内置命令行工具。启动一个控制台生产者。
    docker exec -it kafka-traefik-setup-kafka-1 /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

运行此命令后,您可以在终端中输入消息,这些消息将被发送到指定的 Kafka 主题。

  1. 开始另一个终端会话并启动控制台消费者。
    docker exec -it kafka-traefik-setup-kafka-1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning

这个命令将显示my-topic中的所有消息,包括消费者启动前产生的消息。

  1. 要查看消费者与生产者的同步情况,可以运行以下命令来检查特定消费者组的滞后情况。
    docker exec -it kafka-traefik-setup-kafka-1 /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group <your-consumer-group>

监控和日志记录

  1. Kafka指标:

Kafka公开了许多可以使用JMX(Java管理扩展)监控的指标。您可以配置JMX将这些指标导出到像Prometheus或Grafana这样的监控系统。需要监控的关键指标包括:

  • 消息吞吐量:产生和消费的消息速率。

  • 消费者滞后:最后产生的消息偏移量与最后消费的消息偏移量之间的差异。

  • Broker健康:与代理性能相关的指标,如请求速率和错误率。

  1. Prometheus和Grafana集成:

要可视化Kafka指标,您可以设置Prometheus来抓取来自Kafka代理的指标。请按照以下步骤操作:

  • 通过将其作为Java代理添加到经纪人配置中,在Kafka经纪人上启用JMX Exporter。

  • 通过在其配置文件(prometheus.yml)中添加一个指向您的JMX Exporter端点的抓取作业,配置Prometheus。

  • 使用Grafana创建实时可视化这些指标的仪表板。

如何实现Traefik的监控

  1. Traefik指标端点。

Traefik通过Prometheus提供内置支持以导出指标。要启用此功能,请在docker-compose.yml中的Traefik服务定义中添加以下配置:

    command:
      - "--metrics.prometheus=true"
      - "--metrics.prometheus.addservice=true"
  1. 使用Grafana可视化Traefik指标

一旦Prometheus抓取Traefik指标,您可以使用Grafana将其可视化:

  • 在Grafana中创建一个新仪表板,并添加显示关键Traefik指标的面板,例如:

  • traefik_entrypoint_requests_total:收到的请求总数。

  • traefik_backend_request_duration_seconds:后端服务的响应时间。

  • traefik_service_requests_total:转发到后端服务的总请求数。

  1. 设置警报

在Prometheus或Grafana中根据特定阈值配置警报(例如,消费者滞后高或错误率增加)。

结论

在本指南中,您成功地在Ubuntu 24.04环境中使用Kafka和Traefik实现了事件驱动架构(EDA)。

其他资源

要了解更多信息,您可以访问: