Partie 2 : Capture de Données Modifiées (CDC) pour MongoDB avec Debezium et Memphis.dev

Ceci est la deuxième partie d’une série d’articles de blog sur la construction d’un système moderne basé sur des événements à l’aide de Memphis.dev.

Notre dernier blog post a présenté une implémentation de référence pour capturer des événements de capture de données de changement (CDC) à partir d’une base de données PostgreSQL à l’aide de Debezium Server et de Memphis.dev. En remplaçant Apache Kafka par Memphis.dev, la solution a considérablement réduit les ressources opérationnelles et les frais généraux – économisant de l’argent et libérant les développeurs pour se concentrer sur la construction de nouvelles fonctionnalités.

Cependant, PostgreSQL est la seule base de données couramment utilisée. Debezium fournit des connecteurs pour diverses bases de données, y compris la base de données documentaire non relationnelle MongoDB. MongoDB est populaire auprès des développeurs, en particulier ceux travaillant dans des langages de programmation dynamiques, car il évite l’incompatibilité d’impedance objet-relation. Les développeurs peuvent stocker, interroger et mettre à jour directement des objets dans la base de données.

Dans cet article de blog, nous montrons comment adapter la solution CDC à MongoDB.

Aperçu de la Solution

Ici, nous décrivons l’architecture de la solution de référence pour livrer des événements de capture de données de changement avec Memphis.dev. L’architecture n’a pas changé par rapport à notre précédent blog sauf pour le remplacement de PostgreSQL par MongoDB.

A Todo Item generator script writes randomly generated records to MongoDB. Debezium Server receives CDC events from MongoDB and forwards them to the Memphis REST gateway through the HTTP client sink. The Memphis REST gateway adds the messages to a station in Memphis.dev. Lastly, a consumer script polls Memphis. dev for new messages and prints them to the console.

  • Générateur d’éléments à faire: Insère un élément à faire généré aléatoirement dans la collection MongoDB toutes les 0,5 secondes. Chaque élément à faire contient une description, une heure de création, une date limite facultative et un statut de réalisation.
  • MongoDB: Configurer avec une seule base de données contenant une seule collection (todo_items).
  • Debezium Server: Instance de Debezium Server configurée avec des connecteurs source MongoDB et de sortie HTTP Client.
  • Gateway REST de Memphis.dev: Utilise la configuration prête à l’emploi.
  • Memphis.dev: Configurer avec une seule station (todo-cdc-events) et un seul utilisateur (todocdcservice).
  • Consommateur d’impression: Un script qui utilise le SDK Python de Memphis.dev pour consommer des messages et les imprimer dans la console.

Premiers pas

Le tutoriel de mise en œuvre est disponible dans le répertoire mongodb-debezium-cdc-example du référentiel de solutions d’exemple de Memphis. Docker Compose sera nécessaire pour le faire fonctionner.

Exécution de la mise en œuvre

Construire les images Docker pour le Debezium Server, le consommateur d’impression, et la configuration de la base de données (création de table et d’utilisateur).

Actuellement, l’implémentation dépend d’une version préliminaire du Debezium Server pour la prise en charge de l’authentification JWT. Une image Docker sera construite directement à partir de la branche principale des dépôts Debezium et Debezium Server. Notez que cette étape peut prendre un certain temps (~20 minutes) pour s’exécuter. Lorsque Debezium Server 2.3.0 sera publié, nous passerons à l’utilisation de l’image Docker d’origine.

Étape 1 : Construire les Images

Shell

 

$ docker compose build --pull --no-cache

Étape 2 : Démarrer le Memphis.dev Broker et la Gateway REST

Démarrer le broker Memphis.dev et le gateway REST. Notez que le service Memphis-rest-gateway dépend du service de broker Memphis, donc le service de broker sera également démarré.

Shell

 

$ docker compose up -d memphis-rest-gateway
Shell

 

 
[+] Running 4/4
 ⠿ Network mongodb-debezium-cdc-example_default                   Created                                                        0.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1      Healthy                                                        6.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1               Healthy                                                       16.8s
 ⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1  Started

Étape 3 : Créer une Station et un Utilisateur Correspondant dans Memphis.dev

Les messages sont livrés à des « stations » dans Memphis.dev; elles sont équivalentes aux « sujets » utilisés par les brokers de messages. Dirigez votre navigateur vers localhost:9000. Cliquez sur le lien « sign in with root » en bas de la page.

Connectez-vous avec root (nom d’utilisateur) et memphis (mot de passe).

Suivez le guide pour créer une station nommée todo-cdc-events.

Créez un utilisateur nommé todocdcservice avec la même valeur pour le mot de passe.

Cliquez sur « Suivant » jusqu’à ce que l’assistant soit terminé:

Cliquez sur « Aller à l’aperçu de la station » pour accéder à la page d’aperçu de la station.

Étape 4 : Démarrer le Consommateur d’Impression

Nous avons utilisé le SDK Python de Memphis.dev pour créer un script consommateur qui interroge la station todo-cdc-events et imprime les messages dans la console.

Shell

 

$ docker compose up -d printing-consumer
Shell

 

 
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container printing-consumer                                Started     

Étape 5 : Démarrage et Configuration de MongoDB

Pour capturer les modifications, la fonctionnalité de réplication de MongoDB doit être activée. Plusieurs étapes sont nécessaires:

  • Le nom du lot de réplicas doit être défini. Cela peut être fait en passant le nom d’un lot de réplicas sur la ligne de commande ou dans le fichier de configuration. Dans le fichier Docker Compose, nous exécutons MongoDB avec l’argument de ligne de commande –replSet rs0 pour définir le nom du lot de réplicas.
  • Lorsque la réplication est utilisée et que l’autorisation est activée, il est nécessaire de fournir un fichier de clé commun à chaque instance de réplica. Nous avons généré un fichier de clé en suivant les instructions dans la documentation de MongoDB. Nous avons ensuite construit une image qui étend l’image officielle de MongoDB en incluant le fichier de clé.
  • Le jeu de réplicas doit être initialisé une fois que MongoDB est en cours d’exécution. Nous utilisons un script qui configure l’instance au démarrage. Le script appelle la commande replSetInitiate avec une liste des adresses IP et des ports de chaque instance MongoDB dans le jeu de réplicas. Cette commande permet aux instances MongoDB de communiquer entre elles et de sélectionner un leader.

Généralement, les jeux de réplicas sont utilisés pour augmenter la fiabilité (haute disponibilité). La plupart des documentations décrivent comment configurer un réplica avec plusieurs instances MongoDB. Dans notre cas, le connecteur MongoDB de Debezium se sert de la fonctionnalité de réplication pour capturer les événements de changement de données. Bien que nous suivions les étapes pour configurer un jeu de réplicas, nous n’utilisons qu’une seule instance MongoDB.

Le script de génération d’éléments à faire crée un nouvel élément à faire toutes les demi-secondes. Les valeurs des champs sont générées de manière aléatoire. Les éléments sont ajoutés à une collection MongoDB nommée “todo_items”.

Dans le fichier Docker Compose, le script de génération d’éléments à faire est configuré pour dépendre de l’instance MongoDB fonctionnant dans un état sain et de la réussite de l’exécution du script de configuration de la base de données. En démarrant le script de génération d’éléments à faire, Docker Compose démarrera également MongoDB et exécutera le script de configuration de la base de données.

Shell

 

$ docker compose up -d todo-generator
Shell

 

[+] Running 3/3
 ⠿ Container mongodb                 Healthy                                                                                     8.4s
 ⠿ Container mongodb-database-setup  Exited                                                                                      8.8s
 ⠿ Container mongodb-todo-generator  Started   

Étape 6 : Démarrer le serveur Debezium

Le dernier service qui doit être démarré est le serveur Debezium. Le serveur est configuré avec un connecteur source pour MongoDB et le connecteur de sortie du client HTTP via un fichier de propriétés Java:

Shell

 

 
debezium.sink.type=http
debezium.sink.http.url=http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
debezium.sink.http.time-out.ms=500
debezium.sink.http.retries=3
debezium.sink.http.authentication.type=jwt
debezium.sink.http.authentication.jwt.username=todocdcservice
debezium.sink.http.authentication.jwt.password=todocdcservice
debezium.sink.http.authentication.jwt.url=http://memphis-rest-gateway:4444/
debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector
debezium.source.mongodb.connection.string=mongodb://db
debezium.source.mongodb.user=root
debezium.source.mongodb.password=mongodb
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.topic.prefix=tutorial
debezium.format.key=json
debezium.format.value=json
quarkus.log.console.json=false

La plupart des options sont explicites. L’URL du client HTTP de sortie mérite d’être expliquée en détail. La passerelle REST de Memphis.dev s’attend à recevoir des requêtes POST avec un chemin dans le format suivant:

/stations/{station}/produce/{quantity}

Le {station} espace réservé est remplacé par le nom de la station à laquelle envoyer le message. Le {quantity} espace réservé est remplacé par la valeur single (pour un seul message) ou batch (pour plusieurs messages).

Le message(s) est (sont) transmis en tant que charge utile de la requête POST. Le gateway REST prend en charge trois formats de message (texte brut, JSON ou protocole de tampon). La valeur (text/, application/json, application/x-protobuf) du champ d’en-tête content-type détermine comment la charge utile est interprétée.

Le client HTTP de Debezium Server génère des requêtes REST qui sont conformes à ces modèles. Les requêtes utilisent le verbe POST ; chaque requête contient un seul message encodé en JSON en tant que charge utile, et l’en-tête content-type est défini sur application/JSON. Nous utilisons todo-CDC-events comme nom de station et la valeur de quantité unique dans l’URL de l’endpoint pour acheminer les messages et indiquer comment le gateway REST doit interpréter les requêtes:

http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single

La propriété debezium.sink.http.authentication.type=jwt indique que le client HTTP doit utiliser l’authentification JWT. Les propriétés nom d’utilisateur et mot de passe sont évidentes, mais la propriété debezium.sink.http.authentication.jwt.The URL mérite une explication. Un jeton initial est acquis en utilisant l’endpoint /auth/authenticate, tandis que l’authentification est rafraîchie en utilisant l’endpoint séparé /auth/refreshToken. L’authentification JWT dans le client HTTP ajoute l’endpoint approprié à l’URL de base donnée.

Debezium Server peut être démarré avec la commande suivante:

Shell

 

$ docker compose up -d debezium-server
Shell

 

[+] Running 5/5
 ⠿ Container mongodb                                              Healthy                                                        1.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1      Healthy                                                        0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1               Healthy                                                        1.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1  Running                                                        0.0s
 ⠿ Container debezium-server                                      Started

Étape 7: Vérifier que le système fonctionne

Vérifiez l’écran de présentation de la station todo-cdc-events dans l’interface web Memphis.dev pour confirmer que le producteur et le consommateur sont connectés et que les messages sont livrés.

Et, imprimez les journaux pour le conteneur de consommation d’impression :

Shell

 

$ docker logs --tail 2 printing-consumer

message:

Shell

 

bytearray(b'{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"},{"type":"int64","optional":true,"field":"wallTime"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"tutorial.todo_application.todo_items.Envelope"},"payload":{"before":null,"after":"{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ec\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402475},\\"due_date\\": {\\"$date\\": 1684266602475},\\"description\\": \\"GMZVMKXVKOWIOEAVRYWR\\",\\"completed\\": false}","updateDescription":null,"source":{"version":"2.3.0-SNAPSHOT","connector":"mongodb","name":"tutorial","ts_ms":1684007402000,"snapshot":"false","db":"todo_application","sequence":null,"rs":"rs0","collection":"todo_items","ord":1,"lsid":null,"txnNumber":null,"wallTime":1684007402476},"op":"c","ts_ms":1684007402478,"transaction":null}}')

Format des messages CDC

Les messages entrants sont formatés en JSON. Les messages comportent deux champs de niveau supérieur (schéma et charge utile). Le schéma décrit le schéma du enregistrement (noms de champs et types), tandis que la charge utile décrit le changement apporté au enregistrement. L’objet de charge utile contient lui-même deux champs (avant et après) indiquant la valeur de l’enregistrement avant et après le changement.

Pour MongoDB, le serveur Debezium encode l’enregistrement sous forme de chaîne de JSON sérialisé :

Shell

 

 
{
"before" : null,

"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}"
}

Cela aura des implications pour le traitement en aval des messages, que nous décrirons dans un prochain article de blog de cette série. 

Félicitations ! Vous disposez maintenant d’un exemple fonctionnel de capture d’événements de changement de données à partir d’une base de données MongoDB à l’aide du serveur Debezium et de transférer ces événements vers Memphis.dev pour un traitement en aval.

La partie 3 sortira bientôt !

Source:
https://dzone.com/articles/part-2-change-data-capture-cdc-for-mongodb-with-de