В нашем последнем блоге мы представили ссылочное исполнение для захвата событий изменения данных (CDC) из базы данных MongoDB с использованием Сервера Debezium и Memphis.dev. В конце поста мы отметили, что записи MongoDB сериализуются в виде строк в сообщениях CDC Debezium, например:
{
"schema" : ...,
"payload" : {
"before" : null,
"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
...
}
}
Мы хотим использовать функциональность Schemaverse Memphis.dev для проверки сообщений на соответствие ожидаемой схеме. Сообщения, которые не соответствуют схеме, направляются на станцию мёртвых писем, чтобы они не влияли на конечных потребителей. Если это звучит как древнегреческий язык, не волнуйтесь! Мы объясним детали в нашем следующем блоге.
Для использования функциональности, такой как Schemaverse, нам нужно десериализовать записи MongoDB в виде JSON-документов. В этом посте описывается модификация нашего конвейера CDC MongoDB, которая добавляет сервис-трансформер для десериализации записей MongoDB в JSON-документы.
Обзор решения
Предыдущее решение состояло из шести компонентов:
1. Генератор задач: Вставляет случайно сгенерированную задачу в коллекцию MongoDB каждые 0,5 секунды. Каждая задача содержит описание, время создания, необязательный срок выполнения и статус выполнения.
2. MongoDB: Настроен с одной базой данных, содержащей одну коллекцию (todo_items).
3. Debezium Server: Экземпляр Debezium Server, настроенный с источником MongoDB и соединителем приемника HTTP Client.
4. Memphis.dev REST Gateway: Использует конфигурацию “из коробки”.
5. Memphis.dev: Настроен с одной станцией (todo-cdc-events) и одним пользователем (todocdcservice)
6. Printing Consumer: Скрипт, использующий Python SDK Memphis.dev для приема сообщений и вывода их в консоль.
В этой итерации мы добавляем два дополнительных компонента:
1. Transformer Service: Сервис трансформации, который потребляет сообщения от станции todo-cdc-events, десериализует записи MongoDB и отправляет их на станцию cleaned-todo-cdc-events.
2. Cleaned Printing Consumer: Второй экземпляр потребителя печати, который печатает сообщения, отправленные на станцию cleaned-todo-cdc-events.
Обновленная архитектура выглядит следующим образом:
A Deep Dive Into the Transformer Service
Скелет Службы Трансформации Сообщений
Служба трансформации использует Python SDK от Memphis.dev. Давайте рассмотрим реализацию трансформатора. Метод main() нашего трансформатора сначала соединяется с брокером Memphis.dev. Детали подключения берутся из переменных окружения. Хост, имя пользователя, пароль, имя входной станции и имя выходной станции передаются с использованием переменных окружения в соответствии с рекомендациями из манифеста Twelve-Factor App.
async def main():
try:
print("Waiting on messages...")
memphis = Memphis()
await memphis.connect(host=os.environ[HOST_KEY],
username=os.environ[USERNAME_KEY],
password=os.environ[PASSWORD_KEY])
После установления соединения мы создаем объекты потребителя и производителя. В Memphis.dev потребители и производители имеют имена. Эти имена отображаются в пользовательском интерфейсе Memphis.dev, обеспечивая прозрачность операций системы.
print("Creating consumer")
consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
consumer_name="transformer",
consumer_group="")
print("Creating producer")
producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
producer_name="transformer")
API потребителя использует шаблон функции обратного вызова. Когда сообщения извлекаются из брокера, вызывается предоставленная функция с передачей списка сообщений в качестве аргумента.
print("Creating handler")
msg_handler = create_handler(producer)
print("Setting handler")
consumer.consume(msg_handler)
После настройки callback мы запускаем цикл событий asyncio. В этот момент сервис трансформации приостанавливается и ожидает, пока сообщения будут доступны для извлечения из брокера.
- Поддерживайте главный поток выполнения активным, чтобы потребитель продолжал получать данные.
await asyncio.Event().wait()
Создание функции обработчика сообщений
Функция create для обработчика сообщений принимает объект производителя и возвращает callback-функцию. Поскольку callback-функция принимает только один аргумент, мы используем шаблон замыкания для неявного передачи производителя в функцию msg_handler при её создании.
Функция msg_handler при вызове передаётся три аргумента: список сообщений, ошибка (если она произошла) и контекст, состоящий из словаря. Наш обработчик проходится по сообщениям, вызывает функцию трансформации для каждого, отправляет сообщения на вторую станцию с использованием производителя и подтверждает, что сообщение было обработано. В Memphis.dev сообщения не отмечаются как доставленные, пока потребитель не подтвердит их. Это предотвращает потерю сообщений в случае возникновения ошибки во время обработки.
def create_handler(producer):
async def msg_handler(msgs, error, context):
try:
for msg in msgs:
transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
await producer.produce(message=transformed_msg)
await msg.ack()
except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
print(e)
return
return msg_handler
Функция трансформации сообщений
Теперь мы подошли к суть услуг: функции преобразования сообщений. Данные сообщений (возвращаемые методом get_data()) хранятся в виде объектов bytearray. Мы используем библиотеку json Python для десериализации сообщений в иерархию коллекций Python (список и словарь) и примитивных типов (int, float, str и None).
def deserialize_mongodb_cdc_event(input_msg):
obj = json.loads(input_msg)
Ожидается, что объект будет иметь свойство payload со значением объекта. Этот объект затем имеет два свойства (“before” и “after”), которые являются либо None, либо строками, содержащими сериализованные JSON-объекты. Мы снова используем библиотеку JSON для десериализации и замены строк на объекты.
if "payload" in obj:
payload = obj["payload"]
if "before" in payload:
before_payload = payload["before"]
if before_payload is not None:
payload["before"] = json.loads(before_payload)
if "after" in payload:
after_payload = payload["after"]
if after_payload is not None:
payload["after"] = json.loads(after_payload)
Наконец, мы повторно сериализуем весь JSON-отчет и преобразуем его обратно в bytearray для передачи брокеру.
output_s = json.dumps(obj)
output_msg = bytearray(output_s, "utf-8")
return output_msg
Ура! Теперь наши объекты выглядят следующим образом:
{
"schema" : ...,
"payload" : {
"before" : null,
"after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},
...
}
}
Запуск сервиса преобразования сообщений
Если вы следовали семи шагам в предыдущей статье блога, вам нужно будет выполнить только три дополнительных шага, чтобы запустить сервис преобразования сообщений и убедиться, что он работает:
Шаг 8: Запустите сервис преобразования сообщений
$ docker compose up -d cdc-transformer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container cdc-transformer Started 1.3s
Шаг 9: Запустите второго потребителя печати
$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container cleaned-printing-consumer Started 1.3s
Шаг 10: Проверьте пользовательский интерфейс Memphis
Когда преобразователь начнет передавать сообщения на Memphis.dev, будет создана вторая станция с названием “cleaned-todo-cdc-events”. Вы должны увидеть эту новую станцию на странице Обзор станций в пользовательском интерфейсе Memphis.dev следующим образом:
Страница деталей для страницы “cleaned-todo-cdc-events” должна отображать трансформер, присоединенный как продюсер, печатного потребителя и трансформированные сообщения:
Поздравляем! Теперь мы готовы к проверке сообщений с использованием Schemaverse в нашем следующем блоге. Оставайтесь с нами!
Если вы пропустили части 1 и 2:
Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages