Dans le paysage actuel des communications M2M (Machine to machine), il existe une grande exigence pour le streaming des données numériques provenant de dispositifs IoT hétérogènes vers les différents SGBD pour une analyse ultérieure via le tableau de bord, déclenchant différents événements pour effectuer de nombreuses actions. Pour soutenir ces scénarios, Apache Kafka joue le rôle d’un système nerveux central où les données peuvent être ingérées à partir de divers dispositifs IoT et persistées dans différents types de référentiel, SGBD, stockage cloud, etc. De plus, différents types de pipelines de données peuvent être exécutés avant ou après l’arrivée des données dans le sujet Kafka. En utilisant le connecteur de récepteur JDBC Kafka, nous pouvons diffuser des données en continu à partir du sujet Kafka vers les SGBD respectifs.
La Plus Grande Difficulté du Connecteur de Récepteur JDBC
La plus grande difficulté avec le connecteur de récepteur JDBC est qu’il nécessite la connaissance du schéma des données qui sont déjà arrivées sur le sujet Kafka. Le Schema Registry doit donc être intégré en tant que composant séparé avec le cluster Kafka existant afin de transférer les données vers le SGBD. Par conséquent, pour envoyer des données du sujet Kafka vers le SGBD, les producteurs doivent publier des messages/données contenant le schéma. Le schéma définit la structure du format de données. Si le schéma n’est pas fourni, le connecteur de récepteur JDBC ne pourrait pas mapper les messages avec les colonnes de la table de la base de données après avoir consommé les messages du sujet.
En tirant parti du Schema Registry, nous pouvons éviter d’envoyer le schéma à chaque fois avec les messages/payloads des producteurs car le Schema Registry stocke (ou enregistre) les schémas dans le sujet _schemas
et se lie en conséquence avec le nom de sujet configuré/mentionné tel qu’il est défini dans le fichier de propriétés du connecteur de récepteur JDBC.
Le coût de licence pourrait constituer un obstacle pour les petites et moyennes entreprises souhaitant utiliser Oracle ou Confluent’s Schema Registry avec Apache Kafka open source pour collecter des données de dispositifs IoT à des fins commerciales.
Dans cet article, nous allons utiliser un extrait de code Java pour voir comment les données peuvent être diffusées en continu dans une base de données MySQL à partir d’un sujet Apache Kafka en utilisant le connecteur de récepteur JDBC sans Schema Registry.
Apache Kafka et Connecteurs JDBC
Apache Kafka n’inclut pas les connecteurs JDBC pour les SGBD spécifiques aux fournisseurs, de manière similaire aux connecteurs source et récepteur de fichiers. Il revient à nous de mettre en œuvre ou de développer le code pour un SGBD spécifique en mettant en œuvre l’API Connect d’Apache Kafka. Mais Confluent a développé, testé et soutenu le connecteur de récepteur JDBC, qui a finalement été open-sourcé sous licence Confluent Community, nous avons donc intégré le connecteur de récepteur JDBC avec Apache Kafka.
Il n’y aura aucune exception lancée à partir du sujet même si nous envoyons un schéma incorrect ou aucun schéma du tout, car le sujet Kafka accepte tous les messages ou enregistrements en tant que tableaux d’octets dans des paires clé-valeur. Avant de transmettre le message complet au sujet, le producteur doit convertir le message en tableau d’octets à l’aide de sérialiseurs.
Voici un exemple de schéma lié au payload ou aux données réelles qui doivent être publiées par les producteurs de messages Apache Kafka.
Voici également un extrait de code Java pour le producteur de messages :
public class ProducerWithSchema {
private String status = "Failed";
private String paylaodWithSchema = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"type\": \"int32\", \"optional\": false, \"field\": \"deviceId\" }, { \"type\": \"string\", \"optional\": false, \"field\": \"deviceData\" }, { \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"field\": \"generatedTime\" } ] }, \"payload\": { \"deviceId\": 3000, \"deviceData\": \"PPPPPwfgjijk\", \"generatedTime\": 1401984166000} }";
private String key = "first";
public Producer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConstants.CLIENT_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
System.setProperty("org.apache.logging.log4j.level", "INFO");
return new KafkaProducer(props);
}
public String sendMsgToTopic(){
Producer producer = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode jsonNode = objectMapper.readTree(paylaodWithSchema);
ProducerRecord record = new ProducerRecord(IKafkaConstants.TOPIC_NAME,jsonNode);
producer = this.createProducer();
producer.send(record);
producer.flush();
producer.close();
}catch (Exception e) {
System.out.println("Error in sending record");
System.out.println(e.getMessage());
}
return status;
}
public static void main(String[] args) {
// TODO Auto-generated method stub
new ProducerWithSchema().sendMsgToTopic();
}
}
Bien sûr, avec l’approche ci-dessus, il y a quelques goulots d’étranglement, tels que :
- Couplage serré entre les messages et le schéma.
- À chaque fois, le schéma doit être associé aux données réelles.
- Problèmes liés à l’évolution du schéma.
- Maintenabilité du code, etc.
Pour atténuer ou résoudre les problèmes ci-dessus, le Schema Registry a été introduit en tant que composant distinct où tous les schémas seraient déployés/maintenus. Les vérifications de compatibilité sont nécessaires lors de l’évolution du schéma pour s’assurer que le contrat producteur-consommateur est respecté et le Schema Registry peut être utilisé pour y parvenir.
Vous pourriez regarder la vidéo ci-dessous pour voir comment les données sont diffusées en continu d’un sujet à une table spécifique de MySQL à l’aide du connecteur de récepteur JDBC sur un cluster Apache Kafka à nœud unique.
Conclusion
À présent, vous devriez avoir une meilleure compréhension de la plus grande difficulté avec le connecteur JDBC et l’intégration d’Apache Kafka avec les connecteurs JDBC. J’espère que vous avez apprécié cette lecture. S’il vous plaît aimez et partagez si vous pensez que cette composition est précieuse.
Source:
https://dzone.com/articles/streaming-data-to-rdbms-via-kafka-jdbc-sink