Deel 3: Transformeren van MongoDB CDC Gebeurtenisberichten

In onze vorige blogpost hebben we een referentie-implementatie geïntroduceerd voor het vastleggen van change data capture (CDC) gebeurtenissen vanuit een MongoDB database met behulp van Debezium Server en Memphis.dev. Aan het einde van de post merkten we op dat MongoDB-records worden gserialiseerd als strings in Debezium CDC-berichten op deze manier:

Python

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
            ...
         }
}

We willen de Schemaverse functionaliteit van Memphis.dev gebruiken om berichten te controleren op basis van een verwacht schema. Berichten die niet overeenkomen met het schema worden doorgestuurd naar een dead letter station zodat ze geen invloed hebben op downstream consumenten. Als dit allemaal klinkt als oud Grieks, maak je geen zorgen! We zullen de details in onze volgende blogpost uitleggen.

Om functionaliteiten zoals Schemaverse te gebruiken, moeten we de MongoDB-records deserialiseren als JSON-documenten. Deze blogpost beschrijft een aanpassing van onze MongoDB CDC-pijplijn die een transformatiedienst toevoegt om de MongoDB-records te deserialiseren naar JSON-documenten.

Overzicht van de oplossing

De vorige oplossing bestond uit zes componenten:

1. Todo Item Generator: Voegt elke 0,5 seconden een willekeurig gegenereerde todo-item toe aan de MongoDB-collectie. Elk todo-item bevat een beschrijving, creatietijdstip, optionele vervaldatum en voltooiingsstatus.

2. MongoDB: Geconfigureerd met één database die één verzameling (todo_items) bevat.

3. Debezium Server: Een instantie van Debezium Server die is geconfigureerd met MongoDB-bron en HTTP Client-sink connectors.

4. Memphis.dev REST Gateway: Gebruikt de standaardconfiguratie.

5. Memphis.dev: Geconfigureerd met één station (todo-cdc-events) en één gebruiker (todocdcservice)

6. Printing Consumer: Een script dat de Memphis.dev Python SDK gebruikt om berichten te consumeren en af te drukken op de console.

In deze iteratie voegen we twee extra componenten toe:

1. Transformer Service: Een transformer service die berichten consumeren van het todo-cdc-events station, de MongoDB-records deserialiseert, en ze naar het cleaned-todo-cdc-events station pusht.

2. Cleaned Printing Consumer: Een tweede instantie van de printing consumer die berichten afdrukt die naar het cleaned-todo-cdc-events station worden gepusht.

De bijgewerkte architectuur ziet er zo uit:

A Deep Dive Into the Transformer Service

Skeleton van de Message Transformer Service

Het transformerservice maakt gebruik van de Memphis.dev Python SDK. Laten we de implementatie van de transformer bekijken. De main() methode van onze transformer maakt eerst verbinding met de Memphis.dev broker. De verbindingsgegevens worden opgehaald uit milieuvariabelen. Het host, gebruikersnaam, wachtwoord, invoer stationnaam en uitvoer stationnaam worden doorgegeven met behulp van milieuvariabelen volgens de suggesties van het Twelve-Factor App manifesto.

Python

async def main():
    try:
        print("Waiting on messages...")
        memphis = Memphis()
        await memphis.connect(host=os.environ[HOST_KEY],
                              username=os.environ[USERNAME_KEY],
                              password=os.environ[PASSWORD_KEY])

Zodra een verbinding tot stand is gebracht, maken we consumenten- en produktieobjecten aan. In Memphis.dev hebben consumenten en producenten namen. Deze namen verschijnen in de Memphis.dev UI, waardoor transparantie in de systeemoperaties wordt geboden.

Python

print("Creating consumer")
        consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
                                          consumer_name="transformer",
                                          consumer_group="")

        print("Creating producer")
        producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
                                          producer_name="transformer")

De consumenten-API maakt gebruik van het callback functiepatroon. Wanneer berichten worden opgehaald van de broker, wordt de meegeleverde functie aangeroepen met een lijst van berichten als argument.

Python

print("Creating handler")
        msg_handler = create_handler(producer)

        print("Setting handler")
        consumer.consume(msg_handler)

Na het instellen van de callback starten we de asyncio event loop. Op dit moment pauzeert de transformer service en wacht tot er berichten beschikbaar zijn om van de broker te halen.

  •  Houd je hoofdthread in leven zodat de consument blijft ontvangen van gegevens.
Python

await asyncio.Event().wait()

Maken van de Message Handler Function

De create-functie voor de message handler neemt een producer-object en retourneert een callback-functie. Omdat de callback-functie slechts één argument accepteert, gebruiken we het closure patroon om de producer impliciet door te geven aan de msg_handler-functie wanneer we deze maken.

De msg_handler functie krijgt drie argumenten wanneer deze wordt aangeroepen: een lijst met berichten, een fout (indien optredend) en een context die bestaat uit een woordenboek. On咱 handler loopt over de berichten, roept de transform-functie voor elk aan, stuurt de berichten naar het tweede station met behulp van de producer en erkent dat het bericht is verwerkt. Op Memphis.dev worden berichten niet gemarkeerd als geleverd totdat de consument ze erkent. Dit voorkomt dat berichten verloren gaan als er een fout optreedt tijdens de verwerking.

Python

def create_handler(producer):
    async def msg_handler(msgs, error, context):
        try:
            for msg in msgs:
                transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
                await producer.produce(message=transformed_msg)
                await msg.ack()
        except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
            print(e)
            return

    return msg_handler

De Message Transformer Function

Nu komen we bij de kern van de dienst: de berichttransformatiefunctie. Berichtladingen (teruggegeven door de get_data() methode) worden opgeslagen als bytearray objecten. We gebruiken de Python json bibliotheek om de berichten te deserialiseren in een hiërarchie van Python verzamelingen (lijst en dict) en primitieve types (int, float, str, en None).

Python

def deserialize_mongodb_cdc_event(input_msg):
    obj = json.loads(input_msg)

We verwachten dat het object een payload-eigenschap heeft met een object als waarde. Dat object heeft vervolgens twee eigenschappen (“voor” en “na”), die ofwel None zijn of tekststrings bevatten die geserialiseerde JSON-objecten bevatten. We gebruiken de JSON-bibliotheek opnieuw om te deserialiseren en de strings te vervangen door de objecten. 

Python

 

if "payload" in obj:
        payload = obj["payload"]

        if "before" in payload:
            before_payload = payload["before"]
            if before_payload is not None:
                payload["before"] = json.loads(before_payload)

        if "after" in payload:
            after_payload = payload["after"]
            if after_payload is not None:
                payload["after"] = json.loads(after_payload)

Tot slot serialiseren we de volledige JSON-record opnieuw en converteren deze terug naar een bytearray voor overdracht naar de broker. 

Python

 

output_s = json.dumps(obj)
    output_msg = bytearray(output_s, "utf-8")
    return output_msg

Hooray! Onze objecten zien er nu zo uit:

Python

 

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},

...
}
}

Het uitvoeren van de Transformer Service

Als je de zeven stappen in de vorige blogpost hebt gevolgd, heb je slechts drie extra stappen nodig om de transformer service te starten en te verifiëren dat deze werkt:

Stap 8: Start de Transformer Service

Python

 

$ docker compose up -d cdc-transformer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cdc-transformer                                  Started                                                            1.3s

Stap 9: Start de Tweede Print Consumer

Python

 

$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cleaned-printing-consumer                        Started                                                            1.3s

Stap 10: Controleer de Memphis UI

Wanneer de transformer begint met het produceren van berichten naar Memphis.dev, wordt een tweede station met de naam “gezuiverde-todo-cdc-events” aangemaakt. Je zou deze nieuwe station op de Station Overview pagina in de Memphis.dev UI zo moeten zien:

De details pagina voor de “gecleaned-todo-cdc-events” pagina zou de transformator moeten laten zien die is vastgemaakt als producent, de afdrukconsument en de getransformeerde berichten: 

Gefeliciteerd! We zijn nu klaar om berichten te valideren met behulp van Schemaverse in onze volgende blogpost. Blijf op de hoogte!

Voor het geval je delen 1 en 2 hebt gemist:

Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages