כיצד להשתמש בשינוי נתוני כליאה עם אפטשי קפקא וסקילהDB

במעבדה מעשית זו מאוניברסיטת ScyllaDB, תלמדו להשתמש ב-מקור קישור ScyllaDB CDC כדי לדחוף אירועי שינויים ברמה של שורה בטבלאות של קלאסטר ScyllaDB לשרת Kafka.

מה זה ScyllaDB CDC?

לסיכום, תיעוד מגדיר שינויים (CDC) הוא תכונה המאפשרת לא רק לשאול את המצב הנוכחי של טבלת מסד הנתונים אלא גם לשאול את ההיסטוריה של כל השינויים שנעשו בטבלה. CDC מוגמר ומוכן לשימוש בפרודקציה (GA) מתוך ScyllaDB Enterprise 2021.1.1 ו-ScyllaDB Open Source 4.3.

ב-ScyllaDB, CDC הוא אופציונלי ומופעל על פי טבלה. ההיסטוריה של השינויים שנעשו בטבלה עם CDC מופעלת מאוחסנת בטבלה קשורה נפרדת.

אפשר להפעיל את CDC כאשר יוצרים או משנים טבלה באמצעות האפשרות CDC, למשל:

 
CREATE TABLE ks.t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};

מקור קישור ScyllaDB CDC

מחבר המקור של ScyllaDB CDC הוא מחבר המקלט שינויים ברמת השורה בטבלאות של קבוצת ScyllaDB. זהו מחבר Debezium, תואם ל-Kafka Connect (עם Kafka 2.6.0+). המחבר קורא את יומני ה-CDC עבור הטבלאות המופיעות ומייצר מסרי Kafka עבור כל פעולת INSERT, UPDATE או DELETE ברמת השורה. המחבר חסין לכשלים, מחזיר קריאת נתונים מ-Scylla במקרה של כשל. הוא שומר באופן תכוף את המיקום הנוכחי ביומן ה-CDC של ScyllaDB באמצעות מעקב אחסון תזכורת של Kafka Connect. כל מסר Kafka שנוצר מכיל מידע על המקור, כגון הזמן ושם השורה.

הערה: בזמן כתיבת הטקסט, אין תמיכה בסוגי קבוצות (LIST, SET, MAP) ו-UDTs – עמודות עם סוגים אלה מושמטות מהודעות שנוצרות. הישאר מעודכן בקשר ל-בקשת השיפור הזו ולפיתוחים אחרים ב-פרויקט GitHub.

Confluent ו-Kafka Connect

Confluent היא פלטפורמת זרימת נתונים מקיפה המאפשרת לך לגשת, לאחסן ולנהל נתונים כזרימות רציפות ובזמן אמת. היא מרחיבה את היתרונות של אפאזרה קפקא עם תכונות בינלאומיות ברמת העסקים. Confluent מקל על בניית יישומים מודרניים מונחה על אירועים, ומשיג זרם נתונים אוניברסלי, התומך בהיבטים כמו קיימות, ביצועים ואמינות.

קונקט קפקא הוא כלי לזרימת נתונים בין אפאזרה קפקא ומערכות נתונים אחרות. הוא מאפשר להגדיר חיבורים המעבירים קבצי נתונים גדולים לתוך ומחוץ לקפקא ביעילות ובאמינות. הוא יכול להכניס מסדי נתונים שלמים או לאסוף מדדים משרתי יישומים לנושאי קפקא, מה שמאפשר עיבוד זרם נתונים בעלות עמידות נמוכה.

קונקט קפקא כולל שני סוגים של חיבורים:

  1. חיבור מקור: חיבורי מקור מכניסים מסדי נתונים שלמים וזורמים עדכונים טבלאיים לנושאי קפקא. חיבורי מקור יכולים גם לאסוף מדדים משרתי יישומים ולאחסן את הנתונים בנושאי קפקא, מה שמאפשר עיבוד זרם נתונים בעלות עמידות נמוכה.
  2. חיבור מקבל: חיבורי מקבל מעבירים נתונים מנושאי קפקא למדדים משניים, כגון אלכסטיק מיינס, או למערכות בטוחות, כגון האפון, לניתוח ברירת מחדל.

הקמת שירות עם דוקר

במעבדה זו, תשתמש בדוקר.

אנא ודא שהסביבה שלך מספקת את הדרישות הבאות:

  • דוקר ללינוקס, מק, או ווינדוס.
    • הערה: הפעלת ScyllaDB בדוקר מומלץ רק להערכה וניסיון של ScyllaDB.
  • ScyllaDB שיתופי. לקבלת הביצועים הטובים ביותר, מומלץ להתקין רגיל.
  • 8 GB זיכרון RAM או יותר לשירותים של Kafka ו-ScyllaDB.
  • docker-compose
  • Git

התקנת ScyllaDB ואתחול טבלה

ראשית, תפעיל קבוצת ScyllaDB בן שלושה צמתים ותיצור טבלה עם הפעלת CDC.

אם עוד לא עשית זאת, הורד את הדוגמא מ-git:

 
git clone https://github.com/scylladb/scylla-code-samples.git cd scylla-code-samples/CDC_Kafka_Lab

זהו קובץ docker-compose שתשתמש בו. הוא מתחיל קבוצת ScyllaDB בן שלושה צמתים:

 

version: "3"

services:
  scylla-node1:
    container_name: scylla-node1
    image: scylladb/scylla:5.0.0
    ports:
      - 9042:9042
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

  scylla-node2:
    container_name: scylla-node2
    image: scylladb/scylla:5.0.0
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

  scylla-node3:
    container_name: scylla-node3
    image: scylladb/scylla:5.0.0
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

השגרת קבוצת ScyllaDB:

 
docker-compose -f docker-compose-scylladb.yml up -d

חכה דקה או יותר, ובדוק שקבוצת ScyllaDB עוברת ובמצב רגיל:

 
docker exec scylla-node1 nodetool status

לאחר מכן, תשתמש ב-cqlsh כדי לתקשר עם ScyllaDB. צור מרחב מפתחות, וטבלה עם הפעלת CDC, והכנס שורה לטבלה:

 

docker exec -ti scylla-node1 cqlsh 
CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1}; 
CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true}; 
INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20); 
exit

 

[guy@fedora cdc_test]$ docker-compose -f docker-compose-scylladb.yml up -d

Creating scylla-node1 ... done
Creating scylla-node2 ... done
Creating scylla-node3 ... done
[guy@fedora cdc_test]$ docker exec  scylla-node1 nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  כתובת     עומס      זכותות      בעלות   זהות מארח                               רגיר
UN  172.19.0.3  ?          256          ?       4d4eaad4-62a4-485b-9a05-61432516a737  rack1
UN  172.19.0.2  496 KB     256          ?       bec834b5-b0de-4d55-b13d-a8aa6800f0b9  rack1
UN  172.19.0.4  ?          256          ?       2788324e-548a-49e2-8337-976897c61238  rack1

Note: Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless
[guy@fedora cdc_test]$ docker exec -ti scylla-node1 cqlsh
Connected to  at 172.19.0.2:9042.
[cqlsh 5.0.1 | Cassandra 3.0.8 | CQL spec 3.3.1 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1};
cqlsh> CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
cqlsh> INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20);
cqlsh> exit
[guy@fedora cdc_test]$ 

הגדרת Confluent והתקנת חיבור

כדי להשיק שרת Kafka, תשתמש בפלטפורמת Confluent, שמספקת GUI אינטרנט ידידותי לעקיבה אחרי נושאים והודעות. פלטפורמת Confluent מספקת קובץ docker-compose.yml להגדרת השירותים.

הערה: זו אינה הדרך שבה תשתמש ב-Apache Kafka בייצור. הדוגמא שימושית רק למטרות שיעור ופיתוח. הורד את הקובץ:

 
wget -O docker-compose-confluent.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.3.0-post/cp-all-in-one/docker-compose.yml

לאחר מכן, הורד את חיבור CDC של ScyllaDB:

 
wget -O scylla-cdc-plugin.jar https://github.com/scylladb/scylla-cdc-source-connector/releases/download/scylla-cdc-source-connector-1.0.1/scylla-cdc-source-connector-1.0.1-jar-with-dependencies.jar

הוסף את מחברת ה-CDC של ScyllaDB לספריית תוספי השירות של Confluent באמצעות תיבת מיכל Docker על ידי עריכת docker-compose-confluent.yml כדי להוסיף את שתי השורות כפי שמוצג להלן, תחתון הספרייה עם ספריית הקובץ scylla-cdc-plugin.jar שלך.

 

 image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
     hostname: connect
     container_name: connect
+    volumes:
+      - <directory>/scylla-cdc-plugin.jar:/usr/share/java/kafka/plugins/scylla-cdc-plugin.jar
     depends_on:
       - broker
       - schema-registry

השגת השירותים של Confluent:

 
docker-compose -f docker-compose-confluent.yml up -d

חכה דקה או שנייה, ואז גש http://localhost:9021 לממשק האינטרנט של Confluent.

הוסף את ScyllaConnector באמצעות לוח הבקרה של Confluent:

הוסף את מחברת Scylla על ידי לחיצה על התוסף:

מלא את "אורחים" עם כתובת ה-IP של אחד מצמדי Scylla (אפשר לראות אותו בפלט של פקודת nodetool status) ומספר הרכבת 9042, שמקשבת לשירות ScyllaDB.

ה"מרחב" הוא המרחב המפתח שיצרת לפני כן ב-ScyllaDB.

שים לב שעלול לקחת דקה או שנייה עד שks.my_table יופיע:

 

בדוק מסרים Kafka

אפשר לראות שMyScyllaCluster.ks.my_table הוא הנושא שנוצר על ידי מחברת CDC של ScyllaDB.

עכשיו, בדוק מסרים Kafka מלוח הנושאים:

בחר את הנושא, שהוא זהה לשם המרחב המפתח ושם השולחן שיצרת ב-ScyllaDB:

מהלשונית "סקירה" אפשר לראות את פרטי הנושא. בתחתית מצוין שהנושא זה על חלקית 0.

A partition is the smallest storage unit that holds a subset of records owned by a topic. Each partition is a single log file where records are written to it in an append-only fashion. The records in the partitions are each assigned a sequential identifier called the offset, which is unique for each record within the partition. The offset is an incremental and immutable number maintained by Kafka.

כפי שאתה כבר יודע, ההודעות של ScyllaDB CDC נשלחות לנושא ks.my_table, ומספר החלון של הנושא הוא 0. לאחר מכן, עבור אל תפריט ה"מסרים" והכנס את מספר החלון 0 לשדה "סיבת טעימה":

ניתן לראות מתוך תיעוד הודעות הנושא של Kafka כי אירוע ה-INSERT של הטבלה של ScyllaDB והנתונים הועברו להודעות Kafka על ידי מקור ה-Scylla CDC Source Connector. לחץ על ההודעה כדי להציג את כל פרטי ההודעה:

ההודעה מכילה את שם הטבלה ושם המרחב המספרי של ScyllaDB יחד עם הזמן, כמו גם את מצב הנתונים לפני הפעולה ואחריה. מאחר ומדובר בפעולת הכנסה, הנתונים לפני ההכנסה הם null.

לאחר מכן, הכנס שורה נוספת לטבלת ScyllaDB:

 

docker exec -ti scylla-node1 cqlsh 
INSERT INTO ks.my_table(pk, ck, v) VALUES (200, 50, 70);

כעת, ב-Kafka, המתן כמה שניות ותוכל לראות את הפרטים של ההודעה החדשה:

נקיון

ברגע שאתה גמור עם החדר הזה, תוכל לעצור ולהסיר את מיכלי ה-Docker והתמונות.

כדי להציג רשימה של כל מספרי המיכלים:

 
docker container ls -aq

לאחר מכן תוכל לעצור ולהסיר את המיכלים שאינם נמצאים בשימוש:

 

docker stop <ID_or_Name> 
docker rm <ID_or_Name>

מאוחר יותר, אם תרצה להפעיל מחדש את החדר, תוכל לעקוב אחרי השלבים ולהשתמש ב-docker-compose כמו קודם.

סיכום

באמצעות מקור ה-CDC המקושר, תוסף Kafka מתאים ל-Kafka Connect, תוכל ללכוד את כל שינויי השורות בטבלה של ScyllaDB (INSERT, UPDATE, או DELETE) ולהמיר את האירועים הללו להודעות Kafka. לאחר מכן תוכל לצרוך את הנתונים מיישומים אחרים או לבצע כל פעולה אחרת עם Kafka.

Source:
https://dzone.com/articles/change-data-capture-apache-kafka-and-scylladb