在本期ScyllaDB大学提供的实践实验室中,您将学习如何运用ScyllaDB CDC源连接器,将ScyllaDB集群中各表的行级变更事件推送至Kafka服务器。
**ScyllaDB CDC是什么?**
简而言之,变更数据捕获(CDC)是一项功能,它不仅允许您查询数据库表的当前状态,还能查询该表所有变更的历史记录。自ScyllaDB企业版2021.1.1及开源版4.3起,CDC功能已正式投入生产(GA)。
在ScyllaDB中,CDC为可选配置,按表逐个启用。对启用CDC的表所做的变更历史,将存储在与之关联的独立表中。
您可以在创建或修改表时,通过CDC选项来启用CDC,例如:
CREATE TABLE ks.t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
**ScyllaDB CDC源连接器**
ScyllaDB CDC源连接器是一款能够捕获ScyllaDB集群表中行级变更的源连接器。它基于Debezium技术,兼容Kafka Connect(需Kafka 2.6.0及以上版本)。该连接器针对指定的表读取CDC日志,并为每项行级INSERT
、UPDATE
或DELETE
操作生成Kafka消息。连接器具备容错能力,若读取数据失败,将重试从Scylla获取数据。它通过Kafka Connect偏移量跟踪功能定期保存当前在ScyllaDB CDC日志中的位置。每个生成的Kafka消息均包含源信息,如时间戳和表名。
注意:截至撰写时,尚不支持集合类型(LIST
、SET
、MAP
)和UDTs——具有这些类型的列将从生成的消息中省略。请关注此增强请求及GitHub项目中的其他进展。
Confluent与Kafka Connect
Confluent 是一个全面的数据流平台,它使您能够轻松地访问、存储和管理作为连续实时流的数据。它扩展了 Apache Kafka 的优势,并增加了企业级功能。Confluent 简化了构建现代事件驱动应用程序的过程,并提供了支持可扩展性、性能和可靠性的通用数据管道。
Kafka Connect 是一个用于在 Apache Kafka 和其他数据系统之间可扩展且可靠地流式传输数据的工具。它使得定义将大量数据集移入和移出 Kafka 的连接器变得简单。它可以摄取整个数据库或从应用程序服务器收集指标到 Kafka 主题中,使数据能够以低延迟进行流处理。
Kafka Connect 包含两种类型的连接器:
- 源连接器:源连接器将整个数据库摄取并流式传输表更新到 Kafka 主题。源连接器还可以从应用程序服务器收集指标,并将数据存储在 Kafka 主题中,使数据能够以低延迟进行流处理。
- 接收连接器:接收连接器将数据从 Kafka 主题传递到辅助索引,如 Elasticsearch,或批处理系统,如 Hadoop,用于离线分析。
使用 Docker 设置服务
在本实验中,您将使用 Docker。
请确保您的环境满足以下先决条件:
- 适用于 Linux、Mac 或 Windows 的 Docker。
- 注意:在 Docker 中运行 ScyllaDB 仅推荐用于评估和尝试 ScyllaDB。
- ScyllaDB开源。为获得最佳性能,建议进行常规安装。
- Kafka和ScyllaDB服务至少需要8 GB的RAM。
- docker-compose
- Git
ScyllaDB安装与初始化表
首先,您将启动一个三节点的ScyllaDB集群,并创建一个启用CDC的表。
如果您尚未这样做,请从git下载示例:
git clone https://github.com/scylladb/scylla-code-samples.git cd scylla-code-samples/CDC_Kafka_Lab
这是您将使用的docker-compose文件。它启动一个三节点ScyllaDB集群:
version: "3"
services:
scylla-node1:
container_name: scylla-node1
image: scylladb/scylla:5.0.0
ports:
- 9042:9042
restart: always
command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0
scylla-node2:
container_name: scylla-node2
image: scylladb/scylla:5.0.0
restart: always
command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0
scylla-node3:
container_name: scylla-node3
image: scylladb/scylla:5.0.0
restart: always
command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0
启动ScyllaDB集群:
docker-compose -f docker-compose-scylladb.yml up -d
等待一分钟或更长时间,并检查ScyllaDB集群是否已启动并处于正常状态:
docker exec scylla-node1 nodetool status
接下来,您将使用cqlsh与ScyllaDB交互。创建一个keyspace,一个启用CDC的表,并向表中插入一行:
docker exec -ti scylla-node1 cqlsh
CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1};
CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20);
exit
[guy@fedora cdc_test]$ docker-compose -f docker-compose-scylladb.yml up -d
Creating scylla-node1 ... done
Creating scylla-node2 ... done
Creating scylla-node3 ... done
[guy@fedora cdc_test]$ docker exec scylla-node1 nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- 地址 负载 令牌 拥有 主机ID 机架
UN 172.19.0.3 ? 256 ? 4d4eaad4-62a4-485b-9a05-61432516a737 rack1
UN 172.19.0.2 496 KB 256 ? bec834b5-b0de-4d55-b13d-a8aa6800f0b9 rack1
UN 172.19.0.4 ? 256 ? 2788324e-548a-49e2-8337-976897c61238 rack1
Note: Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless
[guy@fedora cdc_test]$ docker exec -ti scylla-node1 cqlsh
Connected to at 172.19.0.2:9042.
[cqlsh 5.0.1 | Cassandra 3.0.8 | CQL spec 3.3.1 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1};
cqlsh> CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
cqlsh> INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20);
cqlsh> exit
[guy@fedora cdc_test]$
Confluent设置与连接器配置
要启动Kafka服务器,您将使用Confluent平台,该平台提供了一个用户友好的Web GUI来跟踪主题和消息。Confluent平台提供了一个docker-compose.yml
文件来设置服务。
注意:这不是您在生产中使用Apache Kafka的方式。该示例仅对培训和开发目的有用。获取文件:
wget -O docker-compose-confluent.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.3.0-post/cp-all-in-one/docker-compose.yml
接下来,下载ScyllaDB CDC连接器:
wget -O scylla-cdc-plugin.jar https://github.com/scylladb/scylla-cdc-source-connector/releases/download/scylla-cdc-source-connector-1.0.1/scylla-cdc-source-connector-1.0.1-jar-with-dependencies.jar
将 ScyllaDB CDC 连接器添加到 Confluent 连接服务插件目录中,通过编辑 docker-compose-confluent.yml
文件,添加如下两行代码,将目录替换为您的 scylla-cdc-plugin.jar
文件所在目录。
image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
hostname: connect
container_name: connect
+ volumes:
+ - <directory>/scylla-cdc-plugin.jar:/usr/share/java/kafka/plugins/scylla-cdc-plugin.jar
depends_on:
- broker
- schema-registry
启动 Confluent 服务:
docker-compose -f docker-compose-confluent.yml up -d
等待片刻,然后访问 http://localhost:9021
进入 Confluent 网页界面。
通过 Confluent 仪表板添加 ScyllaConnector:
点击插件添加 Scylla Connector:
在“Hosts”中填写其中一个 Scylla 节点的 IP 地址(可通过 nodetool status 命令输出查看)和端口 9042,该端口由 ScyllaDB 服务监听。
“Namespace”填写之前在 ScyllaDB 中创建的键空间。
注意,可能需要等待一分钟 ks.my_table
才会显示出来:
测试 Kafka 消息
您可以看到 MyScyllaCluster.ks.my_table
是由 ScyllaDB CDC 连接器创建的主题。
现在,从 Topics 面板检查 Kafka 消息:
选择与您在 ScyllaDB 中创建的键空间和表名相同的主题:
在“概览”标签页中,您可以查看主题信息。底部显示此主题位于分区 0。
A partition is the smallest storage unit that holds a subset of records owned by a topic. Each partition is a single log file where records are written to it in an append-only fashion. The records in the partitions are each assigned a sequential identifier called the offset, which is unique for each record within the partition. The offset is an incremental and immutable number maintained by Kafka.
如您所知,ScyllaDB的CDC消息被发送到ks.my_table
主题,该主题的分区ID为0。接下来,切换到“消息”标签页,在“偏移量”字段中输入分区ID 0:
从Kafka主题消息的输出中可以看出,ScyllaDB表的INSERT
事件及其数据已被Scylla CDC Source Connector转换为Kafka消息。点击该消息可查看完整信息:
消息中包含了ScyllaDB表名、键空间名及时间,以及操作前后的数据状态。由于这是一次插入操作,插入前的数据为空。
接下来,向ScyllaDB表中插入另一行数据:
docker exec -ti scylla-node1 cqlsh
INSERT INTO ks.my_table(pk, ck, v) VALUES (200, 50, 70);
现在,在Kafka中稍等片刻,您将能看到新消息的详细内容:
清理工作
完成本实验后,您可以停止并移除Docker容器及镜像。
要查看所有容器ID的列表:
docker container ls -aq
然后,您可以停止并移除不再使用的容器:
docker stop <ID_or_Name>
docker rm <ID_or_Name>
若日后需重新运行实验,只需按照步骤操作,并如前使用docker-compose。
总结
借助与Kafka Connect兼容的CDC源连接器——Kafka插件,您能够捕获ScyllaDB表行级别的所有变更(INSERT
、UPDATE
或DELETE
),并将这些事件转换为Kafka消息。随后,您可以从其他应用程序消费这些数据,或利用Kafka执行其他任何操作。
Source:
https://dzone.com/articles/change-data-capture-apache-kafka-and-scylladb