Teil 3: Umwandeln von MongoDB CDC-Ereignismeldungen

In unserem letzten Blog-Beitrag haben wir eine Referenzimplementierung zur Erfassung von Change Data Capture (CDC) Ereignissen aus einer MongoDB-Datenbank mithilfe von Debezium Server und Memphis.dev vorgestellt. Am Ende des Beitrags erwähnten wir, dass MongoDB-Einträge in Debezium CDC-Nachrichten als Zeichenfolgen serialisiert werden, wie folgt:

Python

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
            ...
         }
}

Wir möchten die Schemaverse-Funktionalität von Memphis.dev nutzen, um Nachrichten anhand eines erwarteten Schemas zu überprüfen. Nachrichten, die nicht mit dem Schema übereinstimmen, werden an eine Dead Letter Station weitergeleitet, damit sie die nachgelagerten Verbraucher nicht beeinträchtigen. Falls das alles wie antikes Griechisch klingt, keine Sorge! Wir werden die Details in unserem nächsten Blog-Beitrag erklären.

Um Funktionalitäten wie Schemaverse zu nutzen, müssen wir die MongoDB-Einträge als JSON-Dokumente deserialisieren. Dieser Blog-Beitrag beschreibt eine Modifikation unserer MongoDB CDC-Pipeline, die einen Transformationsdienst hinzufügt, um die MongoDB-Einträge in JSON-Dokumente zu deserialisieren.

Überblick über die Lösung

Die vorherige Lösung bestand aus sechs Komponenten:

1. Todo Item Generator: Fügt alle 0,5 Sekunden ein zufällig generiertes Todo-Element in die MongoDB-Sammlung ein. Jedes Todo-Element enthält eine Beschreibung, Erstellungszeitstempel, optionalen Fälligkeitsdatum und Vollzugsstatus.

2. MongoDB: Konfiguriert mit einer einzigen Datenbank, die eine einzige Sammlung (todo_items) enthält.

3. Debezium Server: Instanz des Debezium Servers, konfiguriert mit MongoDB-Quelle und HTTP-Client-Senken-Connectors.

4. Memphis.dev REST Gateway: Nutzt die vorkonfigurierte Einstellung.

5. Memphis.dev: Konfiguriert mit einer einzigen Station (todo-cdc-events) und einem einzigen Benutzer (todocdcservice)

6. Druckverbraucher: Ein Skript, das die Memphis.dev Python SDK verwendet, um Nachrichten zu verbrauchen und sie in der Konsole auszugeben.

In dieser Iteration fügen wir zwei zusätzliche Komponenten hinzu:

1. Transformer Service: Ein Transformer-Service, der Nachrichten von der todo-cdc-events-Station empfängt, die MongoDB-Datensätze deserialisiert und sie an die cleaned-todo-cdc-events-Station pusht.

2. Gereinigter Druckverbraucher: Eine zweite Instanz des Druckverbrauchers, der Nachrichten druckt, die an die cleaned-todo-cdc-events-Station gepusht werden.

Die aktualisierte Architektur sieht folgendermaßen aus:

A Deep Dive Into the Transformer Service

Gerüst des Nachrichten-Transformationsdienstes

Der Transformations-Dienst verwendet die Memphis.dev Python SDK. Gehen wir die Implementierung des Transformers durch. Die main()-Methode unseres Transformers verbindet sich zuerst mit dem Memphis.dev-Broker. Die Verbindungsdetails werden aus Umgebungsvariablen abgerufen. Das Host, Benutzername, Passwort, Eingangsbahnname und Ausgangsbahnname werden gemäß den Empfehlungen aus dem Zwölf-Faktor-App-Manifesto mithilfe von Umgebungsvariablen übergeben.

Python

async def main():
    try:
        print("Waiting on messages...")
        memphis = Memphis()
        await memphis.connect(host=os.environ[HOST_KEY],
                              username=os.environ[USERNAME_KEY],
                              password=os.environ[PASSWORD_KEY])

Nachdem eine Verbindung hergestellt wurde, erstellen wir Verbraucher- und Produzentenobjekte. In Memphis.dev haben Verbraucher und Produzenten Namen. Diese Namen sind in der Memphis.dev-UI sichtbar und bieten Transparenz in die Systembetriebsabläufe. 

Python

print("Creating consumer")
        consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
                                          consumer_name="transformer",
                                          consumer_group="")

        print("Creating producer")
        producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
                                          producer_name="transformer")

Die Verbraucher-API verwendet das Callback-Funktions-Designmuster. Wenn Nachrichten vom Broker abgerufen werden, wird die bereitgestellte Funktion mit einer Liste von Nachrichten als Argument aufgerufen.

Python

print("Creating handler")
        msg_handler = create_handler(producer)

        print("Setting handler")
        consumer.consume(msg_handler)

Nach dem Einrichten des Callback starten wir die asyncio-Ereignisschleife. Ab diesem Zeitpunkt pausiert der Transformer-Service und wartet, bis Nachrichten verfügbar sind, um sie vom Broker abzurufen.

  • Halten Sie Ihren Hauptthread am Leben, damit der Verbraucher weiterhin Daten empfangen kann.
Python

await asyncio.Event().wait()

Erstellen der Nachrichtenhandler-Funktion

Die Erstellungsfunktion für den Nachrichtenhandler nimmt ein Produzentenobjekt entgegen und gibt eine Callback-Funktion zurück. Da die Callback-Funktion nur ein Argument akzeptiert, verwenden wir das Closure-Pattern zur impliziten Übergabe des Produzenten an die msg_handler-Funktion, wenn wir sie erstellen.

Die msg_handler-Funktion wird bei ihrer Ausführung drei Argumenten übergeben: eine Liste von Nachrichten, einen Fehler (falls einer aufgetreten ist) und ein Kontext, der aus einem Wörterbuch besteht. Unser Handler durchläuft die Nachrichten, ruft die Transformationsfunktion für jede auf, sendet die Nachrichten zum zweiten Station mit Hilfe des Produzenten und bestätigt, dass die Nachricht verarbeitet wurde. Bei Memphis.dev werden Nachrichten erst als zugestellt markiert, nachdem der Verbraucher sie bestätigt hat. Dies verhindert, dass Nachrichten bei einem Fehler während der Verarbeitung verloren gehen.

Python

def create_handler(producer):
    async def msg_handler(msgs, error, context):
        try:
            for msg in msgs:
                transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
                await producer.produce(message=transformed_msg)
                await msg.ack()
        except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
            print(e)
            return

    return msg_handler

Die Nachrichtentransformationsfunktion

Nun kommen wir zum Kern des Dienstes: der Nachrichten-Transformationsfunktion. Nachrichteninhalte (zurückgegeben vom get_data()-Methode) werden als bytearray-Objekte gespeichert. Wir verwenden die Python-json-Bibliothek, um die Nachrichten in eine Hierarchie von Python-Sammlungen (Listen und Wörterbüchern) und primitiven Typen (Integer, Fließkommazahlen, Zeichenketten und None) zu deserialisieren.

Python

def deserialize_mongodb_cdc_event(input_msg):
    obj = json.loads(input_msg)

Wir erwarten, dass das Objekt eine payload-Eigenschaft mit einem Objekt als Wert aufweist. Dieses Objekt hat dann zwei Eigenschaften („before“ und „after“), die entweder None oder Zeichenketten mit serialisierten JSON-Objekten sind. Wir verwenden die JSON-Bibliothek erneut, um die Zeichenketten zu deserialisieren und durch die Objekte zu ersetzen.

Python

 

if "payload" in obj:
        payload = obj["payload"]

        if "before" in payload:
            before_payload = payload["before"]
            if before_payload is not None:
                payload["before"] = json.loads(before_payload)

        if "after" in payload:
            after_payload = payload["after"]
            if after_payload is not None:
                payload["after"] = json.loads(after_payload)

Zuletzt serialisieren wir die gesamte JSON-Datei erneut und konvertieren sie zurück in ein bytearray zur Übertragung an den Broker.

Python

 

output_s = json.dumps(obj)
    output_msg = bytearray(output_s, "utf-8")
    return output_msg

Hurra! Unsere Objekte sehen nun so aus:

Python

 

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},

...
}
}

Ausführung des Transformationsdienstes

Wenn Sie die sieben Schritte im vorherigen Blog-Beitrag befolgt haben, müssen Sie nur drei zusätzliche Schritte ausführen, um den Transformationsdienst zu starten und zu überprüfen, ob er funktioniert:

Schritt 8: Starten des Transformationsdienstes

Python

 

$ docker compose up -d cdc-transformer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cdc-transformer                                  Started                                                            1.3s

Schritt 9: Starten des zweiten Druckkonsumenten

Python

 

$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cleaned-printing-consumer                        Started                                                            1.3s

Schritt 10: Überprüfen Sie die Memphis-UI

Wenn der Transformator anfängt, Nachrichten an Memphis.dev zu produzieren, wird eine zweite Station namens „cleaned-todo-cdc-events“ erstellt. Sie sollten diese neue Station auf der Übersichtsseite der Station in der Memphis.dev-UI so sehen:

Die Detailseite für die „cleaned-todo-cdc-events“-Seite sollte den angehängten Transformer als Produzenten, den Druckverbraucher und die transformierten Nachrichten anzeigen: 

Herzlichen Glückwunsch! Wir sind jetzt bereit, Nachrichten mithilfe von Schemaverse zu validieren, wie in unserem nächsten Blog-Post beschrieben. Bleibt gespannt!

Falls Sie Teile 1 und 2 verpasst haben:

Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages