Partie 3 : Transformation des messages d’événements CDC de MongoDB

Dans notre dernier billet de blog, nous avons présenté une implémentation de référence pour capturer des événements de capture de données modifiées (CDC) à partir d’une base de données MongoDB en utilisant Debezium Server et Memphis.dev. À la fin du post, nous avons noté que les enregistrements MongoDB sont sérialisés en chaînes de caractères dans les messages CDC Debezium de la manière suivante :

Python

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
            ...
         }
}

Nous voulons utiliser la fonctionnalité Schemaverse de Memphis.dev pour vérifier les messages par rapport à un schéma attendu. Les messages qui ne correspondent pas au schéma sont acheminés vers une station de lettres mortes afin qu’ils n’impacte pas les consommateurs en aval. Si tout cela semble être du grec ancien, ne vous inquiétez pas ! Nous expliquerons les détails dans notre prochain billet de blog.

Pour utiliser des fonctionnalités comme Schemaverse, nous devons désérialiser les enregistrements MongoDB en documents JSON. Ce billet de blog décrit une modification de notre pipeline CDC MongoDB qui ajoute un service de transformateur pour désérialiser les enregistrements MongoDB en documents JSON.

Aperçu de la Solution

La solution précédente se composait de six composants :

1. Générateur d’éléments Todo : Insère un élément Todo généré aléatoirement dans la collection MongoDB toutes les 0,5 secondes. Chaque élément Todo contient une description, une heure de création, une date d’échéance facultative et un statut de complétion.

2. MongoDB: Configuré avec une base de données unique contenant une seule collection (todo_items).

3. Debezium Server: Instance de Debezium Server configurée avec une source MongoDB et des connecteurs de sink HTTP Client.

4. Gateway REST Memphis.dev: Utilise la configuration prête à l’emploi.

5. Memphis.dev: Configuré avec une seule station (todo-cdc-events) et un seul utilisateur (todocdcservice)

6. Consommateur d’impression: Un script qui utilise l’API Python de Memphis.dev pour consommer des messages et les imprimer dans la console.

Dans cette itération, nous ajoutons deux composants supplémentaires:

1. Service de transformation: Un service de transformation qui consomme des messages de la station todo-cdc-events, désérialise les enregistrements MongoDB et les envoie à la station cleaned-todo-cdc-events.

2. Consommateur d’impression nettoyé: Une deuxième instance du consommateur d’impression qui imprime les messages envoyés à la station cleaned-todo-cdc-events.

L’architecture mise à jour ressemble à ceci :

A Deep Dive Into the Transformer Service

Squelette du Service de Transformation de Messages

Le service transformer utilise l’SDK Python de Memphis.dev. Parcourons l’implémentation de la transformation. La méthode main() de notre transformateur se connecte d’abord au courtier Memphis.dev. Les détails de la connexion sont récupérés à partir des variables d’environnement. L’hôte, le nom d’utilisateur, le mot de passe, le nom de la station d’entrée et le nom de la station de sortie sont transmis à l’aide de variables d’environnement conformément aux suggestions du manifeste des Twelve-Factor App.

Python

async def main():
    try:
        print("Waiting on messages...")
        memphis = Memphis()
        await memphis.connect(host=os.environ[HOST_KEY],
                              username=os.environ[USERNAME_KEY],
                              password=os.environ[PASSWORD_KEY])

Une fois qu’une connexion est établie, nous créons des objets consommateur et producteur. Chez Memphis.dev, les consommateurs et les producteurs ont des noms. Ces noms apparaissent dans l’interface utilisateur de Memphis.dev, offrant une transparence sur les opérations du système. 

Python

print("Creating consumer")
        consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
                                          consumer_name="transformer",
                                          consumer_group="")

        print("Creating producer")
        producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
                                          producer_name="transformer")

L’API consommateur utilise le modèle de fonction de rappel. Lorsque les messages sont tirés du courtier, la fonction fournie est appelée avec une liste de messages en argument.

Python

print("Creating handler")
        msg_handler = create_handler(producer)

        print("Setting handler")
        consumer.consume(msg_handler)

Après avoir configuré le rappel, nous lançons le asyncio boucle des événements. À ce stade, le service de transformation s’arrête et attend jusqu’à ce que des messages soient disponibles pour être tirés du courtier.

  •  Maintenez votre thread principal actif pour que le consommateur continue de recevoir des données.
Python

await asyncio.Event().wait()

Création de la fonction de gestion des messages

La fonction de création pour le gestionnaire de messages prend un objet producteur et renvoie une fonction de rappel. Puisque la fonction de rappel ne prend qu’un seul argument, nous utilisons le modèle de fermeture pour transmettre implicitement le producteur à la fonction msg_handler lors de sa création.

La msg_handler fonction est passée trois arguments lorsqu’elle est appelée : une liste de messages, une erreur (si elle s’est produite) et un contexte composé d’un dictionnaire. Notre gestionnaire parcourt les messages, appelle la fonction de transformation sur chacun, envoie les messages à la deuxième station à l’aide du producteur et reconnaît que le message a été traité. Chez Memphis.dev, les messages ne sont pas marqués comme livrés jusqu’à ce que le consommateur les reconnaisse. Cela empêche les messages d’être perdus en cas d’erreur lors du traitement.

Python

def create_handler(producer):
    async def msg_handler(msgs, error, context):
        try:
            for msg in msgs:
                transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
                await producer.produce(message=transformed_msg)
                await msg.ack()
        except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
            print(e)
            return

    return msg_handler

La fonction de transformation des messages

Maintenant, nous arrivons au cœur du service : la fonction de transformation des messages. Les données des messages (retournées par la méthode get_data()) sont stockées sous forme d’objets bytearray. Nous utilisons la bibliothèque json de Python pour désérialiser les messages en une hiérarchie de collections Python (liste et dictionnaire) et de types primitifs (entier, flottant, chaîne de caractères et None).

Python

def deserialize_mongodb_cdc_event(input_msg):
    obj = json.loads(input_msg)

Nous nous attendons à ce que l’objet possède une propriété payload avec un objet en tant que valeur. Cet objet possède ensuite deux propriétés (“avant” et “après”), qui sont soit None, soit des chaînes de caractères contenant des objets JSON sérialisés. Nous utilisons à nouveau la bibliothèque JSON pour désérialiser et remplacer les chaînes de caractères par les objets.

Python

 

if "payload" in obj:
        payload = obj["payload"]

        if "before" in payload:
            before_payload = payload["before"]
            if before_payload is not None:
                payload["before"] = json.loads(before_payload)

        if "after" in payload:
            after_payload = payload["after"]
            if after_payload is not None:
                payload["after"] = json.loads(after_payload)

Enfin, nous resérialisons l’ensemble du enregistrement JSON et le convertissons à nouveau en bytearray pour la transmission au broker.

Python

 

output_s = json.dumps(obj)
    output_msg = bytearray(output_s, "utf-8")
    return output_msg

Hourra ! Nos objets ressemblent maintenant à ceci :

Python

 

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},

...
}
}

Exécution du Service de Transformation

Si vous avez suivi les sept étapes dans le billet de blog précédent, vous n’avez besoin que de trois étapes supplémentaires pour démarrer le service de transformation et vérifier qu’il fonctionne correctement :

Étape 8 : Démarrer le Service de Transformation

Python

 

$ docker compose up -d cdc-transformer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cdc-transformer                                  Started                                                            1.3s

Étape 9 : Démarrer le Deuxième Consommateur d’Impression

Python

 

$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cleaned-printing-consumer                        Started                                                            1.3s

Étape 10 : Consulter l’interface utilisateur de Memphis

Lorsque le transformateur commence à produire des messages pour Memphis.dev, une deuxième station nommée “cleaned-todo-cdc-events” sera créée. Vous devriez voir cette nouvelle station sur la page d’accueil des stations dans l’interface utilisateur de Memphis.dev de cette façon :

La page détaillée pour la page « cleaned-todo-cdc-events » devrait afficher le transformateur attaché en tant que producteur, le consommateur d’impression et les messages transformés : 

Félicitations ! Nous sommes maintenant prêts à aborder la validation des messages à l’aide de Schemaverse dans notre prochain article de blog. Restez à l’écoute !

En cas de manque des parties 1 et 2 : 

Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages