在我們的上一篇部落格文章中,我們介紹了一個參考實現,用於從MongoDB資料庫捕獲變更資料捕獲(CDC)事件,使用Debezium Server和Memphis.dev。在文章結尾,我們提到MongoDB記錄在Debezium CDC消息中被序列化為字串,如下所示:
{
"schema" : ...,
"payload" : {
"before" : null,
"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
...
}
}
我們希望利用Memphis.dev的Schemaverse功能來檢查消息是否符合預期模式。不符合模式的消息將被路由到死信站,以免影響下游消費者。如果這一切聽起來像是古希臘語,不用擔心!我們將在下一篇部落格文章中解釋細節。
為了使用Schemaverse這樣的功能,我們需要將MongoDB記錄反序列化為JSON文檔。本篇部落格文章描述了我們的MongoDB CDC管道的修改,增加了用於將MongoDB記錄反序列化為JSON文檔的轉換器服務。
解決方案概覽
先前的解決方案包括六個組件:
1. 待辦事項生成器:每0.5秒在MongoDB集合中插入一個隨機生成的待辦事項。每個待辦事項包含描述、創建時間戳、可選截止日期和完成狀態。
2. MongoDB: 配置單一資料庫,內含單一集合(todo_items)。
3. Debezium Server: Debezium Server 實例,配置了MongoDB來源與HTTP Client接收器連接器。
4. Memphis.dev REST Gateway: 使用預設配置。
5. Memphis.dev: 配置單一車站(todo-cdc-events)及單一用戶(todocdcservice)。
6. Printing Consumer: 使用Memphis.dev Python SDK的腳本,消費訊息並將其打印至控制台。
在此迭代中,我們新增了兩個組件:
1. Transformer Service: 一個轉換器服務,從todo-cdc-events車站消費訊息,反序列化MongoDB記錄,並推送至cleaned-todo-cdc-events車站。
2. Cleaned Printing Consumer: 第二個打印消費者實例,打印推送至cleaned-todo-cdc-events車站的訊息。
更新後的架構如下:
A Deep Dive Into the Transformer Service
消息轉換器服務的骨架
該轉換器服務使用Memphis.dev Python SDK。讓我們逐步了解轉換器的實現。我們的轉換器主()方法首先連接到Memphis.dev代理。連接詳情從環境變量中獲取。根據十二要素應用宣言的建議,主機、用戶名、密碼、輸入站名稱和輸出站名稱通過環境變量傳遞。
async def main():
try:
print("Waiting on messages...")
memphis = Memphis()
await memphis.connect(host=os.environ[HOST_KEY],
username=os.environ[USERNAME_KEY],
password=os.environ[PASSWORD_KEY])
一旦建立連接,我們創建消費者和生產者對象。在Memphis.dev中,消費者和生產者都有名稱。這些名稱在Memphis.dev UI中顯示,提供了系統操作的透明度。
print("Creating consumer")
consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
consumer_name="transformer",
consumer_group="")
print("Creating producer")
producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
producer_name="transformer")
消費者API採用回調函數設計模式。當從代理拉取消息時,提供的函數被調用,其參數是一個消息列表。
print("Creating handler")
msg_handler = create_handler(producer)
print("Setting handler")
consumer.consume(msg_handler)
設定回調後,我們啟動asyncio事件循環。此時,轉換器服務暫停,等待從代理拉取消息。
- 保持主線程活躍,以便消費者持續接收數據。
await asyncio.Event().wait()
建立消息處理函數
消息處理器的create函數接收一個生產者對象並返回一個回調函數。由於回調函數僅接受單一參數,我們使用閉包模式在創建msg_handler函數時隱式傳遞生產者。
當msg_handler函數被調用時,它接收三個參數:消息列表、錯誤(如有發生)及包含字典的上下文。我們的處理器遍歷消息,對每條消息調用轉換函數,使用生產者將消息發送到第二站,並確認消息已處理。在Memphis.dev中,直到消費者確認,消息才會被標記為已送達,這防止了處理過程中出錯時消息的丟失。
def create_handler(producer):
async def msg_handler(msgs, error, context):
try:
for msg in msgs:
transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
await producer.produce(message=transformed_msg)
await msg.ack()
except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
print(e)
return
return msg_handler
消息轉換函數
現在,我們進入服務的核心部分:訊息轉換器功能。訊息負載(由get_data()方法返回)以bytearray對象的形式儲存。我們使用Python的json庫將訊息反序列化為一個由Python集合(列表和字典)及原始類型(整數、浮點數、字符串和None)組成的層次結構。
def deserialize_mongodb_cdc_event(input_msg):
obj = json.loads(input_msg)
我們預期該對象具有一個payload屬性,其值為一個對象。該對象接著具有兩個屬性(“before”和“after”),它們要麼是None,要麼是包含序列化JSON對象的字符串。我們再次使用JSON庫來反序列化並將這些字符串替換為對象。
if "payload" in obj:
payload = obj["payload"]
if "before" in payload:
before_payload = payload["before"]
if before_payload is not None:
payload["before"] = json.loads(before_payload)
if "after" in payload:
after_payload = payload["after"]
if after_payload is not None:
payload["after"] = json.loads(after_payload)
最後,我們將整個JSON記錄重新序列化,並將其轉換回bytearray,以便傳輸給代理。
output_s = json.dumps(obj)
output_msg = bytearray(output_s, "utf-8")
return output_msg
太棒了!我們的對象現在看起來是這樣的:
{
"schema" : ...,
"payload" : {
"before" : null,
"after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},
...
}
}
運行轉換器服務
如果你遵循了上一篇博客文章中的七個步驟,你只需要再執行三個額外步驟,就可以啟動轉換器服務並驗證其運作:
第8步:啟動轉換器服務
$ docker compose up -d cdc-transformer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container cdc-transformer Started 1.3s
第9步:啟動第二個打印消費者
$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container cleaned-printing-consumer Started 1.3s
第10步:檢查孟菲斯UI
當轉換器開始向Memphis.dev生產訊息時,將創建一個名為“cleaned-todo-cdc-events”的第二個站點。你應該能在Memphis.dev UI的站點概覽頁面上看到這個新站點,就像這樣:
“cleaned-todo-cdc-events”頁面的詳情應展示作為生產者的轉換器、列印消費者及已轉換的訊息:
恭喜!我們即將在下一篇部落格文章中,使用Schemaverse進行訊息驗證。敬請期待!
若您錯過了第一、二部分:
Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages