第三部分:转换MongoDB CDC事件消息

在我们上一篇博客中,我们介绍了一种参考实现,用于通过Debezium Server和Memphis.dev从MongoDB数据库捕获变更数据捕获(CDC)事件。在文章末尾,我们提到MongoDB记录在Debezium CDC消息中被序列化为字符串,如下所示:

Python

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
            ...
         }
}

我们希望利用Memphis.dev的Schemaverse功能来检查消息是否符合预期模式。不符合模式的消息将被路由到死信站,以避免影响下游消费者。如果这一切听起来像是古希腊语,请不要担心!我们将在下一篇博客中详细解释。

为了使用Schemaverse这样的功能,我们需要将MongoDB记录反序列化为JSON文档。本篇博客将介绍对我们的MongoDB CDC流水线进行的一项修改,该修改添加了一个转换器服务,用于将MongoDB记录反序列化为JSON文档。

解决方案概述

之前的解决方案由六个组件组成:

1. 待办事项生成器:每0.5秒向MongoDB集合中插入一个随机生成的待办事项。每个待办事项包含描述、创建时间戳、可选截止日期和完成状态。

2. MongoDB: 配置了一个包含单个集合(todo_items)的数据库。

3. Debezium Server: Debezium Server的一个实例,配置了MongoDB源和HTTP客户端接收器连接器。

4. Memphis.dev REST Gateway: 使用默认配置。

5. Memphis.dev: 配置了一个单一的站台(todo-cdc-events)和一个单一用户(todocdcservice)。

6. Printing Consumer: 一个使用Memphis.dev Python SDK来消费消息并将其打印到控制台的脚本。

在此迭代中,我们新增了两个组件:

1. Transformer Service: 一个转换器服务,它从todo-cdc-events站台消费消息,反序列化MongoDB记录,并将其推送到cleaned-todo-cdc-events站台。

2. Cleaned Printing Consumer: 第二个打印消费者实例,用于打印被推送到cleaned-todo-cdc-events站台的消息。

更新后的架构如下:

A Deep Dive Into the Transformer Service

消息转换器服务框架

转换器服务采用Memphis.dev Python SDK。接下来,我们将逐步解析转换器的实现过程。在转换器的主()方法中,首先会连接到Memphis.dev代理。连接细节从环境变量中获取。主机、用户名、密码、输入站名称和输出站名称均通过环境变量传递,这与《十二要素应用宣言》的建议相符。

Python

async def main():
    try:
        print("Waiting on messages...")
        memphis = Memphis()
        await memphis.connect(host=os.environ[HOST_KEY],
                              username=os.environ[USERNAME_KEY],
                              password=os.environ[PASSWORD_KEY])

连接建立后,我们创建消费者和生产者对象。在Memphis.dev中,消费者和生产者都有名称,这些名称会在Memphis.dev的用户界面中显示,从而提供系统操作的透明度。

Python

print("Creating consumer")
        consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
                                          consumer_name="transformer",
                                          consumer_group="")

        print("Creating producer")
        producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
                                          producer_name="transformer")

消费者API采用回调函数设计模式。当从代理拉取消息时,提供的函数会以消息列表作为参数被调用。

Python

print("Creating handler")
        msg_handler = create_handler(producer)

        print("Setting handler")
        consumer.consume(msg_handler)

设置回调后,我们启动asyncio事件循环。此时,转换器服务暂停,等待从代理拉取消息。

  •  保持主线程活跃,以便消费者持续接收数据。
Python

await asyncio.Event().wait()

创建消息处理函数

消息处理器的创建函数接收一个生产者对象并返回一个回调函数。由于回调函数仅接受单一参数,我们采用闭包模式,在创建msg_handler函数时隐式传递生产者。

当调用msg_handler函数时,它接收三个参数:消息列表、可能发生的错误(如果有)以及包含字典的上下文。我们的处理器遍历消息,对每条消息调用转换函数,使用生产者将消息发送到第二站点,并确认消息已处理。在Memphis.dev中,除非消费者确认,否则消息不会被标记为已交付,这防止了处理过程中发生错误时消息丢失。

Python

def create_handler(producer):
    async def msg_handler(msgs, error, context):
        try:
            for msg in msgs:
                transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
                await producer.produce(message=transformed_msg)
                await msg.ack()
        except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
            print(e)
            return

    return msg_handler

消息转换函数

现在,我们深入到服务的核心:消息转换器函数。消息负载(由get_data()方法返回)被存储为bytearray对象。我们使用Python的json库将这些消息反序列化为Python集合(列表和字典)以及基本类型(整数、浮点数、字符串和None)的层次结构。

Python

def deserialize_mongodb_cdc_event(input_msg):
    obj = json.loads(input_msg)

我们期望对象具有一个payload属性,其值为一个对象。该对象随后具有两个属性(“before”和“after”),它们要么是None,要么是包含序列化JSON对象的字符串。我们再次使用JSON库来反序列化这些字符串,并用对应的对象替换它们。

Python

 

if "payload" in obj:
        payload = obj["payload"]

        if "before" in payload:
            before_payload = payload["before"]
            if before_payload is not None:
                payload["before"] = json.loads(before_payload)

        if "after" in payload:
            after_payload = payload["after"]
            if after_payload is not None:
                payload["after"] = json.loads(after_payload)

最后,我们将整个JSON记录重新序列化,并将其转换回bytearray,以便传输给代理。

Python

 

output_s = json.dumps(obj)
    output_msg = bytearray(output_s, "utf-8")
    return output_msg

太棒了!我们的对象现在看起来是这样的:

Python

 

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},

...
}
}

运行转换器服务

如果你遵循了上一篇博客文章中的七个步骤,你只需要再执行三个额外步骤,来启动转换器服务并验证其运行:

第8步:启动转换器服务

Python

 

$ docker compose up -d cdc-transformer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cdc-transformer                                  Started                                                            1.3s

第9步:启动第二个打印消费者

Python

 

$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cleaned-printing-consumer                        Started                                                            1.3s

第10步:检查Memphis UI

当转换器开始向Memphis.dev生成消息时,将创建一个名为“cleaned-todo-cdc-events”的第二个站点。你应该能在Memphis.dev UI的站点概览页面上看到这个新站点,就像这样:

“cleaned-todo-cdc-events”页面的详情页应展示作为生产者的转换器、打印消费者以及转换后的消息:

恭喜!我们已准备好,在下一篇博客文章中使用Schemaverse验证消息。敬请期待!

若您错过了第1和第2部分:

Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages