In onze vorige blogpost hebben we een referentie-implementatie geïntroduceerd voor het vastleggen van change data capture (CDC) gebeurtenissen vanuit een MongoDB database met behulp van Debezium Server en Memphis.dev. Aan het einde van de post merkten we op dat MongoDB-records worden gserialiseerd als strings in Debezium CDC-berichten op deze manier:
{
"schema" : ...,
"payload" : {
"before" : null,
"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
...
}
}
We willen de Schemaverse functionaliteit van Memphis.dev gebruiken om berichten te controleren op basis van een verwacht schema. Berichten die niet overeenkomen met het schema worden doorgestuurd naar een dead letter station zodat ze geen invloed hebben op downstream consumenten. Als dit allemaal klinkt als oud Grieks, maak je geen zorgen! We zullen de details in onze volgende blogpost uitleggen.
Om functionaliteiten zoals Schemaverse te gebruiken, moeten we de MongoDB-records deserialiseren als JSON-documenten. Deze blogpost beschrijft een aanpassing van onze MongoDB CDC-pijplijn die een transformatiedienst toevoegt om de MongoDB-records te deserialiseren naar JSON-documenten.
Overzicht van de oplossing
De vorige oplossing bestond uit zes componenten:
1. Todo Item Generator: Voegt elke 0,5 seconden een willekeurig gegenereerde todo-item toe aan de MongoDB-collectie. Elk todo-item bevat een beschrijving, creatietijdstip, optionele vervaldatum en voltooiingsstatus.
2. MongoDB: Geconfigureerd met één database die één verzameling (todo_items) bevat.
3. Debezium Server: Een instantie van Debezium Server die is geconfigureerd met MongoDB-bron en HTTP Client-sink connectors.
4. Memphis.dev REST Gateway: Gebruikt de standaardconfiguratie.
5. Memphis.dev: Geconfigureerd met één station (todo-cdc-events) en één gebruiker (todocdcservice)
6. Printing Consumer: Een script dat de Memphis.dev Python SDK gebruikt om berichten te consumeren en af te drukken op de console.
In deze iteratie voegen we twee extra componenten toe:
1. Transformer Service: Een transformer service die berichten consumeren van het todo-cdc-events station, de MongoDB-records deserialiseert, en ze naar het cleaned-todo-cdc-events station pusht.
2. Cleaned Printing Consumer: Een tweede instantie van de printing consumer die berichten afdrukt die naar het cleaned-todo-cdc-events station worden gepusht.
De bijgewerkte architectuur ziet er zo uit:
A Deep Dive Into the Transformer Service
Skeleton van de Message Transformer Service
Het transformerservice maakt gebruik van de Memphis.dev Python SDK. Laten we de implementatie van de transformer bekijken. De main() methode van onze transformer maakt eerst verbinding met de Memphis.dev broker. De verbindingsgegevens worden opgehaald uit milieuvariabelen. Het host, gebruikersnaam, wachtwoord, invoer stationnaam en uitvoer stationnaam worden doorgegeven met behulp van milieuvariabelen volgens de suggesties van het Twelve-Factor App manifesto.
async def main():
try:
print("Waiting on messages...")
memphis = Memphis()
await memphis.connect(host=os.environ[HOST_KEY],
username=os.environ[USERNAME_KEY],
password=os.environ[PASSWORD_KEY])
Zodra een verbinding tot stand is gebracht, maken we consumenten- en produktieobjecten aan. In Memphis.dev hebben consumenten en producenten namen. Deze namen verschijnen in de Memphis.dev UI, waardoor transparantie in de systeemoperaties wordt geboden.
print("Creating consumer")
consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
consumer_name="transformer",
consumer_group="")
print("Creating producer")
producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
producer_name="transformer")
De consumenten-API maakt gebruik van het callback functiepatroon. Wanneer berichten worden opgehaald van de broker, wordt de meegeleverde functie aangeroepen met een lijst van berichten als argument.
print("Creating handler")
msg_handler = create_handler(producer)
print("Setting handler")
consumer.consume(msg_handler)
Na het instellen van de callback starten we de asyncio event loop. Op dit moment pauzeert de transformer service en wacht tot er berichten beschikbaar zijn om van de broker te halen.
- Houd je hoofdthread in leven zodat de consument blijft ontvangen van gegevens.
await asyncio.Event().wait()
Maken van de Message Handler Function
De create-functie voor de message handler neemt een producer-object en retourneert een callback-functie. Omdat de callback-functie slechts één argument accepteert, gebruiken we het closure patroon om de producer impliciet door te geven aan de msg_handler-functie wanneer we deze maken.
De msg_handler functie krijgt drie argumenten wanneer deze wordt aangeroepen: een lijst met berichten, een fout (indien optredend) en een context die bestaat uit een woordenboek. On咱 handler loopt over de berichten, roept de transform-functie voor elk aan, stuurt de berichten naar het tweede station met behulp van de producer en erkent dat het bericht is verwerkt. Op Memphis.dev worden berichten niet gemarkeerd als geleverd totdat de consument ze erkent. Dit voorkomt dat berichten verloren gaan als er een fout optreedt tijdens de verwerking.
def create_handler(producer):
async def msg_handler(msgs, error, context):
try:
for msg in msgs:
transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
await producer.produce(message=transformed_msg)
await msg.ack()
except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
print(e)
return
return msg_handler
De Message Transformer Function
Nu komen we bij de kern van de dienst: de berichttransformatiefunctie. Berichtladingen (teruggegeven door de get_data() methode) worden opgeslagen als bytearray objecten. We gebruiken de Python json bibliotheek om de berichten te deserialiseren in een hiërarchie van Python verzamelingen (lijst en dict) en primitieve types (int, float, str, en None).
def deserialize_mongodb_cdc_event(input_msg):
obj = json.loads(input_msg)
We verwachten dat het object een payload-eigenschap heeft met een object als waarde. Dat object heeft vervolgens twee eigenschappen (“voor” en “na”), die ofwel None zijn of tekststrings bevatten die geserialiseerde JSON-objecten bevatten. We gebruiken de JSON-bibliotheek opnieuw om te deserialiseren en de strings te vervangen door de objecten.
if "payload" in obj:
payload = obj["payload"]
if "before" in payload:
before_payload = payload["before"]
if before_payload is not None:
payload["before"] = json.loads(before_payload)
if "after" in payload:
after_payload = payload["after"]
if after_payload is not None:
payload["after"] = json.loads(after_payload)
Tot slot serialiseren we de volledige JSON-record opnieuw en converteren deze terug naar een bytearray voor overdracht naar de broker.
output_s = json.dumps(obj)
output_msg = bytearray(output_s, "utf-8")
return output_msg
Hooray! Onze objecten zien er nu zo uit:
{
"schema" : ...,
"payload" : {
"before" : null,
"after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},
...
}
}
Het uitvoeren van de Transformer Service
Als je de zeven stappen in de vorige blogpost hebt gevolgd, heb je slechts drie extra stappen nodig om de transformer service te starten en te verifiëren dat deze werkt:
Stap 8: Start de Transformer Service
$ docker compose up -d cdc-transformer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container cdc-transformer Started 1.3s
Stap 9: Start de Tweede Print Consumer
$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container cleaned-printing-consumer Started 1.3s
Stap 10: Controleer de Memphis UI
Wanneer de transformer begint met het produceren van berichten naar Memphis.dev, wordt een tweede station met de naam “gezuiverde-todo-cdc-events” aangemaakt. Je zou deze nieuwe station op de Station Overview pagina in de Memphis.dev UI zo moeten zien:
De details pagina voor de “gecleaned-todo-cdc-events” pagina zou de transformator moeten laten zien die is vastgemaakt als producent, de afdrukconsument en de getransformeerde berichten:
Gefeliciteerd! We zijn nu klaar om berichten te valideren met behulp van Schemaverse in onze volgende blogpost. Blijf op de hoogte!
Voor het geval je delen 1 en 2 hebt gemist:
Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages