Nel panorama delle comunicazioni M2M (Machine to machine) di oggi, c’è un’enorme necessità di trasmettere i dati digitali da dispositivi IoT eterogenei ai vari RDBMS per un’ulteriore analisi attraverso il dashboard, innescando diversi eventi per eseguire numerose azioni. Per supportare i suddetti scenari, Apache Kafka funge da sistema nervoso centrale dove i dati possono essere acquisiti da vari dispositivi IoT e persistenti in vari tipi di repository, RDBMS, cloud storage, ecc. Inoltre, possono essere eseguiti vari tipi di pipeline di dati prima o dopo l’arrivo dei dati al topic di Kafka. Utilizzando il connettore sink JDBC di Kafka, possiamo trasmettere i dati continuamente dal topic di Kafka nei rispettivi RDBMS.
La Difficoltà Maggiore del Connettore Sink JDBC
La maggiore difficoltà con il connettore sink JDBC è che richiede la conoscenza dello schema dei dati che è già arrivato sul topic di Kafka. Il Schema Registry deve quindi essere integrato come componente separato con il cluster Kafka esistente per trasferire i dati nel RDBMS. Pertanto, per affondare i dati dal topic di Kafka al RDBMS, i produttori devono pubblicare messaggi/dati contenenti lo schema. Lo schema definisce la struttura del formato dei dati. Se lo schema non è fornito, il connettore sink JDBC non sarebbe in grado di mappare i messaggi con le colonne della tabella del database dopo aver consumato i messaggi dal topic.
Sfruttando il Schema Registry, possiamo evitare di inviare lo schema ogni volta con messaggi/payload dai produttori perché il Schema Registry memorizza (o registra) gli schemi nel topic _schemas
e li lega di conseguenza con il nome del topic configurato/menzionato come definito nel file delle proprietà del connettore sink JDBC.
Il costo della licenza potrebbe essere un ostacolo per le piccole o medie imprese che desiderano utilizzare Oracle o Confluent’s Schema Registry con Apache Kafka open source per raccogliere dati da dispositivi IoT per le loro prospettive aziendali.
In questo articolo, utilizzeremo uno snippet di codice Java per vedere come i dati possano essere trasmessi in modo continuo nel database MySQL da un topic di Apache Kafka utilizzando il connettore Sink JDBC senza Schema Registry.
Apache Kafka e Connettori JDBC
Apache Kafka non fornisce connettori JDBC preconfigurati per RDBMS specifici del fornitore, analogamente ai connettori sorgente e sink per file. Spetta a noi implementare o sviluppare il codice per RDBMS specifici attraverso l’API Connect di Apache Kafka. Tuttavia, Confluent ha sviluppato, testato e supportato il Connettore Sink JDBC, che è stato infine reso open source sotto la Confluent Community License, quindi abbiamo integrato il Connettore Sink JDBC con Apache Kafka.
Non verrà lanciata alcuna eccezione dal topic anche se inviamo uno schema errato o nessun schema affatto, poiché il topic Kafka accetta tutti i messaggi o record come array di byte in coppie chiave-valore. Prima di trasmettere l’intero messaggio al topic, il produttore deve convertire il messaggio in un array di byte utilizzando i serializzatori.
Di seguito è riportato lo schema di esempio che è legato al payload o ai dati effettivi che devono essere pubblicati dai produttori di messaggi di Apache Kafka.
Ecco anche lo snippet di codice Java per il produttore di messaggi:
public class ProducerWithSchema {
private String status = "Failed";
private String paylaodWithSchema = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"type\": \"int32\", \"optional\": false, \"field\": \"deviceId\" }, { \"type\": \"string\", \"optional\": false, \"field\": \"deviceData\" }, { \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"field\": \"generatedTime\" } ] }, \"payload\": { \"deviceId\": 3000, \"deviceData\": \"PPPPPwfgjijk\", \"generatedTime\": 1401984166000} }";
private String key = "first";
public Producer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConstants.CLIENT_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
System.setProperty("org.apache.logging.log4j.level", "INFO");
return new KafkaProducer(props);
}
public String sendMsgToTopic(){
Producer producer = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode jsonNode = objectMapper.readTree(paylaodWithSchema);
ProducerRecord record = new ProducerRecord(IKafkaConstants.TOPIC_NAME,jsonNode);
producer = this.createProducer();
producer.send(record);
producer.flush();
producer.close();
}catch (Exception e) {
System.out.println("Error in sending record");
System.out.println(e.getMessage());
}
return status;
}
public static void main(String[] args) {
// TODO Auto-generated method stub
new ProducerWithSchema().sendMsgToTopic();
}
}
Naturalmente, con l’approccio sopra descritto, ci sono un paio di colli di bottiglia, come:
- Forte accoppiamento tra messaggi e schema.
- Ogni volta che lo schema deve essere associato ai dati effettivi.
- Problemi con l’evoluzione dello schema.
- Manutenibilità del codice, ecc.
Per mitigare o risolvere i problemi sopra elencati, è stato introdotto il Schema Registry come componente separato in cui tutti gli schemi verranno distribuiti/mantenuti. Le verifiche di compatibilità sono necessarie durante l’evoluzione dello schema per garantire che il contratto produttore-consumatore sia rispettato e il Schema Registry può essere utilizzato per raggiungere questo obiettivo.
Potresti guardare il video sottostante per vedere come i dati scorrono continuamente da un topic a una tabella specifica di MySQL utilizzando il connettore sink JDBC in un cluster Apache Kafka a nodo singolo.
Conclusione
A questo punto, dovresti avere una migliore comprensione della più grande difficoltà con il connettore JDBC e l’integrazione di Apache Kafka con i connettori JDBC. Spero che ti sia piaciuto questa lettura. Per favore, metti mi piace e condividi se ritieni che questa composizione sia preziosa.
Source:
https://dzone.com/articles/streaming-data-to-rdbms-via-kafka-jdbc-sink