Neste laboratório prático da ScyllaDB University, você aprenderá a usar o ScyllaDB CDC source connector para enviar eventos de alterações de linha nas tabelas de um cluster ScyllaDB para um servidor Kafka.
O que é ScyllaDB CDC?
Para recapitular, a Captura de Dados de Alteração (CDC) é um recurso que permite não apenas consultar o estado atual de uma tabela de banco de dados, mas também consultar a história de todas as alterações feitas na tabela. A CDC está pronta para produção (GA) a partir de ScyllaDB Enterprise 2021.1.1 e ScyllaDB Open Source 4.3.
No ScyllaDB, a CDC é opcional e habilitada em uma base por tabela. O histórico de alterações feitas em uma tabela habilitada para CDC é armazenado em uma tabela associada separada.
Você pode habilitar a CDC ao criar ou alterar uma tabela usando a opção CDC, por exemplo:
CREATE TABLE ks.t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
ScyllaDB CDC Source Connector
O ScyllaDB CDC Source Connector é um conector de origem que captura alterações em nível de linha nas tabelas de um cluster ScyllaDB. É um conector Debezium, compatível com o Kafka Connect (com o Kafka 2.6.0+). O conector lê o log de CDC para tabelas especificadas e produz mensagens do Kafka para cada operação de nível de linha INSERT
, UPDATE
ou DELETE
. O conector é tolerante a falhas, tentando novamente a leitura de dados do Scylla em caso de falha. Ele periodicamente salva a posição atual no log de CDC do ScyllaDB usando o rastreamento de deslocamento do Kafka Connect. Cada mensagem gerada do Kafka contém informações sobre a fonte, como a marcação de tempo e o nome da tabela.
Nota: no momento da redação, não há suporte para tipos de coleção (LIST
, SET
, MAP
) e UDTs—colunas com esses tipos são omitidas das mensagens geradas. Fique atento à solicitação de melhoria e outras novidades no projeto do GitHub.
Confluent e Kafka Connect
O Confluent é uma plataforma completa de streaming de dados que permite acessar, armazenar e gerenciar dados como fluxos contínuos e em tempo real. Amplia os benefícios do Apache Kafka com recursos de nível corporativo. O Confluent facilita a construção de aplicativos modernos orientados a eventos e oferece uma pipeline de dados universal, suportando escalabilidade, desempenho e confiabilidade.
O Kafka Connect é uma ferramenta para transmitir dados de forma escalável e confiável entre o Apache Kafka e outros sistemas de dados. Simplifica a definição de conectores que movem grandes conjuntos de dados para dentro e para fora do Kafka. Pode ingerir bancos de dados inteiros ou coletar métricas de servidores de aplicativos nos tópicos do Kafka, disponibilizando os dados para processamento em fluxo com baixa latência.
O Kafka Connect inclui dois tipos de conectores:
- Conector de origem: Os conectores de origem ingerem bancos de dados inteiros e transmitem atualizações de tabelas para tópicos do Kafka. Os conectores de origem também podem coletar métricas de servidores de aplicativos e armazenar os dados em tópicos do Kafka, disponibilizando os dados para processamento em fluxo com baixa latência.
- Conector de destino: Os conectores de destino entregam dados de tópicos do Kafka para índices secundários, como o Elasticsearch, ou sistemas em lote, como o Hadoop, para análise offline.
Configuração de Serviço com Docker
Neste laboratório, você usará o Docker.
Certifique-se de que seu ambiente atenda aos seguintes pré-requisitos:
- Docker para Linux, Mac ou Windows.
- Nota: executar o ScyllaDB no Docker é recomendado apenas para avaliação e teste do ScyllaDB.
- ScyllaDB de código aberto. Para o melhor desempenho, é recomendada a instalação regular.
- 8 GB de RAM ou superior para os serviços Kafka e ScyllaDB.
- docker-compose
- Git
Instalação e Inicialização da Tabela ScyllaDB
Primeiro, você iniciará um cluster ScyllaDB de três nós e criará uma tabela com CDC habilitado.
Se ainda não o fez, baixe o exemplo do git:
git clone https://github.com/scylladb/scylla-code-samples.git cd scylla-code-samples/CDC_Kafka_Lab
Este é o arquivo docker-compose que você usará. Ele inicia um Cluster ScyllaDB de três nós:
version: "3"
services:
scylla-node1:
container_name: scylla-node1
image: scylladb/scylla:5.0.0
ports:
- 9042:9042
restart: always
command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0
scylla-node2:
container_name: scylla-node2
image: scylladb/scylla:5.0.0
restart: always
command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0
scylla-node3:
container_name: scylla-node3
image: scylladb/scylla:5.0.0
restart: always
command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0
Inicie o cluster ScyllaDB:
docker-compose -f docker-compose-scylladb.yml up -d
Aguarde um minuto ou mais e verifique se o cluster ScyllaDB está ativo e em status normal:
docker exec scylla-node1 nodetool status
Em seguida, você usará o cqlsh para interagir com o ScyllaDB. Crie um keyspace, uma tabela com CDC habilitado e insira uma linha na tabela:
docker exec -ti scylla-node1 cqlsh
CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1};
CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20);
exit
[guy@fedora cdc_test]$ docker-compose -f docker-compose-scylladb.yml up -d
Creating scylla-node1 ... done
Creating scylla-node2 ... done
Creating scylla-node3 ... done
[guy@fedora cdc_test]$ docker exec scylla-node1 nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Endereço Carga Tokens Possui ID do Host Rack
UN 172.19.0.3 ? 256 ? 4d4eaad4-62a4-485b-9a05-61432516a737 rack1
UN 172.19.0.2 496 KB 256 ? bec834b5-b0de-4d55-b13d-a8aa6800f0b9 rack1
UN 172.19.0.4 ? 256 ? 2788324e-548a-49e2-8337-976897c61238 rack1
Note: Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless
[guy@fedora cdc_test]$ docker exec -ti scylla-node1 cqlsh
Connected to at 172.19.0.2:9042.
[cqlsh 5.0.1 | Cassandra 3.0.8 | CQL spec 3.3.1 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1};
cqlsh> CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
cqlsh> INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20);
cqlsh> exit
[guy@fedora cdc_test]$
Configuração do Confluent e do Conector
Para iniciar um servidor Kafka, você usará a plataforma Confluent, que fornece uma GUI web amigável para acompanhar tópicos e mensagens. A plataforma Confluent fornece um arquivo docker-compose.yml
para configurar os serviços.
Nota: não é assim que você usaria o Apache Kafka em produção. O exemplo é útil apenas para fins de treinamento e desenvolvimento. Obtenha o arquivo:
wget -O docker-compose-confluent.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.3.0-post/cp-all-in-one/docker-compose.yml
Em seguida, baixe o conector CDC do ScyllaDB:
wget -O scylla-cdc-plugin.jar https://github.com/scylladb/scylla-cdc-source-connector/releases/download/scylla-cdc-source-connector-1.0.1/scylla-cdc-source-connector-1.0.1-jar-with-dependencies.jar
Adicione o conector CDC do ScyllaDB ao diretório de plugins do serviço Confluent Connect usando um volume Docker, editando docker-compose-confluent.ym
l para adicionar as duas linhas conforme abaixo, substituindo o diretório pelo diretório do seu arquivo scylla-cdc-plugin.jar
.
image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
hostname: connect
container_name: connect
+ volumes:
+ - <directory>/scylla-cdc-plugin.jar:/usr/share/java/kafka/plugins/scylla-cdc-plugin.jar
depends_on:
- broker
- schema-registry
Inicie os serviços Confluent:
docker-compose -f docker-compose-confluent.yml up -d
Aguarde um minuto ou mais, depois acesse http://localhost:9021
para a GUI da web do Confluent.
Adicione o ScyllaConnector usando o dashboard do Confluent:
Adicione o Scylla Connector clicando no plugin:
Preencha o “Hosts” com o endereço IP de um dos nós Scylla (você pode ver isso no resultado do comando nodetool status) e porta 9042, que é escutada pelo serviço ScyllaDB.
O “Namespace” é o keyspace que você criou anteriormente no ScyllaDB.
Observe que pode levar um minuto ou mais para que ks.my_table
apareça:
Teste de Mensagens Kafka
Você pode ver que MyScyllaCluster.ks.my_table
é o tópico criado pelo conector CDC do ScyllaDB.
Agora, verifique as mensagens Kafka a partir do painel de Tópicos:
Selecione o tópico, que é o mesmo que o nome do keyspace e da tabela que você criou no ScyllaDB:
Na aba “Visão Geral”, você pode ver as informações do tópico. No final, mostra que este tópico está no partition 0.
A partition is the smallest storage unit that holds a subset of records owned by a topic. Each partition is a single log file where records are written to it in an append-only fashion. The records in the partitions are each assigned a sequential identifier called the offset, which is unique for each record within the partition. The offset is an incremental and immutable number maintained by Kafka.
Como você já sabe, as mensagens CDC do ScyllaDB são enviadas para o tópico ks.my_table
, e o id da partição do tópico é 0. Em seguida, vá para a aba “Mensagens” e insira o id da partição 0 no campo “offset”:
Você pode ver na saída das mensagens do tópico Kafka que o evento de INSERT
da tabela ScyllaDB e os dados foram transferidos para mensagens Kafka pelo Scylla CDC Source Connector. Clique na mensagem para ver a informação completa da mensagem:
A mensagem contém o nome da tabela ScyllaDB, o nome do keyspace com a hora, bem como o status dos dados antes e depois da ação. Como se trata de uma operação de inserção, os dados antes da inserção são nulos.
Em seguida, insira outra linha na tabela ScyllaDB:
docker exec -ti scylla-node1 cqlsh
INSERT INTO ks.my_table(pk, ck, v) VALUES (200, 50, 70);
Agora, no Kafka, espere alguns segundos e você poderá ver os detalhes da nova Mensagem:
Limpeza
Depois de concluir este laboratório, você pode parar e remover os contêineres e imagens do Docker.
Para ver uma lista de todos os IDs de contêiner:
docker container ls -aq
Então você pode parar e remover os contêineres que não estão mais em uso:
docker stop <ID_or_Name>
docker rm <ID_or_Name>
Mais tarde, se quiser reexecutar o laboratório, você pode seguir os passos e usar o docker-compose como antes.
Resumo
Com o conector de origem CDC, um plugin Kafka compatível com Kafka Connect, você pode capturar todas as alterações de nível de linha da tabela ScyllaDB (INSERT
, UPDATE
ou DELETE
) e converter esses eventos em mensagens Kafka. Você pode então consumir os dados de outras aplicações ou realizar qualquer outra operação com o Kafka.
Source:
https://dzone.com/articles/change-data-capture-apache-kafka-and-scylladb