En nuestro último artículo de blog, presentamos una implementación de referencia para capturar eventos de captura de datos de cambio (CDC) desde una base de datos MongoDB utilizando Debezium Server y Memphis.dev. Al final del post, señalamos que los registros de MongoDB se serializan como cadenas en mensajes CDC de Debezium de la siguiente manera:
{
"schema" : ...,
"payload" : {
"before" : null,
"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
...
}
}
Queremos utilizar la funcionalidad Schemaverse de Memphis.dev para verificar los mensajes contra un esquema esperado. Los mensajes que no coincidan con el esquema se enrutarán a una estación de cartel muertos para que no afecten a los consumidores en cadena. Si todo esto suena como griego antiguo, no te preocupes. Explicaremos los detalles en nuestro próximo artículo de blog.
Para utilizar funcionalidades como Schemaverse, necesitamos deserializar los registros de MongoDB como documentos JSON. Este blog post describe una modificación a nuestra tubería CDC de MongoDB que agrega un servicio transformador para deserializar los registros de MongoDB en documentos JSON.
Resumen de la Solución
La solución anterior consistía en seis componentes:
1. Generador de Elementos Pendientes: Inserta un elemento de pendientes generado aleatoriamente en la colección de MongoDB cada 0,5 segundos. Cada elemento de pendientes contiene una descripción, marca de tiempo de creación, fecha de vencimiento opcional y estado de finalización.
2. MongoDB: Configurado con una base de datos única que contiene una sola colección (todo_items).
3. Debezium Server: Instancia de Debezium Server configurada con conectores de origen MongoDB y de receptor HTTP Client.
4. Memphis.dev REST Gateway: Utiliza la configuración out-of-the-box.
5. Memphis.dev: Configurado con una sola estación (todo-cdc-events) y un solo usuario (todocdcservice)
6. Consumidor de Impresión: Un script que utiliza el SDK de Python de Memphis.dev para consumir mensajes e imprimirlos en la consola.
En esta iteración, estamos agregando dos componentes adicionales:
1. Servicio Transformador: Un servicio transformador que consume mensajes de la estación todo-cdc-events, deserializa los registros de MongoDB y los envía a la estación cleaned-todo-cdc-events.
2. Consumidor de Impresión Limpiado: Una segunda instancia del consumidor de impresión que imprime mensajes enviados a la estación cleaned-todo-cdc-events.
La arquitectura actualizada se ve así:
A Deep Dive Into the Transformer Service
Esqueleto del Servicio Transformador de Mensajes
El servicio transformador utiliza el SDK de Python de Memphis.dev. Vamos a revisar la implementación del transformador. El método main() de nuestro transformador se conecta primero al broker de Memphis.dev. Los detalles de conexión se obtienen de las variables de entorno. El host, nombre de usuario, contraseña, nombre de la estación de entrada y nombre de la estación de salida se pasan mediante variables de entorno de acuerdo con las sugerencias del Manifiesto de la Aplicación de Doce Factores.
async def main():
try:
print("Waiting on messages...")
memphis = Memphis()
await memphis.connect(host=os.environ[HOST_KEY],
username=os.environ[USERNAME_KEY],
password=os.environ[PASSWORD_KEY])
Una vez establecida la conexión, creamos objetos de consumidor y productor. En Memphis.dev, los consumidores y productores tienen nombres. Estos nombres aparecen en la interfaz de usuario de Memphis.dev, ofreciendo transparencia en las operaciones del sistema.
print("Creating consumer")
consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
consumer_name="transformer",
consumer_group="")
print("Creating producer")
producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
producer_name="transformer")
La API del consumidor utiliza el patrón de diseño de la función callback. Cuando se extraen mensajes del broker, se llama a la función proporcionada con una lista de mensajes como argumento.
print("Creating handler")
msg_handler = create_handler(producer)
print("Setting handler")
consumer.consume(msg_handler)
Después de configurar la devolución de llamada, iniciamos el bucle de eventos asyncio. En este punto, el servicio de transformador se detiene y espera hasta que haya mensajes disponibles para extraer del broker.
- Mantén tu hilo principal activo para que el consumidor siga recibiendo datos.
await asyncio.Event().wait()
Creación de la Función Manejador de Mensajes
La función create para el manejador de mensajes toma un objeto productor y devuelve una función de devolución de llamada. Dado que la función de devolución de llamada solo toma un argumento, utilizamos el patrón de clausura para pasar implícitamente el productor a la función msg_handler cuando la creamos.
La función msg_handler se pasa tres argumentos cuando se llama: una lista de mensajes, un error (si ocurrió) y un contexto que consiste en un diccionario. Nuestro controlador recorre los mensajes, llama a la función de transformación en cada uno, envía los mensajes a la segunda estación utilizando el productor y confirma que el mensaje ha sido procesado. En Memphis.dev, los mensajes no se marcan como entregados hasta que el consumidor los confirma. Esto evita que se pierdan mensajes si ocurre un error durante el procesamiento.
def create_handler(producer):
async def msg_handler(msgs, error, context):
try:
for msg in msgs:
transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
await producer.produce(message=transformed_msg)
await msg.ack()
except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
print(e)
return
return msg_handler
La Función Transformadora de Mensajes
Ahora, llegamos al núcleo del servicio: la función transformadora de mensajes. Los datos de carga útil de los mensajes (devueltos por el método get_data()) se almacenan como objetos bytearray. Utilizamos la biblioteca json de Python para deserializar los mensajes en una jerarquía de colecciones de Python (listas y diccionarios) y tipos primitivos (int, float, str y None).
def deserialize_mongodb_cdc_event(input_msg):
obj = json.loads(input_msg)
Esperamos que el objeto tenga una propiedad de carga útil con un objeto como valor. Ese objeto tiene dos propiedades (“antes” y “después”), que son o bien None o cadenas que contienen objetos JSON serializados. Volvemos a utilizar la biblioteca JSON para deserializar y reemplazar las cadenas con los objetos.
if "payload" in obj:
payload = obj["payload"]
if "before" in payload:
before_payload = payload["before"]
if before_payload is not None:
payload["before"] = json.loads(before_payload)
if "after" in payload:
after_payload = payload["after"]
if after_payload is not None:
payload["after"] = json.loads(after_payload)
Por último, re-serializamos toda la entrada JSON y la convertimos de nuevo en un bytearray para su transmisión al broker.
output_s = json.dumps(obj)
output_msg = bytearray(output_s, "utf-8")
return output_msg
¡Hurra! Nuestros objetos ahora lucen así:
{
"schema" : ...,
"payload" : {
"before" : null,
"after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},
...
}
}
Ejecutando el Servicio Transformador
Si seguiste los siete pasos en el post anterior del blog, solo necesitas ejecutar tres pasos adicionales para iniciar el servicio transformador y verificar que está funcionando:
Paso 8: Iniciar el Servicio Transformador
$ docker compose up -d cdc-transformer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container cdc-transformer Started 1.3s
Paso 9: Iniciar el Segundo Consumidor de Impresión
$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container cleaned-printing-consumer Started 1.3s
Paso 10: Verificar la IU de Memphis
Cuando el transformador comience a producir mensajes a Memphis.dev, se creará una segunda estación llamada “cleaned-todo-cdc-events”. Deberías ver esta nueva estación en la página de Resumen de Estaciones en la IU de Memphis.dev de la siguiente manera:
La página de detalles para la página “cleaned-todo-cdc-events” debe mostrar el transformador adjunto como productor, el consumidor de impresión y los mensajes transformados:
¡Felicitaciones! Ahora estamos listos para abordar la validación de mensajes utilizando Schemaverse en nuestra próxima entrada de blog. ¡Estén atentos!
En caso de que haya perdido las partes 1 y 2:
Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages