Nel nostro ultimo post sul blog, abbiamo presentato una implementazione di riferimento per catturare eventi di change data capture (CDC) da un database MongoDB utilizzando Debezium Server e Memphis.dev. Alla fine del post, abbiamo notato che i record MongoDB vengono serializzati come stringhe nei messaggi CDC di Debezium in questo modo:
{
"schema" : ...,
"payload" : {
"before" : null,
"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
...
}
}
Vogliamo utilizzare la funzionalità Schemaverse di Memphis.dev per verificare i messaggi contro uno schema previsto. I messaggi che non corrispondono allo schema vengono indirizzati a una stazione per lettere morte in modo che non influiscano sui consumatori downstream. Se tutto ciò sembra antico greco, non preoccuparti! Spiegheremo i dettagli nel nostro prossimo post sul blog.
Per utilizzare funzionalità come Schemaverse, dobbiamo deserializzare i record MongoDB come documenti JSON. Questo post descrive una modifica alla nostra pipeline CDC MongoDB che aggiunge un servizio trasformatore per deserializzare i record MongoDB in documenti JSON.
Panoramica della Soluzione
La soluzione precedente consisteva di sei componenti:
1. Generatore di Elementi da Fare: Inserisce un elemento da fare generato casualmente nella collezione MongoDB ogni 0,5 secondi. Ogni elemento da fare contiene una descrizione, un timestamp di creazione, una data dovuta opzionale e uno stato di completamento.
2. MongoDB: Configurato con un unico database contenente una singola collezione (todo_items).
3. Debezium Server: Istanza di Debezium Server configurata con origine MongoDB e connettori sink per client HTTP.
4. Gateway REST di Memphis.dev: Utilizza la configurazione predefinita.
5. Memphis.dev: Configurato con una singola stazione (todo-cdc-events) e un solo utente (todocdcservice)
6. Consumatore di Stampa: Uno script che utilizza l’SDK Python di Memphis.dev per consumare messaggi e stamparli sul console.
In questa iterazione, stiamo aggiungendo due componenti aggiuntivi:
1. Servizio Trasformatore: Un servizio trasformatore che consuma messaggi dalla stazione todo-cdc-events, deserializza i record MongoDB e li inserisce nella stazione cleaned-todo-cdc-events.
2. Consumatore Pulito di Stampa: Una seconda istanza del consumatore di stampa che stampa i messaggi inseriti nella stazione cleaned-todo-cdc-events.
L’architettura aggiornata è così:
A Deep Dive Into the Transformer Service
Scheletro del Servizio Trasformatore di Messaggi
Il servizio trasformatore utilizza l’SDK Python di Memphis.dev. Esaminiamo l’implementazione del trasformatore. Il metodo main() del nostro trasformatore si connette prima al broker Memphis.dev. I dettagli di connessione vengono acquisiti da variabili ambientali. L’host, il nome utente, la password, il nome della stazione di input e il nome della stazione di output vengono passati utilizzando variabili ambientali, conformemente ai suggerimenti del manifesto Twelve-Factor App.
async def main():
try:
print("Waiting on messages...")
memphis = Memphis()
await memphis.connect(host=os.environ[HOST_KEY],
username=os.environ[USERNAME_KEY],
password=os.environ[PASSWORD_KEY])
Una volta stabilita la connessione, creiamo oggetti consumer e producer. In Memphis.dev, consumer e producer hanno nomi. Questi nomi appaiono nell’interfaccia utente di Memphis.dev, offrendo trasparenza sulle operazioni del sistema.
print("Creating consumer")
consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
consumer_name="transformer",
consumer_group="")
print("Creating producer")
producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
producer_name="transformer")
L’API consumer utilizza il design pattern della funzione callback. Quando i messaggi vengono estratti dal broker, la funzione fornita viene chiamata con un elenco di messaggi come argomento.
print("Creating handler")
msg_handler = create_handler(producer)
print("Setting handler")
consumer.consume(msg_handler)
Dopo aver configurato il callback, avviamo il ciclo degli eventi asyncio. A questo punto, il servizio di trasformazione si mette in pausa e attende fino a quando non ci sono messaggi disponibili da estrarre dal broker.
- Mantieni attiva la tua thread principale affinché il consumer continui a ricevere dati.
await asyncio.Event().wait()
Creazione della Funzione Gestore di Messaggi
La funzione create per il gestore di messaggi prende un oggetto produttore e restituisce una funzione callback. Poiché la funzione callback accetta un solo argomento, utilizziamo il pattern di chiusura per passare implicitamente il produttore alla funzione msg_handler quando la creiamo.
La funzione msg_handler viene passata tre argomenti quando viene chiamata: una lista di messaggi, un errore (se si è verificato) e un contesto costituito da un dizionario. Il nostro gestore scorre i messaggi, chiama la funzione di trasformazione su ciascuno, invia i messaggi alla seconda stazione utilizzando il produttore e conferma che il messaggio è stato elaborato. Su Memphis.dev, i messaggi non vengono contrassegnati come consegnati fino a quando il consumer non li conferma. Ciò previene la perdita di messaggi in caso di errore durante l’elaborazione.
def create_handler(producer):
async def msg_handler(msgs, error, context):
try:
for msg in msgs:
transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
await producer.produce(message=transformed_msg)
await msg.ack()
except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
print(e)
return
return msg_handler
La Funzione di Trasformazione dei Messaggi
Ora arriviamo al cuore del servizio: la funzione di trasformazione dei messaggi. I payload dei messaggi (restituiti dal metodo get_data()) sono memorizzati come oggetti bytearray. Utilizziamo la libreria json di Python per deserializzare i messaggi in una gerarchia di collezioni Python (liste e dizionari) e tipi primitivi (int, float, str e None).
def deserialize_mongodb_cdc_event(input_msg):
obj = json.loads(input_msg)
Ci aspettiamo che l’oggetto abbia una proprietà payload con un oggetto come valore. Tale oggetto ha poi due proprietà (“before” e “after”), che sono o None o stringhe contenenti oggetti JSON serializzati. Utilizziamo nuovamente la libreria JSON per deserializzare e sostituire le stringhe con gli oggetti.
if "payload" in obj:
payload = obj["payload"]
if "before" in payload:
before_payload = payload["before"]
if before_payload is not None:
payload["before"] = json.loads(before_payload)
if "after" in payload:
after_payload = payload["after"]
if after_payload is not None:
payload["after"] = json.loads(after_payload)
Infine, reserializziamo l’intero record JSON e lo convertiamo di nuovo in un bytearray per la trasmissione al broker.
output_s = json.dumps(obj)
output_msg = bytearray(output_s, "utf-8")
return output_msg
Hooray! I nostri oggetti ora hanno questo aspetto:
{
"schema" : ...,
"payload" : {
"before" : null,
"after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},
...
}
}
Esecuzione del Servizio di Trasformazione
Se hai seguito i sette passaggi nel post precedente del blog, hai solo bisogno di eseguire tre passaggi aggiuntivi per avviare il servizio di trasformazione e verificare che funzioni:
Passo 8: Avvia il Servizio di Trasformazione
$ docker compose up -d cdc-transformer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container cdc-transformer Started 1.3s
Passo 9: Avvia il Secondo Consumatore di Stampa
$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container cleaned-printing-consumer Started 1.3s
Passo 10: Controlla l’Interfaccia Utente di Memphis
Quando il trasformatore inizia a produrre messaggi su Memphis.dev, verrà creata una seconda stazione denominata “cleaned-todo-cdc-events”. Dovresti vedere questa nuova stazione sulla pagina Panoramica della Stazione nell’interfaccia utente di Memphis.dev in questo modo:
La pagina dei dettagli per la pagina “cleaned-todo-cdc-events” dovrebbe mostrare il trasformatore allegato come produttore, il consumatore di stampa e i messaggi trasformati:
Congratulazioni! Ora siamo pronti a affrontare la validazione dei messaggi utilizzando Schemaverse nel nostro prossimo post di blog. Restate sintonizzati!
Nel caso abbiate perso la parte 1 e 2:
Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages