Transmitir Datos a RDBMS a través del Conector de Suministro JDBC de Kafka sin Utilizar el Registro de Esquemas

En el panorama actual de comunicaciones M2M (Máquina a máquina), existe una gran necesidad de transmitir datos digitales desde dispositivos IoT heterogéneos a diversos RDBMS para su análisis adicional a través del panel, activando diferentes eventos para realizar numerosas acciones. Para apoyar los escenarios mencionados, Apache Kafka actúa como un sistema nervioso central donde los datos pueden ser ingeridos desde varios dispositivos IoT y persistidos en varios tipos de repositorios, RDBMS, almacenamiento en la nube, etc. Además, se pueden ejecutar varios tipos de canalizaciones de datos antes o después de que los datos arriben al tema de Kafka. Utilizando el conector de sumidero JDBC de Kafka, podemos transmitir datos continuamente desde el tema de Kafka a los respectivos RDBMS.

La Mayor Dificultad del Conector de Sumidero JDBC

La mayor dificultad con el conector de sumidero JDBC es que requiere conocimiento del esquema de datos que ya ha llegado al tema de Kafka. Por lo tanto, Schema Registry debe integrarse como un componente separado con el clúster de Kafka existente para transferir los datos al RDBMS. Por lo tanto, para hundir datos desde el tema de Kafka al RDBMS, los productores deben publicar mensajes/datos que contengan el esquema. El esquema define la estructura del formato de datos. Si no se proporciona el esquema, el conector de sumidero JDBC no podría mapear los mensajes con las columnas de la tabla de base de datos después de consumir mensajes del tema.

Al aprovechar Schema Registry, podemos evitar enviar el esquema cada vez con mensajes/cargas útiles desde los productores, ya que Schema Registry almacena (o registra) esquemas en el tema _schemas y se vincula en consecuencia con el nombre de tema configurado/mencionado como se define en el archivo de propiedades del conector de sumidero JDBC.

El costo de licencia podría ser un obstáculo para las empresas pequeñas o medianas que desean utilizar Oracle o Confluent’s Schema Registry con Apache Kafka de código abierto para recopilar datos de dispositivos IoT desde su perspectiva de negocio.

En este artículo, vamos a utilizar un fragmento de código Java para ver cómo se puede transmitir continuamente datos a una base de datos MySQL desde un tema de Apache Kafka utilizando el conector JDBC Sink sin Schema Registry.

Apache Kafka y conectores JDBC

Apache Kafka no incluye conectores JDBC para RDBMS específicos del proveedor, similares a los conectores de origen y sumidero de archivos. Es responsabilidad nuestra implementar o desarrollar el código para RDBMS específicos mediante la implementación de la API de Connect de Apache Kafka. Pero Confluent ha desarrollado, probado y apoyado el conector JDBC Sink, y finalmente lo ha abierto bajo la Licencia Comunitaria de Confluent, por lo que hemos integrado el conector JDBC Sink con Apache Kafka.

No se lanzará ninguna excepción desde el tema incluso si enviamos un esquema incorrecto o ningún esquema en absoluto, porque el tema de Kafka acepta todos los mensajes o registros como matrices de bytes en pares clave-valor. Antes de transmitir todo el mensaje al tema, el productor debe convertir el mensaje en una matriz de bytes utilizando serializadores.

A continuación, se muestra el esquema de muestra que está vinculado con la carga útil o los datos reales que deben publicarse desde los productores de mensajes de Apache Kafka.

Además, aquí está el fragmento de código Java para el productor de mensajes:

 

public class ProducerWithSchema {

private String status = "Failed";
private String paylaodWithSchema = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"type\": \"int32\", \"optional\": false, \"field\": \"deviceId\" }, { \"type\": \"string\", \"optional\": false, \"field\": \"deviceData\" }, { \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"field\": \"generatedTime\" } ] }, \"payload\": { \"deviceId\": 3000, \"deviceData\": \"PPPPPwfgjijk\", \"generatedTime\": 1401984166000} }";
private String key = "first";

public Producer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConstants.CLIENT_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
System.setProperty("org.apache.logging.log4j.level", "INFO");
return new KafkaProducer(props);
}

public String sendMsgToTopic(){
Producer producer = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode jsonNode = objectMapper.readTree(paylaodWithSchema);
ProducerRecord record = new ProducerRecord(IKafkaConstants.TOPIC_NAME,jsonNode);

producer = this.createProducer();
producer.send(record);
producer.flush();
producer.close();

}catch (Exception e) {
System.out.println("Error in sending record");
System.out.println(e.getMessage());
}

return status;

}

public static void main(String[] args) {
// TODO Método stub generado automáticamente
new ProducerWithSchema().sendMsgToTopic();
}

}

Por supuesto, con el enfoque anterior, hay un par de cuellos de botella, como:

  • Acoplamiento estrecho entre mensajes y esquema.
  • Cada vez que el esquema debe unirse con los datos reales.
  • Problemas con la evolución del esquema.
  • Mantenibilidad del código, etc.

Para mitigar o resolver los problemas anteriores, se ha introducido el Schema Registry como un componente separado donde todos los esquemas se implementarían/mantendrían. Las verificaciones de compatibilidad son necesarias durante la evolución del esquema para asegurarse de que el contrato productor-consumidor se respete y el Schema Registry puede ser utilizado para lograr esto.

Podrías ver el siguiente video para ver cómo los datos fluyen continuamente de un tema a una tabla específica de MySQL utilizando el conector de sumidero JDBC en un clúster Apache Kafka de un solo nodo.

Conclusión

A estas alturas, deberías tener una mejor comprensión de la mayor dificultad con el conector JDBC y el empaquetado de Apache Kafka con conectores JDBC. Espero que hayas disfrutado de esta lectura. Por favor, dale me gusta y comparte si crees que esta composición es valiosa.

Source:
https://dzone.com/articles/streaming-data-to-rdbms-via-kafka-jdbc-sink