Apache Kafka와 ScyllaDB를 사용한 변경 데이터 캡처 사용 방법

이 ScyllaDB University의 실습 실험실에서는 ScyllaDB CDC 소스 커넥터를 사용하여 ScyllaDB 클러스터의 테이블에서 발생하는 행 수준 변경 이벤트를 Kafka 서버로 푸시하는 방법을 배우게 됩니다.

ScyllaDB CDC란?

요약하자면, Change Data Capture(CDC)는 데이터베이스 테이블의 현재 상태를 쿼리할 수 있을 뿐만 아니라 테이블에 이루어진 모든 변경 이력을 쿼리할 수 있는 기능입니다. CDC는 ScyllaDB Enterprise 2021.1.1 및 ScyllaDB Open Source 4.3부터 생산 준비가 완료되었습니다(GA).

ScyllaDB에서 CDC는 옵션이며 테이블 단위로 설정됩니다. CDC가 활성화된 테이블에 대한 변경 이력은 별도의 관련 테이블에 저장됩니다.

다음과 같이 CDC 옵션을 사용하여 테이블을 생성하거나 변경할 때 CDC를 활성화할 수 있습니다.

 
CREATE TABLE ks.t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};

ScyllaDB CDC 소스 커넥터

ScyllaDB CDC Source Connector는 ScyllaDB 클러스터 내의 테이블에서 행 수준 변경 사항을 캡처하는 소스 커넥터입니다. 이는 Debezium 커넥터로, Kafka Connect와 호환되며(Kafka 2.6.0+ 기준). 커넥터는 지정된 테이블에 대한 CDC 로그를 읽고 각 행 수준 INSERT, UPDATE, DELETE 작업에 대한 Kafka 메시지를 생성합니다. 커넥터는 실패 시 Scylla에서 데이터를 재시도하여 고장 내성이 있으며, Kafka Connect 오프셋 추적을 사용하여 ScyllaDB CDC 로그에서 현재 위치를 주기적으로 저장합니다. 각 생성된 Kafka 메시지는 타임스탬프와 테이블 이름 등의 소스 정보를 포함합니다.

참고: 현재 작성 시점에서 LIST, SET, MAP 같은 컬렉션 유형 및 UDTs에 대한 지원이 없으며, 이러한 유형의 열은 생성된 메시지에서 생략됩니다. 이러한 개선 요청GitHub 프로젝트의 다른 개발 사항에 대해 최신 상태를 유지하세요.

Confluent 및 Kafka Connect

Confluent는 연속적이고 실시간 스트림으로 데이터에 쉽게 접근하고, 저장하며, 관리할 수 있는 전체 규모의 데이터 스트리밍 플랫폼입니다. 이는 Apache Kafka의 이점을 확장하면서 기업용 등급의 기능을 제공합니다. Confluent를 통해 현대적이고 이벤트 주도적인 애플리케이션을 구축하고, 확장성, 성능 및 신뢰성을 지원하는 보편적인 데이터 파이프라인을 확보하실 수 있습니다.

Kafka Connect는 Apache Kafka와 다른 데이터 시스템 간에 데이터를 효율적이고 안정적으로 스트리밍하기 위한 도구입니다. 대량의 데이터 세트를 Kafka로 이동하거나 나가는 커넥터를 간단하게 정의할 수 있습니다. 전체 데이터베이스를 포함하거나 애플리케이션 서버의 메트릭을 Kafka 토픽에 수집할 수 있으며, 이를 통해 낮은 레이턴시로 스트림 처리에 데이터를 활용할 수 있습니다.

Kafka Connect는 두 가지 유형의 커넥터를 포함합니다:

  1. 소스 커넥터: 소스 커넥터는 전체 데이터베이스를 인서트하고 테이블 업데이트를 Kafka 토픽으로 스트리밍합니다. 또한 애플리케이션 서버의 메트릭을 수집하여 Kafka 토픽에 저장하며, 이를 통해 낮은 레이턴시로 스트림 처리에 데이터를 활용할 수 있습니다.
  2. 싱크 커넥터: 싱크 커넥터는 Kafka 토픽의 데이터를 Elasticsearch와 같은 보조 인덱스나 Hadoop과 같은 배치 시스템으로 전달하여 오프라인 분석을 위해 사용할 수 있습니다.

Docker로 서비스 설정

이 실습에서는 Docker를 사용하게 됩니다.

다음 사전 조건을 충족하는지 확인하십시오:

  • Linux, Mac 또는 Windows용 Docker.
    • 참고: Docker에서 ScyllaDB를 실행하는 것은 ScyllaDB를 평가하고 시험해 보는 데에만 권장됩니다.
  • ScyllaDB 오픈 소스. 최상의 성능을 위해 일반 설치를 권장합니다.
  • Kafka 및 ScyllaDB 서비스에 대해 8GB 이상의 RAM.
  • docker-compose
  • Git

ScyllaDB 설치 및 테이블 초기화

먼저, 세 노드의 ScyllaDB 클러스터를 시작하고 CDC가 활성화된 테이블을 생성합니다.

아직 수행하지 않았다면 git에서 예제를 다운로드하세요:

 
git clone https://github.com/scylladb/scylla-code-samples.git cd scylla-code-samples/CDC_Kafka_Lab

이것은 사용할 docker-compose 파일입니다. 이 파일은 세 노드 ScyllaDB 클러스터를 시작합니다:

 

version: "3"

services:
  scylla-node1:
    container_name: scylla-node1
    image: scylladb/scylla:5.0.0
    ports:
      - 9042:9042
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

  scylla-node2:
    container_name: scylla-node2
    image: scylladb/scylla:5.0.0
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

  scylla-node3:
    container_name: scylla-node3
    image: scylladb/scylla:5.0.0
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

ScyllaDB 클러스터 시작:

 
docker-compose -f docker-compose-scylladb.yml up -d

1분 정도 기다린 후 ScyllaDB 클러스터가 정상 상태로 올라오고 있는지 확인하세요:

 
docker exec scylla-node1 nodetool status

다음으로, cqlsh를 사용하여 ScyllaDB와 상호 작용합니다. 키스페이스를 생성하고 CDC가 활성화된 테이블을 생성하고 테이블에 행을 삽입하세요:

 

docker exec -ti scylla-node1 cqlsh 
CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1}; 
CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true}; 
INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20); 
exit

 

[guy@fedora cdc_test]$ docker-compose -f docker-compose-scylladb.yml up -d

Creating scylla-node1 ... done
Creating scylla-node2 ... done
Creating scylla-node3 ... done
[guy@fedora cdc_test]$ docker exec  scylla-node1 nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- 주소    부하      토큰      소유   호스트 ID                              랙
UN  172.19.0.3  ?          256          ?       4d4eaad4-62a4-485b-9a05-61432516a737  rack1
UN  172.19.0.2  496 KB     256          ?       bec834b5-b0de-4d55-b13d-a8aa6800f0b9  rack1
UN  172.19.0.4  ?          256          ?       2788324e-548a-49e2-8337-976897c61238  rack1

Note: Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless
[guy@fedora cdc_test]$ docker exec -ti scylla-node1 cqlsh
Connected to  at 172.19.0.2:9042.
[cqlsh 5.0.1 | Cassandra 3.0.8 | CQL spec 3.3.1 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1};
cqlsh> CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
cqlsh> INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20);
cqlsh> exit
[guy@fedora cdc_test]$ 

Confluent 설정 및 커넥터 구성

Kafka 서버를 시작하려면 Confluent 플랫폼을 사용하세요. 이 플랫폼은 토픽과 메시지를 추적하기 위한 사용자 친화적인 웹 GUI를 제공합니다. Confluent 플랫폼은 서비스를 설정하기 위한 docker-compose.yml 파일을 제공합니다.

참고: 이것은 프로덕션에서 Apache Kafka를 사용하는 방법이 아닙니다. 이 예제는 교육 및 개발 목적으로만 유용합니다. 파일 다운로드:

 
wget -O docker-compose-confluent.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.3.0-post/cp-all-in-one/docker-compose.yml

다음으로, ScyllaDB CDC 커넥터를 다운로드하세요:

 
wget -O scylla-cdc-plugin.jar https://github.com/scylladb/scylla-cdc-source-connector/releases/download/scylla-cdc-source-connector-1.0.1/scylla-cdc-source-connector-1.0.1-jar-with-dependencies.jar

