パート2: Debezium と Memphis.dev を使用した MongoDB の Change Data Capture (CDC)

これは、Memphis.devを使用して現代のイベント駆動型システムを構築するシリーズのブログ記事の第2部です。

前回のブログ記事では、Debezium ServerとMemphis.devを使用してPostgreSQLデータベースから変更データキャプチャ(CDC)イベントをキャプチャするための参照実装を紹介しました。Apache KafkaをMemphis.devに置き換えることで、ソリューションは運用リソースとオーバーヘッドを大幅に削減し、コストを節約し、開発者が新機能の構築に集中できるようになりました。

しかし、PostgreSQLは一般的に使用される唯一のデータベースではありません。Debeziumは、非リレーショナルドキュメントデータベースであるMongoDBを含むさまざまなデータベース用のコネクタを提供しています。MongoDBは、オブジェクト関係インピーダンスミスマッチを回避するため、動的プログラミング言語で作業している開発者に人気があります。開発者は、データベース内のオブジェクトを直接保存、照会、更新できます。

このブログ記事では、CDCソリューションをMongoDBに適応させる方法を実演します。

ソリューションの概要

ここでは、Memphis.devで変更データキャプチャイベントを配信するための参照ソリューションのアーキテクチャについて説明します。アーキテクチャは、前回のブログ記事から変更されていませんが、MongoDBに置き換えられたPostgreSQLを除いています。

A Todo Item generator script writes randomly generated records to MongoDB. Debezium Server receives CDC events from MongoDB and forwards them to the Memphis REST gateway through the HTTP client sink. The Memphis REST gateway adds the messages to a station in Memphis.dev. Lastly, a consumer script polls Memphis. dev for new messages and prints them to the console.

  • Todo Item Generator: 0.5秒ごとにランダムに生成されたTODOアイテムをMongoDBコレクションに挿入します。各TODOアイテムには、説明、作成タイムスタンプ、任意の締切日、および完了状態が含まれます。
  • MongoDB: 単一のデータベースと単一のコレクション(todo_items)で構成されています。
  • Debezium Server: MongoDBソースとHTTPクライアントシンクコネクタで構成されたDebezium Serverのインスタンス。
  • Memphis.dev REST Gateway: オプションなしのデフォルト設定を使用します。
  • Memphis.dev: 単一のステーション(todo-cdc-events)と単一のユーザー(todocdcservice)で構成されています。
  • Printing Consumer: メッセージを消費し、コンソールに出力するMemphis.dev Python SDKを使用するスクリプト。

はじめに

実装チュートリアルはMemphis Example Solutionsリポジトリのmongodb-debezium-cdc-exampleディレクトリにあります。実行するにはDocker Composeが必要です。

実装の実行

Dockerイメージを構築するために、Debezium Server、印刷コンシューマ、およびデータベースセットアップ(テーブルとユーザーの作成)が必要です。

現在の実装は、JWT認証サポートのためのDebezium Serverのプレリリースバージョンに依存しています。Dockerイメージは、DebeziumおよびDebezium Serverリポジトリのメインブランチから直接構築されます。このステップはかなり時間がかかる場合があります(約20分)。Debezium Server 2.3.0がリリースされると、アップストリームのDockerイメージを使用するように切り替えます。

ステップ1: イメージの構築

Shell

 

$ docker compose build --pull --no-cache

ステップ2: Memphis.devブローカーとRESTゲートウェイの起動

Memphis.devブローカーRESTゲートウェイを起動します。Memphis-rest-gatewayサービスはMemphisブローカーサービスに依存しているため、ブローカーサービスも起動します。

Shell

 

$ docker compose up -d memphis-rest-gateway
Shell

 

 
[+] Running 4/4
 ⠿ Network mongodb-debezium-cdc-example_default                   Created                                                        0.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1      Healthy                                                        6.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1               Healthy                                                       16.8s
 ⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1  Started

ステップ3: Memphis.devでのステーションと対応するユーザーの作成

メッセージはMemphis.devの「ステーション」に配信されます。これは、メッセージブローカーで使用される「トピック」に相当します。ブラウザをlocalhost:9000に向けてください。ページ下部の「rootでサインイン」リンクをクリックしてください。

root(ユーザー名)とmemphis(パスワード)でログインしてください。

todo-cdc-eventsという名前のステーションを作成するためにウィザードに従ってください。

ユーザー名をtodocdcserviceに作成し、パスワードに同じ値を設定します。

ウィザードが完了するまで「次へ」をクリックします。

「ステーションの概要に移動」をクリックしてステーションの概要ページに移動します。

ステップ4: 印刷コンシューマーの起動

Memphis.dev Python SDKを使用して、todo-cdc-eventsステーションをポーリングし、コンソールにメッセージを印刷するコンシューマースクリプトを作成しました。

Shell

 

$ docker compose up -d printing-consumer
Shell

 

 
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container printing-consumer                                Started     

ステップ5: MongoDBの起動と設定

変更をキャプチャするために、MongoDBのレプリケーション機能を有効にする必要があります。手順は以下の通りです。

  • レプリカセット名を設定する必要があります。これは、コマンドラインまたは設定ファイルでレプリカセットの名前を渡すことで行えます。Docker Composeファイルでは、レプリカセット名を設定するために–replSet rs0というコマンドライン引数でMongoDBを実行しています。
  • 複製が使用され、認証が有効になっている場合、各レプリカインスタンスに共通のキーファイルを提供する必要があります。MongoDBのドキュメントの手順に従ってキーファイルを生成しました。そして、そのキーファイルを含む公式のMongoDBイメージを拡張するイメージを構築しました。
  • MongoDBが実行された後、レプリカセットを初期化する必要があります。私たちは、起動時にインスタンスを構成するスクリプトを使用しています。このスクリプトは、レプリカセット内の各MongoDBインスタンスのIPアドレスとポートのリストを持つreplSetInitiateコマンドを呼び出します。このコマンドにより、MongoDBインスタンスは互いに通信し、リーダーを選出します。

一般的に、レプリカセットは信頼性(高可用性)を向上させるために使用されます。ほとんどのドキュメントでは、複数のMongoDBインスタンスでレプリカを設定する方法を説明しています。私たちの場合、DebeziumのMongoDBコネクタは、データ変更イベントをキャプチャするために複製機能に乗っ取ります。レプリカセットを構成する手順を行いますが、実際には1つのMongoDBインスタンスのみを使用します。

todo item generator scriptは、新しいtodoアイテムを半秒ごとに生成します。フィールドの値はランダムに生成され、これらのアイテムは「todo_items」という名前のMongoDBコレクションに追加されます。

Docker Composeファイルでは、todoアイテム生成スクリプトが、MongoDBインスタンスの正常な状態とデータベースセットアップスクリプトの成功完了に依存して構成されています。todoアイテム生成スクリプトを起動すると、Docker ComposeはMongoDBを起動し、データベースセットアップスクリプトを実行します。

Shell

 

$ docker compose up -d todo-generator
Shell

 

[+] Running 3/3
 ⠿ Container mongodb                 Healthy                                                                                     8.4s
 ⠿ Container mongodb-database-setup  Exited                                                                                      8.8s
 ⠿ Container mongodb-todo-generator  Started   

ステップ6: Debezium Serverを起動する

最後に起動するサービスはDebezium Serverです。サーバーは、Javaプロパティファイルを介してMongoDB用のソースコネクタとHTTPクライアントシンクコネクタで構成されています。

Shell

 

 
debezium.sink.type=http
debezium.sink.http.url=http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
debezium.sink.http.time-out.ms=500
debezium.sink.http.retries=3
debezium.sink.http.authentication.type=jwt
debezium.sink.http.authentication.jwt.username=todocdcservice
debezium.sink.http.authentication.jwt.password=todocdcservice
debezium.sink.http.authentication.jwt.url=http://memphis-rest-gateway:4444/
debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector
debezium.source.mongodb.connection.string=mongodb://db
debezium.source.mongodb.user=root
debezium.source.mongodb.password=mongodb
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.topic.prefix=tutorial
debezium.format.key=json
debezium.format.value=json
quarkus.log.console.json=false

ほとんどのオプションは直感的に理解できますが、HTTPクライアントシンクURLについては詳細に説明する価値があります。Memphis.dev RESTゲートウェイは、次の形式のパスを持つPOSTリクエストを受け入れることを期待しています。

/stations/{station}/produce/{quantity}

ここで、{station}プレースホルダは、メッセージを送信するステーションの名前で置き換えられます。{quantity}プレースホルダは、単一のメッセージの場合はsingle、複数のメッセージの場合はbatchの値で置き換えられます。

メッセージはPOSTリクエストのペイロードとして渡されます。RESTゲートウェイは3つのメッセージ形式(プレーンテキスト、JSON、またはプロトコルバッファ)をサポートしています。コンテンツタイプヘッダフィールドの値(text/, application/json, application/x-protobuf)によって、ペイロードの解釈方法が決まります。

Debezium ServerのHTTPクライアントシンクは、これらのパターンに一致するRESTリクエストを生成します。リクエストはPOSTバイアスを使用し、各リクエストはペイロードとして単一のJSONエンコードされたメッセージを含み、コンテンツタイプヘッダはapplication/JSONに設定されます。メッセージのルーティングとRESTゲートウェイがリクエストを解釈する方法を示すために、ステーション名としてtodo-CDC-eventsを使用し、エンドポイントURLの単一数量値を使用します:

http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single

プロパティdebezium.sink.http.authentication.type=jwtは、HTTPクライアントシンクがJWT認証を使用すべきであることを示します。ユーザー名とパスワードのプロパティは自明ですが、debezium.sink.http.authentication.jwt.The URLプロパティには説明が必要です。初期トークンは/auth/authenticateエンドポイントを使用して取得され、認証は別の/auth/refreshTokenエンドポイントを使用して更新されます。HTTPクライアントのJWT認証は、指定されたベースURLに適切なエンドポイントを追加します。

Debezium Serverは以下のコマンドで起動できます:

Shell

 

$ docker compose up -d debezium-server
Shell

 

[+] Running 5/5
 ⠿ Container mongodb                                              Healthy                                                        1.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1      Healthy                                                        0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1               Healthy                                                        1.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1  Running                                                        0.0s
 ⠿ Container debezium-server                                      Started

ステップ7: システムが動作していることを確認

Memphis.dev Web UIのtodo-cdc-eventsステーションの概要画面を確認し、プロデューサーとコンシューマーが接続され、メッセージが配信されていることを確認してください。

そして、印刷-コンシューマーコンテナのログを印刷してください:

Shell

 

$ docker logs --tail 2 printing-consumer

メッセージ:

Shell

 

bytearray(b'{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"},{"type":"int64","optional":true,"field":"wallTime"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"tutorial.todo_application.todo_items.Envelope"},"payload":{"before":null,"after":"{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ec\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402475},\\"due_date\\": {\\"$date\\": 1684266602475},\\"description\\": \\"GMZVMKXVKOWIOEAVRYWR\\",\\"completed\\": false}","updateDescription":null,"source":{"version":"2.3.0-SNAPSHOT","connector":"mongodb","name":"tutorial","ts_ms":1684007402000,"snapshot":"false","db":"todo_application","sequence":null,"rs":"rs0","collection":"todo_items","ord":1,"lsid":null,"txnNumber":null,"wallTime":1684007402476},"op":"c","ts_ms":1684007402478,"transaction":null}}')

CDCメッセージのフォーマット

入力されたメッセージはJSON形式であり、2つのトップレベルフィールド(スキーマとペイロード)を持っています。スキーマはフィールド名とタイプを記述するレコードスキーマを表しますが、ペイロードはレコードへの変更を表します。ペイロードオブジェクト自体は、レコードの変更前後の値を示す2つのフィールド(beforeとafter)を含んでいます。

MongoDBの場合、Debezium Serverはレコードをシリアル化されたJSON文字列としてエンコードします:

Shell

 

 
{
"before" : null,

"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}"
}

これは、メッセージの下流処理に影響を与えることになります。このシリーズの今後のブログ投稿で説明します。

おめでとうございます!Debezium Serverを使用してMongoDBデータベースからデータ変更イベントをキャプチャし、それらのイベントをMemphis.devに転送して下流処理する方法の実践例を手に入れました。

第3部は近日公開予定です!

Source:
https://dzone.com/articles/part-2-change-data-capture-cdc-for-mongodb-with-de