Parte 3: Trasformazione dei Messaggi di Eventi CDC di MongoDB

Nel nostro ultimo post sul blog, abbiamo presentato una implementazione di riferimento per catturare eventi di change data capture (CDC) da un database MongoDB utilizzando Debezium Server e Memphis.dev. Alla fine del post, abbiamo notato che i record MongoDB vengono serializzati come stringhe nei messaggi CDC di Debezium in questo modo:

Python

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
            ...
         }
}

Vogliamo utilizzare la funzionalità Schemaverse di Memphis.dev per verificare i messaggi contro uno schema previsto. I messaggi che non corrispondono allo schema vengono indirizzati a una stazione per lettere morte in modo che non influiscano sui consumatori downstream. Se tutto ciò sembra antico greco, non preoccuparti! Spiegheremo i dettagli nel nostro prossimo post sul blog.

Per utilizzare funzionalità come Schemaverse, dobbiamo deserializzare i record MongoDB come documenti JSON. Questo post descrive una modifica alla nostra pipeline CDC MongoDB che aggiunge un servizio trasformatore per deserializzare i record MongoDB in documenti JSON.

Panoramica della Soluzione

La soluzione precedente consisteva di sei componenti:

1. Generatore di Elementi da Fare: Inserisce un elemento da fare generato casualmente nella collezione MongoDB ogni 0,5 secondi. Ogni elemento da fare contiene una descrizione, un timestamp di creazione, una data dovuta opzionale e uno stato di completamento.

2. MongoDB: Configurato con un unico database contenente una singola collezione (todo_items).

3. Debezium Server: Istanza di Debezium Server configurata con origine MongoDB e connettori sink per client HTTP.

4. Gateway REST di Memphis.dev: Utilizza la configurazione predefinita.

5. Memphis.dev: Configurato con una singola stazione (todo-cdc-events) e un solo utente (todocdcservice)

6. Consumatore di Stampa: Uno script che utilizza l’SDK Python di Memphis.dev per consumare messaggi e stamparli sul console.

In questa iterazione, stiamo aggiungendo due componenti aggiuntivi:

1. Servizio Trasformatore: Un servizio trasformatore che consuma messaggi dalla stazione todo-cdc-events, deserializza i record MongoDB e li inserisce nella stazione cleaned-todo-cdc-events.

2. Consumatore Pulito di Stampa: Una seconda istanza del consumatore di stampa che stampa i messaggi inseriti nella stazione cleaned-todo-cdc-events.

L’architettura aggiornata è così:

A Deep Dive Into the Transformer Service

Scheletro del Servizio Trasformatore di Messaggi

Il servizio trasformatore utilizza l’SDK Python di Memphis.dev. Esaminiamo l’implementazione del trasformatore. Il metodo main() del nostro trasformatore si connette prima al broker Memphis.dev. I dettagli di connessione vengono acquisiti da variabili ambientali. L’host, il nome utente, la password, il nome della stazione di input e il nome della stazione di output vengono passati utilizzando variabili ambientali, conformemente ai suggerimenti del manifesto Twelve-Factor App.

Python

async def main():
    try:
        print("Waiting on messages...")
        memphis = Memphis()
        await memphis.connect(host=os.environ[HOST_KEY],
                              username=os.environ[USERNAME_KEY],
                              password=os.environ[PASSWORD_KEY])

Una volta stabilita la connessione, creiamo oggetti consumer e producer. In Memphis.dev, consumer e producer hanno nomi. Questi nomi appaiono nell’interfaccia utente di Memphis.dev, offrendo trasparenza sulle operazioni del sistema. 

Python

print("Creating consumer")
        consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
                                          consumer_name="transformer",
                                          consumer_group="")

        print("Creating producer")
        producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
                                          producer_name="transformer")

L’API consumer utilizza il design pattern della funzione callback. Quando i messaggi vengono estratti dal broker, la funzione fornita viene chiamata con un elenco di messaggi come argomento.

Python

print("Creating handler")
        msg_handler = create_handler(producer)

        print("Setting handler")
        consumer.consume(msg_handler)

Dopo aver configurato il callback, avviamo il ciclo degli eventi asyncio. A questo punto, il servizio di trasformazione si mette in pausa e attende fino a quando non ci sono messaggi disponibili da estrarre dal broker.

  • Mantieni attiva la tua thread principale affinché il consumer continui a ricevere dati.
Python

await asyncio.Event().wait()

Creazione della Funzione Gestore di Messaggi

La funzione create per il gestore di messaggi prende un oggetto produttore e restituisce una funzione callback. Poiché la funzione callback accetta un solo argomento, utilizziamo il pattern di chiusura per passare implicitamente il produttore alla funzione msg_handler quando la creiamo.

La funzione msg_handler viene passata tre argomenti quando viene chiamata: una lista di messaggi, un errore (se si è verificato) e un contesto costituito da un dizionario. Il nostro gestore scorre i messaggi, chiama la funzione di trasformazione su ciascuno, invia i messaggi alla seconda stazione utilizzando il produttore e conferma che il messaggio è stato elaborato. Su Memphis.dev, i messaggi non vengono contrassegnati come consegnati fino a quando il consumer non li conferma. Ciò previene la perdita di messaggi in caso di errore durante l’elaborazione.

Python

def create_handler(producer):
    async def msg_handler(msgs, error, context):
        try:
            for msg in msgs:
                transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
                await producer.produce(message=transformed_msg)
                await msg.ack()
        except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
            print(e)
            return

    return msg_handler

La Funzione di Trasformazione dei Messaggi

Ora arriviamo al cuore del servizio: la funzione di trasformazione dei messaggi. I payload dei messaggi (restituiti dal metodo get_data()) sono memorizzati come oggetti bytearray. Utilizziamo la libreria json di Python per deserializzare i messaggi in una gerarchia di collezioni Python (liste e dizionari) e tipi primitivi (int, float, str e None).

Python

def deserialize_mongodb_cdc_event(input_msg):
    obj = json.loads(input_msg)

Ci aspettiamo che l’oggetto abbia una proprietà payload con un oggetto come valore. Tale oggetto ha poi due proprietà (“before” e “after”), che sono o None o stringhe contenenti oggetti JSON serializzati. Utilizziamo nuovamente la libreria JSON per deserializzare e sostituire le stringhe con gli oggetti.

Python

 

if "payload" in obj:
        payload = obj["payload"]

        if "before" in payload:
            before_payload = payload["before"]
            if before_payload is not None:
                payload["before"] = json.loads(before_payload)

        if "after" in payload:
            after_payload = payload["after"]
            if after_payload is not None:
                payload["after"] = json.loads(after_payload)

Infine, reserializziamo l’intero record JSON e lo convertiamo di nuovo in un bytearray per la trasmissione al broker.

Python

 

output_s = json.dumps(obj)
    output_msg = bytearray(output_s, "utf-8")
    return output_msg

Hooray! I nostri oggetti ora hanno questo aspetto:

Python

 

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},

...
}
}

Esecuzione del Servizio di Trasformazione

Se hai seguito i sette passaggi nel post precedente del blog, hai solo bisogno di eseguire tre passaggi aggiuntivi per avviare il servizio di trasformazione e verificare che funzioni:

Passo 8: Avvia il Servizio di Trasformazione

Python

 

$ docker compose up -d cdc-transformer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cdc-transformer                                  Started                                                            1.3s

Passo 9: Avvia il Secondo Consumatore di Stampa

Python

 

$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cleaned-printing-consumer                        Started                                                            1.3s

Passo 10: Controlla l’Interfaccia Utente di Memphis

Quando il trasformatore inizia a produrre messaggi su Memphis.dev, verrà creata una seconda stazione denominata “cleaned-todo-cdc-events”. Dovresti vedere questa nuova stazione sulla pagina Panoramica della Stazione nell’interfaccia utente di Memphis.dev in questo modo:

La pagina dei dettagli per la pagina “cleaned-todo-cdc-events” dovrebbe mostrare il trasformatore allegato come produttore, il consumatore di stampa e i messaggi trasformati: 

Congratulazioni! Ora siamo pronti a affrontare la validazione dei messaggi utilizzando Schemaverse nel nostro prossimo post di blog. Restate sintonizzati!

Nel caso abbiate perso la parte 1 e 2:

Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages