Parte 3: Transformación de Mensajes de Eventos CDC de MongoDB

En nuestro último artículo de blog, presentamos una implementación de referencia para capturar eventos de captura de datos de cambio (CDC) desde una base de datos MongoDB utilizando Debezium Server y Memphis.dev. Al final del post, señalamos que los registros de MongoDB se serializan como cadenas en mensajes CDC de Debezium de la siguiente manera:

Python

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
            ...
         }
}

Queremos utilizar la funcionalidad Schemaverse de Memphis.dev para verificar los mensajes contra un esquema esperado. Los mensajes que no coincidan con el esquema se enrutarán a una estación de cartel muertos para que no afecten a los consumidores en cadena. Si todo esto suena como griego antiguo, no te preocupes. Explicaremos los detalles en nuestro próximo artículo de blog.

Para utilizar funcionalidades como Schemaverse, necesitamos deserializar los registros de MongoDB como documentos JSON. Este blog post describe una modificación a nuestra tubería CDC de MongoDB que agrega un servicio transformador para deserializar los registros de MongoDB en documentos JSON.

Resumen de la Solución

La solución anterior consistía en seis componentes:

1. Generador de Elementos Pendientes: Inserta un elemento de pendientes generado aleatoriamente en la colección de MongoDB cada 0,5 segundos. Cada elemento de pendientes contiene una descripción, marca de tiempo de creación, fecha de vencimiento opcional y estado de finalización.

2. MongoDB: Configurado con una base de datos única que contiene una sola colección (todo_items).

3. Debezium Server: Instancia de Debezium Server configurada con conectores de origen MongoDB y de receptor HTTP Client.

4. Memphis.dev REST Gateway: Utiliza la configuración out-of-the-box.

5. Memphis.dev: Configurado con una sola estación (todo-cdc-events) y un solo usuario (todocdcservice)

6. Consumidor de Impresión: Un script que utiliza el SDK de Python de Memphis.dev para consumir mensajes e imprimirlos en la consola.

En esta iteración, estamos agregando dos componentes adicionales:

1. Servicio Transformador: Un servicio transformador que consume mensajes de la estación todo-cdc-events, deserializa los registros de MongoDB y los envía a la estación cleaned-todo-cdc-events.

2. Consumidor de Impresión Limpiado: Una segunda instancia del consumidor de impresión que imprime mensajes enviados a la estación cleaned-todo-cdc-events.

La arquitectura actualizada se ve así:

A Deep Dive Into the Transformer Service

Esqueleto del Servicio Transformador de Mensajes

El servicio transformador utiliza el SDK de Python de Memphis.dev. Vamos a revisar la implementación del transformador. El método main() de nuestro transformador se conecta primero al broker de Memphis.dev. Los detalles de conexión se obtienen de las variables de entorno. El host, nombre de usuario, contraseña, nombre de la estación de entrada y nombre de la estación de salida se pasan mediante variables de entorno de acuerdo con las sugerencias del Manifiesto de la Aplicación de Doce Factores.

Python

async def main():
    try:
        print("Waiting on messages...")
        memphis = Memphis()
        await memphis.connect(host=os.environ[HOST_KEY],
                              username=os.environ[USERNAME_KEY],
                              password=os.environ[PASSWORD_KEY])

Una vez establecida la conexión, creamos objetos de consumidor y productor. En Memphis.dev, los consumidores y productores tienen nombres. Estos nombres aparecen en la interfaz de usuario de Memphis.dev, ofreciendo transparencia en las operaciones del sistema. 

Python

print("Creating consumer")
        consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
                                          consumer_name="transformer",
                                          consumer_group="")

        print("Creating producer")
        producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
                                          producer_name="transformer")

La API del consumidor utiliza el patrón de diseño de la función callback. Cuando se extraen mensajes del broker, se llama a la función proporcionada con una lista de mensajes como argumento.

Python

print("Creating handler")
        msg_handler = create_handler(producer)

        print("Setting handler")
        consumer.consume(msg_handler)

Después de configurar la devolución de llamada, iniciamos el bucle de eventos asyncio. En este punto, el servicio de transformador se detiene y espera hasta que haya mensajes disponibles para extraer del broker.

  • Mantén tu hilo principal activo para que el consumidor siga recibiendo datos.
Python

await asyncio.Event().wait()

Creación de la Función Manejador de Mensajes

La función create para el manejador de mensajes toma un objeto productor y devuelve una función de devolución de llamada. Dado que la función de devolución de llamada solo toma un argumento, utilizamos el patrón de clausura para pasar implícitamente el productor a la función msg_handler cuando la creamos.

La función msg_handler se pasa tres argumentos cuando se llama: una lista de mensajes, un error (si ocurrió) y un contexto que consiste en un diccionario. Nuestro controlador recorre los mensajes, llama a la función de transformación en cada uno, envía los mensajes a la segunda estación utilizando el productor y confirma que el mensaje ha sido procesado. En Memphis.dev, los mensajes no se marcan como entregados hasta que el consumidor los confirma. Esto evita que se pierdan mensajes si ocurre un error durante el procesamiento.

Python

def create_handler(producer):
    async def msg_handler(msgs, error, context):
        try:
            for msg in msgs:
                transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
                await producer.produce(message=transformed_msg)
                await msg.ack()
        except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
            print(e)
            return

    return msg_handler

La Función Transformadora de Mensajes

Ahora, llegamos al núcleo del servicio: la función transformadora de mensajes. Los datos de carga útil de los mensajes (devueltos por el método get_data()) se almacenan como objetos bytearray. Utilizamos la biblioteca json de Python para deserializar los mensajes en una jerarquía de colecciones de Python (listas y diccionarios) y tipos primitivos (int, float, str y None).

Python

def deserialize_mongodb_cdc_event(input_msg):
    obj = json.loads(input_msg)

Esperamos que el objeto tenga una propiedad de carga útil con un objeto como valor. Ese objeto tiene dos propiedades (“antes” y “después”), que son o bien None o cadenas que contienen objetos JSON serializados. Volvemos a utilizar la biblioteca JSON para deserializar y reemplazar las cadenas con los objetos.

Python

 

if "payload" in obj:
        payload = obj["payload"]

        if "before" in payload:
            before_payload = payload["before"]
            if before_payload is not None:
                payload["before"] = json.loads(before_payload)

        if "after" in payload:
            after_payload = payload["after"]
            if after_payload is not None:
                payload["after"] = json.loads(after_payload)

Por último, re-serializamos toda la entrada JSON y la convertimos de nuevo en un bytearray para su transmisión al broker.

Python

 

output_s = json.dumps(obj)
    output_msg = bytearray(output_s, "utf-8")
    return output_msg

¡Hurra! Nuestros objetos ahora lucen así:

Python

 

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},

...
}
}

Ejecutando el Servicio Transformador

Si seguiste los siete pasos en el post anterior del blog, solo necesitas ejecutar tres pasos adicionales para iniciar el servicio transformador y verificar que está funcionando:

Paso 8: Iniciar el Servicio Transformador

Python

 

$ docker compose up -d cdc-transformer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cdc-transformer                                  Started                                                            1.3s

Paso 9: Iniciar el Segundo Consumidor de Impresión

Python

 

$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cleaned-printing-consumer                        Started                                                            1.3s

Paso 10: Verificar la IU de Memphis

Cuando el transformador comience a producir mensajes a Memphis.dev, se creará una segunda estación llamada “cleaned-todo-cdc-events”. Deberías ver esta nueva estación en la página de Resumen de Estaciones en la IU de Memphis.dev de la siguiente manera:

La página de detalles para la página “cleaned-todo-cdc-events” debe mostrar el transformador adjunto como productor, el consumidor de impresión y los mensajes transformados: 

¡Felicitaciones! Ahora estamos listos para abordar la validación de mensajes utilizando Schemaverse en nuestra próxima entrada de blog. ¡Estén atentos!

En caso de que haya perdido las partes 1 y 2:

Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages