第三部分:轉換MongoDB CDC事件消息

在我們的上一篇部落格文章中,我們介紹了一個參考實現,用於從MongoDB資料庫捕獲變更資料捕獲(CDC)事件,使用Debezium Server和Memphis.dev。在文章結尾,我們提到MongoDB記錄在Debezium CDC消息中被序列化為字串,如下所示:

Python

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
            ...
         }
}

我們希望利用Memphis.dev的Schemaverse功能來檢查消息是否符合預期模式。不符合模式的消息將被路由到死信站,以免影響下游消費者。如果這一切聽起來像是古希臘語,不用擔心!我們將在下一篇部落格文章中解釋細節。

為了使用Schemaverse這樣的功能,我們需要將MongoDB記錄反序列化為JSON文檔。本篇部落格文章描述了我們的MongoDB CDC管道的修改,增加了用於將MongoDB記錄反序列化為JSON文檔的轉換器服務。

解決方案概覽

先前的解決方案包括六個組件:

1. 待辦事項生成器:每0.5秒在MongoDB集合中插入一個隨機生成的待辦事項。每個待辦事項包含描述、創建時間戳、可選截止日期和完成狀態。

2. MongoDB: 配置單一資料庫,內含單一集合(todo_items)。

3. Debezium Server: Debezium Server 實例,配置了MongoDB來源與HTTP Client接收器連接器。

4. Memphis.dev REST Gateway: 使用預設配置。

5. Memphis.dev: 配置單一車站(todo-cdc-events)及單一用戶(todocdcservice)。

6. Printing Consumer: 使用Memphis.dev Python SDK的腳本,消費訊息並將其打印至控制台。

在此迭代中,我們新增了兩個組件:

1. Transformer Service: 一個轉換器服務,從todo-cdc-events車站消費訊息,反序列化MongoDB記錄,並推送至cleaned-todo-cdc-events車站。

2. Cleaned Printing Consumer: 第二個打印消費者實例,打印推送至cleaned-todo-cdc-events車站的訊息。

更新後的架構如下:

A Deep Dive Into the Transformer Service

消息轉換器服務的骨架

轉換器服務使用Memphis.dev Python SDK。讓我們逐步了解轉換器的實現。我們的轉換器主()方法首先連接到Memphis.dev代理。連接詳情從環境變量中獲取。根據十二要素應用宣言的建議,主機、用戶名、密碼、輸入站名稱和輸出站名稱通過環境變量傳遞。

Python

async def main():
    try:
        print("Waiting on messages...")
        memphis = Memphis()
        await memphis.connect(host=os.environ[HOST_KEY],
                              username=os.environ[USERNAME_KEY],
                              password=os.environ[PASSWORD_KEY])

一旦建立連接,我們創建消費者和生產者對象。在Memphis.dev中,消費者和生產者都有名稱。這些名稱在Memphis.dev UI中顯示,提供了系統操作的透明度。

Python

print("Creating consumer")
        consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
                                          consumer_name="transformer",
                                          consumer_group="")

        print("Creating producer")
        producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
                                          producer_name="transformer")

消費者API採用回調函數設計模式。當從代理拉取消息時,提供的函數被調用,其參數是一個消息列表。

Python

print("Creating handler")
        msg_handler = create_handler(producer)

        print("Setting handler")
        consumer.consume(msg_handler)

設定回調後,我們啟動asyncio事件循環。此時,轉換器服務暫停,等待從代理拉取消息。

  • 保持主線程活躍,以便消費者持續接收數據。
Python

await asyncio.Event().wait()

建立消息處理函數

消息處理器的create函數接收一個生產者對象並返回一個回調函數。由於回調函數僅接受單一參數,我們使用閉包模式在創建msg_handler函數時隱式傳遞生產者。

msg_handler函數被調用時,它接收三個參數:消息列表、錯誤(如有發生)及包含字典的上下文。我們的處理器遍歷消息,對每條消息調用轉換函數,使用生產者將消息發送到第二站,並確認消息已處理。在Memphis.dev中,直到消費者確認,消息才會被標記為已送達,這防止了處理過程中出錯時消息的丟失。

Python

def create_handler(producer):
    async def msg_handler(msgs, error, context):
        try:
            for msg in msgs:
                transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
                await producer.produce(message=transformed_msg)
                await msg.ack()
        except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
            print(e)
            return

    return msg_handler

消息轉換函數

現在,我們進入服務的核心部分:訊息轉換器功能。訊息負載(由get_data()方法返回)以bytearray對象的形式儲存。我們使用Python的json庫將訊息反序列化為一個由Python集合(列表和字典)及原始類型(整數、浮點數、字符串和None)組成的層次結構。

Python

def deserialize_mongodb_cdc_event(input_msg):
    obj = json.loads(input_msg)

我們預期該對象具有一個payload屬性,其值為一個對象。該對象接著具有兩個屬性(“before”和“after”),它們要麼是None,要麼是包含序列化JSON對象的字符串。我們再次使用JSON庫來反序列化並將這些字符串替換為對象。

Python

 

if "payload" in obj:
        payload = obj["payload"]

        if "before" in payload:
            before_payload = payload["before"]
            if before_payload is not None:
                payload["before"] = json.loads(before_payload)

        if "after" in payload:
            after_payload = payload["after"]
            if after_payload is not None:
                payload["after"] = json.loads(after_payload)

最後,我們將整個JSON記錄重新序列化,並將其轉換回bytearray,以便傳輸給代理。

Python

 

output_s = json.dumps(obj)
    output_msg = bytearray(output_s, "utf-8")
    return output_msg

太棒了!我們的對象現在看起來是這樣的:

Python

 

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},

...
}
}

運行轉換器服務

如果你遵循了上一篇博客文章中的七個步驟,你只需要再執行三個額外步驟,就可以啟動轉換器服務並驗證其運作:

第8步:啟動轉換器服務

Python

 

$ docker compose up -d cdc-transformer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cdc-transformer                                  Started                                                            1.3s

第9步:啟動第二個打印消費者

Python

 

$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cleaned-printing-consumer                        Started                                                            1.3s

第10步:檢查孟菲斯UI

當轉換器開始向Memphis.dev生產訊息時,將創建一個名為“cleaned-todo-cdc-events”的第二個站點。你應該能在Memphis.dev UI的站點概覽頁面上看到這個新站點,就像這樣:

“cleaned-todo-cdc-events”頁面的詳情應展示作為生產者的轉換器、列印消費者及已轉換的訊息:

恭喜!我們即將在下一篇部落格文章中,使用Schemaverse進行訊息驗證。敬請期待!

若您錯過了第一、二部分:

Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages