في مشهد الاتصالات M2M (من آلة إلى آلة) اليوم، هناك متطلبات كبيرة لتدفق البيانات الرقمية من أجهزة الإنترنت المتنوعة (IoT) إلى مختلف قواعد البيانات العلائقية (RDBMS) للتحليل المستمر عبر اللوحة الشريطية، وتشغيل أحداث مختلفة لإجراء إجراءات عديدة. لدعم السيناريوهات المذكورة، يعمل أباتشي كافكا مثل الجهاز العصبي المركزي حيث يمكن إدخال البيانات من أجهزة IoT متنوعة والاحتفاظ بها في مختلف أنواع المستودعات، مثل قواعد البيانات العلائقية (RDBMS)، وتخزين السحابة، وغيرها. علاوة على ذلك، يمكن تنفيذ أنواع مختلفة من خطوط البيانات قبل أو بعد وصول البيانات إلى موضوعات كافكا. باستخدام موصل الـ JDBC لكافكا، يمكننا تدفق البيانات باستمرار من موضوعات كافكا إلى قواعد البيانات العلائقية المقابلة.
أصعب مشكلة في موصل JDBC Sink
المشكلة الأكبر في موصل JDBC sink هي أنه يتطلب معرفة مخطط البيانات الذي وصل بالفعل إلى موضوع كافكا. يجب دمج Schema Registry كمكون منفصل مع الكومة الحالية من كافكا من أجل نقل البيانات إلى قواعد البيانات العلائقية. ولذا، لنقل البيانات من موضوع كافكا إلى قاعدة البيانات العلائقية، يجب على المنتجين نشر الرسائل/البيانات التي تحتوي على المخطط. يحدد المخطط بنية تنسيق البيانات. إذا لم يتم توفير المخطط، لن يتمكن موصل JDBC sink من تعيين الرسائل بعد استهلاكها من الموضوع.
باستغلال Schema Registry، يمكننا تجنب إرسال المخطط في كل مرة مع الرسائل/البروتوكولات من المنتجين لأن Schema Registry يخزن (أو يسجل) المخططات في موضوع _schemas
ويربطها بشكل صحيح مع اسم الموضوع المصاحب/الذي تم ذكره في ملف خصائص موصل JDBC sink.
تكلفة الترخيص قد تكون العقبة لدى الشركات الصغيرة أو المتوسطة التي ترغب في استخدام Oracle أو موجّه Confluent للنموذج مع موقع Apache Kafka المفتوح المصدر لجمع بيانات أجهزة الإنترنت من الأشياء (IoT) من أجل منظورها التجاري.
في هذا المقال، سنستخدم مقتطف للكود الإصدار العربي لفهم كيف يمكن تيار البيانات باستمرار إلى قاعدة بيانات MySQL من موقع Apache Kafka موضوع المواضيع باستخدام موصل مصد JDBC بدون موجّه النموذج.
Apache Kafka وموصلات JDBC
لم يكن Apache Kafka يحزم موصلات JDBC لقواعد البيانات المستندة على العلامة التجارية المحددة مثل موصلات مصد ومصدر الملف. من واجبنا تنفيذ أو تطوير الكود لقواعد البيانات المحددة من العلامة التجارية عن طريق تنفيذ واجهة توصيل Apache Kafka. ولكن Confluent قد طورت واختبرت ودعمت موصل مصد JDBC وفي النهاية المفتوحة المصدر تحت ترخيص المجتمع Confluent، لذا قمنا بدمج موصل مصد JDBC مع Apache Kafka.
لن يتم طرح أي استثناء من الموضوع حتى لو أرسلنا النموذج غير الصحيح أو عدم وجود نموذج على الإطلاق لأن الموضوع Kafka يقبل جميع الرسائل أو السجلات كصفائف بايت في أزواج مفتاح-قيمة. قبل نقل الرسالة بالكامل إلى الموضوع، يجب على المنتج لتحويل الرسالة إلى صفيف بايت باستخدام المعالجين.
الآتي هو النموذج العينة المرتبطة بالحمل أو البيانات الفعلية التي يجب نشرها من منتجات رسائل Apache Kafka.
أيضاً، إليك مقتطف الكود الإصدار العربي لمنتج الرسالة:
public class ProducerWithSchema {
private String status = "Failed";
private String paylaodWithSchema = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"type\": \"int32\", \"optional\": false, \"field\": \"deviceId\" }, { \"type\": \"string\", \"optional\": false, \"field\": \"deviceData\" }, { \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"field\": \"generatedTime\" } ] }, \"payload\": { \"deviceId\": 3000, \"deviceData\": \"PPPPPwfgjijk\", \"generatedTime\": 1401984166000} }";
private String key = "first";
public Producer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConstants.CLIENT_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
System.setProperty("org.apache.logging.log4j.level", "INFO");
return new KafkaProducer(props);
}
public String sendMsgToTopic(){
Producer producer = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode jsonNode = objectMapper.readTree(paylaodWithSchema);
ProducerRecord record = new ProducerRecord(IKafkaConstants.TOPIC_NAME,jsonNode);
producer = this.createProducer();
producer.send(record);
producer.flush();
producer.close();
}catch (Exception e) {
System.out.println("Error in sending record");
System.out.println(e.getMessage());
}
return status;
}
public static void main(String[] args) {
// TODO Auto-generated method stub
new ProducerWithSchema().sendMsgToTopic();
}
}
بالطبع، مع الأسلوب المذكور أعلاه، هناك عدد قليل من العقبات، مثل:
- الترابط الوثيق بين الرسائل والشيمات.
- في كل مرة يجب توصيل الشيمات مع البيانات الفعلية.
- مشاكل مع تطور الشيمات.
- الصيانة البرمجية، إلخ.
للتخفيف من أو حل المشكلات المذكورة أعلاه، تم تقديم ملجأ الشيمات كمكون منفصل حيث يتم نشر/الاحتفاظ بجميع الشيمات. من الضروري إجراء فحوصات التوافق خلال تطور الشيمات للتأكد من أن عقدة المنتج-المستهلك معلقة ويمكن استخدام ملجأ الشيمات لتحقيق ذلك.
يمكنك مشاهدة الفيديو أدناه لمعرفة كيفية تدفق البيانات باستمرار من الموضوع إلى جدول محدد في MySQL باستخدام عاكس موصل JDBC على مجمع Apache Kafka من عقدة واحدة.
الخاتمة
الآن، يجب أن يكون لديك فهم أفضل لأصعب صعوبة مع موصل JDBC وربط Apache Kafka مع موصلات JDBC. آمل أن تكون قد استمتعت بهذه القراءة. يرجى الإعجاب والمشاركة إذا كنت تشعر أن هذا التكوين ذو قيمة.
Source:
https://dzone.com/articles/streaming-data-to-rdbms-via-kafka-jdbc-sink