Questa è la seconda parte di una serie di post sul blog sulla creazione di un sistema orientato agli eventi moderno utilizzando Memphis.dev.
Il nostro ultimo blog post ha introdotto una implementazione di riferimento per la raccolta di eventi di capture di dati di cambiamento (CDC) da un database PostgreSQL utilizzando Debezium Server e Memphis.dev. Sostituendo Apache Kafka con Memphis.dev, la soluzione ha ridotto notevolmente le risorse operative e l’onere – risparmiando denaro e liberando gli sviluppatori per concentrarsi sulla creazione di nuove funzionalità.
Tuttavia, PostgreSQL è l’unico database comunemente utilizzato. Debezium fornisce connettori per vari database, inclusa la base di dati documento non relazionale MongoDB. MongoDB è popolare tra gli sviluppatori, specialmente quelli che lavorano con linguaggi di programmazione dinamici, poiché evita la disparità tra oggetti e relazioni. Gli sviluppatori possono memorizzare, interrogare e aggiornare direttamente gli oggetti nel database.
In questo post sul blog, mostriamo come adattare la soluzione CDC a MongoDB.
Panoramica della Soluzione
Qui descriviamo l’architettura della soluzione di riferimento per la consegna di eventi di capture di dati di cambiamento con Memphis.dev. L’architettura non è cambiata dall’nostro precedente blog post, ad eccezione della sostituzione di PostgreSQL con MongoDB.
A Todo Item generator script writes randomly generated records to MongoDB. Debezium Server receives CDC events from MongoDB and forwards them to the Memphis REST gateway through the HTTP client sink. The Memphis REST gateway adds the messages to a station in Memphis.dev. Lastly, a consumer script polls Memphis. dev for new messages and prints them to the console.
- Generatore di Elementi da Fare: Inserisce un elemento da fare generato casualmente nella collezione MongoDB ogni 0,5 secondi. Ogni elemento da fare contiene una descrizione, un timestamp di creazione, una data di scadenza opzionale e uno stato di completamento.
- MongoDB: Configurato con un unico database contenente una sola collezione (todo_items).
- Server Debezium: Un’istanza di Debezium Server configurata con connettori sorgente MongoDB e sink Client HTTP.
- Gateway REST di Memphis.dev: Utilizza la configurazione predefinita.
- Memphis.dev: Configurato con una singola stazione (todo-cdc-events) e un solo utente (todocdcservice).
- Consumatore di Stampa: Uno script che utilizza l’SDK Python di Memphis.dev per consumare messaggi e stamparli sul console.
Iniziare
La guida alla realizzazione è disponibile nella directory mongodb-debezium-cdc-example del repository di Soluzioni Esempi di Memphis. Docker Compose sarà necessario per eseguirlo.
Esecuzione della Implementazione
Costruire le immagini Docker per il Debezium Server, il consumatore di stampa e l’installazione del database (creazione di tabelle e utenti).
Attualmente, l’implementazione dipende da una versione pre-rilascio del Debezium Server per il supporto dell’autenticazione JWT. Un’immagine Docker sarà costruita direttamente dalla branch principale dei repository Debezium e Debezium Server. Si noti che questo passaggio può richiedere un po’ di tempo (~20 minuti) per essere eseguito. Quando Debezium Server 2.3.0 sarà rilasciato, cambieremo all’uso dell’immagine Docker upstream.
Passo 1: Costruire le Immagini
$ docker compose build --pull --no-cache
Passo 2: Avviare il Broker e il Gateway REST di Memphis.dev
Avviare il broker di Memphis.dev e il gateway REST. Si noti che il servizio memphis-rest-gateway dipende dal servizio broker di Memphis, quindi il servizio broker verrà avviato anche esso.
$ docker compose up -d memphis-rest-gateway
[+] Running 4/4
⠿ Network mongodb-debezium-cdc-example_default Created 0.0s
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Healthy 6.0s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 16.8s
⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1 Started
Passo 3: Creare una Stazione e l’Utente Corrispondente in Memphis.dev
I messaggi vengono consegnati a “stazioni” in Memphis.dev; sono equivalenti ai “topic” utilizzati dai broker di messaggi. Aprire il browser su localhost:9000. Fare clic sul link “accedi con root” in fondo alla pagina.
Accedere con root (username) e memphis (password).
Seguire il wizard per creare una stazione denominata todo-cdc-events.
Creare un utente chiamato todocdcservice con lo stesso valore per la password.
Fare clic su “Avanti” fino a quando il wizard non è terminato:
Fare clic su “Vai alla panoramica della stazione” per andare alla pagina della panoramica della stazione.
Passo 4: Avviare il Consumer di Stampa
Abbiamo utilizzato l’SDK Python di Memphis.dev per creare uno script consumer che scandisce la stazione todo-cdc-events e stampa i messaggi sul console.
$ docker compose up -d printing-consumer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container printing-consumer Started
Passo 5: Avvio e Configurazione di MongoDB
Per catturare le modifiche, la funzionalità di replica di MongoDB deve essere abilitata. Ci sono diversi passaggi:
- Il nome del set di repliche deve essere impostato. Questo può essere fatto passando il nome di un set di repliche sulla riga di comando o nel file di configurazione. Nel file Docker Compose, eseguiamo MongoDB con l’argomento della riga di comando –replSet rs0 per impostare il nome del set di repliche.
- Quando viene utilizzata la replicazione e l’autorizzazione è abilitata, è necessario fornire un file chiave comune a ciascuna istanza replica. Abbiamo generato un file chiave seguendo le istruzioni nella documentazione di MongoDB. Successivamente, abbiamo creato un’immagine che estende l’immagine ufficiale di MongoDB includendo il file chiave.
- Il set di replica deve essere inizializzato una volta che MongoDB è in esecuzione. Utilizziamo uno script che configura l’istanza al momento del startup. Lo script chiama il comando replSetInitiate con un elenco degli indirizzi IP e delle porte di ciascuna istanza MongoDB nel set di replica. Questo comando fa in modo che le istanze MongoDB comunichino tra loro e selezionino un leader.
In generale, i set di replica vengono utilizzati per aumentare la affidabilità (disponibilità elevata). La maggior parte della documentazione che troverai descrive come configurare una replica con più istanze MongoDB. Nel nostro caso, il connettore MongoDB di Debezium si avvale della funzionalità di replicazione per catturare eventi di cambiamento dei dati. Anche se seguiamo i passaggi per configurare un set di replica, utilizziamo solo un’istanza MongoDB.
Il script generator di item todo crea un nuovo elemento todo ogni mezzo secondo. I valori dei campi vengono generati in modo casuale. Gli elementi vengono aggiunti a una collezione MongoDB denominata “todo_items”.
Nel file Docker Compose, lo script generator di item todo è configurato in modo da dipendere dall’istanza di Mongodb funzionante in uno stato sano e dal completamento riuscito dello script di setup del database. Avviando lo script generator di item todo, Docker Compose avvierà anche MongoDB e eseguirà lo script di setup del database.
$ docker compose up -d todo-generator
[+] Running 3/3
⠿ Container mongodb Healthy 8.4s
⠿ Container mongodb-database-setup Exited 8.8s
⠿ Container mongodb-todo-generator Started
Passo 6: Avviare il Server Debezium
L’ultimo servizio che deve essere avviato è il Server Debezium. Il server è configurato con un connettore sorgente per MongoDB e il connettore sink HTTP Client attraverso un file di proprietà Java:
debezium.sink.type=http
debezium.sink.http.url=http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
debezium.sink.http.time-out.ms=500
debezium.sink.http.retries=3
debezium.sink.http.authentication.type=jwt
debezium.sink.http.authentication.jwt.username=todocdcservice
debezium.sink.http.authentication.jwt.password=todocdcservice
debezium.sink.http.authentication.jwt.url=http://memphis-rest-gateway:4444/
debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector
debezium.source.mongodb.connection.string=mongodb://db
debezium.source.mongodb.user=root
debezium.source.mongodb.password=mongodb
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.topic.prefix=tutorial
debezium.format.key=json
debezium.format.value=json
quarkus.log.console.json=false
La maggior parte delle opzioni sono di auto-spiegazione. L’URL del client HTTP sink vale la pena spiegare in dettaglio. La porta di ingresso REST di memphis.dev si aspetta di ricevere richieste POST con un percorso nel formato seguente:
Il segnaposto {station} viene sostituito con il nome della stazione a cui inviare il messaggio. Il segnaposto {quantity} viene sostituito con il valore single (per un singolo messaggio) o batch (per più messaggi).
Il messaggio(i) viene(vengono) trasmesso(i) come payload della richiesta POST. Il gateway REST supporta tre formati di messaggi (testo normale, JSON o protocol buffer). Il valore (text/, application/json, application/x-protobuf) dell’header Content-Type determina come il payload viene interpretato.
Il client HTTP sink di Debezium Server produce richieste REST che sono coerenti con questi schemi. Le richieste utilizzano il verbo POST; ogni richiesta contiene un singolo messaggio codificato in JSON come payload e l’header Content-Type è impostato su application/JSON. Utilizziamo todo-CDC-events come nome della stazione e il valore della quantità singola nell’URL dell’endpoint per instradare i messaggi e indicare come il gateway REST dovrebbe interpretare le richieste:
http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
La proprietà debezium.sink.http.authentication.type=jwt indica che il sink HTTP Client dovrebbe utilizzare l’autenticazione JWT. Le proprietà username e password sono evidenti, ma la proprietà debezium.sink.http.authentication.jwt.The URL merita qualche spiegazione. Un token iniziale viene acquisito utilizzando l’endpoint /auth/authenticate, mentre l’autenticazione viene aggiornata utilizzando l’endpoint separato /auth/refreshToken. L’autenticazione JWT nel client HTTP aggiunge l’endpoint appropriato alla URL base data.
Debezium Server può essere avviato con il seguente comando:
$ docker compose up -d debezium-server
[+] Running 5/5
⠿ Container mongodb Healthy 1.5s
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Healthy 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1 Running 0.0s
⠿ Container debezium-server Started
Passo 7: Verifica del Funzionamento del Sistema
Controlla la schermata dell’overview della stazione todo-cdc-events in Memphis.dev web UI per confermare che il produttore e il consumatore sono connessi e che i messaggi vengono consegnati.
E, stampare i log per il contenitore printing-consumer:
$ docker logs --tail 2 printing-consumer
messaggio:
bytearray(b'{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"},{"type":"int64","optional":true,"field":"wallTime"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"tutorial.todo_application.todo_items.Envelope"},"payload":{"before":null,"after":"{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ec\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402475},\\"due_date\\": {\\"$date\\": 1684266602475},\\"description\\": \\"GMZVMKXVKOWIOEAVRYWR\\",\\"completed\\": false}","updateDescription":null,"source":{"version":"2.3.0-SNAPSHOT","connector":"mongodb","name":"tutorial","ts_ms":1684007402000,"snapshot":"false","db":"todo_application","sequence":null,"rs":"rs0","collection":"todo_items","ord":1,"lsid":null,"txnNumber":null,"wallTime":1684007402476},"op":"c","ts_ms":1684007402478,"transaction":null}}')
Formato dei messaggi CDC
I messaggi in ingresso sono formattati come JSON. I messaggi hanno due campi di primo livello (schema e payload). Lo schema descrive lo schema del record (nomi e tipi dei campi), mentre il payload descrive il cambiamento al record. L’oggetto payload contiene esso stesso due campi (prima e dopo) che indicano il valore del record prima e dopo il cambiamento.
Per MongoDB, Debezium Server codifica il record come una stringa di JSON serializzato:
{
"before" : null,
"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}"
}
Ciò avrà implicazioni per il processamento downstream dei messaggi, che descriveremo in un prossimo post del blog in questa serie.
Congratulazioni! Ora hai un esempio funzionante di come catturare eventi di cambiamento dei dati da un database MongoDB utilizzando Debezium Server e trasferire gli eventi a Memphis.dev per il processamento downstream.
Parte 3 uscirà presto!
Source:
https://dzone.com/articles/part-2-change-data-capture-cdc-for-mongodb-with-de