Comment utiliser la capture de données modifiées avec Apache Kafka et ScyllaDB

Dans ce laboratoire pratique de l’Université ScyllaDB, vous apprendrez à utiliser le connecteur source CDC ScyllaDB pour envoyer les événements de changements au niveau des lignes dans les tables d’un cluster ScyllaDB vers un serveur Kafka.

Qu’est-ce que le CDC ScyllaDB?

Pour résumer, la Capture des Données de Changements (CDC) est une fonctionnalité qui vous permet non seulement de consulter l’état actuel d’une table de base de données, mais aussi de consulter l’historique de tous les changements effectués sur la table. Le CDC est prêt pour la production (GA) à partir de ScyllaDB Entreprise 2021.1.1 et ScyllaDB Open Source 4.3.

Dans ScyllaDB, le CDC est optionnel et activé au niveau de la table. L’historique des changements apportés à une table activée pour le CDC est stocké dans une table associée distincte.

Vous pouvez activer le CDC lors de la création ou de la modification d’une table en utilisant l’option CDC, par exemple:

 
CREATE TABLE ks.t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};

Connecteur Source ScyllaDB CDC

Le ScyllaDB CDC Source Connector est un connecteur source capturant les modifications au niveau des lignes dans les tables d’un cluster ScyllaDB. Il s’agit d’un Debezium connecteur, compatible avec Kafka Connect (avec Kafka 2.6.0+). Le connecteur lit le journal CDC pour les tables spécifiées et produit des messages Kafka pour chaque opération de niveau ligne INSERT, UPDATE, ou DELETE. Le connecteur est tolérant aux pannes, essayant de nouveau la lecture des données à partir de Scylla en cas d’échec. Il enregistre périodiquement la position actuelle dans le journal CDC ScyllaDB en utilisant le suivi des décalages Kafka Connect. Chaque message Kafka généré contient des informations sur la source, telles que l’horodatage et le nom de la table.

Note: au moment de la rédaction, il n’y a pas de support pour les types de collections (LIST, SET, MAP) et les UDTs—les colonnes avec ces types sont omises des messages générés. Restez à jour sur cette demande d’amélioration et autres développements dans le projet GitHub.

Confluent et Kafka Connect

Confluent est une plateforme complète de diffusion de données qui vous permet d’accéder, stocker et gérer des données sous forme de flux continus et en temps réel. Elle étend les avantages d’Apache Kafka avec des fonctionnalités de niveau entreprise. Confluent facilite la construction d’applications modernes basées sur des événements et offre un pipeline universel de données, soutenu par la scalabilité, la performance et la fiabilité.

Kafka Connect est un outil pour diffuser de manière scalable et fiable des données entre Apache Kafka et d’autres systèmes de données. Il simplifie la définition de connecteurs qui déplacent de grands ensembles de données dans et hors de Kafka. Il peut ingérer des bases de données entières ou collecter des métriques à partir des serveurs d’applications pour les intégrer dans des sujets Kafka, rendant les données disponibles pour le traitement en flux avec faible latence.

Kafka Connect comprend deux types de connecteurs:

  1. Connecteur source : Les connecteurs source ingèrent des bases de données entières et diffusent les mises à jour de table vers des sujets Kafka. Les connecteurs source peuvent également collecter des métriques à partir des serveurs d’applications et stocker les données dans des sujets Kafka, rendant les données disponibles pour le traitement en flux avec faible latence.
  2. Connecteur sink : Les connecteurs sink distribuent des données à partir de sujets Kafka vers des index secondaires, tels qu’Elasticsearch, ou des systèmes batch, tels que Hadoop, pour une analyse hors ligne.

Installation de service avec Docker

Dans ce laboratoire, vous utiliserez Docker.

