在我们上一篇博客中,我们介绍了一种参考实现,用于通过Debezium Server和Memphis.dev从MongoDB数据库捕获变更数据捕获(CDC)事件。在文章末尾,我们提到MongoDB记录在Debezium CDC消息中被序列化为字符串,如下所示:
{
"schema" : ...,
"payload" : {
"before" : null,
"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
...
}
}
我们希望利用Memphis.dev的Schemaverse功能来检查消息是否符合预期模式。不符合模式的消息将被路由到死信站,以避免影响下游消费者。如果这一切听起来像是古希腊语,请不要担心!我们将在下一篇博客中详细解释。
为了使用Schemaverse这样的功能,我们需要将MongoDB记录反序列化为JSON文档。本篇博客将介绍对我们的MongoDB CDC流水线进行的一项修改,该修改添加了一个转换器服务,用于将MongoDB记录反序列化为JSON文档。
解决方案概述
之前的解决方案由六个组件组成:
1. 待办事项生成器:每0.5秒向MongoDB集合中插入一个随机生成的待办事项。每个待办事项包含描述、创建时间戳、可选截止日期和完成状态。
2. MongoDB: 配置了一个包含单个集合(todo_items)的数据库。
3. Debezium Server: Debezium Server的一个实例,配置了MongoDB源和HTTP客户端接收器连接器。
4. Memphis.dev REST Gateway: 使用默认配置。
5. Memphis.dev: 配置了一个单一的站台(todo-cdc-events)和一个单一用户(todocdcservice)。
6. Printing Consumer: 一个使用Memphis.dev Python SDK来消费消息并将其打印到控制台的脚本。
在此迭代中,我们新增了两个组件:
1. Transformer Service: 一个转换器服务,它从todo-cdc-events站台消费消息,反序列化MongoDB记录,并将其推送到cleaned-todo-cdc-events站台。
2. Cleaned Printing Consumer: 第二个打印消费者实例,用于打印被推送到cleaned-todo-cdc-events站台的消息。
更新后的架构如下:
A Deep Dive Into the Transformer Service
消息转换器服务框架
该转换器服务采用Memphis.dev Python SDK。接下来,我们将逐步解析转换器的实现过程。在转换器的主()方法中,首先会连接到Memphis.dev代理。连接细节从环境变量中获取。主机、用户名、密码、输入站名称和输出站名称均通过环境变量传递,这与《十二要素应用宣言》的建议相符。
async def main():
try:
print("Waiting on messages...")
memphis = Memphis()
await memphis.connect(host=os.environ[HOST_KEY],
username=os.environ[USERNAME_KEY],
password=os.environ[PASSWORD_KEY])
连接建立后,我们创建消费者和生产者对象。在Memphis.dev中,消费者和生产者都有名称,这些名称会在Memphis.dev的用户界面中显示,从而提供系统操作的透明度。
print("Creating consumer")
consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
consumer_name="transformer",
consumer_group="")
print("Creating producer")
producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
producer_name="transformer")
消费者API采用回调函数设计模式。当从代理拉取消息时,提供的函数会以消息列表作为参数被调用。
print("Creating handler")
msg_handler = create_handler(producer)
print("Setting handler")
consumer.consume(msg_handler)
设置回调后,我们启动asyncio事件循环。此时,转换器服务暂停,等待从代理拉取消息。
- 保持主线程活跃,以便消费者持续接收数据。
await asyncio.Event().wait()
创建消息处理函数
消息处理器的创建函数接收一个生产者对象并返回一个回调函数。由于回调函数仅接受单一参数,我们采用闭包模式,在创建msg_handler函数时隐式传递生产者。
当调用msg_handler函数时,它接收三个参数:消息列表、可能发生的错误(如果有)以及包含字典的上下文。我们的处理器遍历消息,对每条消息调用转换函数,使用生产者将消息发送到第二站点,并确认消息已处理。在Memphis.dev中,除非消费者确认,否则消息不会被标记为已交付,这防止了处理过程中发生错误时消息丢失。
def create_handler(producer):
async def msg_handler(msgs, error, context):
try:
for msg in msgs:
transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
await producer.produce(message=transformed_msg)
await msg.ack()
except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
print(e)
return
return msg_handler
消息转换函数
现在,我们深入到服务的核心:消息转换器函数。消息负载(由get_data()方法返回)被存储为bytearray对象。我们使用Python的json库将这些消息反序列化为Python集合(列表和字典)以及基本类型(整数、浮点数、字符串和None)的层次结构。
def deserialize_mongodb_cdc_event(input_msg):
obj = json.loads(input_msg)
我们期望对象具有一个payload属性,其值为一个对象。该对象随后具有两个属性(“before”和“after”),它们要么是None,要么是包含序列化JSON对象的字符串。我们再次使用JSON库来反序列化这些字符串,并用对应的对象替换它们。
if "payload" in obj:
payload = obj["payload"]
if "before" in payload:
before_payload = payload["before"]
if before_payload is not None:
payload["before"] = json.loads(before_payload)
if "after" in payload:
after_payload = payload["after"]
if after_payload is not None:
payload["after"] = json.loads(after_payload)
最后,我们将整个JSON记录重新序列化,并将其转换回bytearray,以便传输给代理。
output_s = json.dumps(obj)
output_msg = bytearray(output_s, "utf-8")
return output_msg
太棒了!我们的对象现在看起来是这样的:
{
"schema" : ...,
"payload" : {
"before" : null,
"after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},
...
}
}
运行转换器服务
如果你遵循了上一篇博客文章中的七个步骤,你只需要再执行三个额外步骤,来启动转换器服务并验证其运行:
第8步:启动转换器服务
$ docker compose up -d cdc-transformer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container cdc-transformer Started 1.3s
第9步:启动第二个打印消费者
$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container cleaned-printing-consumer Started 1.3s
第10步:检查Memphis UI
当转换器开始向Memphis.dev生成消息时,将创建一个名为“cleaned-todo-cdc-events”的第二个站点。你应该能在Memphis.dev UI的站点概览页面上看到这个新站点,就像这样:
“cleaned-todo-cdc-events”页面的详情页应展示作为生产者的转换器、打印消费者以及转换后的消息:
恭喜!我们已准备好,在下一篇博客文章中使用Schemaverse验证消息。敬请期待!
若您错过了第1和第2部分:
Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages