في هذا المختبر العملي من جامعة ScyllaDB، ستتعلم كيفية استخدام مولد الاتصال ScyllaDB CDC source connector لدفع أحداث التغييرات على مستوى الصف في جداول المجموعة ScyllaDB إلى خادم Kafka.
ما هو ScyllaDB CDC؟
لتلخيص، تم استخدام تتبع البيانات المتغيرة (CDC) كميزة تسمح لك ليس فقط بالاستعلام عن الحالة الحالية لجدول قاعدة البيانات ولكن أيضًا الاستعلام عن تاريخ جميع التغييرات التي تم إجراؤها على الجدول. تم إصدار CDC للإنتاج (GA) بدءًا من ScyllaDB Enterprise 2021.1.1 و ScyllaDB Open Source 4.3.
في ScyllaDB، يعتبر CDC اختياريًا ويتم تمكينه على أساس لكل جدول. يتم تخزين تاريخ التغييرات التي تم إجراؤها على جدول تم تمكين CDC له في جدول مستقل مرتبط.
يمكنك تمكين CDC عند إنشاء جدول أو تعديله باستخدام خيار CDC، مثل:
CREATE TABLE ks.t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
مولد الاتصال ScyllaDB CDC Source Connector
موصل مصدر ScyllaDB CDC هو موصل مصدر يلتقط تغييرات الصفوف على مستوى الصفوف في جداول مجموعة ScyllaDB. إنه موصل Debezium، متوافق مع Kafka Connect (مع Kafka 2.6.0+). يقرأ الموصل سجل CDC للجداول المحددة وينتج رسائل Kafka لكل عملية INSERT
، UPDATE
، أو DELETE
على مستوى الصف. الموصل مرن للغاية ويعيد قراءة البيانات من Scylla في حالة الفشل. يقوم بتوقيع الموقع الحالي في سجل ScyllaDB CDC بشكل دوري باستخدام تتبع الإزاحة في Kafka Connect. تحتوي كل رسالة Kafka المولدة على معلومات عن المصدر، مثل التوقيت واسم الجدول.
ملاحظة: في وقت الكتابة، لا يوجد دعم لأنواع المجموعة (LIST
، SET
، MAP
) وUDTs – الأعمدة ذات تلك الأنواع تُهمل من الرسائل المولدة. تابع التحديث على طلب التحسين هذا والتطورات الأخرى في مشروع GitHub.
Confluent وKafka Connect
Confluent هو منصة تدفق البيانات بالكامل تمكنك من الوصول بسهولة إلى وتخزين وإدارة البيانات كتيارات مستمرة وفي الوقت الفعلي. فهو يوسع مزايا Apache Kafka بميزات تصنيف المؤسسات. يجعل Confluent بسهولة البناء في تطبيقات الحدث الحديثة والحصول على خطوط عادلة للبيانات، مؤازرة للقابلية للتوسع، الأداء والموثوقية.
Kafka Connect هو أداة لتدفق البيانات بشكل موسع وموثوق بين Apache Kafka ونظم البيانات الأخرى. يجعل التعريف بسهولة موصلات تنقل مجموعات كبيرة من البيانات في وخارج Kafka. يمكن أن تبطئ القواعد بأكملها أو جمع الإحصائيات من خوادم تطبيقات البيانات في مواضع Kafka، مما يجعل البيانات متاحة لمعالجة التيار بتأخير قصير.
Kafka Connect يشمل نوعين من الموصلات:
- موصل المصدر: تأخذ موصلات المصدر القواعد بأكملها وتدفق تحديثات الجدول إلى مواضع Kafka. يمكن أن تجمع موصلات المصدر أيضًا الإحصائيات من خوادم تطبيقات البيانات وتخزين البيانات في مواضع Kafka، مما يجعل البيانات متاحة لمعالجة التيار بتأخير قصير.
- موصل الميناء: توصيلات الميناء تقدم البيانات من مواضع Kafka إلى الفهارس الثانوية، مثل Elasticsearch، أو الأنظمة النصفية، مثل Hadoop، للتحليل خارج الخط.
إعداد الخدمة باستخدام Docker
في هذا المختبر، سوف تستخدم Docker.
يرجى التأكد من أن بيئتك تلبي الشروط الأساسية التالية:
- Docker لـ Linux، Mac، أو Windows.
- ملاحظة: تشغيل ScyllaDB في Docker يوصى فقط لتقييم وتجربة ScyllaDB.
- ScyllaDB مفتوح المصدر. للحصول على أفضل أداء، يوصى بالتثبيت العادي.
- 8 جيجابايت من الذاكرة أو أكثر لخدمات Kafka وScyllaDB.
- docker-compose
- Git
تثبيت ScyllaDB وتهيئة الجدول
أولاً، ستقوم بإطلاق مجموعة ScyllaDB ثلاثية العقد وإنشاء جدول مع تمكين CDC.
إذا لم تقم بذلك بعد، قم بتنزيل المثال من git:
git clone https://github.com/scylladb/scylla-code-samples.git cd scylla-code-samples/CDC_Kafka_Lab
هذا هو ملف docker-compose الذي ستستخدمه. يبدأ مجموعة ScyllaDB ثلاثية العقد:
version: "3"
services:
scylla-node1:
container_name: scylla-node1
image: scylladb/scylla:5.0.0
ports:
- 9042:9042
restart: always
command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0
scylla-node2:
container_name: scylla-node2
image: scylladb/scylla:5.0.0
restart: always
command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0
scylla-node3:
container_name: scylla-node3
image: scylladb/scylla:5.0.0
restart: always
command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0
اطلق مجموعة ScyllaDB:
docker-compose -f docker-compose-scylladb.yml up -d
انتظر دقيقة أو نحو ذلك، وتحقق من أن مجموعة ScyllaDB متصلة وفي حالة طبيعية:
docker exec scylla-node1 nodetool status
بعد ذلك، ستستخدم cqlsh للتفاعل مع ScyllaDB. قم بإنشاء مساحة مفتاح، وجدول مع تمكين CDC، وإدراج سجل في الجدول:
docker exec -ti scylla-node1 cqlsh
CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1};
CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20);
exit
[guy@fedora cdc_test]$ docker-compose -f docker-compose-scylladb.yml up -d
Creating scylla-node1 ... done
Creating scylla-node2 ... done
Creating scylla-node3 ... done
[guy@fedora cdc_test]$ docker exec scylla-node1 nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- عنوان الحمل العدد المحدد الملكية معرف المضيف الراك
UN 172.19.0.3 ? 256 ? 4d4eaad4-62a4-485b-9a05-61432516a737 rack1
UN 172.19.0.2 496 KB 256 ? bec834b5-b0de-4d55-b13d-a8aa6800f0b9 rack1
UN 172.19.0.4 ? 256 ? 2788324e-548a-49e2-8337-976897c61238 rack1
Note: Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless
[guy@fedora cdc_test]$ docker exec -ti scylla-node1 cqlsh
Connected to at 172.19.0.2:9042.
[cqlsh 5.0.1 | Cassandra 3.0.8 | CQL spec 3.3.1 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1};
cqlsh> CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
cqlsh> INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20);
cqlsh> exit
[guy@fedora cdc_test]$
إعداد Confluent وتكوين الواجهة
لتشغيل خادم Kafka، ستستخدم منصة Confluent، التي توفر واجهة ويب صديقة للمستخدم لتتبع المواضيع والرسائل. توفر منصة Confluent ملف docker-compose.yml
لإعداد الخدمات.
ملاحظة: هذه ليست الطريقة التي يمكنك استخدام Apache Kafka في الإنتاج. المثال مفيد فقط للتدريب والتطوير. احصل على الملف:
wget -O docker-compose-confluent.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.3.0-post/cp-all-in-one/docker-compose.yml
بعد ذلك، قم بتنزيل موصل ScyllaDB CDC:
wget -O scylla-cdc-plugin.jar https://github.com/scylladb/scylla-cdc-source-connector/releases/download/scylla-cdc-source-connector-1.0.1/scylla-cdc-source-connector-1.0.1-jar-with-dependencies.jar
أضف موصِل ScyllaDB CDC إلى دليل مكتبات خدمة Confluent connect باستخدام حجرة Docker عن طريق تحرير ملف docker-compose-confluent.ym
l لإضافة السطرين التاليين، واستبدال الدليل بالدليل الخاص بملف scylla-cdc-plugin.jar
الخاص بك.
image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
hostname: connect
container_name: connect
+ volumes:
+ - <directory>/scylla-cdc-plugin.jar:/usr/share/java/kafka/plugins/scylla-cdc-plugin.jar
depends_on:
- broker
- schema-registry
قم بتشغيل خدمات Confluent:
docker-compose -f docker-compose-confluent.yml up -d
انتظر دقيقة أو نحو ذلك، ثم قم بالوصول إلى http://localhost:9021
لواجهة الويب Confluent.
أضف ScyllaConnector باستخدام tableau Confluent:
أضف موصِل Scylla بالنقر على المكون:
قم بملء “المضيفون” بعنوان IP لأحد عقد Scylla (يمكنك رؤيته في ناتج أمر nodetool status) ومنفذ 9042، الذي يستمع إليه خدمة ScyllaDB.
الـ “فضاء الاسم” هو المساحة الرئيسية التي أنشأتها من قبل في ScyllaDB.
لاحظ أنه قد يستغرق دقيقة أو نحو ذلك حتى يظهر ks.my_table
:
اختبر رسائل Kafka
يمكنك رؤية أن MyScyllaCluster.ks.my_table
هو الموضوع الذي أنشأه موصِل ScyllaDB CDC.
الآن، تحقق من رسائل Kafka من لوحة المواضيع:
اختر الموضوع، وهو نفس اسم المساحة الرئيسية واسم الجدول الذي أنشأته في ScyllaDB:
من علامة تبويب “نظرة عامة“، يمكنك رؤية معلومات الموضوع. في الأسفل، يظهر أن هذا الموضوع على القسمة 0.
A partition is the smallest storage unit that holds a subset of records owned by a topic. Each partition is a single log file where records are written to it in an append-only fashion. The records in the partitions are each assigned a sequential identifier called the offset, which is unique for each record within the partition. The offset is an incremental and immutable number maintained by Kafka.
كما تعلم بالفعل، تتم إرسال رسائل ScyllaDB CDC إلى موضوع ks.my_table
، ومعرّف الفرع من الموضوع هو 0. بعد ذلك، انتقل إلى علامة التبويب “Messages” وأدخل معرّف الفرع 0 في حقل “offset”:
يمكنك ملاحظة من خروج مواضع رسائل Kafka أن جدول ScyllaDB INSERT
الحدث والبيانات تم نقلها إلى رسائل Kafka من قبل مولد Scylla CDC Source Connector. انقر على الرسالة لعرض معلومات الرسالة الكاملة:
الرسالة تحتوي على اسم جدول ScyllaDB واسم keyspace مع الوقت، وكذلك حالة البيانات قبل العمل وبعده. نظرًا لأن هذه عملية إدخال، فإن البيانات قبل الإدخال هي فارغة.
بعد ذلك، أدخل صفًا آخر في جدول ScyllaDB:
docker exec -ti scylla-node1 cqlsh
INSERT INTO ks.my_table(pk, ck, v) VALUES (200, 50, 70);
الآن، في Kafka، انتظر بضع ثوانٍ ويمكنك رؤية تفاصيل الرسالة الجديدة:
تنظيف
بمجرد الانتهاء من العمل في هذا المختبر، يمكنك التوقف وإزالة الأوعية والصور Docker.
لعرض قائمة بجميع معرّفات الأوعية:
docker container ls -aq
ثم يمكنك التوقف وإزالة الأوعي التي لم تعد بحاجة إليها:
docker stop <ID_or_Name>
docker rm <ID_or_Name>
فيما بعد، إذا كنت ترغب في إعادة التشغيل للمختبر، يمكنك اتباع الخطوات واستخدام docker-compose كما كان من قبل.
ملخص
باستخدام مولد المصادر CDC، إضافة Kafka متوافقة مع Kafka Connect، يمكنك التقاط جميع تغييرات ScyllaDB جدول الصفوف (INSERT
، UPDATE
، أو DELETE
) وتحويل تلك الأحداث إلى رسائل Kafka. بعد ذلك يمكنك استهلاك البيانات من تطبيقات أخرى أو إجراء أي عمل آخر باستخدام Kafka.
Source:
https://dzone.com/articles/change-data-capture-apache-kafka-and-scylladb