Часть 3: Преобразование сообщений CDC MongoDB

В нашем последнем блоге мы представили ссылочное исполнение для захвата событий изменения данных (CDC) из базы данных MongoDB с использованием Сервера Debezium и Memphis.dev. В конце поста мы отметили, что записи MongoDB сериализуются в виде строк в сообщениях CDC Debezium, например:

Python

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
            ...
         }
}

Мы хотим использовать функциональность Schemaverse Memphis.dev для проверки сообщений на соответствие ожидаемой схеме. Сообщения, которые не соответствуют схеме, направляются на станцию мёртвых писем, чтобы они не влияли на конечных потребителей. Если это звучит как древнегреческий язык, не волнуйтесь! Мы объясним детали в нашем следующем блоге.

Для использования функциональности, такой как Schemaverse, нам нужно десериализовать записи MongoDB в виде JSON-документов. В этом посте описывается модификация нашего конвейера CDC MongoDB, которая добавляет сервис-трансформер для десериализации записей MongoDB в JSON-документы.

Обзор решения

Предыдущее решение состояло из шести компонентов:

1. Генератор задач: Вставляет случайно сгенерированную задачу в коллекцию MongoDB каждые 0,5 секунды. Каждая задача содержит описание, время создания, необязательный срок выполнения и статус выполнения.

2. MongoDB: Настроен с одной базой данных, содержащей одну коллекцию (todo_items).

3. Debezium Server: Экземпляр Debezium Server, настроенный с источником MongoDB и соединителем приемника HTTP Client.

4. Memphis.dev REST Gateway: Использует конфигурацию “из коробки”.

5. Memphis.dev: Настроен с одной станцией (todo-cdc-events) и одним пользователем (todocdcservice)

6. Printing Consumer: Скрипт, использующий Python SDK Memphis.dev для приема сообщений и вывода их в консоль.

В этой итерации мы добавляем два дополнительных компонента:

1. Transformer Service: Сервис трансформации, который потребляет сообщения от станции todo-cdc-events, десериализует записи MongoDB и отправляет их на станцию cleaned-todo-cdc-events.

2. Cleaned Printing Consumer: Второй экземпляр потребителя печати, который печатает сообщения, отправленные на станцию cleaned-todo-cdc-events.

Обновленная архитектура выглядит следующим образом:

A Deep Dive Into the Transformer Service

Скелет Службы Трансформации Сообщений

Служба трансформации использует Python SDK от Memphis.dev. Давайте рассмотрим реализацию трансформатора. Метод main() нашего трансформатора сначала соединяется с брокером Memphis.dev. Детали подключения берутся из переменных окружения. Хост, имя пользователя, пароль, имя входной станции и имя выходной станции передаются с использованием переменных окружения в соответствии с рекомендациями из манифеста Twelve-Factor App.

Python

async def main():
    try:
        print("Waiting on messages...")
        memphis = Memphis()
        await memphis.connect(host=os.environ[HOST_KEY],
                              username=os.environ[USERNAME_KEY],
                              password=os.environ[PASSWORD_KEY])

После установления соединения мы создаем объекты потребителя и производителя. В Memphis.dev потребители и производители имеют имена. Эти имена отображаются в пользовательском интерфейсе Memphis.dev, обеспечивая прозрачность операций системы. 

Python

print("Creating consumer")
        consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
                                          consumer_name="transformer",
                                          consumer_group="")

        print("Creating producer")
        producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
                                          producer_name="transformer")

API потребителя использует шаблон функции обратного вызова. Когда сообщения извлекаются из брокера, вызывается предоставленная функция с передачей списка сообщений в качестве аргумента.

Python

print("Creating handler")
        msg_handler = create_handler(producer)

        print("Setting handler")
        consumer.consume(msg_handler)

После настройки callback мы запускаем цикл событий asyncio. В этот момент сервис трансформации приостанавливается и ожидает, пока сообщения будут доступны для извлечения из брокера.

  • Поддерживайте главный поток выполнения активным, чтобы потребитель продолжал получать данные.
Python

await asyncio.Event().wait()

Создание функции обработчика сообщений

Функция create для обработчика сообщений принимает объект производителя и возвращает callback-функцию. Поскольку callback-функция принимает только один аргумент, мы используем шаблон замыкания для неявного передачи производителя в функцию msg_handler при её создании.

Функция msg_handler при вызове передаётся три аргумента: список сообщений, ошибка (если она произошла) и контекст, состоящий из словаря. Наш обработчик проходится по сообщениям, вызывает функцию трансформации для каждого, отправляет сообщения на вторую станцию с использованием производителя и подтверждает, что сообщение было обработано. В Memphis.dev сообщения не отмечаются как доставленные, пока потребитель не подтвердит их. Это предотвращает потерю сообщений в случае возникновения ошибки во время обработки.

Python

def create_handler(producer):
    async def msg_handler(msgs, error, context):
        try:
            for msg in msgs:
                transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
                await producer.produce(message=transformed_msg)
                await msg.ack()
        except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
            print(e)
            return

    return msg_handler

Функция трансформации сообщений

Теперь мы подошли к суть услуг: функции преобразования сообщений. Данные сообщений (возвращаемые методом get_data()) хранятся в виде объектов bytearray. Мы используем библиотеку json Python для десериализации сообщений в иерархию коллекций Python (список и словарь) и примитивных типов (int, float, str и None).

Python

def deserialize_mongodb_cdc_event(input_msg):
    obj = json.loads(input_msg)

Ожидается, что объект будет иметь свойство payload со значением объекта. Этот объект затем имеет два свойства (“before” и “after”), которые являются либо None, либо строками, содержащими сериализованные JSON-объекты. Мы снова используем библиотеку JSON для десериализации и замены строк на объекты.

Python

 

if "payload" in obj:
        payload = obj["payload"]

        if "before" in payload:
            before_payload = payload["before"]
            if before_payload is not None:
                payload["before"] = json.loads(before_payload)

        if "after" in payload:
            after_payload = payload["after"]
            if after_payload is not None:
                payload["after"] = json.loads(after_payload)

Наконец, мы повторно сериализуем весь JSON-отчет и преобразуем его обратно в bytearray для передачи брокеру.

Python

 

output_s = json.dumps(obj)
    output_msg = bytearray(output_s, "utf-8")
    return output_msg

Ура! Теперь наши объекты выглядят следующим образом:

Python

 

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},

...
}
}

Запуск сервиса преобразования сообщений

Если вы следовали семи шагам в предыдущей статье блога, вам нужно будет выполнить только три дополнительных шага, чтобы запустить сервис преобразования сообщений и убедиться, что он работает:

Шаг 8: Запустите сервис преобразования сообщений

Python

 

$ docker compose up -d cdc-transformer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cdc-transformer                                  Started                                                            1.3s

Шаг 9: Запустите второго потребителя печати

Python

 

$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cleaned-printing-consumer                        Started                                                            1.3s

Шаг 10: Проверьте пользовательский интерфейс Memphis

Когда преобразователь начнет передавать сообщения на Memphis.dev, будет создана вторая станция с названием “cleaned-todo-cdc-events”. Вы должны увидеть эту новую станцию на странице Обзор станций в пользовательском интерфейсе Memphis.dev следующим образом:

Страница деталей для страницы “cleaned-todo-cdc-events” должна отображать трансформер, присоединенный как продюсер, печатного потребителя и трансформированные сообщения: 

Поздравляем! Теперь мы готовы к проверке сообщений с использованием Schemaverse в нашем следующем блоге. Оставайтесь с нами!

Если вы пропустили части 1 и 2:

Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages