Deel 2: Veranderingen in Gegevensopname (CDC) voor MongoDB met Debezium en Memphis.dev

Dit is deel twee van een serie blogberichten over het bouwen van een moderne gebeurtenisgestuurde systeem met behulp van Memphis.dev.

Ons laatste blog bericht introduceerde een referentie-implementatie voor het vastleggen van change data capture (CDC) gebeurtenissen vanuit een PostgreSQL-database met behulp van Debezium Server en Memphis.dev. Door Apache Kafka te vervangen door Memphis.dev, heeft de oplossing aanzienlijk de operationele middelen en overhead verminderd – geld bespaard en ontwikkelaars vrijgemaakt om zich te concentreren op het bouwen van nieuwe functionaliteit.

PostgreSQL is echter de enige veelgebruikte database. Debezium biedt connectors voor verschillende databases, waaronder de niet-relationele documentdatabase MongoDB. MongoDB is populair onder ontwikkelaars, met name die werkzaam zijn in dynamische programmeertalen, aangezien het de object-relationele impedantie mismatch vermijdt. Ontwikkelaars kunnen objecten direct opslaan, opvragen en bijwerken in de database.

In dit blogbericht laten we zien hoe u de CDC-oplossing kunt aanpassen aan MongoDB.

Overzicht van de Oplossing

Hier beschrijven we de architectuur van de referentieoplossing voor het leveren van change data capture gebeurtenissen met Memphis.dev. De architectuur is niet veranderd ten opzichte van ons vorige blog bericht, behalve voor de vervanging van PostgreSQL door MongoDB.

A Todo Item generator script writes randomly generated records to MongoDB. Debezium Server receives CDC events from MongoDB and forwards them to the Memphis REST gateway through the HTTP client sink. The Memphis REST gateway adds the messages to a station in Memphis.dev. Lastly, a consumer script polls Memphis. dev for new messages and prints them to the console.

  • Todo Item Generator: Voegt elke 0,5 seconden een willekeurig gegenereerde to-do-item toe aan de MongoDB-collectie. Elk to-do-item bevat een omschrijving, creatietijdstip, optionele vervaldatum en voltooiingsstatus.
  • MongoDB: Geconfigureerd met een enkele database met een enkele collectie (todo_items).
  • Debezium Server: Een instantie van Debezium Server die is geconfigureerd met MongoDB-bron en HTTP Client-sink connectors.
  • Memphis.dev REST Gateway: Gebruikt de standaardconfiguratie.
  • Memphis.dev: Geconfigureerd met één station (todo-cdc-events) en één gebruiker (todocdcservice).
  • Printing Consumer: Een script dat de Memphis.dev Python SDK gebruikt om berichten te consumeren en naar de console te printen.

Aan de slag

De implementatietutorial is beschikbaar in de mongodb-debezium-cdc-example directory van de Memphis Example Solutions repository. Docker Compose zal nodig zijn om het uit te voeren.

Implementatie uitvoeren

Bouw de Docker-installatiekopieën voor de Debezium Server, de printconsument en de database-setup (tabel- en gebruikerscreëring).

Op dit moment is de implementatie afhankelijk van een voorvertoningsversie van de Debezium Server voor ondersteuning van JWT-verificatie. Een Docker-installatiekopie wordt rechtstreeks gebouwd vanaf de hoofdbranch van de Debezium en Debezium Server repositories. Houd er rekening mee dat dit stapje behoorlijk lang kan duren (~20 minuten) om uit te voeren. Wanneer Debezium Server 2.3.0 wordt uitgebracht, zullen we overschakelen naar de upstream Docker-installatiekopie.

Stap 1: Bouw de Installatiekopieën

Shell

 

$ docker compose build --pull --no-cache

Stap 2: Start de Memphis.dev Broker en REST Gateway

Start de Memphis.dev broker en REST gateway. Houd er rekening mee dat de Memphis-rest-gateway service afhankelijk is van de Memphis broker service, dus de broker service wordt ook gestart.

Shell

 

$ docker compose up -d memphis-rest-gateway
Shell

 

 
[+] Running 4/4
 ⠿ Network mongodb-debezium-cdc-example_default                   Created                                                        0.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1      Healthy                                                        6.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1               Healthy                                                       16.8s
 ⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1  Started

Stap 3: Creëer een Station en Bijbehorende Gebruiker in Memphis.dev

Berichten worden geleverd aan “stations” in Memphis.dev; ze zijn vergelijkbaar met “onderwerpen” die worden gebruikt door berichtenbrokers. Ga met je browser naar localhost:9000. Klik op de “inloggen met root” link onderaan de pagina.

Log in met root (gebruikersnaam) en memphis (wachtwoord). 

Volg de wizard om een station met de naam todo-cdc-events te creëren.

Maak een gebruiker met de naam todocdcservice met dezelfde waarde voor het wachtwoord.

Klik op “Volgende” totdat de wizard is afgerond:

Klik op “Naar stationoverzicht” om naar de overzichtspagina van het station te gaan.

Stap 4: Start de Printing Consumer

We hebben de Memphis.dev Python SDK gebruikt om een consumer script te maken die het todo-cdc-events station afvraagt en de berichten naar de console afdrukt.

Shell

 

$ docker compose up -d printing-consumer
Shell

 

 
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container printing-consumer                                Started     

Stap 5: MongoDB starten en configureren

Om veranderingen vast te leggen, moet de replicatie functionaliteit van MongoDB worden ingeschakeld. Er zijn verschillende stappen:

  • De naam van de replica set moet worden ingesteld. Dit kan worden gedaan door de naam van een replica set door te geven via de commandolijn of in het configuratiebestand. In het Docker Compose bestand voeren we MongoDB uit met de commandolijnargumenten –replSet rs0 om de naam van de replica set in te stellen.
  • Wanneer replicatie wordt gebruikt en autorisatie is ingeschakeld, moet een gemeenschappelijk sleutelbestand worden verstrekt aan elke replica-instantie. We hebben een sleutelbestand gegenereerd volgens de instructies in de MongoDB documentatie. Vervolgens bouwden we een afbeelding die de officiële MongoDB-afbeelding uitbreidt door het sleutelbestand op te nemen.
  • De replica-set moet worden geïnitialiseerd zodra MongoDB draait. We gebruiken een script dat de instantie bij het opstarten configureert. Het script roept de replSetInitiate opdracht aan met een lijst van de IP-adressen en poorten van elke MongoDB-instantie in de replica-set. Deze opdracht zorgt ervoor dat de MongoDB-instanties communiceren met elkaar en een leider kiezen.

Over het algemeen worden replica-sets gebruikt voor verhoogde betrouwbaarheid (hoge beschikbaarheid). De meeste documentatie die u zult vinden beschrijft hoe u een replica kunt instellen met meerdere MongoDB-instanties. In ons geval maakt de Debezium MongoDB-connector gebruik van de replicatiefunctie om gebeurtenissen van gegevenswijzigingen vast te leggen. Hoewel we de stappen doorlopen om een replica-set te configureren, gebruiken we slechts één MongoDB-instantie.

De todo item generator script creëert elke halve seconde een nieuw todo-item. De veldwaarden worden willekeurig gegenereerd. De items worden toegevoegd aan een MongoDB-verzameling met de naam “todo_items”.

In het Docker Compose-bestand is het todo item generator script geconfigureerd om afhankelijk te zijn van de Mongodb-instantie die in een gezond toestand draait en de succesvolle voltooiing van het database setup script. Door het starten van het todo item generator script zal Docker Compose ook MongoDB starten en het database setup script uitvoeren.

Shell

 

$ docker compose up -d todo-generator
Shell

 

[+] Running 3/3
 ⠿ Container mongodb                 Healthy                                                                                     8.4s
 ⠿ Container mongodb-database-setup  Exited                                                                                      8.8s
 ⠿ Container mongodb-todo-generator  Started   

Stap 6: Start de Debezium Server

De laatste service die gestart moet worden, is de Debezium Server. De server is geconfigureerd met een bronconnector voor MongoDB en de HTTP Client sink connector via een Java properties file:

Shell

 

 
debezium.sink.type=http
debezium.sink.http.url=http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
debezium.sink.http.time-out.ms=500
debezium.sink.http.retries=3
debezium.sink.http.authentication.type=jwt
debezium.sink.http.authentication.jwt.username=todocdcservice
debezium.sink.http.authentication.jwt.password=todocdcservice
debezium.sink.http.authentication.jwt.url=http://memphis-rest-gateway:4444/
debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector
debezium.source.mongodb.connection.string=mongodb://db
debezium.source.mongodb.user=root
debezium.source.mongodb.password=mongodb
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.topic.prefix=tutorial
debezium.format.key=json
debezium.format.value=json
quarkus.log.console.json=false

De meeste opties zijn zelfverklarend. De HTTP client sink URL is het best uit te leggen in detail. Memphis.dev REST gateway verwacht POST-verzoeken met een pad in het volgende formaat:

/stations/{station}/produce/{quantity}

De {station} placeholder wordt vervangen door de naam van het station waarnaar de boodschap moet worden verzonden. De {quantity} placeholder wordt vervangen door de waarde enkel (voor één boodschap) of batch (voor meerdere boodschappen).

De boodschap(en) wordt (worden) doorgegeven als de lading van de POST-aanvraag. De REST-gateway ondersteunt drie boodschapindelingen (tekst zonder opmaak, JSON of protocol buffer). De waarde (tekst/, application/json, application/x-protobuf) van het content-type kopveld bepaalt hoe de lading wordt geïnterpreteerd.

De HTTP Client sink van Debezium Server produceert REST-aanvragen die consistent zijn met deze patronen. Aanvragen gebruiken het POST-werkwoord; elke aanvraag bevat een enkele JSON-gecodeerde boodschap als lading, en het content-type kopveld is ingesteld op application/JSON. We gebruiken todo-CDC-events als stationsnaam en de enkele hoeveelheidswaarde in de eindpunt-URL om boodschappen te routeren en aan te geven hoe de REST-gateway de aanvragen moet interpreteren:

http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single

De eigenschap debezium.sink.http.authentication.type=jwt geeft aan dat de HTTP Client sink JWT-authenticatie moet gebruiken. De gebruikersnaam en wachtwoord eigenschappen zijn vanzelfsprekend, maar de debezium.sink.http.authentication.jwt.The URL eigenschap verdient enige uitleg. Een initiële token wordt verkregen met behulp van de /auth/authenticate eindpunt, terwijl de authenticatie wordt vernieuwd met behulp van het afzonderlijke /auth/refreshToken eindpunt. De JWT-authenticatie in de HTTP Client voegt het juiste eindpunt toe aan de gegeven basis-URL.

Debezium Server kan worden gestart met de volgende opdracht:

Shell

 

$ docker compose up -d debezium-server
Shell

 

[+] Running 5/5
 ⠿ Container mongodb                                              Healthy                                                        1.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1      Healthy                                                        0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1               Healthy                                                        1.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1  Running                                                        0.0s
 ⠿ Container debezium-server                                      Started

Stap 7: Controleer of het systeem werkt

Controleer het overzichtsschot van de todo-cdc-events-station in de Memphis.dev web-UI om te bevestigen dat de producent en consument zijn verbonden en dat boodschappen worden afgeleverd.

En druk de logs af voor de container van de print-consument:

Shell

 

$ docker logs --tail 2 printing-consumer

bericht:

Shell

 

bytearray(b'{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"},{"type":"int64","optional":true,"field":"wallTime"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"tutorial.todo_application.todo_items.Envelope"},"payload":{"before":null,"after":"{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ec\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402475},\\"due_date\\": {\\"$date\\": 1684266602475},\\"description\\": \\"GMZVMKXVKOWIOEAVRYWR\\",\\"completed\\": false}","updateDescription":null,"source":{"version":"2.3.0-SNAPSHOT","connector":"mongodb","name":"tutorial","ts_ms":1684007402000,"snapshot":"false","db":"todo_application","sequence":null,"rs":"rs0","collection":"todo_items","ord":1,"lsid":null,"txnNumber":null,"wallTime":1684007402476},"op":"c","ts_ms":1684007402478,"transaction":null}}')

Indeling van de CDC-berichten

De binnenkomende berichten zijn geformatteerd als JSON. De berichten hebben twee bovenliggende velden (schema en payload). Het schema beschrijft het recordschema (veldnamen en typen), terwijl de payload de wijziging in de record beschrijft. Het payload-object zelf bevat twee velden (voor en na) die aangeven wat de waarde van de record was voor en na de wijziging.

Voor MongoDB codeert Debezium Server de record als een tekenreeks van gserialiseerde JSON:

Shell

 

 
{
"before" : null,

"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}"
}

Dit zal gevolgen hebben voor de verdere verwerking van berichten, die we in een toekomstige blogpost in deze serie zullen beschrijven.

Gefeliciteerd! Je hebt nu een werkend voorbeeld van hoe je gegevenswijzigingsgebeurtenissen van een MongoDB-database kunt vastleggen met Debezium Server en de gebeurtenissen kunt overdragen naar Memphis.dev voor verdere verwerking.

Deel 3 komt snel uit!

Source:
https://dzone.com/articles/part-2-change-data-capture-cdc-for-mongodb-with-de