Это вторая часть серии блогов о создании современной системы с событийным управлением с использованием Memphis.dev.
Наш последний блог пост представил ссылочное реализацию для захвата событий изменения данных (CDC) из базы данных PostgreSQL с использованием Debezium Server и Memphis.dev. Заменив Apache Kafka на Memphis.dev, решение значительно снизило операционные ресурсы и накладные расходы, сэкономив деньги и освободив разработчиков для создания нового функционала.
Однако PostgreSQL — единственная широко используемая база данных. Debezium предоставляет соединители для различных баз данных, включая нереляционную документную базу данных MongoDB. MongoDB популярен среди разработчиков, особенно работающих с динамическими языками программирования, поскольку он избегает несоответствия между объектами и реляционными данными. Разработчики могут напрямую хранить, запрашивать и обновлять объекты в базе данных.
В этом блоге мы демонстрируем, как адаптировать решение CDC для MongoDB.
Обзор решения
Здесь мы описываем архитектуру ссылочного решения для доставки событий захвата изменений данных с использованием Memphis.dev. Архитектура не изменилась по сравнению с нашим предыдущим блогом, за исключением замены PostgreSQL на MongoDB.
A Todo Item generator script writes randomly generated records to MongoDB. Debezium Server receives CDC events from MongoDB and forwards them to the Memphis REST gateway through the HTTP client sink. The Memphis REST gateway adds the messages to a station in Memphis.dev. Lastly, a consumer script polls Memphis. dev for new messages and prints them to the console.
- Генератор задач Todo: Вставляет случайно сгенерированную задачу todo в коллекцию MongoDB каждые 0,5 секунд. Каждая задача todo содержит описание, временную метку создания, необязательную дату выполнения и статус выполнения.
- MongoDB: Настроен с одной базой данных, содержащей одну коллекцию (todo_items).
- Сервер Debezium: Экземпляр сервера Debezium, настроенный с источником MongoDB и соединителем приемника HTTP Client.
- REST Gateway Memphis.dev: Использует конфигурацию out-of-the-box.
- Memphis.dev: Настроен с одним станцией (todo-cdc-events) и одним пользователем (todocdcservice).
- Потребитель печати: Скрипт, использующий Python SDK Memphis.dev для потребления сообщений и вывода их в консоль.
Начало работы
Руководство по реализации доступно в директории mongodb-debezium-cdc-example репозитория Memphis Example Solutions. Docker Compose потребуется для его запуска.
Запуск реализации
Создайте образы Docker для Debezium Server, потребителя печати и настройки базы данных (создание таблиц и пользователя).
В настоящее время реализация зависит от предварительной версии Debezium Server для поддержки аутентификации JWT. Образ Docker будет создан непосредственно из основной ветки репозиториев Debezium и Debezium Server. Обратите внимание, что этот шаг может занять немало времени (~20 минут) для выполнения. Когда Debezium Server 2.3.0 будет выпущен, мы перейдем к использованию образа Docker от основного репозитория.
Шаг 1: Создание образов
$ docker compose build --pull --no-cache
Шаг 2: Запуск брокера Memphis.dev и шлюза REST
Запустите брокер Memphis.dev и шлюз REST. Обратите внимание, что служба memphis-rest-gateway зависит от службы брокера Memphis, поэтому служба брокера также будет запущена.
$ docker compose up -d memphis-rest-gateway
[+] Running 4/4
⠿ Network mongodb-debezium-cdc-example_default Created 0.0s
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Healthy 6.0s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 16.8s
⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1 Started
Шаг 3: Создание станции и соответствующего пользователя в Memphis.dev
Сообщения доставляются на “станции” в Memphis.dev; они эквивалентны “топикам”, используемым брокерами сообщений. Откройте браузер по адресу localhost:9000. Нажмите ссылку “войти с root” в нижней части страницы.
Войдите с использованием имени пользователя root и пароля memphis.
Следуйте пошаговой инструкции для создания станции с названием todo-cdc-events.
Создайте пользователя с именем todocdcservice с одинаковым значением пароля.
Нажмите “Далее” до завершения мастера:
Нажмите “Перейти к обзору станции”, чтобы перейти на страницу обзора станции.
Шаг 4: Запуск Потребителя Печати
Мы использовали Python SDK от Memphis.dev для создания скрипта потребителя, который проверяет станцию todo-cdc-events и выводит сообщения в консоль.
$ docker compose up -d printing-consumer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container printing-consumer Started
Шаг 5: Запуск и Настройка MongoDB
Для захвата изменений необходимо включить функцию репликации MongoDB. Существует несколько шагов:
- Название реплики должно быть установлено. Это можно сделать, передав имя реплики в командной строке или в конфигурационном файле. В файле Docker Compose мы запускаем MongoDB с аргументом командной строки –replSet rs0, чтобы установить имя реплики.
- При использовании репликации и включенной авторизации необходимо предоставить общий файл ключа каждой инстанции реплики. Мы создали файл ключа, следуя инструкциям в документации MongoDB. Затем мы создали образ, который расширяет официальный образ MongoDB, включая файл ключа.
- Набор реплик должен быть инициализирован после запуска MongoDB. Мы используем скрипт, который настраивает инстанцию при запуске. Скрипт вызывает команду replSetInitiate с перечислением IP-адресов и портов каждой инстанции MongoDB в наборе реплик. Эта команда заставляет инстанции MongoDB общаться между собой и выбрать лидера.
Обычно наборы реплик используются для повышения надежности (высокая доступность). Большинство документов, которые вы найдете, описывают, как настроить реплику с несколькими инстанциями MongoDB. В нашем случае соединитель Debezium для MongoDB использует функциональность репликации для захвата событий изменения данных. Хотя мы проходим шаги для настройки набора реплик, мы используем только одну инстанцию MongoDB.
скрипт генератора задач по TODO создает новую задачу по TODO каждые полсекунды. Значения полей генерируются случайным образом. Элементы добавляются в коллекцию MongoDB с именем “todo_items”.
В файле Docker Compose скрипт генератора задач по TODO настроен так, что зависит от работоспособного состояния экземпляра Mongodb и успешного завершения скрипта настройки базы данных. Запуск скрипта генератора задач по TODO приведет к запуску MongoDB и выполнению скрипта настройки базы данных через Docker Compose.
$ docker compose up -d todo-generator
[+] Running 3/3
⠿ Container mongodb Healthy 8.4s
⠿ Container mongodb-database-setup Exited 8.8s
⠿ Container mongodb-todo-generator Started
Шаг 6: Запустить сервер Debezium
Последним сервисом, который нужно запустить, является сервер Debezium. Сервер настроен с использованием источника соединителя для MongoDB и соединителя приемника HTTP Client через файл свойств Java:
debezium.sink.type=http
debezium.sink.http.url=http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
debezium.sink.http.time-out.ms=500
debezium.sink.http.retries=3
debezium.sink.http.authentication.type=jwt
debezium.sink.http.authentication.jwt.username=todocdcservice
debezium.sink.http.authentication.jwt.password=todocdcservice
debezium.sink.http.authentication.jwt.url=http://memphis-rest-gateway:4444/
debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector
debezium.source.mongodb.connection.string=mongodb://db
debezium.source.mongodb.user=root
debezium.source.mongodb.password=mongodb
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.topic.prefix=tutorial
debezium.format.key=json
debezium.format.value=json
quarkus.log.console.json=false
Большинство опций понятны сами по себе. URL-адрес приемника HTTP клиента стоит объяснить подробнее. Gateway REST на memphis.dev ожидает получать POST-запросы с путем в следующем формате:
/stations/{station}/produce/{quantity}
Заполнитель {station} заменяется на имя станции, куда отправляется сообщение. Заполнитель {quantity} заменяется на значение single (для одного сообщения) или batch (для нескольких сообщений).
Сообщение(я) передается в качестве полезной нагрузки запроса POST. REST-шлюз поддерживает три формата сообщений (простое текстовое содержимое, JSON или протокольные буферы). Значение (text/, application/json, application/x-protobuf) поля заголовка content-type определяет, как интерпретировать полезную нагрузку.
HTTP-клиент Debezium Server, выступающий в качестве приемника, создает запросы REST, соответствующие этим шаблонам. Запросы используют метод POST; каждый запрос содержит одно сообщение, закодированное в формате JSON, в качестве полезной нагрузки, а заголовок content-type установлен в application/JSON. Мы используем todo-CDC-events в качестве имени станции и единственного значения количества в URL-адресе конечной точки для маршрутизации сообщений и указания, как REST-шлюз должен интерпретировать запросы:
http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
Свойство debezium.sink.http.authentication.type=jwt указывает, что HTTP-клиент приемника должен использовать аутентификацию JWT. Свойства username и password очевидны, однако свойство debezium.sink.http.authentication.jwt.The URL требует некоторого объяснения. Инициальный токен получают с использованием конечной точки /auth/authenticate, в то время как аутентификация обновляется с помощью отдельной конечной точки /auth/refreshToken. Аутентификация JWT в HTTP-клиенте приемника добавляет соответствующую конечную точку к заданному базовому URL.
Debezium Server можно запустить с помощью следующей команды:
$ docker compose up -d debezium-server
[+] Running 5/5
⠿ Container mongodb Healthy 1.5s
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Healthy 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1 Running 0.0s
⠿ Container debezium-server Started
Шаг 7: Убедитесь, что система работает
Проверьте экран обзора станции todo-cdc-events в веб-интерфейсе Memphis.dev, чтобы убедиться, что производитель и потребитель подключены, и сообщения доставляются.
И, выведите логи для контейнера printing-consumer:
$ docker logs --tail 2 printing-consumer
сообщение:
bytearray(b'{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"},{"type":"int64","optional":true,"field":"wallTime"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"tutorial.todo_application.todo_items.Envelope"},"payload":{"before":null,"after":"{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ec\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402475},\\"due_date\\": {\\"$date\\": 1684266602475},\\"description\\": \\"GMZVMKXVKOWIOEAVRYWR\\",\\"completed\\": false}","updateDescription":null,"source":{"version":"2.3.0-SNAPSHOT","connector":"mongodb","name":"tutorial","ts_ms":1684007402000,"snapshot":"false","db":"todo_application","sequence":null,"rs":"rs0","collection":"todo_items","ord":1,"lsid":null,"txnNumber":null,"wallTime":1684007402476},"op":"c","ts_ms":1684007402478,"transaction":null}}')
Формат сообщений CDC
Входящие сообщения форматируются в формате JSON. Сообщения имеют два основных поля (схема и данные). Схема описывает схему записи (названия полей и их типы), а данные описывают изменение записи. Объект данных содержит два поля (до и после), указывающие значение записи до и после изменения.
Для MongoDB, Debezium Server кодирует запись как строку сериализованного JSON:
{
"before" : null,
"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}"
}
Это будет иметь последствия для последующей обработки сообщений, о которых мы расскажем в будущей статье в этой серии.
Поздравляем! Теперь у вас есть работающий пример того, как отслеживать изменения данных в базе данных MongoDB с помощью Debezium Server и передавать события в Memphis.dev для последующей обработки.
Часть 3 выйдет скоро!
Source:
https://dzone.com/articles/part-2-change-data-capture-cdc-for-mongodb-with-de