如何利用Apache Kafka與ScyllaDB實現變更數據捕獲

在ScyllaDB大學提供的這個實踐實驗室中,您將學習如何使用ScyllaDB CDC源連接器將ScyllaDB集群中表的行級變更事件推送至Kafka服務器。

什麼是ScyllaDB CDC?

簡而言之,變更數據捕獲(CDC)是一項功能,允許您不僅可以查詢數據庫表的當前狀態,還可以查詢對該表所做的所有變更的歷史記錄。從ScyllaDB企業版2021.1.1和ScyllaDB開源版4.3開始,CDC功能已經生產就緒(GA)。

在ScyllaDB中,CDC是可選的,並且是按表啟用的。對CDC啟用的表所做的變更歷史記錄存儲在單獨的相關聯表中。

您可以在創建或修改表時使用CDC選項來啟用CDC,例如:

 
CREATE TABLE ks.t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};

ScyllaDB CDC源連接器

ScyllaDB CDC Source Connector 是一個來源連接器,用於捕捉ScyllaDB集群中表的行級變更。它是一個Debezium連接器,與Kafka Connect兼容(需Kafka 2.6.0+)。該連接器讀取指定表的CDC日誌,並為每個行級INSERTUPDATEDELETE操作生成Kafka消息。連接器具有容錯性,在失敗時會重試從Scylla讀取數據。它定期使用Kafka Connect偏移追蹤保存ScyllaDB CDC日誌中的當前位置。每個生成的Kafka消息都包含有關源的信息,例如時間戳和表名稱。

注意:截至撰寫本文時,尚不支持集合類型(LISTSETMAP)和UDTs——具有這些類型的列將從生成的消息中省略。請關注此增強請求GitHub項目中的其他進展。

Confluent和Kafka Connect

Confluent 是一個全面性的資料串流平台,讓您能輕鬆存取、儲存及管理資料,以連續、即時的串流形式呈現。它擴展了 Apache Kafka 的優勢,加入企業級功能。Confluent 使得建立現代事件驅動應用程式變得容易,並獲得一個支援可擴展性、效能和可靠性的通用資料管道。

Kafka Connect 是一種工具,用於在 Apache Kafka 與其他資料系統之間可擴展且可靠地串流資料。它使得定義連接器變得簡單,這些連接器能夠將大量資料集在 Kafka 之間移動。它可將整個資料庫或從應用程式伺服器收集的指標匯入 Kafka 主題,使資料能以低延遲進行串流處理。

Kafka Connect 包含兩種類型的連接器:

  1. 來源連接器(Source connector):來源連接器將整個資料庫的資料匯入並將表更新串流至 Kafka 主題。來源連接器還能收集應用程式伺服器的指標並將資料儲存於 Kafka 主題,使資料能以低延遲進行串流處理。
  2. 接收連接器(Sink connector):接收連接器將 Kafka 主題的資料傳送至次要索引,如 Elasticsearch,或批次系統,如 Hadoop,以便進行離線分析。

使用 Docker 進行服務設置

在本實驗室中,您將使用 Docker。

請確保您的環境符合以下先決條件:

  • 適用於 Linux、Mac 或 Windows 的 Docker。
    • 注意:在 Docker 中運行 ScyllaDB 僅建議用於評估和嘗試 ScyllaDB。
  • ScyllaDB開源。為了獲得最佳性能,建議進行常規安裝。
  • Kafka和ScyllaDB服務需要8 GB以上的RAM。
  • docker-compose
  • Git

ScyllaDB安裝與初始化表格

首先,您將啟動一個三節點的ScyllaDB集群並創建一個啟用CDC的表格。

如果您還未這樣做,請從git下載示例:

 
git clone https://github.com/scylladb/scylla-code-samples.git cd scylla-code-samples/CDC_Kafka_Lab

這是您將使用的docker-compose文件。它啟動一個三節點的ScyllaDB集群:

 

version: "3"

services:
  scylla-node1:
    container_name: scylla-node1
    image: scylladb/scylla:5.0.0
    ports:
      - 9042:9042
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

  scylla-node2:
    container_name: scylla-node2
    image: scylladb/scylla:5.0.0
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

  scylla-node3:
    container_name: scylla-node3
    image: scylladb/scylla:5.0.0
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

啟動ScyllaDB集群:

 
docker-compose -f docker-compose-scylladb.yml up -d

等待一分鐘左右,並檢查ScyllaDB集群是否已啟動並處於正常狀態:

 
docker exec scylla-node1 nodetool status

接下來,您將使用cqlsh與ScyllaDB交互。創建一個keyspace,一個啟用CDC的表格,並向表格中插入一條記錄:

 

docker exec -ti scylla-node1 cqlsh 
CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1}; 
CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true}; 
INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20); 
exit

 

[guy@fedora cdc_test]$ docker-compose -f docker-compose-scylladb.yml up -d