ScyllaDB CDC 커넥터를 Docker 볼륨을 사용하여 Confluent 커넥트 서비스 플러그인 디렉토리에 추가하려면 docker-compose-confluent.yml을 편집하여 다음과 같이 두 줄을 추가하세요. 여기서 디렉토리는 scylla-cdc-plugin.jar 파일의 디렉토리로 대체하세요.

 

 image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
     hostname: connect
     container_name: connect
+    volumes:
+      - <directory>/scylla-cdc-plugin.jar:/usr/share/java/kafka/plugins/scylla-cdc-plugin.jar
     depends_on:
       - broker
       - schema-registry

Confluent 서비스를 시작하세요:

 
docker-compose -f docker-compose-confluent.yml up -d

1분 정도 기다린 후 http://localhost:9021에 접속하여 Confluent 웹 GUI를 사용하세요.

Confluent 대시보드를 사용하여 ScyllaConnector를 추가하세요:

플러그인을 클릭하여 Scylla 커넥터를 추가하세요:

“Hosts”에 Scylla 노드 중 하나의 IP 주소와 9042 포트를 입력하세요. 이는 ScyllaDB 서비스가 수신 대기하는 포트입니다.

“Namespace”는 이전에 ScyllaDB에서 생성한 키스페입니다.

ks.my_table가 나타나기까지 1분 정도 기다릴 수 있습니다.

 

Kafka 메시지 테스트

MyScyllaCluster.ks.my_table가 ScyllaDB CDC 커넥터에 의해 생성된 토픽임을 확인할 수 있습니다.

이제 Topics 패널에서 Kafka 메시지를 확인하세요:

ScyllaDB에서 생성한 키스페와 테이블 이름과 동일한 토픽을 선택하세요:

Overview” 탭에서 토픽 정보를 볼 수 있습니다. 맨 아래에는 이 토픽이 파티션 0에 있음이 표시됩니다.

A partition is the smallest storage unit that holds a subset of records owned by a topic. Each partition is a single log file where records are written to it in an append-only fashion. The records in the partitions are each assigned a sequential identifier called the offset, which is unique for each record within the partition. The offset is an incremental and immutable number maintained by Kafka.

아시다시피, ScyllaDB CDC 메시지는 ks.my_table 토픽으로 전송되며, 토픽의 파티션 ID는 0입니다. 다음으로 “메시지” 탭으로 이동하여 “오프셋” 필드에 파티션 ID 0을 입력하세요:

Kafka 토픽 메시지의 출력을 통해 ScyllaDB 테이블 INSERT 이벤트와 데이터가 Scylla CDC Source Connector에 의해 Kafka 메시지로 전송되었음을 확인할 수 있습니다. 메시지를 클릭하여 전체 메시지 정보를 볼 수 있습니다:

메시지에는 행동 전후의 데이터 상태와 함께 시간, ScyllaDB 테이블 이름 및 키스페이름이 포함되어 있습니다. 이 작업이 삽입 작업이므로 삽입 전의 데이터는 null입니다.

다음으로 ScyllaDB 테이블에 다른 행을 삽입하세요:

 

docker exec -ti scylla-node1 cqlsh 
INSERT INTO ks.my_table(pk, ck, v) VALUES (200, 50, 70);

이제 Kafka에서 몇 초 기다리면 새 메시지의 세부 정보를 볼 수 있습니다:

정리

이 실습을 마치신 후 Docker 컨테이너와 이미지를 중지하고 제거하실 수 있습니다.

모든 컨테이너 ID 목록을 보려면:

 
docker container ls -aq

그런 다음 더 이상 사용하지 않는 컨테이너를 중지하고 제거하실 수 있습니다:

 

docker stop <ID_or_Name> 
docker rm <ID_or_Name>

나중에 실습을 재실행하고 싶으시면 이전과 같이 단계를 따르고 docker-compose를 사용하시면 됩니다.

요약

CDC 소스 커넥터와 Kafka Connect와 호환되는 Kafka 플러그인을 사용하면 ScyllaDB 테이블 행 수준 변경 사항(INSERT, UPDATE 또는 DELETE)을 캡처하고 해당 이벤트를 Kafka 메시지로 변환할 수 있습니다. 그런 다음 다른 애플리케이션에서 데이터를 소비하거나 Kafka로 다른 작업을 수행할 수 있습니다.

Source:
https://dzone.com/articles/change-data-capture-apache-kafka-and-scylladb