במעבדה מעשית זו מאוניברסיטת ScyllaDB, תלמדו להשתמש ב-מקור קישור ScyllaDB CDC כדי לדחוף אירועי שינויים ברמה של שורה בטבלאות של קלאסטר ScyllaDB לשרת Kafka.
מה זה ScyllaDB CDC?
לסיכום, תיעוד מגדיר שינויים (CDC) הוא תכונה המאפשרת לא רק לשאול את המצב הנוכחי של טבלת מסד הנתונים אלא גם לשאול את ההיסטוריה של כל השינויים שנעשו בטבלה. CDC מוגמר ומוכן לשימוש בפרודקציה (GA) מתוך ScyllaDB Enterprise 2021.1.1 ו-ScyllaDB Open Source 4.3.
ב-ScyllaDB, CDC הוא אופציונלי ומופעל על פי טבלה. ההיסטוריה של השינויים שנעשו בטבלה עם CDC מופעלת מאוחסנת בטבלה קשורה נפרדת.
אפשר להפעיל את CDC כאשר יוצרים או משנים טבלה באמצעות האפשרות CDC, למשל:
CREATE TABLE ks.t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
מקור קישור ScyllaDB CDC
מחבר המקור של ScyllaDB CDC הוא מחבר המקלט שינויים ברמת השורה בטבלאות של קבוצת ScyllaDB. זהו מחבר Debezium, תואם ל-Kafka Connect (עם Kafka 2.6.0+). המחבר קורא את יומני ה-CDC עבור הטבלאות המופיעות ומייצר מסרי Kafka עבור כל פעולת INSERT
, UPDATE
או DELETE
ברמת השורה. המחבר חסין לכשלים, מחזיר קריאת נתונים מ-Scylla במקרה של כשל. הוא שומר באופן תכוף את המיקום הנוכחי ביומן ה-CDC של ScyllaDB באמצעות מעקב אחסון תזכורת של Kafka Connect. כל מסר Kafka שנוצר מכיל מידע על המקור, כגון הזמן ושם השורה.
הערה: בזמן כתיבת הטקסט, אין תמיכה בסוגי קבוצות (LIST
, SET
, MAP
) ו-UDTs – עמודות עם סוגים אלה מושמטות מהודעות שנוצרות. הישאר מעודכן בקשר ל-בקשת השיפור הזו ולפיתוחים אחרים ב-פרויקט GitHub.
Confluent ו-Kafka Connect
Confluent היא פלטפורמת זרימת נתונים מקיפה המאפשרת לך לגשת, לאחסן ולנהל נתונים כזרימות רציפות ובזמן אמת. היא מרחיבה את היתרונות של אפאזרה קפקא עם תכונות בינלאומיות ברמת העסקים. Confluent מקל על בניית יישומים מודרניים מונחה על אירועים, ומשיג זרם נתונים אוניברסלי, התומך בהיבטים כמו קיימות, ביצועים ואמינות.
קונקט קפקא הוא כלי לזרימת נתונים בין אפאזרה קפקא ומערכות נתונים אחרות. הוא מאפשר להגדיר חיבורים המעבירים קבצי נתונים גדולים לתוך ומחוץ לקפקא ביעילות ובאמינות. הוא יכול להכניס מסדי נתונים שלמים או לאסוף מדדים משרתי יישומים לנושאי קפקא, מה שמאפשר עיבוד זרם נתונים בעלות עמידות נמוכה.
קונקט קפקא כולל שני סוגים של חיבורים:
- חיבור מקור: חיבורי מקור מכניסים מסדי נתונים שלמים וזורמים עדכונים טבלאיים לנושאי קפקא. חיבורי מקור יכולים גם לאסוף מדדים משרתי יישומים ולאחסן את הנתונים בנושאי קפקא, מה שמאפשר עיבוד זרם נתונים בעלות עמידות נמוכה.
- חיבור מקבל: חיבורי מקבל מעבירים נתונים מנושאי קפקא למדדים משניים, כגון אלכסטיק מיינס, או למערכות בטוחות, כגון האפון, לניתוח ברירת מחדל.
הקמת שירות עם דוקר
במעבדה זו, תשתמש בדוקר.
אנא ודא שהסביבה שלך מספקת את הדרישות הבאות:
- דוקר ללינוקס, מק, או ווינדוס.
- הערה: הפעלת ScyllaDB בדוקר מומלץ רק להערכה וניסיון של ScyllaDB.
- ScyllaDB שיתופי. לקבלת הביצועים הטובים ביותר, מומלץ להתקין רגיל.
- 8 GB זיכרון RAM או יותר לשירותים של Kafka ו-ScyllaDB.
- docker-compose
- Git
התקנת ScyllaDB ואתחול טבלה
ראשית, תפעיל קבוצת ScyllaDB בן שלושה צמתים ותיצור טבלה עם הפעלת CDC.
אם עוד לא עשית זאת, הורד את הדוגמא מ-git:
git clone https://github.com/scylladb/scylla-code-samples.git cd scylla-code-samples/CDC_Kafka_Lab
זהו קובץ docker-compose שתשתמש בו. הוא מתחיל קבוצת ScyllaDB בן שלושה צמתים:
version: "3"
services:
scylla-node1:
container_name: scylla-node1
image: scylladb/scylla:5.0.0
ports:
- 9042:9042
restart: always
command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0
scylla-node2:
container_name: scylla-node2
image: scylladb/scylla:5.0.0
restart: always
command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0
scylla-node3:
container_name: scylla-node3
image: scylladb/scylla:5.0.0
restart: always
command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0
השגרת קבוצת ScyllaDB:
docker-compose -f docker-compose-scylladb.yml up -d
חכה דקה או יותר, ובדוק שקבוצת ScyllaDB עוברת ובמצב רגיל:
docker exec scylla-node1 nodetool status
לאחר מכן, תשתמש ב-cqlsh כדי לתקשר עם ScyllaDB. צור מרחב מפתחות, וטבלה עם הפעלת CDC, והכנס שורה לטבלה:
docker exec -ti scylla-node1 cqlsh
CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1};
CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20);
exit
[guy@fedora cdc_test]$ docker-compose -f docker-compose-scylladb.yml up -d
Creating scylla-node1 ... done
Creating scylla-node2 ... done
Creating scylla-node3 ... done
[guy@fedora cdc_test]$ docker exec scylla-node1 nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- כתובת עומס זכותות בעלות זהות מארח רגיר
UN 172.19.0.3 ? 256 ? 4d4eaad4-62a4-485b-9a05-61432516a737 rack1
UN 172.19.0.2 496 KB 256 ? bec834b5-b0de-4d55-b13d-a8aa6800f0b9 rack1
UN 172.19.0.4 ? 256 ? 2788324e-548a-49e2-8337-976897c61238 rack1
Note: Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless
[guy@fedora cdc_test]$ docker exec -ti scylla-node1 cqlsh
Connected to at 172.19.0.2:9042.
[cqlsh 5.0.1 | Cassandra 3.0.8 | CQL spec 3.3.1 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1};
cqlsh> CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
cqlsh> INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20);
cqlsh> exit
[guy@fedora cdc_test]$
הגדרת Confluent והתקנת חיבור
כדי להשיק שרת Kafka, תשתמש בפלטפורמת Confluent, שמספקת GUI אינטרנט ידידותי לעקיבה אחרי נושאים והודעות. פלטפורמת Confluent מספקת קובץ docker-compose.yml
להגדרת השירותים.
הערה: זו אינה הדרך שבה תשתמש ב-Apache Kafka בייצור. הדוגמא שימושית רק למטרות שיעור ופיתוח. הורד את הקובץ:
wget -O docker-compose-confluent.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.3.0-post/cp-all-in-one/docker-compose.yml
לאחר מכן, הורד את חיבור CDC של ScyllaDB:
wget -O scylla-cdc-plugin.jar https://github.com/scylladb/scylla-cdc-source-connector/releases/download/scylla-cdc-source-connector-1.0.1/scylla-cdc-source-connector-1.0.1-jar-with-dependencies.jar
הוסף את מחברת ה-CDC של ScyllaDB לספריית תוספי השירות של Confluent באמצעות תיבת מיכל Docker על ידי עריכת docker-compose-confluent.ym
l כדי להוסיף את שתי השורות כפי שמוצג להלן, תחתון הספרייה עם ספריית הקובץ scylla-cdc-plugin.jar
שלך.
image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
hostname: connect
container_name: connect
+ volumes:
+ - <directory>/scylla-cdc-plugin.jar:/usr/share/java/kafka/plugins/scylla-cdc-plugin.jar
depends_on:
- broker
- schema-registry
השגת השירותים של Confluent:
docker-compose -f docker-compose-confluent.yml up -d
חכה דקה או שנייה, ואז גש http://localhost:9021
לממשק האינטרנט של Confluent.
הוסף את ScyllaConnector באמצעות לוח הבקרה של Confluent:
הוסף את מחברת Scylla על ידי לחיצה על התוסף:
מלא את "אורחים" עם כתובת ה-IP של אחד מצמדי Scylla (אפשר לראות אותו בפלט של פקודת nodetool status) ומספר הרכבת 9042, שמקשבת לשירות ScyllaDB.
ה"מרחב" הוא המרחב המפתח שיצרת לפני כן ב-ScyllaDB.
שים לב שעלול לקחת דקה או שנייה עד שks.my_table
יופיע:
בדוק מסרים Kafka
אפשר לראות שMyScyllaCluster.ks.my_table
הוא הנושא שנוצר על ידי מחברת CDC של ScyllaDB.
עכשיו, בדוק מסרים Kafka מלוח הנושאים:
בחר את הנושא, שהוא זהה לשם המרחב המפתח ושם השולחן שיצרת ב-ScyllaDB:
מהלשונית "סקירה" אפשר לראות את פרטי הנושא. בתחתית מצוין שהנושא זה על חלקית 0.
A partition is the smallest storage unit that holds a subset of records owned by a topic. Each partition is a single log file where records are written to it in an append-only fashion. The records in the partitions are each assigned a sequential identifier called the offset, which is unique for each record within the partition. The offset is an incremental and immutable number maintained by Kafka.
כפי שאתה כבר יודע, ההודעות של ScyllaDB CDC נשלחות לנושא ks.my_table
, ומספר החלון של הנושא הוא 0. לאחר מכן, עבור אל תפריט ה"מסרים" והכנס את מספר החלון 0 לשדה "סיבת טעימה":
ניתן לראות מתוך תיעוד הודעות הנושא של Kafka כי אירוע ה-INSERT
של הטבלה של ScyllaDB והנתונים הועברו להודעות Kafka על ידי מקור ה-Scylla CDC Source Connector. לחץ על ההודעה כדי להציג את כל פרטי ההודעה:
ההודעה מכילה את שם הטבלה ושם המרחב המספרי של ScyllaDB יחד עם הזמן, כמו גם את מצב הנתונים לפני הפעולה ואחריה. מאחר ומדובר בפעולת הכנסה, הנתונים לפני ההכנסה הם null.
לאחר מכן, הכנס שורה נוספת לטבלת ScyllaDB:
docker exec -ti scylla-node1 cqlsh
INSERT INTO ks.my_table(pk, ck, v) VALUES (200, 50, 70);
כעת, ב-Kafka, המתן כמה שניות ותוכל לראות את הפרטים של ההודעה החדשה:
נקיון
ברגע שאתה גמור עם החדר הזה, תוכל לעצור ולהסיר את מיכלי ה-Docker והתמונות.
כדי להציג רשימה של כל מספרי המיכלים:
docker container ls -aq
לאחר מכן תוכל לעצור ולהסיר את המיכלים שאינם נמצאים בשימוש:
docker stop <ID_or_Name>
docker rm <ID_or_Name>
מאוחר יותר, אם תרצה להפעיל מחדש את החדר, תוכל לעקוב אחרי השלבים ולהשתמש ב-docker-compose כמו קודם.
סיכום
באמצעות מקור ה-CDC המקושר, תוסף Kafka מתאים ל-Kafka Connect, תוכל ללכוד את כל שינויי השורות בטבלה של ScyllaDB (INSERT
, UPDATE
, או DELETE
) ולהמיר את האירועים הללו להודעות Kafka. לאחר מכן תוכל לצרוך את הנתונים מיישומים אחרים או לבצע כל פעולה אחרת עם Kafka.
Source:
https://dzone.com/articles/change-data-capture-apache-kafka-and-scylladb