Creating scylla-node1 ... done
Creating scylla-node2 ... done
Creating scylla-node3 ... done
[guy@fedora cdc_test]$ docker exec  scylla-node1 nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- 地址    負載      令牌      擁有   主機ID                              機架
UN  172.19.0.3  ?          256          ?       4d4eaad4-62a4-485b-9a05-61432516a737  rack1
UN  172.19.0.2  496 KB     256          ?       bec834b5-b0de-4d55-b13d-a8aa6800f0b9  rack1
UN  172.19.0.4  ?          256          ?       2788324e-548a-49e2-8337-976897c61238  rack1

Note: Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless
[guy@fedora cdc_test]$ docker exec -ti scylla-node1 cqlsh
Connected to  at 172.19.0.2:9042.
[cqlsh 5.0.1 | Cassandra 3.0.8 | CQL spec 3.3.1 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1};
cqlsh> CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
cqlsh> INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20);
cqlsh> exit
[guy@fedora cdc_test]$ 

Confluent設置與連接器配置

要啟動Kafka服務器,您將使用Confluent平台,該平台提供了一個用戶友好的網頁GUI來追蹤主題和消息。Confluent平台提供了一個docker-compose.yml文件來設置服務。

注意:這不是您在生產環境中使用Apache Kafka的方式。此示例僅用於培訓和開發目的。獲取文件:

 
wget -O docker-compose-confluent.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.3.0-post/cp-all-in-one/docker-compose.yml

接下來,下載ScyllaDB CDC連接器:

 
wget -O scylla-cdc-plugin.jar https://github.com/scylladb/scylla-cdc-source-connector/releases/download/scylla-cdc-source-connector-1.0.1/scylla-cdc-source-connector-1.0.1-jar-with-dependencies.jar

將 ScyllaDB CDC 連接器添加到 Confluent 連接服務插件目錄中,使用 Docker 卷通過編輯 docker-compose-confluent.yml 添加以下兩行,替換目錄為您的 scylla-cdc-plugin.jar 文件目錄。

 

 image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
     hostname: connect
     container_name: connect
+    volumes:
+      - <directory>/scylla-cdc-plugin.jar:/usr/share/java/kafka/plugins/scylla-cdc-plugin.jar
     depends_on:
       - broker
       - schema-registry

啟動 Confluent 服務:

 
docker-compose -f docker-compose-confluent.yml up -d

等待一分鐘左右,然後訪問 http://localhost:9021 以進入 Confluent 網頁界面。

通過 Confluent 儀表板添加 ScyllaConnector:

點擊插件添加 Scylla 連接器:

在“Hosts”中填寫 Scylla 節點之一的 IP 地址(您可以從 nodetool status 命令的輸出中看到)和端口 9042,這是 ScyllaDB 服務監聽的端口。

“Namespace”是在 ScyllaDB 中之前創建的 keyspace。

注意,ks.my_table 可能需要一分鐘左右的時間才會出現:

 

測試 Kafka 消息

您可以看到 MyScyllaCluster.ks.my_table 是由 ScyllaDB CDC 連接器創建的話題。

現在,從 Topics 面板檢查 Kafka 消息:

選擇與您在 ScyllaDB 中創建的 keyspace 和表名相同的話題:

從“概覽”標籤,您可以看到話題信息。在底部,顯示此話題位於分區 0。

A partition is the smallest storage unit that holds a subset of records owned by a topic. Each partition is a single log file where records are written to it in an append-only fashion. The records in the partitions are each assigned a sequential identifier called the offset, which is unique for each record within the partition. The offset is an incremental and immutable number maintained by Kafka.

如您所知,ScyllaDB CDC 訊息已傳送至ks.my_table主題,該主題的分區 ID 為 0。接下來,前往“訊息”標籤,並將分區 ID 0 輸入到“偏移量”欄位中:

從 Kafka 主題訊息的輸出可以看出,ScyllaDB 表的INSERT事件及其數據已通過 Scylla CDC Source Connector 轉換為 Kafka 訊息。點擊訊息以查看完整的訊息資訊:

訊息中包含 ScyllaDB 表名、keyspace 名稱以及時間,以及操作前後的數據狀態。由於這是一次插入操作,插入前的數據為空。

接下來,向 ScyllaDB 表中插入另一行:

 

docker exec -ti scylla-node1 cqlsh 
INSERT INTO ks.my_table(pk, ck, v) VALUES (200, 50, 70);

現在,在 Kafka 中等待幾秒鐘,您將能看到新訊息的詳細資訊:

清理

完成本實驗室工作後,您可以停止並移除 Docker 容器和映像。

要查看所有容器 ID 的列表:

 
docker container ls -aq

然後,您可以停止並移除不再使用的容器:

 

docker stop <ID_or_Name> 
docker rm <ID_or_Name>

稍後,如果您想重新運行實驗,可以按照步驟並像之前一樣使用 docker-compose。

總結

通過 CDC 源連接器,一個與 Kafka Connect 兼容的 Kafka 插件,您可以捕獲 ScyllaDB 表中所有行級別的變更(INSERTUPDATEDELETE),並將這些事件轉換為 Kafka 訊息。然後,您可以從其他應用程序消費數據或使用 Kafka 執行任何其他操作。

Source:
https://dzone.com/articles/change-data-capture-apache-kafka-and-scylladb