Streaming-Daten in RDBMS über den Kafka JDBC Sink Connector ohne Nutzung des Schema Registry

Im heutigen M2M (Machine to machine)-Kommunikationsumfeld besteht ein großer Bedarf an der Streamung von digitalen Daten von heterogenen IoT-Geräten zu verschiedenen RDBMS für weitere Analyse über das Dashboard, wodurch verschiedene Ereignisse ausgelöst werden, um zahlreiche Aktionen durchzuführen. Um die oben genannten Szenarien zu unterstützen, fungiert Apache Kafka als zentrales Nervensystem, in dem Daten von verschiedenen IoT-Geräten aufgenommen und in verschiedene Arten von Repositorys, RDBMS, Cloud-Speicher usw. persistiert werden können. Darüber hinaus können verschiedene Arten von Datenpipelines vor oder nach der Ankunft der Daten im Kafka-Thema ausgeführt werden. Mit dem Kafka JDBC Sink Connector können wir Daten kontinuierlich aus dem Kafka-Thema in das entsprechende RDBMS streamen.

Die größte Schwierigkeit des JDBC Sink Connectors

Die größte Schwierigkeit beim JDBC Sink Connector besteht darin, dass er Kenntnisse über das Schema der Daten erfordert, die bereits im Kafka-Thema gelandet sind. Der Schema Registry muss daher als separates Komponente mit dem bestehenden Kafka-Cluster integriert werden, um die Daten in das RDBMS zu übertragen. Daher müssen die Produzenten, um Daten vom Kafka-Thema in das RDBMS zu sinken, Nachrichten/Daten mit dem Schema veröffentlichen. Das Schema definiert die Struktur des Datenformats. Wenn das Schema nicht bereitgestellt wird, kann der JDBC Sink Connector die Nachrichten nicht mit den Spalten der Datenbanktabelle nach dem Verbrauch von Nachrichten aus dem Thema zusammenführen.

Durch die Nutzung des Schema Registry können wir die Übermittlung des Schemas bei jeder Nachricht/Payload von den Produzenten vermeiden, da der Schema Registry Schemas im _schemas-Thema speichert (oder registriert) und entsprechend mit dem konfigurierten/genannten Thema-Namen als in der Eigenschaftsdatei des JDBC Sink Connectors definiert gebunden wird.

Die Lizenzierungskosten könnten ein Hindernis für kleine oder mittelgroße Unternehmen sein, die Oracle oder Confluent’s Schema Registry mit dem Open-Source-Apache Kafka verwenden möchten, um IoT-Gerätedaten für ihre Geschäftsperspektiven zu erfassen.

In diesem Artikel werden wir einen Java-Codeausschnitt verwenden, um zu sehen, wie Daten kontinuierlich in eine MySQL-Datenbank aus einem Apache Kafka-Thema übertragen werden können, indem der JDBC Sink Connector ohne Schema Registry verwendet wird.

Apache Kafka und JDBC-Connectors

Apache Kafka bündelt die JDBC-Connectors für herstellerspezifische RDBMS nicht, ähnlich wie Dateisource- und Sink-Connectors. Es liegt in unserer Verantwortung, den Code für ein spezifisches RDBMS durch Implementierung der Apache Kafka Connect API zu implementieren oder zu entwickeln. Aber Confluent hat den JDBC Sink Connector entwickelt, getestet und unterstützt und schließlich unter der Confluent Community Lizenz als Open Source freigegeben, also haben wir den JDBC Sink Connector mit Apache Kafka integriert.

Es wird keine Ausnahme aus dem Thema geworfen, selbst wenn wir das falsche Schema oder überhaupt kein Schema senden, weil das Kafka-Thema alle Nachrichten oder Datensätze als Byte-Arrays in Schlüssel-Wert-Paaren akzeptiert. Bevor die gesamte Nachricht an das Thema übertragen wird, muss der Produzent die Nachricht mithilfe von Serialisierern in ein Byte-Array konvertieren.

Unten ist das Beispielschema, das mit dem Payload oder der tatsächlichen Daten verbunden ist, die von den Apache Kafka-Nachrichtenproduzenten veröffentlicht werden sollen.

Hier ist auch der Java-Codeausschnitt für den Nachrichtenproduzenten:

 

public class ProducerWithSchema {

private String status = "Failed";
private String paylaodWithSchema = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"type\": \"int32\", \"optional\": false, \"field\": \"deviceId\" }, { \"type\": \"string\", \"optional\": false, \"field\": \"deviceData\" }, { \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"field\": \"generatedTime\" } ] }, \"payload\": { \"deviceId\": 3000, \"deviceData\": \"PPPPPwfgjijk\", \"generatedTime\": 1401984166000} }";
private String key = "first";

public Producer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConstants.CLIENT_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
System.setProperty("org.apache.logging.log4j.level", "INFO");
return new KafkaProducer(props);
}

public String sendMsgToTopic(){
Producer producer = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode jsonNode = objectMapper.readTree(paylaodWithSchema);
ProducerRecord record = new ProducerRecord(IKafkaConstants.TOPIC_NAME,jsonNode);

producer = this.createProducer();
producer.send(record);
producer.flush();
producer.close();

}catch (Exception e) {
System.out.println("Error in sending record");
System.out.println(e.getMessage());
}

return status;

}

public static void main(String[] args) {
// TODO Auto-generated method stub
new ProducerWithSchema().sendMsgToTopic();
}

}

Natürlich gibt es mit dem oben beschriebenen Ansatz einige Engpässe, wie:

  • Enge Kopplung zwischen Nachrichten und Schema.
  • Bei jeder Nachricht muss das Schema mit dem tatsächlichen Dateninhalt kombiniert werden.
  • Probleme bei der Schemevolatilität.
  • Pflege der Codebasis usw.

Um die oben genannten Probleme zu mildern oder zu lösen, wurde der Schema-Registry als separates Komponente eingeführt, in der alle Schemata bereitgestellt und gewartet werden. Kompatibilitätsprüfungen sind bei der Schemevolatilität notwendig, um sicherzustellen, dass der Produzent-Verbraucher-Vertrag eingehalten wird, und die Schema-Registry kann dazu genutzt werden.

Sie können das unten stehende Video ansehen, um zu sehen, wie Daten kontinuierlich von einem Thema in eine spezifische MySQL-Tabelle mithilfe des JDBC-Sink-Connectors in einer Einzelknoten-Apache-Kafka-Cluster fließen.

Schlussfolgerung

Mittlerweile sollten Sie einen besseren Verständnis davon haben, was die größte Herausforderung beim JDBC-Connector und dem Zusammenpacken von Apache Kafka mit JDBC-Connectors ist. Ich hoffe, Sie fanden diesen Artikel informativ. Bitte liken und teilen Sie, wenn Sie denken, dass diese Zusammenstellung wertvoll ist.

Source:
https://dzone.com/articles/streaming-data-to-rdbms-via-kafka-jdbc-sink