Veuillez vous assurer que votre environnement répond aux prérequis suivants:

  • Docker pour Linux, Mac ou Windows.
    • Note :l’exécution de ScyllaDB dans Docker est seulement recommandée pour évaluer et tester ScyllaDB.
  • ScyllaDB open source. Pour obtenir les meilleures performances, il est recommandé d’installer régulièrement.
  • 8 Go de RAM ou plus pour les services Kafka et ScyllaDB.
  • docker-compose
  • Git

ScyllaDB Installation et Initialisation de Table

Tout d’abord, vous allez lancer un cluster ScyllaDB à trois nœuds et créer une table avec CDC activé.

Si vous ne l’avez pas déjà fait, téléchargez l’exemple de git:

 
git clone https://github.com/scylladb/scylla-code-samples.git cd scylla-code-samples/CDC_Kafka_Lab

Ceci est le fichier docker-compose que vous utiliserez. Il démarre un cluster ScyllaDB à trois nœuds:

 

version: "3"

services:
  scylla-node1:
    container_name: scylla-node1
    image: scylladb/scylla:5.0.0
    ports:
      - 9042:9042
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

  scylla-node2:
    container_name: scylla-node2
    image: scylladb/scylla:5.0.0
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

  scylla-node3:
    container_name: scylla-node3
    image: scylladb/scylla:5.0.0
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

Démarrez le cluster ScyllaDB:

 
docker-compose -f docker-compose-scylladb.yml up -d

Attendez une minute environ, puis vérifiez que le cluster ScyllaDB est opérationnel et en état normal:

 
docker exec scylla-node1 nodetool status

Ensuite, vous utiliserez cqlsh pour interagir avec ScyllaDB. Créez un keyspace, une table avec CDC activé, et insérez une ligne dans la table:

 

docker exec -ti scylla-node1 cqlsh 
CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1}; 
CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true}; 
INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20); 
exit

 

[guy@fedora cdc_test]$ docker-compose -f docker-compose-scylladb.yml up -d

Creating scylla-node1 ... done
Creating scylla-node2 ... done
Creating scylla-node3 ... done
[guy@fedora cdc_test]$ docker exec  scylla-node1 nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Adresse     Charge       Jetons       Propres    ID de l'hôte                               Rack
UN  172.19.0.3  ?          256          ?       4d4eaad4-62a4-485b-9a05-61432516a737  rack1
UN  172.19.0.2  496 KB     256          ?       bec834b5-b0de-4d55-b13d-a8aa6800f0b9  rack1
UN  172.19.0.4  ?          256          ?       2788324e-548a-49e2-8337-976897c61238  rack1

Note: Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless
[guy@fedora cdc_test]$ docker exec -ti scylla-node1 cqlsh
Connected to  at 172.19.0.2:9042.
[cqlsh 5.0.1 | Cassandra 3.0.8 | CQL spec 3.3.1 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1};
cqlsh> CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
cqlsh> INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20);
cqlsh> exit
[guy@fedora cdc_test]$ 

Configuration de Confluent et du Connecteur

Pour lancer un serveur Kafka, vous utiliserez la plateforme Confluent, qui offre une interface web conviviale pour suivre les sujets et les messages. La plateforme Confluent fournit un fichier docker-compose.yml pour configurer les services.

Note: ce n’est pas ainsi que vous utiliseriez Apache Kafka en production. L’exemple est utile uniquement à des fins de formation et de développement. Obtenez le fichier:

 
wget -O docker-compose-confluent.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.3.0-post/cp-all-in-one/docker-compose.yml

Ensuite, téléchargez le connecteur CDC ScyllaDB:

 
wget -O scylla-cdc-plugin.jar https://github.com/scylladb/scylla-cdc-source-connector/releases/download/scylla-cdc-source-connector-1.0.1/scylla-cdc-source-connector-1.0.1-jar-with-dependencies.jar

Ajoutez le connecteur CDC ScyllaDB au répertoire des plugins de service Confluent connect en utilisant un volume Docker en modifiant le fichier docker-compose-confluent.yml pour ajouter les deux lignes comme ci-dessous, en remplaçant le répertoire par le répertoire de votre fichier scylla-cdc-plugin.jar.

 

 image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
     hostname: connect
     container_name: connect
+    volumes:
+      - <directory>/scylla-cdc-plugin.jar:/usr/share/java/kafka/plugins/scylla-cdc-plugin.jar
     depends_on:
       - broker
       - schema-registry

Lancez les services Confluent:

 
docker-compose -f docker-compose-confluent.yml up -d

Attendez une minute ou deux, puis accédez à http://localhost:9021 pour la GUI web Confluent.

Ajoutez le ScyllaConnector à l’aide du tableau de bord Confluent:

Ajoutez le Scylla Connector en cliquant sur le plugin:

Remplissez le champ “Hosts” avec l’adresse IP d’un des nœuds Scylla (vous pouvez la voir dans la sortie de la commande nodetool status) et le port 9042, qui est écouté par le service ScyllaDB.

Le “Namespace” est l’espace de clés que vous avez créé précédemment dans ScyllaDB.

Notez qu’il peut prendre une minute ou deux pour que ks.my_table apparaisse:

 

Testez les messages Kafka

Vous pouvez voir que MyScyllaCluster.ks.my_table est le sujet créé par le connecteur CDC ScyllaDB.

Maintenant, recherchez des messages Kafka à partir du panneau des sujets:

Sélectionnez le sujet, qui est le même que le nom de l’espace de clés et de la table que vous avez créés dans ScyllaDB:

À partir de l’onglet “Aperçu”, vous pouvez voir les informations du sujet. En bas, il indique que ce sujet est sur la partition 0.

A partition is the smallest storage unit that holds a subset of records owned by a topic. Each partition is a single log file where records are written to it in an append-only fashion. The records in the partitions are each assigned a sequential identifier called the offset, which is unique for each record within the partition. The offset is an incremental and immutable number maintained by Kafka.

Comme vous le savez déjà, les messages CDC de ScyllaDB sont envoyés au sujet ks.my_table, et l’identifiant de partition du sujet est 0. Ensuite, allez à l’onglet “Messages” et entrez l’identifiant de partition 0 dans le champ “offset” :

Vous pouvez voir à partir de la sortie des messages du sujet Kafka que l’événement de table ScyllaDB INSERT et les données ont été transférés aux messages Kafka par le connecteur Source CDC Scylla. Cliquez sur le message pour voir toutes les informations du message :

Le message contient le nom de la table ScyllaDB, le nom de l’espace de clés avec l’heure, ainsi que l’état des données avant et après l’action. Puisque c’est une opération d’insertion, les données avant l’insertion sont nulles.

Ensuite, insérez une autre ligne dans la table ScyllaDB :

 

docker exec -ti scylla-node1 cqlsh 
INSERT INTO ks.my_table(pk, ck, v) VALUES (200, 50, 70);

Maintenant, dans Kafka, attendez quelques secondes et vous pourrez voir les détails du nouveau Message :

Nettoyage

Une fois que vous avez terminé de travailler sur ce laboratoire, vous pouvez arrêter et supprimer les conteneurs et images Docker.

Pour afficher la liste de tous les IDs de conteneur :

 
docker container ls -aq

Ensuite, vous pouvez arrêter et supprimer les conteneurs que vous n’utilisez plus :

 

docker stop <ID_or_Name> 
docker rm <ID_or_Name>

Plus tard, si vous souhaitez relancer le laboratoire, vous pouvez suivre les étapes et utiliser docker-compose comme précédemment.

Résumé

Avec le connecteur source CDC, un plugin Kafka compatible avec Kafka Connect, vous pouvez capturer tous les changements au niveau des lignes de la table ScyllaDB (INSERT, UPDATE, ou DELETE) et convertir ces événements en messages Kafka. Vous pouvez ensuite consommer les données à partir d’autres applications ou effectuer toute autre opération avec Kafka.

Source:
https://dzone.com/articles/change-data-capture-apache-kafka-and-scylladb