Передача данных в реляционную базу данных (RDBMS) через коннектор Kafka JDBC Sink без использования Schema Registry

В современном ландшафте M2M (Machine to machine) связи существует огромная потребность в потоковой передаче цифровых данных от гетерогенных устройств IoT в различные RDBMS для дальнейшего анализа через панель мониторинга, вызывая различные события для выполнения многочисленных действий. Для поддержки вышеуказанных сценариев Apache Kafka действует как центральная нервная система, где данные могут быть поглощены из различных устройств IoT и сохранены в различные типы репозитория, RDBMS, облачное хранилище и т. д. Кроме того, можно выполнять различные типы конвейеров данных до или после прибытия данных в тему Kafka. Используя соединитель сливной JDBC Kafka, мы можем непрерывно передавать данные из темы Kafka в соответствующие RDBMS.

Самая большая трудность соединителя сливного JDBC

Самая большая трудность с соединителем сливного JDBC заключается в том, что он требует знания схемы данных, которые уже приземлились на тему Kafka. Схема Реестр должен, следовательно, быть интегрирован как отдельный компонент с выходящим кластером Kafka для передачи данных в RDBMS. Следовательно, чтобы утопить данные из темы Kafka в RDBMS, производители должны публиковать сообщения/данные, содержащие схему. Схема определяет структуру формата данных. Если схема не предоставлена, соединитель сливного JDBC не сможет сопоставить сообщения с колонками таблицы базы данных после потребления сообщений из темы.

Используя Реестр схем, мы можем избежать отправки схемы каждый раз с сообщениями/нагрузками от производителей, потому что Реестр схем хранит (или регистрирует) схемы в _schemas теме и привязывает соответственно с настроенным/упомянутым именем темы, как определено в файле свойств соединителя сливного JDBC.

Стоимость лицензии может стать препятствием для малых и средних компаний, которые хотят использовать Oracle или Реестр схем Confluent с открытым исходным кодом Apache Kafka для сбора данных от устройств IoT с точки зрения бизнеса.

В этой статье мы рассмотрим фрагмент кода на Java, чтобы увидеть, как данные могут непрерывно передаваться в базу данных MySQL из топика Apache Kafka с использованием соединителя JDBC Sink без Реестра схем.

Apache Kafka и JDBC Connectors

Apache Kafka не поставляется с соединителями JDBC для поставщиков-специфичных СУБД, аналогично соединителям источника и приемника файлов. Нас вменяется в обязанность реализовать или разработать код для конкретной СУБД, реализуя API Connect Apache Kafka. Но Confluent разработал, протестировал и поддерживает JDBC Sink Connector, и в итоге открыл исходный код под лицензией Confluent Community, поэтому мы интегрировали JDBC Sink Connector с Apache Kafka.

Не будет выброшено никаких исключений из топика, даже если мы отправим неверную схему или вообще не отправим схему, потому что топик Apache Kafka принимает все сообщения или записи в виде массивов байтов в парах ключ-значение. Прежде чем передавать все сообщение в топик, производитель должен преобразовать сообщение в массив байтов с помощью сериализаторов.

Ниже приведен пример схемы, которая связана с полезной нагрузкой или фактическими данными, которые должны быть опубликованы от производителей сообщений Apache Kafka.

Также вот фрагмент кода на Java для производителя сообщений:

 

public class ProducerWithSchema {

private String status = "Failed";
private String paylaodWithSchema = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"type\": \"int32\", \"optional\": false, \"field\": \"deviceId\" }, { \"type\": \"string\", \"optional\": false, \"field\": \"deviceData\" }, { \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"field\": \"generatedTime\" } ] }, \"payload\": { \"deviceId\": 3000, \"deviceData\": \"PPPPPwfgjijk\", \"generatedTime\": 1401984166000} }";
private String key = "first";

public Producer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConstants.CLIENT_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
System.setProperty("org.apache.logging.log4j.level", "INFO");
return new KafkaProducer(props);
}

public String sendMsgToTopic(){
Producer producer = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode jsonNode = objectMapper.readTree(paylaodWithSchema);
ProducerRecord record = new ProducerRecord(IKafkaConstants.TOPIC_NAME,jsonNode);

producer = this.createProducer();
producer.send(record);
producer.flush();
producer.close();

}catch (Exception e) {
System.out.println("Error in sending record");
System.out.println(e.getMessage());
}

return status;

}

public static void main(String[] args) {
// TODO Auto-generated method stub
new ProducerWithSchema().sendMsgToTopic();
}

}

Конечно, с вышеуказанным подходом существует несколько узких мест, таких как:

  • Жесткая связь между сообщениями и схемой.
  • Каждый раз схему необходимо объединять с фактическими данными.
  • Проблемы с эволюцией схемы.
  • Поддержка кода и т.д.

Чтобы смягчить или решить вышеуказанные проблемы, был введен Schema Registry в качестве отдельного компонента, где все схемы будут развернуты/поддерживаться. Необходимы проверки совместимости во время эволюции схемы, чтобы обеспечить соблюдение контракта между производителем и потребителем, и схемный реестр может быть использован для достижения этого.

Вы можете посмотреть ниже видео, чтобы увидеть, как данные непрерывно передаются от темы к конкретной таблице MySQL с использованием соединителя-приемника JDBC на одноузловом кластере Apache Kafka.

Заключение

К настоящему времени вы должны иметь более глубокое понимание наибольшей трудности с соединителем JDBC и объединением Apache Kafka с соединителями JDBC. Надеюсь, вам понравился этот текст. Пожалуйста, поставьте лайк и поделитесь, если считаете эту статью ценной.

Source:
https://dzone.com/articles/streaming-data-to-rdbms-via-kafka-jdbc-sink