Este é o segundo post de uma série de artigos sobre a construção de um sistema orientado a eventos moderno usando Memphis.dev.
Nosso último blog post apresentou uma implementação de referência para capturar eventos de captura de dados de mudança (CDC) de um banco de dados PostgreSQL usando o Debezium Server e Memphis.dev. Ao substituir o Apache Kafka pelo Memphis.dev, a solução reduziu substancialmente os recursos operacionais e a sobrecarga – economizando dinheiro e liberando desenvolvedores para se concentrar na construção de novas funcionalidades.
No entanto, o PostgreSQL é o único banco de dados comumente usado. O Debezium fornece conectores para vários bancos de dados, incluindo o banco de dados de documentos não relacionais MongoDB. O MongoDB é popular entre desenvolvedores, especialmente aqueles que trabalham em linguagens de programação dinâmicas, uma vez que evita a descontinuidade entre objetos e relacionamentos. Desenvolvedores podem armazenar, consultar e atualizar objetos diretamente no banco de dados.
Neste post do blog, demonstramos como adaptar a solução CDC para o MongoDB.
Visão Geral da Solução
Aqui, descrevemos a arquitetura da solução de referência para entregar eventos de captura de dados de mudança com Memphis.dev. A arquitetura não mudou desde nosso post anterior do blog exceto pela substituição do PostgreSQL pelo MongoDB.
A Todo Item generator script writes randomly generated records to MongoDB. Debezium Server receives CDC events from MongoDB and forwards them to the Memphis REST gateway through the HTTP client sink. The Memphis REST gateway adds the messages to a station in Memphis.dev. Lastly, a consumer script polls Memphis. dev for new messages and prints them to the console.
- Gerador de Itens de Tarefa: Insere um item de tarefa gerado aleatoriamente na coleção MongoDB a cada 0,5 segundos. Cada item de tarefa contém uma descrição, timestamp de criação, data de vencimento opcional e status de conclusão.
- MongoDB: Configurado com um banco de dados contendo uma única coleção (todo_items).
- Servidor Debezium: Instância do Servidor Debezium configurada com fontes MongoDB e conectores de saída HTTP Client.
- Gateway REST do Memphis.dev: Utiliza a configuração pronta para uso.
- Memphis.dev: Configurado com uma única estação (todo-cdc-events) e um único usuário (todocdcservice).
- Consumidor de Impressão: Um script que usa a SDK Python do Memphis.dev para consumir mensagens e imprimi-las no console.
Começando
O tutorial de implementação está disponível na pasta mongodb-debezium-cdc-example do repositório de Soluções Exemplo do Memphis. Docker Compose será necessário para executá-lo.
Executando a Implementação
Construa as imagens Docker para o Debezium Server, o consumidor de impressão e a configuração do banco de dados (criação de tabela e usuário).
Atualmente, a implementação depende de uma versão pré-lançamento do Debezium Server para o suporte à autenticação JWT. Uma imagem Docker será construída diretamente a partir do branch principal dos repositórios Debezium e Debezium Server. Note que este passo pode levar bastante tempo (~20 minutos) para ser executado. Quando o Debezium Server 2.3.0 for lançado, mudaremos para usar a imagem Docker oficial.
Passo 1: Construir as Imagens
$ docker compose build --pull --no-cache
Passo 2: Iniciar o Broker e o Gateway REST do Memphis.dev
Inicie o broker do Memphis.dev e o gateway REST. Note que o serviço memphis-rest-gateway depende do serviço de broker do Memphis, então o serviço de broker também será iniciado.
$ docker compose up -d memphis-rest-gateway
[+] Running 4/4
⠿ Network mongodb-debezium-cdc-example_default Created 0.0s
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Healthy 6.0s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 16.8s
⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1 Started
Passo 3: Criar uma Estação e Usuário Correspondente no Memphis.dev
Mensagens são entregues a “estações” no Memphis.dev; elas são equivalentes a “tópicos” usados por brokers de mensagens. Acesse o localhost:9000 em seu navegador. Clique no link “sign in with root” no final da página.
Faça login com root (nome de usuário) e memphis (senha).
Siga o assistente para criar uma estação chamada todo-cdc-events.
Crie um usuário chamado todocdcservice com o mesmo valor para a senha.
Clique em “Próximo” até que o assistente esteja concluído:
Clique em “Ir para visão geral da estação” para ir à página de visão geral da estação.
Etapa 4: Iniciar o Consumidor de Impressão
Utilizamos o SDK Python do Memphis.dev para criar um script de consumidor que verifica a estação de eventos todo-cdc e imprime as mensagens no console.
$ docker compose up -d printing-consumer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container printing-consumer Started
Etapa 5: Iniciando e Configurando o MongoDB
Para capturar mudanças, a funcionalidade replicação do MongoDB deve ser habilitada. Existem vários passos:
- O nome do conjunto de réplicas deve ser definido. Isso pode ser feito passando o nome de um conjunto de réplicas na linha de comando ou no arquivo de configuração. No arquivo Docker Compose, executamos o MongoDB com o argumento de linha de comando –replSet rs0 para definir o nome do conjunto de réplicas.
- Quando a replicação é utilizada e o controle de acesso está habilitado, deve-se fornecer um arquivo de chave comum a cada instância de réplica. Geramos um arquivo de chave seguindo as instruções na documentação do MongoDB. Em seguida, construímos uma imagem que estende a imagem oficial do MongoDB, incluindo o arquivo de chave.
- O conjunto de réplicas precisa ser inicializado assim que o MongoDB estiver em funcionamento. Utilizamos um script que configura a instância no início. O script chama o comando replSetInitiate com uma lista dos endereços IP e portas de cada instância do MongoDB no conjunto de réplicas. Esse comando faz com que as instâncias do MongoDB se comuniquem entre si e selecionem um líder.
De modo geral, os conjuntos de réplicas são usados para aumentar a confiabilidade (alta disponibilidade). A maioria das documentações que você encontrará descreve como configurar uma réplica com múltiplas instâncias do MongoDB. No nosso caso, o conector do MongoDB do Debezium se aproveita da funcionalidade de replicação para capturar eventos de alteração de dados. Embora passemos pelos passos para configurar um conjunto de réplicas, utilizamos apenas uma instância do MongoDB.
O script de gerador de itens de tarefa cria um novo item de tarefa a cada meio segundo. Os valores dos campos são gerados aleatoriamente. Os itens são adicionados a uma coleção do MongoDB chamada “todo_items”.
No arquivo Docker Compose, o script de gerador de itens de tarefa está configurado para depender do estado saudável da instância do Mongodb e do término bem-sucedido do script de configuração do banco de dados. Ao iniciar o script de gerador de itens de tarefa, o Docker Compose também iniciará o MongoDB e executará o script de configuração do banco de dados.
$ docker compose up -d todo-generator
[+] Running 3/3
⠿ Container mongodb Healthy 8.4s
⠿ Container mongodb-database-setup Exited 8.8s
⠿ Container mongodb-todo-generator Started
Passo 6: Iniciar o Debezium Server
O último serviço que precisa ser iniciado é o Debezium Server. O servidor está configurado com um conector de origem para MongoDB e o conector de receptor HTTP Client através de um arquivo de propriedades Java:
debezium.sink.type=http
debezium.sink.http.url=http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
debezium.sink.http.time-out.ms=500
debezium.sink.http.retries=3
debezium.sink.http.authentication.type=jwt
debezium.sink.http.authentication.jwt.username=todocdcservice
debezium.sink.http.authentication.jwt.password=todocdcservice
debezium.sink.http.authentication.jwt.url=http://memphis-rest-gateway:4444/
debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector
debezium.source.mongodb.connection.string=mongodb://db
debezium.source.mongodb.user=root
debezium.source.mongodb.password=mongodb
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.topic.prefix=tutorial
debezium.format.key=json
debezium.format.value=json
quarkus.log.console.json=false
A maioria das opções é autoexplicativa. A URL do receptor HTTP é digna de explicação detalhada. A gateway REST do Memphis.dev espera receber solicitações POST com um caminho no seguinte formato:
/stations/{station}/produce/{quantity}
O espaço reservado {station} é substituído pelo nome da estação para enviar a mensagem. O espaço reservado {quantity} é substituído pelo valor single (para uma única mensagem) ou batch (para múltiplas mensagens).
A mensagem(ns) é (são) passada(s) como payload da solicitação POST. O gateway REST suporta três formatos de mensagem (texto simples, JSON ou protocol buffer). O valor (text/, application/json, application/x-protobuf) do campo de cabeçalho content-type determina como o payload é interpretado.
O cliente HTTP do Debezium Server produz solicitações REST que são consistentes com esses padrões. As solicitações usam o verbo POST; cada solicitação contém uma única mensagem codificada em JSON como payload, e o cabeçalho content-type é definido como application/JSON. Usamos todo-CDC-events como nome da estação e o valor único na URL do endpoint para encaminhar mensagens e indicar como o gateway REST deve interpretar as solicitações:
http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
A propriedade debezium.sink.http.authentication.type=jwt indica que o cliente HTTP deve usar a autenticação JWT. As propriedades de nome de usuário e senha são autoexplicativas, mas a propriedade debezium.sink.http.authentication.jwt.The URL merece alguma explicação. Um token inicial é adquirido usando o endpoint /auth/authenticate, enquanto a autenticação é atualizada usando o endpoint separado /auth/refreshToken. A autenticação JWT no cliente HTTP acrescenta o endpoint apropriado à URL base dada.
O Debezium Server pode ser iniciado com o seguinte comando:
$ docker compose up -d debezium-server
[+] Running 5/5
⠿ Container mongodb Healthy 1.5s
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Healthy 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1 Running 0.0s
⠿ Container debezium-server Started
Etapa 7: Confirmar o Funcionamento do Sistema
Verifique a tela de visão geral da estação todo-cdc-events no Memphis.dev web UI para confirmar que o produtor e o consumidor estão conectados e que as mensagens estão sendo entregues.
E, imprima os logs para o contêiner do consumidor de impressão:
$ docker logs --tail 2 printing-consumer
mensagem:
bytearray(b'{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"},{"type":"int64","optional":true,"field":"wallTime"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"tutorial.todo_application.todo_items.Envelope"},"payload":{"before":null,"after":"{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ec\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402475},\\"due_date\\": {\\"$date\\": 1684266602475},\\"description\\": \\"GMZVMKXVKOWIOEAVRYWR\\",\\"completed\\": false}","updateDescription":null,"source":{"version":"2.3.0-SNAPSHOT","connector":"mongodb","name":"tutorial","ts_ms":1684007402000,"snapshot":"false","db":"todo_application","sequence":null,"rs":"rs0","collection":"todo_items","ord":1,"lsid":null,"txnNumber":null,"wallTime":1684007402476},"op":"c","ts_ms":1684007402478,"transaction":null}}')
Formato das Mensagens CDC
As mensagens recebidas são formatadas como JSON. As mensagens têm dois campos de nível superior (esquema e payload). O esquema descreve o esquema do registro (nomes e tipos de campos), enquanto o payload descreve a mudança no registro. O objeto payload em si contém dois campos (antes e depois) indicando o valor do registro antes e depois da mudança.
Para o MongoDB, o Debezium Server codifica o registro como uma string de JSON serializado:
{
"before" : null,
"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}"
}
Isso terá implicações para o processamento downstream das mensagens, que descreveremos em um futuro post de blog nesta série.
Parabéns! Agora você tem um exemplo funcional de como capturar eventos de alteração de dados de um banco de dados MongoDB usando o Debezium Server e transferir os eventos para o Memphis.dev para processamento downstream.
A Parte 3 está saindo em breve!
Source:
https://dzone.com/articles/part-2-change-data-capture-cdc-for-mongodb-with-de