חלק בשני: למדוד שינויי נתונים (CDC) ב-MongoDB באמצעות Debezium ו-Memphis.dev

זהו החלק השני מסדרת פוסטים בלוג על בניית מערכת מודרנית מבוססת אירועים באמצעות Memphis.dev.

הפוסט האחרון שלנו בבלוג הציג יישום עצום ללכידת אירועי לכידת נתונים משתנה (CDC) ממסד נתונים PostgreSQL באמצעות Debezium Server ו-Memphis.dev. על ידי החלפת Apache Kafka ב-Memphis.dev, הפתרון צמצם באופן משמעותי את המשאבים התפעוליים והעלויות הנלוות – חוסך כסף ומשחרר מפתחים להתמקד בבניית תכונות חדשות.

PostgreSQL הוא המסד נתונים הנפוץ ביותר בלבד, עם זאת. Debezium מספק חיבורים למסדי נתונים שונים, כולל המסד הלא רלוונטי של נתוני המסד MongoDB. MongoDB הוא פופולרי בקרב מפתחים, במיוחד אלו העובדים בשפות תכנות דינאמיות, מאחר שהוא מקפיד על הפרעת ההתנגשות בין אובייקט לרלוונטיות. מפתחים יכולים לאחסן, לשאול ולעדכן אובייקטים ישירות במסד הנתונים.

בפוסט הבלוג הזה, אנו מדגים כיצד להתאים את פתרון ה-CDC ל-MongoDB.

סקירת הפתרון

כאן, אנו מתארים את הארכיטקטורה של הפתרון העצום להבאת אירועי לכידת נתונים משתנה עם Memphis.dev. הארכיטקטורה לא השתנתה מ-הפוסט הקודם שלנו בבלוג למעט החלפת PostgreSQL ב-MongoDB.

A Todo Item generator script writes randomly generated records to MongoDB. Debezium Server receives CDC events from MongoDB and forwards them to the Memphis REST gateway through the HTTP client sink. The Memphis REST gateway adds the messages to a station in Memphis.dev. Lastly, a consumer script polls Memphis. dev for new messages and prints them to the console.

  • מחולל פריטי עבודה: מכניס פריט עבודה אקראי שנוצר כל 0.5 שניות לקולקציית MongoDB. כל פריט עבודה כולל תיאור, תאריך ושעה של יצירה, תאריך עבודה אופציונלי וסטטוס של סיום.
  • MongoDB: מוגדר עם מסד נתונים יחידה המכילה קולקציה יחידה (todo_items).
  • שרת Debezium: מוצע של שרת Debezium המוגדר עם מקור MongoDB וקשרי כניסה של HTTP Client.
  • שער REST של Memphis.dev: משתמש בהתאמה מוכנה.
  • Memphis.dev: מוגדר עם תחנה יחידה (todo-cdc-events) ומשתמש יחיד (todocdcservice).
  • לקוח הדפסה: תכנית המשתמשת במספרה של Memphis.dev בשפת Python לצריכת הודעות והדפסתם לתוך התוכנית.

תחילת העבודה

מדריך היישום זמין במדריך mongodb-debezium-cdc-example של המאגר של פתרונות הדוגמא Memphis. Docker Compose יהיה נדרש להפעלתו.

הפעלת היישום

לבנות את התמונות של דוקר עבור שרת Debezium, הצרכן המדפיס וההגדרה של המסד הנתונים (יצירת טבלה ומשתמש).

כרגע, היישום תלוי בגרסה מוקדמת של שרת Debezium לתמיכה באימות JWT. תמונת דוקר תיבנה ישירות מהענף הראשי של המאגרים של Debezium ושרת Debezium. שים לב ששלב זה יכול לקחת זמן רב (~20 דקות) לרוץ. כאשר שרת Debezium 2.3.0 יופק, נחליף לשימוש בתמונת דוקר העליונה.

שלב 1: לבנות את התמונות

Shell

 

$ docker compose build --pull --no-cache

שלב 2: להתחיל את ברוקר Memphis.dev ושער REST

התחל את ברוקר Memphis.dev ואת שער REST. שים לב ששירות Memphis-rest-gateway תלוי בשירות ברוקר Memphis, ולכן שירות הברוקר יתחיל גם כן.

Shell

 

$ docker compose up -d memphis-rest-gateway
Shell

 

 
[+] Running 4/4
 ⠿ Network mongodb-debezium-cdc-example_default                   Created                                                        0.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1      Healthy                                                        6.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1               Healthy                                                       16.8s
 ⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1  Started

שלב 3: ליצור תחנה ומשתמש מתאים ב-Memphis.dev

הודעות מועברות ל"תחנות" ב-Memphis.dev; הן שקולות ל"נושאים" המשמשים על ידי בורקרים של הודעות. הפניית דפדפן ל-localhost:9000. לחץ על הקישור "לח到 עם ראש" בתחתית הדף.

התחבר עם ראש (שם משתמש) ו-memphis (סיסמה). 

עקוב אחרי הוויזרד ליצירת תחנה בשם todo-cdc-events.

צור משתמש בשם todocdcservice עם אותו ערך לסיסמה.

לחץ על "הבא" עד שהושלם החינמי:

לחץ על "עבור לסקירת התחנה" כדי לעבור לדף סקירת התחנה.

שלב 4: התחל את צורך ההדפסה

השתמשנו ב-Memphis.dev Python SDK כדי ליצור תסריט צורך שמחפש את התחנה todo-cdc-events ומדפיס את ההודעות לתוך התפריט.

Shell

 

$ docker compose up -d printing-consumer
Shell

 

 
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container printing-consumer                                Started     

שלב 5: התחלת והגדרת MongoDB

כדי ללכוד שינויים, חייבת להיות מופעלת פונקציית ה-שכפול של MongoDB. יש כמה שלבים:

  • יש לקבוע את שם קבוצת השכפול. זה יכול להיעשות על ידי העברת שם של קבוצת שכפול בשורת הפקודה או בקובץ ההגדרות. ב-קובץ Docker Compose, אנו מפעילים את MongoDB עם טיעון שורת הפקודה –replSet rs0 כדי לקבוע את שם קבוצת השכפול.
  • כאשר משתמשים בשידור ומופעלת האפשרות להזדהות, על כל מופע תיקונית לכלול קובץ מפתח משותף. יצרנו קובץ מפתח בהתאם להוראות בתיעוד של MongoDB. לאחר מכן בנינו תמונה המשתרעת על פני התמונה הרשמית של MongoDB באמצעות הכללת הקובץ המפתח.
  • הקבוצת השידור צריכה להתארגן פעם אחת שמופעל MongoDB. אנו משתמשים בתסריט המכוון על המופע בהתחלה. התסריט קורא לreplSetInitiate עם רשימת כתובות ה-IP והיציאות של כל מופע MongoDB בקבוצת השידור. ביצוע הפקודה גורם למופעי MongoDB לתקשר זה עם זה ולבחור מנהיג.

באופן כללי, קבוצות שידור משמשות להגדלת האמינות (זמינות גבוהה). רוב התיעוד שתמצאו מתאר כיצד להגדיר שידור עם מספר מופעי MongoDB. במקרה שלנו, חיבור MongoDB של Debezium משתמש בתכונת השידור כדי ללכוד אירועי שינוי בנתונים. למרות שאנו עוברים את השלבים להגדרת קבוצת שידור, אנו משתמשים רק במופע אחד של MongoDB.

התכנית גנרטור פריטי עבודה יוצרת פריט עבודה חדש כל חצי שנייה. ערכי השדות מוגררים באופן אקראי. הפריטים מוספים לקולקציית MongoDB הנקראת "todo_items".

בקובץ Docker Compose, התכנית גנרטור פריטי עבודה מותאמת להיתלות במצב בריא של מצביע MongoDB והשלמת תכנית ההקמה של מסד הנתונים. על ידי התחלת התכנית גנרטור פריטי עבודה, Docker Compose יתחיל גם את MongoDB וירוץ את תכנית ההקמה של מסד הנתונים.

Shell

 

$ docker compose up -d todo-generator
Shell

 

[+] Running 3/3
 ⠿ Container mongodb                 Healthy                                                                                     8.4s
 ⠿ Container mongodb-database-setup  Exited                                                                                      8.8s
 ⠿ Container mongodb-todo-generator  Started   

שלב 6: התחלת שרת Debezium

השירות האחרון שצריך להתחיל הוא שרת Debezium. השרת מותאם עם מקשר מקור עבור MongoDB ומקשר צינור של HTTP Client דרך קובץ תכונות Java:

Shell

 

 
debezium.sink.type=http
debezium.sink.http.url=http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
debezium.sink.http.time-out.ms=500
debezium.sink.http.retries=3
debezium.sink.http.authentication.type=jwt
debezium.sink.http.authentication.jwt.username=todocdcservice
debezium.sink.http.authentication.jwt.password=todocdcservice
debezium.sink.http.authentication.jwt.url=http://memphis-rest-gateway:4444/
debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector
debezium.source.mongodb.connection.string=mongodb://db
debezium.source.mongodb.user=root
debezium.source.mongodb.password=mongodb
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.topic.prefix=tutorial
debezium.format.key=json
debezium.format.value=json
quarkus.log.console.json=false

רוב האפשרויות ברורות מעצמן. כתובת צינור HTTP של הלקוח שווה הסבר מפורט. שער Memphis.dev של REST מצפה לקבל בקשות POST עם מסר בפורמט הבא:

<קוד>/stations/{station}/produce/{quantity}

המיקום {station} הממוקם מוחלף עם שם התחנה לשליחת הודעה אליה. המיקום {quantity} הממוקם מוחלף עם הערך יחיד (עבור הודעה יחידה) או באטך (עבור הודעות מרובות).

המסר(ים) מועבר(ים) כתוכן של בקשת POST. שער REST תומך בשלושה פורמטים של הודעות (טקסט פשוט, JSON או פרוטוקול בלוק). הערך (text/, application/json, application/x-protobuf) של שדה כותרת content-type קובע איך מתובן התוכן.

לקבלת בקשות REST שמתואמות לדפוסים אלה, קונסומציה של לקוח HTTP של שרת Debezium מייצרת. בקשות משתמשות בפעולת POST; כל בקשה מכילה הודעה מקודדת ב-JSON כתוכן, והכותרת content-type מוגדרת ל-application/JSON. אנו משתמשות ב-todo-CDC-events כשם התחנה ובערך של כמות יחידה בכתובת הסנכרון לניתוח הודעות ולהצגת איך שער REST צריך להבין את הבקשות:

http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single

התכונה debezium.sink.http.authentication.type=jwt מציינת שלקוח HTTP צריך להשתמש באימות JWT. שדות השם משתמש והסיסמה הם ברורים, אך התכונה debezium.sink.http.authentication.jwt.The URL ראוי להסבר מסוים. זכות מקודמת מתקבלת באמצעות נקודת קצה /auth/authenticate, והאימות מתעדכן באמצעות הנקודה הנפרדת /auth/refreshToken. האימות JWT בלקוח HTTP מוסיף את הנקודת הקצה המתאימה לכתובת הבסיס הנתונה.

שרת Debezium יכול להתחיל עם הפקודה הבאה:

Shell

 

$ docker compose up -d debezium-server
Shell

 

[+] Running 5/5
 ⠿ Container mongodb                                              Healthy                                                        1.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1      Healthy                                                        0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1               Healthy                                                        1.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1  Running                                                        0.0s
 ⠿ Container debezium-server                                      Started

שלב 7: אימות שהמערכת עובדת

בדוק את מסך סקירת התחנה todo-cdc-events בממדפסת Memphis.dev כדי לאמת שהיצרן והצרכן מחוברים, והודעות נמסרות.

וכן, הדפסת הלוגים עבור המכונית המוצרת של ההדפסה:

Shell

 

$ docker logs --tail 2 printing-consumer

הודעה:

Shell

 

bytearray(b'{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"},{"type":"int64","optional":true,"field":"wallTime"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"tutorial.todo_application.todo_items.Envelope"},"payload":{"before":null,"after":"{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ec\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402475},\\"due_date\\": {\\"$date\\": 1684266602475},\\"description\\": \\"GMZVMKXVKOWIOEAVRYWR\\",\\"completed\\": false}","updateDescription":null,"source":{"version":"2.3.0-SNAPSHOT","connector":"mongodb","name":"tutorial","ts_ms":1684007402000,"snapshot":"false","db":"todo_application","sequence":null,"rs":"rs0","collection":"todo_items","ord":1,"lsid":null,"txnNumber":null,"wallTime":1684007402476},"op":"c","ts_ms":1684007402478,"transaction":null}}')

פורמט הודעות ה-CDC

ההודעות הנכנסות מעוצבות כ-JSON. להודעות יש שני שדות ראשיים (שם התבנית והטבלה). התבנית מתארת את תבנית הרשומה (שמות השדות וסוגיהם), והטבלה מתארת את השינוי ברשומה. אובייקט הטבלה עצמו מכיל שני שדות (לפני ואחרי) המציינים את ערכי הרשומה לפני ואחרי השינוי.

עבור MongoDB, שרת Debezium מקודד את הרשומה כמחרוזת JSON מועברת:

Shell

 

 
{
"before" : null,

"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}"
}

זה יהיה בעל השלכות לעיבוד ההודעות במובאים, אשר נתאר בפוסט בלוג עתידי בסדרה זו. 

מזל טוב! כעת יש לך דוגמה מעשית לאיך ללכוד אירועי שינוי בנתונים ממסד נתונים MongoDB באמצעות שרת Debezium ולהעביר את האירועים ל-Memphis.dev לעיבוד מובא.

חלק 3 יצא בקרוב!

Source:
https://dzone.com/articles/part-2-change-data-capture-cdc-for-mongodb-with-de