在現代系統設計中,事件驅動架構(EDA)專注於在系統內創建、檢測、使用和響應事件。事件是可以影響系統硬體或軟體的重要事件,例如用戶操作、狀態變化或數據更新。

EDA使應用程序的不同部分以解耦合的方式互動,使它們可以通過事件而不是直接調用進行通信。這種設置讓組件能夠獨立工作,異步響應事件,並根據變化的業務需求進行調整,而無需進行主要系統重構,促進敏捷性。

新的和現代應用程序現在大量依賴實時數據處理和響應性。EDA的重要性不言而喻,因為它提供了支持這些需求的框架。通過使用異步通信和事件驅動交互,系統可以有效處理大量交易並在不穩定負載下保持性能。這些功能在變化非常迅速的環境中特別受到讚賞,例如電子商務平台或物聯網應用程序。

EDA的一些關鍵組件包括:

  • 事件源:這些是在系統內發生重要操作時生成事件的生產者。例如用戶交互或數據變化。

  • 監聽器:這些是訂閱特定事件並在事件發生時作出反應的實體。監聽器使系統能夠動態地對變化做出反應。

  • 處理器:這些負責在監聽器檢測到事件後處理事件,執行事件觸發的必要業務邏輯或工作流程。

在本文中,您將了解如何使用 Traefik、Kafka 和 Docker 實現事件驅動的數據處理。

這裡有一個您可以快速運行以瞭解今天將要搭建的內容的 GitHub 上托管的簡單應用程式

目錄

以下是我們將涵蓋的內容:

讓我們開始吧!

先決條件

開始之前:

  • 部署一個至少具有 4GB RAM 和最少 20GB 可用硬碟空間的 Ubuntu 24.04 實例,以容納 Docker 映像、容器和 Kafka 資料。

  • 使用具有 sudo 權限的非 root 用戶訪問實例。

  • 更新套件索引。

sudo apt update

了解技術

Apache Kafka

Apache Kafka 是一個為高吞吐量數據管道和實時流應用程序而構建的分佈式事件流平台。它作為實現事件驅動架構的基礎,通過有效管理大量事件來實現。Kafka 使用發布-訂閱模型,其中生產者將事件發送到主題,而消費者訂閱這些主題以接收事件。

Kafka 的一些關鍵特性包括:

  • 高吞吐量:Kafka 能夠處理每秒數百萬個事件並具有低延遲,適用於高容量應用程序。

  • 容錯性:Kafka的分佈式架構確保了數據的耐用性和可用性,即使在伺服器故障的情況下也是如此。它會在集群內的多個代理節點之間複製數據。

  • 可擴展性:Kafka可以通過向集群添加更多代理節點或將分區添加到主題來輕鬆地進行水平擴展,滿足不斷增長的數據需求而無需進行重大重新配置。

Traefik

Traefik是一個現代的HTTP反向代理和負載平衡器,專門為微服務架構設計。它會自動發現運行在您基礎設施中的服務並相應地路由流量。Traefik通過根據服務元數據提供動態路由功能,從而簡化了微服務的管理。

Traefik的一些關鍵功能包括:

  • 動態配置:Traefik會隨著服務的添加或刪除自動更新其路由配置,消除了手動干預。

  • 負載平衡:有效地將傳入請求分發到多個服務實例,提高性能和可靠性。

  • 集成儀表板:Traefik提供了一個用戶友好的儀表板,用於即時監控流量和服務健康狀況。

通過在事件驅動架構中使用Kafka和Traefik,您可以構建響應靈敏的系統,有效處理實時數據處理,同時保持高可用性和可擴展性。

如何設置環境

如何在Ubuntu 24.04上安裝Docker

  1. 安裝所需的套件。
sudo apt install ca-certificates curl gnupg lsb-release
  1. 添加Docker的官方GPG金鑰。
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
  1. 將Docker存儲庫添加到您的APT源。
echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
  1. 再次更新套件索引並使用Docker Compose插件安裝Docker Engine。
sudo apt update
sudo apt install docker-ce docker-ce-cli containerd.io docker-compose-plugin
  1. 檢查以驗證安裝。
sudo docker run hello-world

預期輸出:

Unable to find image 'hello-world:latest' locally
latest: Pulling from library/hello-world
c1ec31eb5944: Pull complete
Digest: sha256:305243c734571da2d100c8c8b3c3167a098cab6049c9a5b066b6021a60fcb966
Status: Downloaded newer image for hello-world:latest

Hello from Docker!
This message shows that your installation appears to be working correctly.

如何配置Docker Compose

Docker Compose簡化了多容器應用程序的管理,允許您在單個文件中定義和運行服務。

  1. 創建一個項目目錄
mkdir ~/kafka-traefik-setup && cd ~/kafka-traefik-setup
  1. 創建一個docker-compose.yml文件。
nano docker-compose.yml
  1. 將以下配置添加到文件中以定義您的服務。
version: '3.8'

services:
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

  zookeeper:
    image: wurstmeister/zookeeper:latest
    ports:
      - "2181:2181"

  traefik:
    image: traefik:v2.9
    ports:
      - "80:80"       # HTTP流量
      - "8080:8080"   # Traefik儀表板(不安全)
    command:
      - "--api.insecure=true"
      - "--providers.docker=true"
    volumes:
      - "/var/run/docker.sock:/var/run/docker.sock"

使用ctrl + o保存更改,然後使用ctrl + x退出。

  1. 啟動您的服務。
docker compose up -d

預期輸出:

[+] Running 4/4
 ✔ Network kafka-traefik-setup_default        Created                  0.2s
 ✔ Container kafka-traefik-setup-zookeeper-1  Started                  1.9s
 ✔ Container kafka-traefik-setup-traefik-1    Started                  1.9s
 ✔ Container kafka-traefik-setup-kafka-1      Started                  1.9s

如何構建事件驅動系統

如何創建事件生產者

要在Kafka中生成事件,您需要實現一個Kafka生產者。以下是使用Java的示例。

  1. 創建一個名為kafka-producer.java的文件。
nano kafka-producer.java
  1. 添加以下配置以用於Kafka生產者。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) {
        // 設置生產者屬性
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 創建生產者
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        try {
            // 向主題"my-topic"發送消息
            ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key1", "Hello, Kafka!");
            RecordMetadata metadata = producer.send(record).get(); // 同步發送
            System.out.printf("Sent message with key %s to partition %d with offset %d%n", 
                              record.key(), metadata.partition(), metadata.offset());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 關閉生產者
            producer.close();
        }
    }
}

使用ctrl + o保存更改,然後使用ctrl + x退出。

在上面的配置中,生產者將使用鍵”key1″和值”Hello, Kafka!”向主題”my-topic”發送消息。

如何設置Kafka主題

在生成或消費消息之前,您需要在Kafka中創建主題。

  1. 使用隨附於您的Kafka安裝中的kafka-topics.sh腳本來創建一個主題。
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic <TopicName> --partitions <NumberOfPartitions> --replication-factor <ReplicationFactor>

例如,如果您想要創建一個名為my-topic、擁有3個分區和複製因子為1的主題,執行:

docker exec <Kafka Container ID> /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 3 --replication-factor 1

期望的輸出:

Created topic my-topic.
  1. 檢查以確認主題是否成功創建。
docker exec -it kafka-traefik-setup-kafka-1 /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

期望的輸出:

my-topic

如何創建事件消費者

在創建了生產者和主題之後,您可以創建消費者來從這些主題中讀取消息。

  1. 創建一個文件kafka-consumer.java
nano kafka-consumer.java
  1. 添加以下Kafka消費者的配置。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        // 設置消費者屬性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        // 創建消費者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 訂閱主題
        consumer.subscribe(Collections.singletonList("my-topic"));

        try {
            while (true) {
                // 輪詢新記錄
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Consumed message with key %s and value %s from partition %d at offset %d%n",
                                      record.key(), record.value(), record.partition(), record.offset());
                }
            }
        } finally {
            // 關閉消費者
            consumer.close();
        }
    }
}

保存更改,使用ctrl + o保存,然後使用ctrl + x退出。

在上述配置中,消費者訂閱my-topic,並持續輪詢新消息。當接收到消息時,它會打印出它們的鍵和值以及分區和偏移信息。

如何將Traefik與Kafka集成

配置 Traefik 作為反向代理。

將 Traefik 集成為 Kafka 的反向代理,可以有效管理傳入流量,同時提供動態路由和 SSL 終結等功能。

  1. 更新 docker-compose.yml 文件。
version: '3.8'

services:
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    labels:
      - "traefik.enable=true"
      - "traefik.http.routers.kafka.rule=Host(`kafka.example.com`)"
      - "traefik.http.services.kafka.loadbalancer.server.port=9092"

  zookeeper:
    image: wurstmeister/zookeeper:latest
    ports:
      - "2181:2181"

  traefik:
    image: traefik:v2.9
    ports:
      - "80:80"        # HTTP 流量
      - "8080:8080"    # Traefik 控制台(不安全)
    command:
      - "--api.insecure=true"
      - "--providers.docker=true"
    volumes:
      - "/var/run/docker.sock:/var/run/docker.sock"

在此配置中,將kafka.example.com替換為您的實際域名。這些標籤定義了 Traefik 將用於將流量引導到 Kafka 服務的路由規則。

  1. 重新啟動您的服務。
docker compose up -d
  1. 通過在網頁瀏覽器中訪問http://localhost:8080來訪問您的 Traefik 控制台。

    使用 Traefik 進行負載均衡

    Traefik 提供了內置的負載均衡功能,可以幫助在多個 Kafka 生產者和消費者實例之間分發請求。

    事件驅動微服務的負載均衡策略

    1. 輪詢

默認情況下,Traefik 使用輪詢策略將傳入的請求均勻地分發到服務的所有可用實例上。這對於在運行多個 Kafka 生產者或消費者實例時平衡負載是有效的。

  1. 黏性會話:

如果您需要從特定客戶端始終將請求發送到同一個實例(例如,保持會話狀態),您可以在 Traefik 中使用 cookies 或標頭來配置黏性會話。

  1. 健康檢查:

在 Traefik 中配置健康檢查,以確保流量僅路由到您的 Kafka 服務的健康實例。您可以通過在您的docker-compose.yml文件中的服務定義中添加健康檢查參數來執行此操作:

    labels:
      - "traefik.http.services.kafka.loadbalancer.healthcheck.path=/health"
      - "traefik.http.services.kafka.loadbalancer.healthcheck.interval=10s"
      - "traefik.http.services.kafka.loadbalancer.healthcheck.timeout=3s"

測試設置

驗證事件的生產和消費

  1. Kafka 提供了用於測試的內置命令行工具。啟動控制台生產者。
    docker exec -it kafka-traefik-setup-kafka-1 /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

執行此命令後,您可以在終端機中輸入消息,這些消息將被發送到指定的 Kafka 主題。

  1. 開始另一個終端會話並啟動控制台消費者。
    docker exec -it kafka-traefik-setup-kafka-1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning

這個命令將顯示所有在my-topic中的訊息,包括消費者啟動之前產生的訊息。

  1. 要查看消費者與生產者的同步情況,您可以運行以下命令來檢查特定消費者群組的滯後。
    docker exec -it kafka-traefik-setup-kafka-1 /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group <your-consumer-group>

監控和記錄

  1. Kafka指標:

Kafka公開了許多指標,可以使用JMX(Java Management Extensions)進行監控。您可以配置JMX將這些指標導出到監控系統,如Prometheus或Grafana。要監控的關鍵指標包括:

  • 訊息吞吐量: 被生產和被消耗的訊息速率。

  • 消費者滯後: 最後一個生產的訊息位移和最後一個消費的訊息位移之間的差異。

  • Broker健康: 與代理器性能相關的指標,如請求速率和錯誤率。

  1. Prometheus和Grafana整合:

要視覺化Kafka指標,您可以設置Prometheus從您的Kafka代理器中採集指標。請按照以下步驟進行:

  • 通過將其添加為代理在您的代理配置中,在您的Kafka代理上啟用JMX Exporter。

  • 通過在其配置文件(prometheus.yml)中添加一個抓取任務來配置Prometheus,指向您的JMX Exporter端點。

  • 使用Grafana創建實時可視化這些指標的儀表板。

如何實現Traefik的監控

  1. Traefik指標端點。

Traefik通過Prometheus提供內置支持以導出指標。要啟用此功能,請在您的Traefik服務定義中的docker-compose.yml中添加以下配置:

    command:
      - "--metrics.prometheus=true"
      - "--metrics.prometheus.addservice=true"
  1. 使用Grafana可視化Traefik指標

一旦Prometheus正在抓取Traefik指標,您可以使用Grafana可視化它們:

  • 在Grafana中創建一個新的儀表板,並添加顯示關鍵Traefik指標的面板。

  • traefik_entrypoint_requests_total:收到的請求總數。

  • traefik_backend_request_duration_seconds:後端服務的響應時間。

  • traefik_service_requests_total:轉發到後端服務的總請求數。

  1. 設置警報

根據特定閾值(例如高用戶延遲或增加的錯誤率)在Prometheus或Grafana中配置警報。

結論

在本指南中,您成功在Ubuntu 24.04環境中使用Kafka和Traefik實現了事件驅動架構(EDA)。

其他資源

欲了解更多,請訪問:

  • Apache Kafka官方文檔

  • Traefik官方文檔

  • Docker官方文檔

  • Vultr 指南,用於在 Ubuntu 24.04 上設置 Traefik 代理