كيفية استخدام تغيير البيانات التتبع مع أباتشي كافكا وسكيلا دي بي

في هذا المختبر العملي من جامعة ScyllaDB، ستتعلم كيفية استخدام مولد الاتصال ScyllaDB CDC source connector لدفع أحداث التغييرات على مستوى الصف في جداول المجموعة ScyllaDB إلى خادم Kafka.

ما هو ScyllaDB CDC؟

لتلخيص، تم استخدام تتبع البيانات المتغيرة (CDC) كميزة تسمح لك ليس فقط بالاستعلام عن الحالة الحالية لجدول قاعدة البيانات ولكن أيضًا الاستعلام عن تاريخ جميع التغييرات التي تم إجراؤها على الجدول. تم إصدار CDC للإنتاج (GA) بدءًا من ScyllaDB Enterprise 2021.1.1 و ScyllaDB Open Source 4.3.

في ScyllaDB، يعتبر CDC اختياريًا ويتم تمكينه على أساس لكل جدول. يتم تخزين تاريخ التغييرات التي تم إجراؤها على جدول تم تمكين CDC له في جدول مستقل مرتبط.

يمكنك تمكين CDC عند إنشاء جدول أو تعديله باستخدام خيار CDC، مثل:

 
CREATE TABLE ks.t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};

مولد الاتصال ScyllaDB CDC Source Connector

موصل مصدر ScyllaDB CDC هو موصل مصدر يلتقط تغييرات الصفوف على مستوى الصفوف في جداول مجموعة ScyllaDB. إنه موصل Debezium، متوافق مع Kafka Connect (مع Kafka 2.6.0+). يقرأ الموصل سجل CDC للجداول المحددة وينتج رسائل Kafka لكل عملية INSERT، UPDATE، أو DELETE على مستوى الصف. الموصل مرن للغاية ويعيد قراءة البيانات من Scylla في حالة الفشل. يقوم بتوقيع الموقع الحالي في سجل ScyllaDB CDC بشكل دوري باستخدام تتبع الإزاحة في Kafka Connect. تحتوي كل رسالة Kafka المولدة على معلومات عن المصدر، مثل التوقيت واسم الجدول.

ملاحظة: في وقت الكتابة، لا يوجد دعم لأنواع المجموعة (LIST، SET، MAP) وUDTs – الأعمدة ذات تلك الأنواع تُهمل من الرسائل المولدة. تابع التحديث على طلب التحسين هذا والتطورات الأخرى في مشروع GitHub.

Confluent وKafka Connect

Confluent هو منصة تدفق البيانات بالكامل تمكنك من الوصول بسهولة إلى وتخزين وإدارة البيانات كتيارات مستمرة وفي الوقت الفعلي. فهو يوسع مزايا Apache Kafka بميزات تصنيف المؤسسات. يجعل Confluent بسهولة البناء في تطبيقات الحدث الحديثة والحصول على خطوط عادلة للبيانات، مؤازرة للقابلية للتوسع، الأداء والموثوقية.

Kafka Connect هو أداة لتدفق البيانات بشكل موسع وموثوق بين Apache Kafka ونظم البيانات الأخرى. يجعل التعريف بسهولة موصلات تنقل مجموعات كبيرة من البيانات في وخارج Kafka. يمكن أن تبطئ القواعد بأكملها أو جمع الإحصائيات من خوادم تطبيقات البيانات في مواضع Kafka، مما يجعل البيانات متاحة لمعالجة التيار بتأخير قصير.

Kafka Connect يشمل نوعين من الموصلات:

  1. موصل المصدر: تأخذ موصلات المصدر القواعد بأكملها وتدفق تحديثات الجدول إلى مواضع Kafka. يمكن أن تجمع موصلات المصدر أيضًا الإحصائيات من خوادم تطبيقات البيانات وتخزين البيانات في مواضع Kafka، مما يجعل البيانات متاحة لمعالجة التيار بتأخير قصير.
  2. موصل الميناء: توصيلات الميناء تقدم البيانات من مواضع Kafka إلى الفهارس الثانوية، مثل Elasticsearch، أو الأنظمة النصفية، مثل Hadoop، للتحليل خارج الخط.

إعداد الخدمة باستخدام Docker

في هذا المختبر، سوف تستخدم Docker.

يرجى التأكد من أن بيئتك تلبي الشروط الأساسية التالية:

  • Docker لـ Linux، Mac، أو Windows.
    • ملاحظة: تشغيل ScyllaDB في Docker يوصى فقط لتقييم وتجربة ScyllaDB.
  • ScyllaDB مفتوح المصدر. للحصول على أفضل أداء، يوصى بالتثبيت العادي.
  • 8 جيجابايت من الذاكرة أو أكثر لخدمات Kafka وScyllaDB.
  • docker-compose
  • Git

تثبيت ScyllaDB وتهيئة الجدول

أولاً، ستقوم بإطلاق مجموعة ScyllaDB ثلاثية العقد وإنشاء جدول مع تمكين CDC.

إذا لم تقم بذلك بعد، قم بتنزيل المثال من git:

 
git clone https://github.com/scylladb/scylla-code-samples.git cd scylla-code-samples/CDC_Kafka_Lab

هذا هو ملف docker-compose الذي ستستخدمه. يبدأ مجموعة ScyllaDB ثلاثية العقد:

 

version: "3"

services:
  scylla-node1:
    container_name: scylla-node1
    image: scylladb/scylla:5.0.0
    ports:
      - 9042:9042
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

  scylla-node2:
    container_name: scylla-node2
    image: scylladb/scylla:5.0.0
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

  scylla-node3:
    container_name: scylla-node3
    image: scylladb/scylla:5.0.0
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

اطلق مجموعة ScyllaDB:

 
docker-compose -f docker-compose-scylladb.yml up -d

انتظر دقيقة أو نحو ذلك، وتحقق من أن مجموعة ScyllaDB متصلة وفي حالة طبيعية:

 
docker exec scylla-node1 nodetool status

بعد ذلك، ستستخدم cqlsh للتفاعل مع ScyllaDB. قم بإنشاء مساحة مفتاح، وجدول مع تمكين CDC، وإدراج سجل في الجدول:

 

docker exec -ti scylla-node1 cqlsh 
CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1}; 
CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true}; 
INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20); 
exit

 

[guy@fedora cdc_test]$ docker-compose -f docker-compose-scylladb.yml up -d

Creating scylla-node1 ... done
Creating scylla-node2 ... done
Creating scylla-node3 ... done
[guy@fedora cdc_test]$ docker exec  scylla-node1 nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
 -- عنوان    الحمل      العدد المحدد      الملكية    معرف المضيف                              الراك
UN  172.19.0.3  ?          256          ?       4d4eaad4-62a4-485b-9a05-61432516a737  rack1
UN  172.19.0.2  496 KB     256          ?       bec834b5-b0de-4d55-b13d-a8aa6800f0b9  rack1
UN  172.19.0.4  ?          256          ?       2788324e-548a-49e2-8337-976897c61238  rack1

Note: Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless
[guy@fedora cdc_test]$ docker exec -ti scylla-node1 cqlsh
Connected to  at 172.19.0.2:9042.
[cqlsh 5.0.1 | Cassandra 3.0.8 | CQL spec 3.3.1 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1};
cqlsh> CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
cqlsh> INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20);
cqlsh> exit
[guy@fedora cdc_test]$ 

إعداد Confluent وتكوين الواجهة

لتشغيل خادم Kafka، ستستخدم منصة Confluent، التي توفر واجهة ويب صديقة للمستخدم لتتبع المواضيع والرسائل. توفر منصة Confluent ملف docker-compose.yml لإعداد الخدمات.

ملاحظة: هذه ليست الطريقة التي يمكنك استخدام Apache Kafka في الإنتاج. المثال مفيد فقط للتدريب والتطوير. احصل على الملف:

 
wget -O docker-compose-confluent.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.3.0-post/cp-all-in-one/docker-compose.yml

بعد ذلك، قم بتنزيل موصل ScyllaDB CDC:

 
wget -O scylla-cdc-plugin.jar https://github.com/scylladb/scylla-cdc-source-connector/releases/download/scylla-cdc-source-connector-1.0.1/scylla-cdc-source-connector-1.0.1-jar-with-dependencies.jar

أضف موصِل ScyllaDB CDC إلى دليل مكتبات خدمة Confluent connect باستخدام حجرة Docker عن طريق تحرير ملف docker-compose-confluent.yml لإضافة السطرين التاليين، واستبدال الدليل بالدليل الخاص بملف scylla-cdc-plugin.jar الخاص بك.

 

 image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
     hostname: connect
     container_name: connect
+    volumes:
+      - <directory>/scylla-cdc-plugin.jar:/usr/share/java/kafka/plugins/scylla-cdc-plugin.jar
     depends_on:
       - broker
       - schema-registry

قم بتشغيل خدمات Confluent:

 
docker-compose -f docker-compose-confluent.yml up -d

انتظر دقيقة أو نحو ذلك، ثم قم بالوصول إلى http://localhost:9021 لواجهة الويب Confluent.

أضف ScyllaConnector باستخدام tableau Confluent:

أضف موصِل Scylla بالنقر على المكون:

قم بملء “المضيفون” بعنوان IP لأحد عقد Scylla (يمكنك رؤيته في ناتج أمر nodetool status) ومنفذ 9042، الذي يستمع إليه خدمة ScyllaDB.

الـ “فضاء الاسم” هو المساحة الرئيسية التي أنشأتها من قبل في ScyllaDB.

لاحظ أنه قد يستغرق دقيقة أو نحو ذلك حتى يظهر ks.my_table:

 

اختبر رسائل Kafka

يمكنك رؤية أن MyScyllaCluster.ks.my_table هو الموضوع الذي أنشأه موصِل ScyllaDB CDC.

الآن، تحقق من رسائل Kafka من لوحة المواضيع:

اختر الموضوع، وهو نفس اسم المساحة الرئيسية واسم الجدول الذي أنشأته في ScyllaDB:

من علامة تبويب “نظرة عامة“، يمكنك رؤية معلومات الموضوع. في الأسفل، يظهر أن هذا الموضوع على القسمة 0.

A partition is the smallest storage unit that holds a subset of records owned by a topic. Each partition is a single log file where records are written to it in an append-only fashion. The records in the partitions are each assigned a sequential identifier called the offset, which is unique for each record within the partition. The offset is an incremental and immutable number maintained by Kafka.

كما تعلم بالفعل، تتم إرسال رسائل ScyllaDB CDC إلى موضوع ks.my_table، ومعرّف الفرع من الموضوع هو 0. بعد ذلك، انتقل إلى علامة التبويب “Messages” وأدخل معرّف الفرع 0 في حقل “offset”:

يمكنك ملاحظة من خروج مواضع رسائل Kafka أن جدول ScyllaDB INSERT الحدث والبيانات تم نقلها إلى رسائل Kafka من قبل مولد Scylla CDC Source Connector. انقر على الرسالة لعرض معلومات الرسالة الكاملة:

الرسالة تحتوي على اسم جدول ScyllaDB واسم keyspace مع الوقت، وكذلك حالة البيانات قبل العمل وبعده. نظرًا لأن هذه عملية إدخال، فإن البيانات قبل الإدخال هي فارغة.

بعد ذلك، أدخل صفًا آخر في جدول ScyllaDB:

 

docker exec -ti scylla-node1 cqlsh 
INSERT INTO ks.my_table(pk, ck, v) VALUES (200, 50, 70);

الآن، في Kafka، انتظر بضع ثوانٍ ويمكنك رؤية تفاصيل الرسالة الجديدة:

تنظيف

بمجرد الانتهاء من العمل في هذا المختبر، يمكنك التوقف وإزالة الأوعية والصور Docker.

لعرض قائمة بجميع معرّفات الأوعية:

 
docker container ls -aq

ثم يمكنك التوقف وإزالة الأوعي التي لم تعد بحاجة إليها:

 

docker stop <ID_or_Name> 
docker rm <ID_or_Name>

فيما بعد، إذا كنت ترغب في إعادة التشغيل للمختبر، يمكنك اتباع الخطوات واستخدام docker-compose كما كان من قبل.

ملخص

باستخدام مولد المصادر CDC، إضافة Kafka متوافقة مع Kafka Connect، يمكنك التقاط جميع تغييرات ScyllaDB جدول الصفوف (INSERT، UPDATE، أو DELETE) وتحويل تلك الأحداث إلى رسائل Kafka. بعد ذلك يمكنك استهلاك البيانات من تطبيقات أخرى أو إجراء أي عمل آخر باستخدام Kafka.

Source:
https://dzone.com/articles/change-data-capture-apache-kafka-and-scylladb