現代のシステム設計では、イベント駆動アーキテクチャ(EDA)は、システム内でのイベントの作成、検出、使用、および応答に焦点を当てています。イベントとは、ユーザーのアクション、状態の変更、またはデータの更新など、システムのハードウェアやソフトウェアに影響を与える重要な出来事です。
EDAは、アプリケーションの異なる部分が結合を緩和して相互作用することを可能にし、直接呼び出しではなくイベントを介して通信できるようにします。このセットアップにより、コンポーネントが独立して動作し、イベントに非同期に応答し、ビジネスのニーズの変化に柔軟に対応し、大規模なシステムの再構成なしで調整できるため、アジリティが高まります。
新しいモダンなアプリケーションは、現在、リアルタイムのデータ処理と迅速な応答に大きく依存しています。EDAの重要性は非常に大きく、これらの要件をサポートするフレームワークを提供しています。非同期通信とイベント駆動の相互作用を使用することで、システムは高いトランザクションの処理を効率的に行い、不安定な負荷下でのパフォーマンスを維持できます。これらの機能は、変更が非常に即座に発生する環境、例えばeコマースプラットフォームやIoTアプリケーションなどで特に評価されます。
EDAの主要なコンポーネントには、次のものがあります:
-
イベントソース:これらはシステム内で重要なアクションが発生したときにイベントを生成するプロデューサーです。ユーザーの相互作用やデータの変更などが例です。
-
リスナー:これらは特定のイベントに購読し、そのイベントが発生したときに応答するエンティティです。リスナーによりシステムは変更に動的に反応することができます。
-
ハンドラ:これらはリスナーによって検出されたイベントを処理し、イベントによってトリガされる必要なビジネスロジックやワークフローを実行する責任があります。
この記事では、Traefik、Kafka、およびDockerを使用してイベント駆動型のデータ処理を実装する方法について学びます。
ここには、今日構築する内容の概要を素早く把握するためにすぐに実行できるGitHubでホストされたシンプルなアプリケーションがあります。
目次
以下の内容をカバーします:
さあ始めましょう!
前提条件
始める前に:
-
少なくとも4GBのRAMと20GB以上の空きディスク容量を備えたUbuntu 24.04のインスタンスを展開してください。これはDockerイメージ、コンテナ、およびKafkaデータを収容するためです。
-
sudo権限を持つ非rootユーザーでインスタンスにアクセスしてください。
-
パッケージインデックスを更新してください。
sudo apt update
技術の理解
Apache Kafka
Apache Kafkaは、高スループットなデータパイプラインとリアルタイムストリーミングアプリケーション向けに構築された分散イベントストリーミングプラットフォームです。大量のイベントを効率的に管理するために、EDAの実装のバックボーンとして機能します。Kafkaはプロデューサーがイベントをトピックに送信し、コンシューマーがこれらのトピックに登録してイベントを受信するパブリッシュサブスクライブモデルを使用しています。
Kafkaの主な機能のいくつかには次のものがあります:
-
高スループット: Kafkaは低レイテンシーで秒間数百万のイベントを処理できるため、大容量のアプリケーションに適しています。
-
障害耐性:Kafkaの分散アーキテクチャは、サーバーの障害に直面してもデータの耐久性と可用性を確保します。クラスタ内の複数のブローカー間でデータをレプリケートします。
-
拡張性:Kafkaは、クラスタにさらにブローカーを追加したり、トピックにパーティションを追加することで、成長するデータニーズに対応するために水平方向に簡単にスケーリングできます。
Traefik
Traefikは、マイクロサービスアーキテクチャ向けに特に設計された現代的なHTTPリバースプロキシおよびロードバランサーです。インフラストラクチャで実行されているサービスを自動的に検出し、トラフィックを適切にルーティングします。Traefikは、サービスメタデータに基づいた動的ルーティング機能を提供することで、マイクロサービスの管理を簡素化します。
Traefikの主な機能のいくつかには以下があります:
-
動的設定:Traefikは、サービスが追加または削除されると自動的にルーティング設定を更新し、手動介入を排除します。
-
ロードバランシング:複数のサービスインスタンス間での受信リクエストを効率的に分散し、パフォーマンスと信頼性を向上させます。
-
統合ダッシュボード:Traefikは、トラフィックとサービスの健全性をリアルタイムで監視するためのユーザーフレンドリーなダッシュボードを提供します。
イベント駆動型アーキテクチャでKafkaとTraefikを使用することで、高可用性とスケーラビリティを維持しながら、リアルタイムデータ処理を効率的に処理する応答性のあるシステムを構築することができます。
環境の設定方法
Ubuntu 24.04にDockerをインストールする方法
- 必要なパッケージをインストールします。
sudo apt install ca-certificates curl gnupg lsb-release
- Dockerの公式GPGキーを追加します。
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
- DockerリポジトリをAPTソースに追加します。
echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
- パッケージインデックスを更新し、Docker EngineとDocker Composeプラグインをインストールします。
sudo apt update
sudo apt install docker-ce docker-ce-cli containerd.io docker-compose-plugin
- インストールが正しく行われたかを確認します。
sudo docker run hello-world
期待される出力:
Unable to find image 'hello-world:latest' locally
latest: Pulling from library/hello-world
c1ec31eb5944: Pull complete
Digest: sha256:305243c734571da2d100c8c8b3c3167a098cab6049c9a5b066b6021a60fcb966
Status: Downloaded newer image for hello-world:latest
Hello from Docker!
This message shows that your installation appears to be working correctly.
Docker Composeの設定方法
Docker Composeは、マルチコンテナアプリケーションの管理を簡素化し、サービスを単一のファイルで定義して実行することができます。
- プロジェクトディレクトリを作成します。
mkdir ~/kafka-traefik-setup && cd ~/kafka-traefik-setup
docker-compose.yml
ファイルを作成します。
nano docker-compose.yml
- 以下の設定をファイルに追加して、サービスを定義します。
version: '3.8'
services:
kafka:
image: wurstmeister/kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
zookeeper:
image: wurstmeister/zookeeper:latest
ports:
- "2181:2181"
traefik:
image: traefik:v2.9
ports:
- "80:80" # HTTPトラフィック
- "8080:8080" # Traefikダッシュボード(安全でない)
command:
- "--api.insecure=true"
- "--providers.docker=true"
volumes:
- "/var/run/docker.sock:/var/run/docker.sock"
変更を保存するにはctrl + o
を押し、終了するにはctrl + x
を押します。
- サービスを開始します。
docker compose up -d
期待される出力:
[+] Running 4/4
✔ Network kafka-traefik-setup_default Created 0.2s
✔ Container kafka-traefik-setup-zookeeper-1 Started 1.9s
✔ Container kafka-traefik-setup-traefik-1 Started 1.9s
✔ Container kafka-traefik-setup-kafka-1 Started 1.9s
イベント駆動型システムの構築方法
イベントプロデューサーの作成方法
Kafkaでイベントを生成するには、Kafkaプロデューサーを実装する必要があります。以下はJavaを使用した例です。
kafka-producer.java
というファイルを作成します。
nano kafka-producer.java
- 以下のKafkaプロデューサーの構成を追加します。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
// プロデューサープロパティを設定
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// プロデューサーを作成
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
// トピック"my-topic"にメッセージを送信
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key1", "Hello, Kafka!");
RecordMetadata metadata = producer.send(record).get(); // 同期送信
System.out.printf("Sent message with key %s to partition %d with offset %d%n",
record.key(), metadata.partition(), metadata.offset());
} catch (Exception e) {
e.printStackTrace();
} finally {
// プロデューサーを閉じる
producer.close();
}
}
}
変更を保存するにはctrl + o
を押し、終了するにはctrl + x
を押します。
上記の構成では、プロデューサーはキー”key1″と値”Hello, Kafka!”をトピック”my-topic”にメッセージを送信します。
Kafkaトピックの設定方法
メッセージの生成または消費を行う前に、Kafkaでトピックを作成する必要があります。
- Kafkaのインストールに含まれる
kafka-topics.sh
スクリプトを使用してトピックを作成します。
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic <TopicName> --partitions <NumberOfPartitions> --replication-factor <ReplicationFactor>
たとえば、トピック名をmy-topic
、パーティション数を3、レプリケーションファクターを1として作成したい場合は、次のコマンドを実行します:
docker exec <Kafka Container ID> /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 3 --replication-factor 1
期待される出力:
Created topic my-topic.
- トピックが正常に作成されたかどうかを確認してください。
docker exec -it kafka-traefik-setup-kafka-1 /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
期待される出力:
my-topic
イベントコンシューマを作成する方法
プロデューサとトピックを作成した後、そのトピックからメッセージを読み取るためのコンシューマを作成できます。
kafka-consumer.java
というファイルを作成します。
nano kafka-consumer.java
- 次のKafkaコンシューマの構成を追加します。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
// コンシューマプロパティを設定
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// コンシューマを作成
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// トピックに登録
consumer.subscribe(Collections.singletonList("my-topic"));
try {
while (true) {
// 新しいレコードをポーリング
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed message with key %s and value %s from partition %d at offset %d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
} finally {
// コンシューマを閉じる
consumer.close();
}
}
}
変更内容を保存するにはctrl + o
を使用し、終了するにはctrl + x
を使用します。
上記の構成では、コンシューマはmy-topic
に登録し、新しいメッセージを継続的にポーリングします。メッセージを受信すると、それらのキーと値とパーティションおよびオフセット情報を出力します。
TraefikをKafkaと統合する方法
Traefikをリバースプロキシとして設定します。
KafkaのリバースプロキシとしてTraefikを統合すると、動的ルーティングやSSL終了などの機能を提供しながら、着信トラフィックを効率的に管理できます。
docker-compose.yml
ファイルを更新します。
version: '3.8'
services:
kafka:
image: wurstmeister/kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
labels:
- "traefik.enable=true"
- "traefik.http.routers.kafka.rule=Host(`kafka.example.com`)"
- "traefik.http.services.kafka.loadbalancer.server.port=9092"
zookeeper:
image: wurstmeister/zookeeper:latest
ports:
- "2181:2181"
traefik:
image: traefik:v2.9
ports:
- "80:80" # HTTPトラフィック
- "8080:8080" # Traefikダッシュボード(セキュリティ保護なし)
command:
- "--api.insecure=true"
- "--providers.docker=true"
volumes:
- "/var/run/docker.sock:/var/run/docker.sock"
この構成では、kafka.example.com
を実際のドメイン名に置き換えてください。ラベルは、TraefikがトラフィックをKafkaサービスに誘導するために使用するルーティングルールを定義します。
- サービスを再起動します。
docker compose up -d
-
Webブラウザで
http://localhost:8080
にアクセスしてTraefikダッシュボードにアクセスします。Traefikを使用したロードバランシング
Traefikには、複数のKafkaプロデューサーおよびコンシューマーの複数のインスタンスにリクエストを分散するのに役立つ組み込みのロードバランシング機能が備わっています。
イベント駆動型マイクロサービスのロードバランシング戦略
- ラウンドロビン:
デフォルトでは、Traefikはラウンドロビン戦略を使用して、サービスのすべての利用可能なインスタンスにリクエストを均等に分散します。これは、複数のKafkaプロデューサーやコンシューマーのインスタンスが実行されている場合に負荷を均等に分散するのに効果的です。
- スティッキーセッション:
特定のクライアントからのリクエストを常に同じインスタンスに送信する必要がある場合(たとえば、セッション状態を維持する場合)、Traefikでクッキーまたはヘッダーを使用してスティッキーセッションを構成できます。
- ヘルスチェック:
Traefikでヘルスチェックを構成して、Kafkaサービスの健全なインスタンスにのみトラフィックがルーティングされるようにします。これは、docker-compose.yml
ファイル内のサービス定義にヘルスチェックパラメータを追加することで行います。
labels:
- "traefik.http.services.kafka.loadbalancer.healthcheck.path=/health"
- "traefik.http.services.kafka.loadbalancer.healthcheck.interval=10s"
- "traefik.http.services.kafka.loadbalancer.healthcheck.timeout=3s"
セットアップのテスト
イベントの生成と消費の検証
- Kafkaにはテスト用の組み込みコマンドラインツールが用意されています。Console producerを起動します。
docker exec -it kafka-traefik-setup-kafka-1 /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
このコマンドを実行した後、ターミナルにメッセージを入力して、指定されたKafkaトピックに送信されます。
- 別のターミナルセッションを開始して、コンソールコンシューマーを起動してください。
docker exec -it kafka-traefik-setup-kafka-1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
このコマンドは、コンシューマーが開始される前に生成されたすべてのメッセージを表示します。
- 消費者が生産者との遅れ具合を確認するために、特定のコンシューマーグループの遅延をチェックするには、次のコマンドを実行できます。
docker exec -it kafka-traefik-setup-kafka-1 /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group <your-consumer-group>
モニタリングとログ記録
- Kafkaメトリクス:
KafkaはJMX(Java Management Extensions)を使用して監視できる多くのメトリクスを公開しています。これらのメトリクスをPrometheusやGrafanaなどの監視システムにエクスポートするようにJMXを構成できます。監視するための主要なメトリクスには次のものがあります:
-
メッセージスループット: 生産および消費されるメッセージのレート。
-
コンシューマーの遅延: 最後に生成されたメッセージオフセットと最後に消費されたメッセージオフセットの差。
-
ブローカーの健康状態: リクエストレートやエラーレートなど、ブローカーパフォーマンスに関連するメトリクス。
- PrometheusとGrafanaの統合:
Kafkaメトリクスを視覚化するために、Prometheusを設定してKafkaブローカーからメトリクスをスクレイピングすることができます。以下の手順に従ってください。
-
KafkaブローカーでJMXエクスポーターを有効にするには、ブローカー設定にJavaエージェントとして追加します。
-
Prometheusを設定するには、JMXエクスポーターのエンドポイントを指すスクレイプジョブを設定ファイル(
prometheus.yml
)に追加します。 -
Grafanaを使用して、これらのメトリクスをリアルタイムで可視化するダッシュボードを作成します。
Traefikの監視を実装する方法
- Traefikメトリクスエンドポイント。
Traefikは、Prometheus経由でメトリクスをエクスポートするためのビルトインサポートを提供します。この機能を有効にするには、docker-compose.yml
内のTraefikサービス定義に以下の設定を追加します:
command:
- "--metrics.prometheus=true"
- "--metrics.prometheus.addservice=true"
- GrafanaでのTraefikメトリクスの可視化.
PrometheusがTraefikメトリクスを取得していると、Grafanaを使用してこれらを可視化できます:
-
Grafanaで新しいダッシュボードを作成し、以下のような主要なTraefikメトリクスを表示するパネルを追加します:
-
traefik_entrypoint_requests_total: 受信したリクエストの総数。
-
traefik_backend_request_duration_seconds: バックエンドサービスの応答時間。
-
traefik_service_requests_total: バックエンドサービスに転送された合計リクエスト数。
- アラートの設定。
特定の閾値(例:高い消費者の遅延や増加したエラー率)に基づいて、PrometheusまたはGrafanaでアラートを設定します。
結論
このガイドでは、Ubuntu 24.04環境内でKafkaとTraefikを使用したイベント駆動アーキテクチャ(EDA)を正常に実装しました。
追加リソース
詳細については、以下のサイトをご覧ください:
-
Apache Kafka 公式ドキュメント
-
Traefik 公式ドキュメント
-
Docker 公式ドキュメント
-
Ubuntu 24.04にTraefik Proxyを設定するためのVultrガイド
Source:
https://www.freecodecamp.org/news/how-to-implement-event-driven-data-processing/