这是关于使用Memphis.dev构建现代事件驱动系统的系列博客文章的第二部分。
我们上一篇博客文章介绍了如何使用Debezium Server和Memphis.dev从PostgreSQL数据库捕获变更数据捕获(CDC)事件的参考实现。通过用Memphis.dev替换Apache Kafka,该解决方案显著减少了操作资源和开销——节省了成本,并使开发者能够专注于构建新功能。
然而,PostgreSQL并不是唯一常用的数据库。Debezium提供了多种数据库的连接器,包括非关系型文档数据库MongoDB。MongoDB在开发者中尤其受欢迎,特别是那些使用动态编程语言的开发者,因为它避免了对象关系阻抗失配。开发者可以直接在数据库中存储、查询和更新对象。
在本篇博客中,我们将展示如何将CDC解决方案适配到MongoDB。
解决方案概览
在这里,我们描述了使用Memphis.dev进行变更数据捕获事件交付的参考解决方案的架构。除了将PostgreSQL替换为MongoDB之外,该架构与我们之前的博客文章中的架构没有变化。
A Todo Item generator script writes randomly generated records to MongoDB. Debezium Server receives CDC events from MongoDB and forwards them to the Memphis REST gateway through the HTTP client sink. The Memphis REST gateway adds the messages to a station in Memphis.dev. Lastly, a consumer script polls Memphis. dev for new messages and prints them to the console.
- 待办事项生成器:每0.5秒向MongoDB集合中插入一个随机生成的待办事项。每个待办事项包含描述、创建时间戳、可选截止日期和完成状态。
- MongoDB:配置了一个包含单一集合(todo_items)的单一数据库。
- Debezium服务器:配置了Debezium服务器的实例,带有MongoDB源和HTTP客户端接收器连接器。
- Memphis.dev REST网关:使用现成配置。
- Memphis.dev:配置了一个单一站点(todo-cdc-events)和一个单一用户(todocdcservice)。
- 打印消费者:一个使用Memphis.dev Python SDK的消费脚本,用于接收消息并将其打印到控制台。
开始使用
实施教程可在Memphis示例解决方案仓库的mongodb-debezium-cdc-example目录中找到。Docker Compose将用于运行它。
实施运行
构建Debezium Server、打印消费者以及数据库设置(表格和用户创建)的Docker镜像。
当前实现依赖于Debezium Server的预发布版本以支持JWT认证。Docker镜像将从Debezium和Debezium Server仓库的主分支直接构建。请注意,此步骤可能需要较长时间(约20分钟)来执行。待Debezium Server 2.3.0正式发布后,我们将切换至使用官方Docker镜像。
步骤1:构建镜像
$ docker compose build --pull --no-cache
步骤2:启动Memphis.dev Broker与REST网关
启动Memphis.dev broker和REST网关。需注意,Memphis-rest-gateway服务依赖于Memphis broker服务,因此broker服务也将一同启动。
$ docker compose up -d memphis-rest-gateway
[+] Running 4/4
⠿ Network mongodb-debezium-cdc-example_default Created 0.0s
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Healthy 6.0s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 16.8s
⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1 Started
步骤3:在Memphis.dev中创建站点及对应用户
在Memphis.dev中,消息被传递至“站点”,其等同于消息代理所使用的“主题”。通过浏览器访问localhost:9000。点击页面底部的“使用root登录”链接。
使用root(用户名)和memphis(密码)进行登录。
按照向导创建一个名为todo-cdc-events的站点。
创建一个名为todocdcservice的用户,密码设置为相同值。
点击“下一步”直至向导完成:
点击“前往站点概览”以进入站点概览页面。
步骤4:启动打印消费者
我们利用Memphis.dev Python SDK创建了一个消费者脚本,该脚本轮询todo-cdc-events站点并将消息打印到控制台。
$ docker compose up -d printing-consumer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container printing-consumer Started
步骤5:启动并配置MongoDB
为了捕捉变更,必须启用MongoDB的复制功能。具体步骤包括:
- 需设置副本集名称,这可以通过在命令行或配置文件中传递副本集名称来实现。在Docker Compose文件中,我们通过命令行参数–replSet rs0运行MongoDB,以设定副本集名称。
- 在采用复制且启用授权的情况下,必须为每个副本集实例提供一个共同的密钥文件。我们根据MongoDB文档中的指南生成了密钥文件,并构建了一个镜像,该镜像在官方MongoDB镜像基础上加入了密钥文件。
- 启动MongoDB后,需要初始化副本集。我们使用一个脚本,在实例启动时进行配置。该脚本调用replSetInitiate命令,附带副本集中每个MongoDB实例的IP地址和端口列表。此命令促使MongoDB实例间相互通信并选出主节点。
通常,副本集用于提升可靠性(高可用性)。大多数文档描述了如何设置包含多个MongoDB实例的副本集。在我们的场景中,Debezium的MongoDB连接器利用复制功能来捕获数据变更事件。尽管我们经历了配置副本集的步骤,但实际上仅使用了一个MongoDB实例。
该待办事项生成器脚本每隔半秒创建一个新的待办事项。字段值是随机生成的。这些事项被添加到一个名为“todo_items”的MongoDB集合中。
在Docker Compose文件中,待办事项生成器脚本被配置为依赖于运行状态良好的Mongodb实例及数据库设置脚本的顺利完成。通过启动待办事项生成器脚本,Docker Compose还将启动MongoDB并运行数据库设置脚本。
$ docker compose up -d todo-generator
[+] Running 3/3
⠿ Container mongodb Healthy 8.4s
⠿ Container mongodb-database-setup Exited 8.8s
⠿ Container mongodb-todo-generator Started
步骤6:启动Debezium服务器
需要启动的最后一个服务是Debezium服务器。该服务器通过一个Java属性文件配置了MongoDB源连接器和HTTP客户端接收器连接器:
debezium.sink.type=http
debezium.sink.http.url=http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
debezium.sink.http.time-out.ms=500
debezium.sink.http.retries=3
debezium.sink.http.authentication.type=jwt
debezium.sink.http.authentication.jwt.username=todocdcservice
debezium.sink.http.authentication.jwt.password=todocdcservice
debezium.sink.http.authentication.jwt.url=http://memphis-rest-gateway:4444/
debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector
debezium.source.mongodb.connection.string=mongodb://db
debezium.source.mongodb.user=root
debezium.source.mongodb.password=mongodb
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.topic.prefix=tutorial
debezium.format.key=json
debezium.format.value=json
quarkus.log.console.json=false
大多数选项都是不言自明的。HTTP客户端接收器URL值得详细解释。Memphis.dev REST网关期望收到具有以下格式的POST请求路径:
<代码>/stations/{station}/produce/{quantity}
其中{station}占位符被替换为要发送消息到的站点的名称。{quantity}占位符被替换为值single(对于单个消息)或batch(对于多个消息)。
消息作为POST请求的载荷进行传递。REST网关支持三种消息格式(纯文本、JSON或协议缓冲区)。内容类型头部字段的值(text/, application/json, application/x-protobuf)决定了载荷的解析方式。
Debezium Server的HTTP客户端接收器生成的REST请求遵循这些模式。请求使用POST动词;每个请求包含一个作为载荷的单个JSON编码消息,且内容类型头部设置为application/JSON。我们使用todo-CDC-events作为站点名称,并在端点URL中的单数量值用于路由消息,指示REST网关应如何解析请求:
http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
属性debezium.sink.http.authentication.type=jwt表明HTTP客户端接收器应使用JWT认证。用户名和密码属性不言自明,但debezium.sink.http.authentication.jwt.URL属性需要一些解释。初始令牌通过/auth/authenticate端点获取,而认证刷新则通过独立的/auth/refreshToken端点进行。HTTP客户端中的JWT认证会将相应的端点附加到给定的基础URL上。
可以使用以下命令启动Debezium Server:
$ docker compose up -d debezium-server
[+] Running 5/5
⠿ Container mongodb Healthy 1.5s
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Healthy 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1 Running 0.0s
⠿ Container debezium-server Started
步骤7:确认系统运行正常
在Memphis.dev的Web UI中检查todo-cdc-events站点概览屏幕,以确认生产者和消费者已连接,并且消息正在被传递。
同时,打印打印消费者容器的日志:
$ docker logs --tail 2 printing-consumer
消息内容:
bytearray(b'{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"},{"type":"int64","optional":true,"field":"wallTime"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"tutorial.todo_application.todo_items.Envelope"},"payload":{"before":null,"after":"{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ec\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402475},\\"due_date\\": {\\"$date\\": 1684266602475},\\"description\\": \\"GMZVMKXVKOWIOEAVRYWR\\",\\"completed\\": false}","updateDescription":null,"source":{"version":"2.3.0-SNAPSHOT","connector":"mongodb","name":"tutorial","ts_ms":1684007402000,"snapshot":"false","db":"todo_application","sequence":null,"rs":"rs0","collection":"todo_items","ord":1,"lsid":null,"txnNumber":null,"wallTime":1684007402476},"op":"c","ts_ms":1684007402478,"transaction":null}}')
CDC消息格式
传入的消息以JSON格式呈现。这些消息包含两个顶级字段(模式和负载)。模式描述了记录的模式(字段名称和类型),而负载描述了对记录的变更。负载对象本身包含两个字段(之前和之后),指示记录变更前后的值。
对于MongoDB,Debezium Server将记录编码为序列化JSON字符串:
{
"before" : null,
"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}"
}
这将对消息的下游处理产生影响,我们将在本系列未来的博客文章中详细描述。
祝贺你!你现在拥有了一个工作示例,展示了如何使用Debezium Server从MongoDB数据库捕获数据变更事件,并将这些事件传输到Memphis.dev进行下游处理。
第三部分即将发布!
Source:
https://dzone.com/articles/part-2-change-data-capture-cdc-for-mongodb-with-de