이것은 Memphis.dev를 사용하여 현대적인 이벤트 기반 시스템을 구축하는 일련의 블로그 게시물의 두 번째 부분입니다.
우리의 마지막 블로그 게시물은 Debezium Server와 Memphis.dev를 사용하여 PostgreSQL 데이터베이스에서 변경 데이터 캡처(CDC) 이벤트를 캡처하기 위한 참조 구현을 소개했습니다. Apache Kafka를 Memphis.dev로 대체함으로써 해당 솔루션은 운영 자원 및 오버헤드를 상당히 감소시켰습니다 – 돈을 절약하고 개발자들이 새로운 기능 구축에 집중할 수 있도록 해주었습니다.
그러나 PostgreSQL은 일반적으로 사용되는 유일한 데이터베이스는 아닙니다. Debezium은 MongoDB와 같은 비관계형 문서 데이터베이스를 포함한 다양한 데이터베이스용 커넥터를 제공합니다. MongoDB는 객체-관계 비호환성을 피함으로써 동적 프로그래밍 언어로 작업하는 개발자들 사이에서 인기가 있습니다. 개발자들은 데이터베이스에서 직접 객체를 저장, 쿼리 및 업데이트할 수 있습니다.
이 블로그 게시물에서는 CDC 솔루션을 MongoDB에 적용하는 방법을 보여줍니다.
솔루션 개요
여기서 우리는 Memphis.dev로 변경 데이터 캡처 이벤트를 전달하기 위한 참조 솔루션의 아키텍처를 설명합니다. 아키텍처는 이전 블로그 게시물에서 변경된 것이 없으며, PostgreSQL이 MongoDB로 대체되었을 뿐입니다.
A Todo Item generator script writes randomly generated records to MongoDB. Debezium Server receives CDC events from MongoDB and forwards them to the Memphis REST gateway through the HTTP client sink. The Memphis REST gateway adds the messages to a station in Memphis.dev. Lastly, a consumer script polls Memphis. dev for new messages and prints them to the console.
- Todo Item Generator: 0.5초마다 MongoDB 컬렉션에 임의로 생성된 할 일 항목을 삽입합니다. 각 할 일 항목은 설명, 생성 타임스탬프, 선택적 마감일 및 완료 상태를 포함합니다.
- MongoDB: 단일 데이터베이스에 단일 컬렉션(todo_items)이 구성되어 있습니다.
- Debezium Server: MongoDB 소스 및 HTTP 클라이언트 싱크 커넥터로 구성된 Debezium Server 인스턴스입니다.
- Memphis.dev REST Gateway: 표준 설정을 사용합니다.
- Memphis.dev: 단일 스테이션(todo-cdc-events)과 단일 사용자(todocdcservice)로 구성되어 있습니다.
- Printing Consumer: Memphis.dev Python SDK를 사용하여 메시지를 소비하고 콘솔에 출력하는 스크립트입니다.
시작하기
구현 튜토리얼은 Memphis Example Solutions 저장소의 mongodb-debezium-cdc-example 디렉토리에서 확인할 수 있습니다. Docker Compose를 실행하는 데 필요합니다.
구현 실행
데브비지움 서버, 인쇄 소비자 및 데이터베이스 설정(테이블 및 사용자 생성)용 Docker 이미지를 빌드합니다.현재 구현은 JWT 인증 지원을 위한 Debezium 서버의 미리 출시 버전에 의존합니다. Docker 이미지는 Debezium 및 Debezium 서버 저장소의 메인 분기에서 직접 빌드됩니다. 이 단계가 상당히 오래 걸릴 수 있다는 점(약 20분)에 유의하세요. Debezium 서버 2.3.0이 출시되면 우리는 상류 Docker 이미지를 사용하기로 결정할 것입니다.
1단계: 이미지 빌드
$ docker compose build --pull --no-cache
2단계: Memphis.dev 브로커 및 REST 게이트웨이 시작
Memphis.dev 브로커 및 REST 게이트웨이를 시작합니다. Memphis-rest-gateway 서비스는 Memphis 브로커 서비스에 의존하므로 브로커 서비스도 시작됩니다.
$ docker compose up -d memphis-rest-gateway
[+] Running 4/4
⠿ Network mongodb-debezium-cdc-example_default Created 0.0s
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Healthy 6.0s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 16.8s
⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1 Started
3단계: Memphis.dev에서 스테이션 및 해당 사용자 생성
메시지는 Memphis.dev의 “스테이션”으로 전달됩니다. 이는 메시지 브로커에서 사용하는 “토픽”과 동일합니다. 브라우저를 localhost:9000에 가리킵니다. 페이지 하단의 “root로 로그인” 링크를 클릭하세요.
root(사용자 이름) 및 memphis(암호)로 로그인합니다.
todo-cdc-events라는 스테이션을 만들기 위해 마법사를 따르세요.
사용자 이름 todocdcservice를 생성하고 비밀번호에 동일한 값을 사용합니다.
마법사가 완료될 때까지 “다음”을 클릭합니다:
“스테이션 개요로 이동”을 클릭하여 스테이션 개요 페이지로 이동합니다.
4단계: 인쇄 소비자 시작
우리는 Memphis.dev Python SDK를 사용하여 소비자 스크립트를 생성하여 todo-cdc-events 스테이션을 폴링하고 콘솔에 메시지를 인쇄했습니다.
$ docker compose up -d printing-consumer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container printing-consumer Started
5단계: MongoDB 시작 및 구성
변경 사항을 캡처하려면 MongoDB의 복제 기능을 활성화해야 합니다. 여러 단계가 있습니다:
- 복제 세트 이름을 설정해야 합니다. 이는 복제 세트의 이름을 전달하여 명령줄 또는 구성 파일에서 수행할 수 있습니다. Docker Compose 파일에서 우리는 MongoDB를 명령줄 인수 –replSet rs0로 실행하여 복제 세트 이름을 설정합니다.
- 복제가 사용되고 인증이 활성화된 경우, 각 복제 인스턴스에는 일반 키 파일을 제공해야 합니다. 우리는 MongoDB 문서의 지침에 따라 키 파일을 생성하였습니다. 그런 다음 이미지를 구축하여 공식 MongoDB 이미지를 확장하고 키 파일을 포함시켰습니다.
- MongoDB가 실행되면 복제 세트를 한 번 초기화해야 합니다. 우리는 스크립트를 사용하여 시작 시 인스턴스를 구성합니다. 이 스크립트는 replSetInitiate 명령어를 호출하여 복제 세트의 각 MongoDB 인스턴스의 IP 주소와 포트 목록을 전달합니다. 이 명령어는 MongoDB 인스턴스들이 서로 통신하고 리더를 선출하게 합니다.
일반적으로 복제 세트는 신뢰성(고가용성)을 높이기 위해 사용됩니다. 대부분의 문서에서는 여러 MongoDB 인스턴스로 복제를 설정하는 방법을 설명합니다. 우리의 경우, Debezium의 MongoDB 커넥터는 복제 기능을 이용하여 데이터 변경 이벤트를 캡처합니다. 복제 세트를 구성하는 단계를 거치지만, 실제로는 하나의 MongoDB 인스턴스만 사용합니다.
todo item generator script는 매 0.5초마다 새로운 todo 항목을 생성합니다. 필드 값은 무작위로 생성되며, 이 항목들은 “todo_items”라는 MongoDB 컬렉션에 추가됩니다.
Docker Compose 파일에서, todo 항목 생성기 스크립트는 데이터베이스 설정 스크립트의 성공적인 완료와 건강한 상태에서 실행 중인 Mongodb 인스턴스에 의존하도록 구성되어 있습니다. todo 항목 생성기 스크립트를 시작하면, Docker Compose는 MongoDB를 시작하고 데이터베이스 설정 스크립트를 실행합니다.
$ docker compose up -d todo-generator
[+] Running 3/3
⠿ Container mongodb Healthy 8.4s
⠿ Container mongodb-database-setup Exited 8.8s
⠿ Container mongodb-todo-generator Started
6단계: Debezium 서버 시작
마지막으로 시작해야 하는 서비스는 Debezium 서버입니다. 이 서버는 MongoDB용 소스 커넥터와 HTTP 클라이언트 싱크 커넥터를 Java 속성 파일로 구성되어 있습니다:
debezium.sink.type=http
debezium.sink.http.url=http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
debezium.sink.http.time-out.ms=500
debezium.sink.http.retries=3
debezium.sink.http.authentication.type=jwt
debezium.sink.http.authentication.jwt.username=todocdcservice
debezium.sink.http.authentication.jwt.password=todocdcservice
debezium.sink.http.authentication.jwt.url=http://memphis-rest-gateway:4444/
debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector
debezium.source.mongodb.connection.string=mongodb://db
debezium.source.mongodb.user=root
debezium.source.mongodb.password=mongodb
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.topic.prefix=tutorial
debezium.format.key=json
debezium.format.value=json
quarkus.log.console.json=false
대부분의 옵션은 직관적입니다. HTTP 클라이언트 싱크 URL은 자세히 설명하기에 가치가 있습니다. memphis.dev REST 게이트웨이는 다음 형식의 경로를 가진 POST 요청을 기대합니다:
/stations/{station}/produce/{quantity}
{station} 자리 표시자는 메시지를 보낼 스테이션의 이름으로 대체됩니다. {quantity} 자리 표시자는 단일 메시지(단일) 또는 여러 메시지(배치)에 대한 값으로 대체됩니다.
메시지(들)은 POST 요청의 페이로드로 전달됩니다. REST 게이트웨이는 세 가지 메시지 형식(플레인 텍스트, JSON, 또는 프로토콜 버퍼)을 지원합니다. 콘텐츠 타입 헤더 필드의 값(text/, application/json, application/x-protobuf)이 페이로드의 해석 방법을 결정합니다.
Debezium Server의 HTTP Client 싱크는 이러한 패턴과 일치하는 REST 요청을 생성합니다. 요청은 POST 동사를 사용하며, 각 요청은 페이로드로 단일 JSON 인코딩 메시지를 포함하고, 콘텐츠 타입 헤더는 application/JSON으로 설정됩니다. 우리는 메시지 라우팅 및 REST 게이트웨이가 요청을 해석하는 방법을 나타내기 위해 엔드포인트 URL에서 스테이션 이름으로 todo-CDC-events를 사용하고 단일 수량 값을 사용합니다:
http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
속성 debezium.sink.http.authentication.type=jwt은 HTTP Client 싱크가 JWT 인증을 사용해야 함을 나타냅니다. 사용자 이름과 암호 속성은 명확하지만, debezium.sink.http.authentication.jwt.The URL 속성은 설명이 필요합니다. 초기 토큰은 /auth/authenticate 엔드포인트를 사용하여 획득되며, 인증은 별도의 /auth/refreshToken 엔드포인트를 사용하여 갱신됩니다. HTTP Client에서의 JWT 인증은 주어진 기본 URL에 적절한 엔드포인트를 추가합니다.
Debezium Server는 다음 명령으로 시작할 수 있습니다:
$ docker compose up -d debezium-server
[+] Running 5/5
⠿ Container mongodb Healthy 1.5s
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Healthy 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1 Running 0.0s
⠿ Container debezium-server Started
7단계: 시스템이 작동하는지 확인
Memphis.dev 웹 UI의 todo-cdc-events 스테이션 개요 화면을 확인하여 생산자와 소비자가 연결되어 있고 메시지가 전달되고 있는지 확인합니다.
그리고, 프린팅-컨슈머 컨테이너의 로그를 출력하세요:
$ docker logs --tail 2 printing-consumer
메시지:
bytearray(b'{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"},{"type":"int64","optional":true,"field":"wallTime"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"tutorial.todo_application.todo_items.Envelope"},"payload":{"before":null,"after":"{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ec\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402475},\\"due_date\\": {\\"$date\\": 1684266602475},\\"description\\": \\"GMZVMKXVKOWIOEAVRYWR\\",\\"completed\\": false}","updateDescription":null,"source":{"version":"2.3.0-SNAPSHOT","connector":"mongodb","name":"tutorial","ts_ms":1684007402000,"snapshot":"false","db":"todo_application","sequence":null,"rs":"rs0","collection":"todo_items","ord":1,"lsid":null,"txnNumber":null,"wallTime":1684007402476},"op":"c","ts_ms":1684007402478,"transaction":null}}')
CDC 메시지의 형식
들어오는 메시지는 JSON 형식으로 포맷되어 있습니다. 메시지에는 두 개의 최상위 필드(스키마와 페이로드)가 있습니다. 스키마는 레코드 스키마(필드 이름과 타입)를 설명하고, 페이로드는 레코드의 변경 사항을 설명합니다. 페이로드 객체 자체는 레코드의 변경 전과 후의 값을 나타내는 두 개의 필드(before와 after)를 포함하고 있습니다.
MongoDB의 경우, Debezium Server는 레코드를 직렬화된 JSON 문자열로 인코딩합니다:
{
"before" : null,
"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}"
}
이는 이후 메시지 처리에 대한 영향을 미칠 것이며, 이는 이 시리즈의 향후 블로그 게시물에서 설명할 예정입니다.
축하합니다! 이제 Debezium Server를 사용하여 MongoDB 데이터베이스에서 데이터 변경 이벤트를 캡처하고, 이벤트를 Memphis.dev에 전달하여 다운스트림 처리를 하는 예제를 작동할 수 있게 되었습니다.
Part 3은 곧 출시됩니다!
Source:
https://dzone.com/articles/part-2-change-data-capture-cdc-for-mongodb-with-de