私たちの前回のブログ記事では、MongoDBデータベースから変更データキャプチャ(CDC)イベントをキャプチャするためのリファレンス実装をDebezium ServerおよびMemphis.devを使用して紹介しました。記事の終わりに、MongoDBレコードはDebezium CDCメッセージ内で文字列としてシリアル化されることを指摘しました。
{
"schema" : ...,
"payload" : {
"before" : null,
"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
...
}
}
私たちは、Memphis.devのSchemaverse機能を使用して、メッセージが期待されるスキーマに一致するかをチェックしたいと考えています。スキーマに一致しないメッセージは、デッドレターステーションにルーティングされ、下流のコンシューマーに影響を与えないようにします。これが古代ギリシャ語のように聞こえる場合は心配しないでください!次回のブログ記事で詳細を説明します。
Schemaverseのような機能を使用するためには、MongoDBレコードをJSONドキュメントとして逆シリアル化する必要があります。このブログ記事では、MongoDB CDCパイプラインに変換サービスを追加することで、MongoDBレコードをJSONドキュメントに逆シリアル化する修正について説明します。
ソリューションの概要
以前のソリューションは、以下の6つのコンポーネントで構成されていました:
1. Todoアイテムジェネレータ:MongoDBコレクションに0.5秒ごとにランダムに生成されたTodoアイテムを挿入します。各Todoアイテムには、説明、作成日時、任意の期限日、および完了状態が含まれます。
2. MongoDB: 単一のデータベースに単一のコレクション(todo_items)を含むように設定されています。
3. Debezium Server: MongoDBソースとHTTPクライアントシンクコネクタを備えたDebezium Serverのインスタンスが設定されています。
4. Memphis.dev REST Gateway: オプションの設定を使用しています。
5. Memphis.dev: 単一のステーション(todo-cdc-events)と単一のユーザー(todocdcservice)で設定されています。
6. Printing Consumer: Memphis.dev Python SDKを使用してメッセージを消費し、コンソールに出力するスクリプトです。
このイテレーションでは、2つの追加コンポーネントを追加しています:
1. Transformer Service: 変換サービスで、todo-cdc-eventsステーションからメッセージを消費し、MongoDBレコードを逆シリアル化して、cleaned-todo-cdc-eventsステーションにプッシュします。
2. Cleaned Printing Consumer: cleaned-todo-cdc-eventsステーションにプッシュされたメッセージを出力する印刷消費者の2番目のインスタンス。
更新されたアーキテクチャは以下のようになります:
A Deep Dive Into the Transformer Service
メッセージ変換サービスのスケルトン
この変換サービスはMemphis.devのPython SDKを使用しています。変換器の実装を見てみましょう。変換器のmain()メソッドは最初にMemphis.devブローカーに接続します。接続の詳細は環境変数から取得されます。ホスト、ユーザー名、パスワード、入力ステーション名、出力ステーション名はTwelve-Factor App宣言の提案に従って環境変数を介して渡されます。
async def main():
try:
print("Waiting on messages...")
memphis = Memphis()
await memphis.connect(host=os.environ[HOST_KEY],
username=os.environ[USERNAME_KEY],
password=os.environ[PASSWORD_KEY])
接続が確立されると、コンシューマーとプロデューサーのオブジェクトを作成します。Memphis.devでは、コンシューマーとプロデューサーに名前があります。これらの名前はMemphis.dev UIに表示され、システムの操作に関する透明性を提供します。
print("Creating consumer")
consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
consumer_name="transformer",
consumer_group="")
print("Creating producer")
producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
producer_name="transformer")
コンシューマーAPIはコールバック関数のデザインパターンを使用しています。ブローカーからメッセージが引き出されると、提供された関数がメッセージのリストを引数として呼び出されます。
print("Creating handler")
msg_handler = create_handler(producer)
print("Setting handler")
consumer.consume(msg_handler)
コールバックを設定した後、asyncio イベントループを開始します。この時点で、トランスフォーマーサービスは一時停止し、ブローカーからメッセージを引き出す準備ができるまで待機します。
- メインスレッドを生かし続けることで、コンシューマーはデータを引き続き受信できます。
await asyncio.Event().wait()
メッセージハンドラー関数の作成
メッセージハンドラーの作成関数は、プロデューサーオブジェクトを受け取り、コールバック関数を返します。コールバック関数は単一の引数のみを取るため、クロージャーパターンを使用して、msg_handler関数を作成する際にプロデューサーを暗黙的に渡します。
msg_handler 関数は、呼び出される際に3つの引数を受け取ります: メッセージのリスト、発生したエラー(もしあれば)、そして辞書で構成されるコンテキスト。ハンドラーはメッセージのリストをループし、各メッセージに対して変換関数を呼び出し、プロデューサーを使用して2番目のステーションにメッセージを送信し、メッセージの処理が完了したことを確認します。Memphis.devでは、メッセージはコンシューマーがそれらを確認するまで配信済みとしてマークされません。これにより、処理中にエラーが発生した場合でもメッセージが失われることを防ぎます。
def create_handler(producer):
async def msg_handler(msgs, error, context):
try:
for msg in msgs:
transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
await producer.produce(message=transformed_msg)
await msg.ack()
except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
print(e)
return
return msg_handler
メッセージトランスフォーマー関数
さて、サービスの核心であるメッセージ変換機能について説明します。メッセージのペイロード(get_data()メソッドによって返される)はbytearrayオブジェクトとして保存されます。Pythonのjsonライブラリを使用して、メッセージをPythonのコレクション(リストと辞書)とプリミティブ型(int、float、str、None)の階層にデシリアライズします。
def deserialize_mongodb_cdc_event(input_msg):
obj = json.loads(input_msg)
オブジェクトが値としてオブジェクトを持つpayloadプロパティを持つことを期待します。そのオブジェクトは、Noneまたはシリアル化されたJSONオブジェクトを含む文字列である”before”と”after”という2つのプロパティを持ちます。再度JSONライブラリを使用して文字列をオブジェクトにデシリアライズし、置き換えます。
if "payload" in obj:
payload = obj["payload"]
if "before" in payload:
before_payload = payload["before"]
if before_payload is not None:
payload["before"] = json.loads(before_payload)
if "after" in payload:
after_payload = payload["after"]
if after_payload is not None:
payload["after"] = json.loads(after_payload)
最後に、全体のJSONレコードを再シリアライズし、ブローカーに送信するためにbytearrayに戻します。
output_s = json.dumps(obj)
output_msg = bytearray(output_s, "utf-8")
return output_msg
やったね!これでオブジェクトは次のようになりました:
{
"schema" : ...,
"payload" : {
"before" : null,
"after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},
...
}
}
変換サービスの実行
前回のブログ記事の7つのステップに従った場合、変換サービスを開始し、その動作を確認するために追加で3つのステップを実行するだけです。
ステップ8: 変換サービスの開始
$ docker compose up -d cdc-transformer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container cdc-transformer Started 1.3s
ステップ9: 2番目の印刷コンシューマーの開始
$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container cleaned-printing-consumer Started 1.3s
ステップ10: メンフィスUIの確認
変換器がMemphis.devにメッセージを生成し始めると、”cleaned-todo-cdc-events”という名前の2番目のステーションが作成されます。この新しいステーションがMemphis.dev UIのステーション概要ページに表示されるはずです。
“cleaned-todo-cdc-events”ページの詳細ページには、プロデューサーとして接続されたトランスフォーマー、印刷コンシューマー、そして変換されたメッセージが表示されるべきです。
おめでとう!次回のブログ投稿では、Schemaverseを使用してメッセージを検証する準備ができました。お楽しみに!
パート1と2を見逃した場合は、
Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages