Parte 3: Transformando Mensagens de Eventos CDC do MongoDB

Em nosso último post no blog, apresentamos uma implementação de referência para capturar eventos de captura de dados em mudança (CDC) de um banco de dados MongoDB usando Debezium Server e Memphis.dev. No final do post, observamos que os registros do MongoDB são serializados como strings nos mensagens CDC do Debezium, da seguinte forma:

Python

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
            ...
         }
}

Queremos usar a funcionalidade Schemaverse de Memphis.dev para verificar mensagens contra um esquema esperado. Mensagens que não correspondem ao esquema são encaminhadas para uma estação de carta morta para que não afetem os consumidores subsequentes. Se tudo isso parecer grego antigo, não se preocupe! Explicaremos os detalhes em nosso próximo post no blog.

Para usar funcionalidades como o Schemaverse, precisamos desserializar os registros do MongoDB como documentos JSON. Este post descreve uma modificação em nossa pipeline CDC do MongoDB que adiciona um serviço de transformador para desserializar os registros do MongoDB em documentos JSON.

Visão Geral da Solução

A solução anterior consistia em seis componentes:

1. Gerador de Itens de Tarefa: Insere um item de tarefa gerado aleatoriamente na coleção MongoDB a cada 0,5 segundos. Cada item de tarefa contém uma descrição, timestamp de criação, data de vencimento opcional e status de conclusão.

2. MongoDB: Configurado com um único banco de dados contendo uma única coleção (todo_items).

3. Debezium Server: Instância do Debezium Server configurada com fonte MongoDB e conectores de sink HTTP Client.

4. Gateway REST Memphis.dev: Utiliza a configuração pronta.

5. Memphis.dev: Configurado com uma única estação (todo-cdc-events) e um único usuário (todocdcservice)

6. Consumidor de Impressão: Um script que usa a SDK Python do Memphis.dev para consumir mensagens e imprimi-las no console.

Nesta iteração, estamos adicionando dois componentes adicionais:

1. Serviço de Transformador: Um serviço de transformação que consome mensagens da estação todo-cdc-events, desserializa os registros do MongoDB e os envia para a estação cleaned-todo-cdc-events.

2. Consumidor de Impressão Limpa: Uma segunda instância do consumidor de impressão que imprime mensagens enviadas para a estação cleaned-todo-cdc-events.

A arquitetura atualizada é assim:

A Deep Dive Into the Transformer Service

Estrutura do Serviço de Transformador de Mensagens

O serviço transformador utiliza o SDK Python do Memphis.dev. Vamos percorrer a implementação do transformador. O método main() de nosso transformador se conecta primeiro ao broker Memphis.dev. Os detalhes da conexão são obtidos de variáveis ambientais. O host, nome de usuário, senha, nome da estação de entrada e nome da estação de saída são passados usando variáveis ambientais de acordo com as sugestões do manifesto Twelve-Factor App.

Python

async def main():
    try:
        print("Waiting on messages...")
        memphis = Memphis()
        await memphis.connect(host=os.environ[HOST_KEY],
                              username=os.environ[USERNAME_KEY],
                              password=os.environ[PASSWORD_KEY])

Após estabelecer uma conexão, criamos objetos de consumidor e produtor. No Memphis.dev, consumidores e produtores têm nomes. Esses nomes aparecem na UI do Memphis.dev, oferecendo transparência nas operações do sistema. 

Python

print("Creating consumer")
        consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
                                          consumer_name="transformer",
                                          consumer_group="")

        print("Creating producer")
        producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
                                          producer_name="transformer")

A API do consumidor utiliza o padrão de projeto da função de callback. Quando mensagens são extraídas do broker, a função fornecida é chamada com uma lista de mensagens como argumento.

Python

print("Creating handler")
        msg_handler = create_handler(producer)

        print("Setting handler")
        consumer.consume(msg_handler)

Após configurar a callback, iniciamos o loop de eventos asyncio. Neste momento, o serviço de transformador fica em pausa até que haja mensagens disponíveis para serem extraídas do broker.

  • Mantenha sua thread principal ativa para que o consumidor continue recebendo dados.
Python

await asyncio.Event().wait()

Criando a Função de Manipulador de Mensagens

A função create para o manipulador de mensagens recebe um objeto produtor e retorna uma função de callback. Como a função de callback só recebe um argumento, utilizamos o padrão de fechamento para passar implicitamente o produtor para a função msg_handler quando a criamos.

A função msg_handler é passada três argumentos quando chamada: uma lista de mensagens, um erro (se ocorrer) e um contexto composto por um dicionário. Nosso manipulador percorre as mensagens, chama a função de transformação em cada uma, envia as mensagens para a segunda estação usando o produtor e confirma que a mensagem foi processada. No Memphis.dev, as mensagens não são marcadas como entregues até que o consumidor as confirme. Isso evita que as mensagens sejam perdidas se ocorrer um erro durante o processamento.

Python

def create_handler(producer):
    async def msg_handler(msgs, error, context):
        try:
            for msg in msgs:
                transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
                await producer.produce(message=transformed_msg)
                await msg.ack()
        except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
            print(e)
            return

    return msg_handler

A Função de Transformador de Mensagens

Agora, chegamos ao cerne do serviço: a função de transformação de mensagens. Os payloads de mensagens (retornados pelo método get_data()) são armazenados como objetos bytearray. Utilizamos a biblioteca json do Python para desserializar as mensagens em uma hierarquia de coleções do Python (listas e dicionários) e tipos primitivos (int, float, str e None).

Python

def deserialize_mongodb_cdc_event(input_msg):
    obj = json.loads(input_msg)

Esperamos que o objeto possua uma propriedade payload com um objeto como valor. Esse objeto possui então duas propriedades (“antes” e “depois”), que são ou None ou strings contendo objetos JSON serializados. Usamos novamente a biblioteca JSON para desserializar e substituir as strings pelos objetos.

Python

 

if "payload" in obj:
        payload = obj["payload"]

        if "before" in payload:
            before_payload = payload["before"]
            if before_payload is not None:
                payload["before"] = json.loads(before_payload)

        if "after" in payload:
            after_payload = payload["after"]
            if after_payload is not None:
                payload["after"] = json.loads(after_payload)

Por fim, reserializamos todo o registro JSON e o convertemos de volta em um bytearray para transmissão ao broker.

Python

 

output_s = json.dumps(obj)
    output_msg = bytearray(output_s, "utf-8")
    return output_msg

Uhuul! Nossos objetos agora têm a seguinte aparência:

Python

 

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},

...
}
}

Executando o Serviço de Transformador

Se você seguiu os sete passos no post anterior do blog, só precisa executar mais três passos para iniciar o serviço de transformação e verificar se está funcionando:

Passo 8: Iniciar o Serviço de Transformador

Python

 

$ docker compose up -d cdc-transformer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cdc-transformer                                  Started                                                            1.3s

Passo 9: Iniciar o Segundo Consumidor de Impressão

Python

 

$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cleaned-printing-consumer                        Started                                                            1.3s

Passo 10: Verificar a IU do Memphis

Quando o transformador começar a produzir mensagens para o Memphis.dev, uma segunda estação chamada “cleaned-todo-cdc-events” será criada. Você deve ver esta nova estação na página de Visão Geral de Estações na IU do Memphis.dev, assim:

A página de detalhes para a “cleaned-todo-cdc-events” deve exibir o transformador anexado como produtor, o consumidor de impressão e as mensagens transformadas: 

Parabéns! Estamos prontos para começar a validar mensagens usando o Schemaverse em nosso próximo post de blog. Fique ligado!

Caso você tenha perdido as partes 1 e 2:

Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages