In het huidige M2M (Machine to machine) communicatie landschap, is er een enorme behoefte aan het streamen van digitale gegevens van heterogene IoT-apparaten naar de verschillende RDBMS voor verdere analyse via het dashboard, waardoor verschillende gebeurtenissen worden geactiveerd om tal van acties uit te voeren. Om de bovenstaande scenario’s te ondersteunen, fungeert Apache Kafka als het centrale zenuwstelsel waar gegevens uit verschillende IoT-apparaten kunnen worden opgenomen en in verschillende soorten repository, RDBMS, cloudopslag, enz. kunnen worden bewaard. Bovendien kunnen verschillende soorten datapiplines worden uitgevoerd vóór of na aankomst van gegevens in Kafka’s onderwerp. Door gebruik te maken van de Kafka JDBC sink connector, kunnen we gegevens continu van Kafka’s onderwerp naar de respectievelijke RDBMS streamen.
De grootste moeilijkheid van de JDBC Sink Connector
De grootste moeilijkheid met de JDBC sink connector is dat het kennis vereist van het schema van gegevens die al op het Kafka-onderwerp is beland. Schema Registry moet daarom als een apart onderdeel worden geïntegreerd met de bestaande Kafka-cluster om de gegevens in de RDBMS over te brengen. Daarom moeten producenten berichten/gegevens met het schema publiceren om gegevens van het Kafka-onderwerp naar de RDBMS te laten zinken. Het schema definieert de structuur van het gegevensformaat. Als het schema niet wordt verstrekt, kan de JDBC sink connector de berichten niet afzetten bij de kolommen van de database tabel na het consumeren van berichten van het onderwerp.
Door Schema Registry te gebruiken, kunnen we voorkomen dat we elke keer met berichten/payloads van de producenten schema’s verzenden, omdat Schema Registry schema’s opslaat (of registreert) in _schemas
onderwerp en dienovereenkomstig bindt met de geconfigureerde/genoemde onderwerpnaam zoals gedefinieerd in de eigenschappenbestand van de JDBC sink connector.
De licentiekosten kunnen een obstakel zijn voor kleine of middelgrote bedrijven die Oracle of Confluent’s Schema Registry willen gebruiken met open source Apache Kafka om IoT-apparaatgegevens te verzamelen voor hun zakelijke perspectieven.
In dit artikel gaan we Java-codefragmenten gebruiken om te zien hoe gegevens continu kunnen worden gestreamd naar een MySQL-database vanuit een Apache Kafka-onderwerp met behulp van de JDBC Sink connector zonder Schema Registry.
Apache Kafka en JDBC Connectors
Apache Kafka is niet gebundeld met JDBC connectors voor leverancierspecifieke RDBMS, vergelijkbaar met bestandssource- en sinkconnectors. Het is onze verantwoordelijkheid om de code voor specifieke RDBMS te implementeren of te ontwikkelen door de Connect API van Apache Kafka te implementeren. Maar Confluent heeft de JDBC Sink Connector ontwikkeld, getest en ondersteund en uiteindelijk onder een Confluent Community License open source gemaakt, dus hebben we de JDBC Sink Connector geïntegreerd met Apache Kafka.
Er wordt geen uitzondering gegenereerd door het onderwerp, zelfs niet als we een incorrecte schema of helemaal geen schema verzenden, omdat het Kafka-onderwerp alle berichten of records accepteert als byte arrays in sleutel-waardeparen. Voordat het volledige bericht naar het onderwerp wordt verzonden, moet de producent het bericht converteren naar een byte array met behulp van serializers.
Hieronder is het voorbeeldschema dat is gebonden aan de payload of de werkelijke gegevens die moeten worden gepubliceerd door de Apache Kafka-berichtproducenten.
Ook hier is het Java-codefragment voor de berichtproducent:
public class ProducerWithSchema {
private String status = "Failed";
private String paylaodWithSchema = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"type\": \"int32\", \"optional\": false, \"field\": \"deviceId\" }, { \"type\": \"string\", \"optional\": false, \"field\": \"deviceData\" }, { \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"field\": \"generatedTime\" } ] }, \"payload\": { \"deviceId\": 3000, \"deviceData\": \"PPPPPwfgjijk\", \"generatedTime\": 1401984166000} }";
private String key = "first";
public Producer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConstants.CLIENT_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
System.setProperty("org.apache.logging.log4j.level", "INFO");
return new KafkaProducer(props);
}
public String sendMsgToTopic(){
Producer producer = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode jsonNode = objectMapper.readTree(paylaodWithSchema);
ProducerRecord record = new ProducerRecord(IKafkaConstants.TOPIC_NAME,jsonNode);
producer = this.createProducer();
producer.send(record);
producer.flush();
producer.close();
}catch (Exception e) {
System.out.println("Error in sending record");
System.out.println(e.getMessage());
}
return status;
}
public static void main(String[] args) {
// TODO Auto-generated method stub
new ProducerWithSchema().sendMsgToTopic();
}
}
Natuurlijk, met de bovenstaande aanpak zijn er een paar knelpunten, zoals:
- Sterk gekoppeld tussen berichten en schema.
- Elke keer dat het schema moet worden gecombineerd met de werkelijke gegevens.
- Problemen met schema-evolutie.
- Onderhoud van code, enz.
Om de bovenstaande problemen te verlichten of op te lossen, is de Schema Registry geïntroduceerd als een aparte component waar alle schema’s zouden worden geïmplementeerd/onderhouden. Compatibiliteitscontroles zijn noodzakelijk tijdens schema-evolutie om ervoor te zorgen dat het contract tussen producent en consument wordt gehandhaafd en de Schema Registry kan worden gebruikt om dit te bereiken.
Je kunt het onderstaande video bekijken om te zien hoe gegevens continu stromen van het onderwerp naar een specifieke tabel in MySQL met behulp van de JDBC sink connector op een enkelnode Apache Kafka cluster.
Conclusie
Op dit moment zou je een beter begrip moeten hebben van de grootste moeilijkheid met de JDBC connector en het bundelen van Apache Kafka met JDBC connectors. Ik hoop dat je van deze lezing hebt genoten. Like en deel als je vindt dat dit stuk waardevol is.
Source:
https://dzone.com/articles/streaming-data-to-rdbms-via-kafka-jdbc-sink