الجزء 3: تحويل رسائل أحداث CDC في MongoDB

في مقالنا الأخير، عرضنا تنفيذًا مرجعيًا للتقاط أحداث تعقب التغيير (CDC) من قاعدة بيانات MongoDB باستخدام Debezium Server و Memphis.dev. في نهاية المقال، أشرنا إلى أن سجلات MongoDB تتم ترميزها كسلاسل أحرف في رسائل CDC لـ Debezium بهذه الطريقة:

Python

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
            ...
         }
}

نرغب في استخدام ميزة Schemaverse في Memphis.dev للتحقق من الرسائل ضد مخطط متوقع. تتم توجيه الرسائل التي لا تتطابق مع المخطط إلى محطة الرسائل الميتة بحيث لا تؤثر على المستهلكين التاليين. إذا كان هذا يبدو وكأنه يوناني عجيب، فلا تقلق! سنشرح التفاصيل في مقالنا التالي.

لاستخدام ميزات مثل Schemaverse، نحتاج إلى تحليل السجلات MongoDB إلى وثائق JSON. يصف هذا المقال تعديلًا على خطنا الإنتاجي لتعقب تغييرات MongoDB يضيف خدمة محولة لتحليل السجلات MongoDB إلى وثائق JSON.

نظرة عامة على الحل

كان الحل السابق يتألف من ستة مكونات:

1. مولد عناصر المهمة: يدخل عنصر مهمة مُولد عشوائيًا في مجموعة MongoDB كل 0.5 ثانية. يحتوي كل عنصر مهمة على وصف وطاقم زمنية الإنشاء وتاريخ استحقاق اختياري وحالة الإكتمال.

2. MongoDB: مُهيئة مع قاعدة بيانات واحدة تحتوي على مجموعة واحدة (todo_items).

3. Debezium Server: مثيل من خادم Debezium مُهيئ مع مصدر MongoDB وموصلات مصدر HTTP Client.

4. Gateway REST في Memphis.dev: يستخدم التكوين القياسي.

5. Memphis.dev: مُهيئة مع محطة واحدة (todo-cdc-events) ومستخدم واحد (todocdcservice)

6. مستهلك الطباعة: سكريبت يستخدم SDK Python لـ Memphis.dev لاستهلاك الرسائل وطباعتها على وحدة التحكم.

في هذه التكرار، نضيف مكونين إضافيين:

1. خدمة التحويل: خدمة تحويل تستهلك رسائل من محطة todo-cdc-events، تحلل سجلات MongoDB، وتُدفعها إلى محطة cleaned-todo-cdc-events.

2. مستهلك الطباعة المُنظَف: مثيل ثانٍ من مستهلك الطباعة يطبع رسائل تُدفع إلى محطة cleaned-todo-cdc-events.

الهيكل المُحدث يبدو كما يلي:

A Deep Dive Into the Transformer Service

هيكل خدمة التحويل الرسائل

تستخدم خدمة المحول باستخدام SDK البايثون لـ Memphis.dev. دعونا نستعرض تنفيذ المحول. تتصل الطريقة الرئيسية () لمحولنا أولاً بوكر Memphis.dev. تم الحصول على تفاصيل الاتصال من المتغيرات البيئية. تم تمرير المضيف واسم المستخدم وكلمة المرور واسم المحطة الداخلية واسم المحطة الخارجية من خلال المتغيرات البيئية بناءً على توصيات كتاب التفوق الاثني عشر.

Python

async def main():
    try:
        print("Waiting on messages...")
        memphis = Memphis()
        await memphis.connect(host=os.environ[HOST_KEY],
                              username=os.environ[USERNAME_KEY],
                              password=os.environ[PASSWORD_KEY])

بمجرد إقامة الاتصال ، ننشئ كائنات المستهلك والمنتج. في Memphis.dev ، للمستهلكين والمنتجين أسماء. تظهر هذه الأسماء في واجهة Memphis.dev ، مما يوفر شفافية في عمليات النظام. 

Python

print("Creating consumer")
        consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
                                          consumer_name="transformer",
                                          consumer_group="")

        print("Creating producer")
        producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
                                          producer_name="transformer")

تستخدم واجهة برمجة التطبيقات (API) المستهلك نمط وظيفة الاستدعاء. عندما يتم سحب الرسائل من الوكيل ، يتم استدعاء الوظيفة المقدمة مع قائمة بالرسائل كحجة لها.

Python

print("Creating handler")
        msg_handler = create_handler(producer)

        print("Setting handler")
        consumer.consume(msg_handler)

بعد تعيين الاستدعاء المرجعي، نبدأ حلقة الحدث asyncio. في هذه المرحلة، تتوقف خدمة المحول وتنتظر حتى تتوفر رسائل لسحبها من الوكيل.

  •  حافظ على تنشيط الخط الرئيسي للحفاظ على استقبال المستهلك للبيانات.
Python

await asyncio.Event().wait()

إنشاء وظيفة معالجة الرسائل

تأخذ الدالة الخاصة بإنشاء معالج الرسائل كائن منتج وتُرجع دالة استدعاء مرجعية. نظرًا لأن الدالة المرجعية تأخذ وسيطة واحدة فقط، نستخدم نمط الإغلاق لتمرير المُنتج إلى دالة msg_handler بشكل ضمني عند إنشائها.

يتم تمرير دالة msg_handler ثلاثة وسيطات عند الاستدعاء: قائمة بالرسائل، خطأ (في حالة حدوثه)، وسياق يتكون من قاموس. يتكرر معالجنا على الرسائل، يستدعي دالة التحويل على كل منها، يرسل الرسائل إلى المحطة الثانية باستخدام المُنتج، ويعلن أن الرسالة قد تم معالجتها. في Memphis.dev، لا يُعتبر الرسائل مُسلّمًا إلى التواصل حتى يقوم المستهلك بالإعلام بها. هذا يمنع إسقاط الرسائل في حالة حدوث خطأ أثناء المعالجة.

Python

def create_handler(producer):
    async def msg_handler(msgs, error, context):
        try:
            for msg in msgs:
                transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
                await producer.produce(message=transformed_msg)
                await msg.ack()
        except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
            print(e)
            return

    return msg_handler

دالة المحول للرسائل

الآن، نصل إلى جوهر الخدمة: وظيفة تحويل الرسائل. تتم تخزين بيانات الرسائل (التي يتم إرجاعها بواسطة الطريقة get_data()) ككائنات bytearray. نستخدم مكتبة json الخاصة ببايثون لتحليل الرسائل إلى تنظيم بايثون للمجموعات (قائمة وقاموس) والأنواع البسيطة (int، float، str، و None).

Python

def deserialize_mongodb_cdc_event(input_msg):
    obj = json.loads(input_msg)

نتوقع أن يكون للكائن خاصية payload بقيمة كائن. ثم لديها هذا الكائن خاصتان (“قبل” و “بعد”)، والتي هي إما None أو سلاسل أحرف تحتوي على كائنات JSON مجسمة. نستخدم مكتبة JSON مرة أخرى لتحليل واستبدال السلاسل بالكائنات.

Python

 

if "payload" in obj:
        payload = obj["payload"]

        if "before" in payload:
            before_payload = payload["before"]
            if before_payload is not None:
                payload["before"] = json.loads(before_payload)

        if "after" in payload:
            after_payload = payload["after"]
            if after_payload is not None:
                payload["after"] = json.loads(after_payload)

أخيرًا، نجسد مرة أخرى السجل الكامل JSON ونحوله مرة أخرى إلى bytearray للإرسال إلى الوكيل.

Python

 

output_s = json.dumps(obj)
    output_msg = bytearray(output_s, "utf-8")
    return output_msg

مبروك! تبدو كائناتنا الآن كما يلي:

Python

 

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},

...
}
}

تشغيل خدمة المحول

إذا قمت بمتابعة الخطوات السبع في المنشور السابق، فأنت تحتاج فقط لتنفيذ ثلاث خطوات إضافية لبدء خدمة المحول والتأكد من أنها تعمل:

الخطوة 8: بدء خدمة المحول

Python

 

$ docker compose up -d cdc-transformer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cdc-transformer                                  Started                                                            1.3s

الخطوة 9: بدء المستهلك الطباعة الثاني

Python

 

$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cleaned-printing-consumer                        Started                                                            1.3s

الخطوة 10: فحص واجهة Memphis UI

عندما يبدأ المحول في إنتاج رسائل إلى Memphis.dev، سيتم إنشاء محطة ثانية باسم “cleaned-todo-cdc-events”. يجب أن ترى هذه المحطة الجديدة على صفحة OverView للمحطات في واجهة Memphis.dev هكذا:

صفحة التفاصيل لصفحة “cleaned-todo-cdc-events” يجب أن تظهر المحول المعلق كمزود، والمستهلك الطابع، والرسائل المحولة: 

تهانينا! نحن الآن جاهزون لمعالجة تحقق من صحة الرسائل باستخدام Schemaverse في مقالتنا التالية. ترقبوا!

في حال فاتتك أجزاء 1 و 2:

Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages