זרימת נתונים למערכת ניהול מבני נתונים רבת ממשיות (RDBMS) דרך קוננטור ה-JDBC של Kafka מבלי להשתמש במארׂ הסכימות (Schema Registry)

בנוף התקשורת M2M (מכונה למכונה) של היום, יש צורך גדול בזרימת הנתונים הדיגיטליים מהתקןות IoT שונות למסדי נתונים רבים לניתוח נוסף באמצעות מסך הבקרה, מה שמפעיל מאות מאורעות שונים לביצוע פעולות רבות. כדי לתמוך בתרחישים שלעיל, Apache Kafka מתנהל כמערכת העצבים המרכזית שבה ניתן לספק נתונים מהתקןות IoT רבות ולאחסן אותם במאגרים שונים של האחסון, מסדי נתונים, מאוחסון ענן וכד'. בנוסף, ניתן לבצע מערכות זרימת נתונים שונות לפני או אחרי שהנתונים מגיעים לנושא של Kafka. באמצעות ממשק הורדת JDBC של Kafka, ניתן לזרום נתונים ברציפות מנושא Kafka למסדי נתונים המתאימים.

הקושי הגדול ביותר בממשק הורדת JDBC

הקושי הגדול ביותר עם ממשק הורדת JDBC הוא שהוא דורש ידע על השמה של הנתונים שכבר נכנסו לנושא של Kafka. ריפרסטרי השמה חייבים להיות משולבים כמרכיב נפרד עם השרת Kafka הקיים כדי להעביר את הנתונים למסדי נתונים. לכן, כדי לשבת נתונים מהנושא של Kafka למסדי נתונים, היצרנים חייבים לפרסם מסרים / נתונים המכילים את השמה. השמה מגדיר את מבנה פורמט הנתונים. אם השמה אינה מוצגת, ממשק הורדת JDBC לא יוכל למפות את ההודעות עם עמודות הטבלה של המסד הנתונים לאחר שיאסוף הודעות מהנושא.

על ידי שימוש בריפרסטרי השמה, אנו יכולים להימנע משליחת השמה בכל פעם עם הודעות / משאבות מהיצרנים מכיוון שריפרסטרי השמה מאחסנים (או רושמים) שמות בנושא _schemas ומקשרים בהתאם לשם הנושא המותאם / המוזכר כפי שמוגדר בקובץ התכונות של ממשק הורדת JDBC.

עלות הרישיון עשויה להיות מכשיר עבור חברות קטנות או בינוניות שמעוניינות להשתמש ב-Oracle או סטור השם של קונפלואנט עם Apache Kafka פתוח המקור כדי לאסוף נתונים מהמכשירים של IoT עבור הנקודות העניינית שלהם.

במאמר זה, אנו הולכים להשתמש בקטע קוד Java כדי לראות איך ניתן לזרום נתונים ברציפות למסד הנתונים MySQL מנושא Apache Kafka באמצעות מחבר סינק JDBC מבלי להשתמש במסד הנתונים של סטור השם.

Apache Kafka ו-JDBC Connectors

ל-Apache Kafka אין מחברים JDBC מערכת הנתונים מסוג RDBMS ספציפיים של היצרן כמו מקורות קבצים ומחברים סינק. זה עלינו ליישם או לפתח את הקוד למערכת נתונים RDBMS ספציפית על ידי יישום ממשק ה-Connect של Apache Kafka. אך קונפלואנט פיתחה, בדקה ותמכה במחבר סינק JDBC ולבסוף פתחה מקור תחת רישיון קהילתי קונפלואנט, כך שאנו משלבים את מחבר ה-JDBC סינק עם Apache Kafka.

לא ייזרקו שגיאות מהנושא גם אם נשלח שם הארכימדי הלא נכון או אף שום שם ארכימדי כי הנושא של Kafka מקבל את כל ההודעות או הרשומות כמערכות בתים בזוגות מפתח-ערך. לפני שמשדר היצרן את כל הודעתו לנושא, עליו להמיר את הודעתו למערך בתים באמצעות ממירים.

להלן הדוגמה לשם הארכימדי המחובר למטעם הנתונים האמיתיים שצריך לפרסם מיצרן הודעות Apache Kafka.

כמו כן, הנה קטע הקוד ה-Java עבור יצרן הודעות:

 

public class ProducerWithSchema {

private String status = "Failed";
private String paylaodWithSchema = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"type\": \"int32\", \"optional\": false, \"field\": \"deviceId\" }, { \"type\": \"string\", \"optional\": false, \"field\": \"deviceData\" }, { \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"field\": \"generatedTime\" } ] }, \"payload\": { \"deviceId\": 3000, \"deviceData\": \"PPPPPwfgjijk\", \"generatedTime\": 1401984166000} }";
private String key = "first";

public Producer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConstants.CLIENT_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
System.setProperty("org.apache.logging.log4j.level", "INFO");
return new KafkaProducer(props);
}

public String sendMsgToTopic(){
Producer producer = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode jsonNode = objectMapper.readTree(paylaodWithSchema);
ProducerRecord record = new ProducerRecord(IKafkaConstants.TOPIC_NAME,jsonNode);

producer = this.createProducer();
producer.send(record);
producer.flush();
producer.close();

}catch (Exception e) {
System.out.println("Error in sending record");
System.out.println(e.getMessage());
}

return status;

}

public static void main(String[] args) {
// TODO Auto-generated method stub
new ProducerWithSchema().sendMsgToTopic();
}

}

כמובן, עם הגישה הנ"ל, ישנם כמה נקודות סתירה, כגון:

  • קשירה הדוקה בין הודעות לשם.
  • בכל פעם יש לחבר את השם עם הנתונים האמיתיים.
  • בעיות עם התפתחות השם.
  • תחזוקת הקוד וכו'.

כדי לפצל או לפתור את הבעיות הנ"ל, הוצע ריפרסטור השם כרכיב נפרד שבו כל השמות יופקדו/יתומקדמים. בדיקת תאימות היא הכרחית במהלך התפתחות השם כדי לוודא שהחוזה בין היצרן לצרכן נשמר וניתן להשתמש בריפרסטור השם לשם כך.

אפשר לצפות בסרטון הבא כדי לראות איך נתונים זורמים ברציפות מנושא לטבלה ספציפית של MySQL באמצעות מתורגמן סינק של JDBC על ערימת Apache Kafka בצומת יחיד.

מסקנה

עד כה, אנא להיות יותר מודע לבעיה הגדולה ביותר עם מתורגמן JDBC ולארוג Apache Kafka עם מתורגמנים JDBC. אני מקווה שנהניתם לקרוא את זה. אנא הערכו ושתפו אם אתם מרגישים שהיצירה זה שווה.

Source:
https://dzone.com/articles/streaming-data-to-rdbms-via-kafka-jdbc-sink