No cenário atual de comunicações M2M (Machine to machine), há uma enorme necessidade de transmitir dados digitais de dispositivos IoT heterogêneos para vários RDBMS para análise adicional através do painel, acionando diferentes eventos para realizar inúmeras ações. Para suportar os cenários acima, o Apache Kafka atua como um sistema nervoso central onde os dados podem ser ingeridos de vários dispositivos IoT e persistidos em vários tipos de repositório, RDBMS, armazenamento em nuvem, etc. Além disso, vários tipos de pipelines de dados podem ser executados antes ou depois que os dados chegam ao tópico do Kafka. Ao usar o conector de sumidouro JDBC do Kafka, podemos transmitir dados continuamente do tópico do Kafka para os respectivos RDBMS.
A Maior Dificuldade do Conector de Sumidouro JDBC
A maior dificuldade com o conector de sumidouro JDBC é que ele requer conhecimento do esquema de dados que já foi depositado no tópico do Kafka. O Schema Registry deve, portanto, ser integrado como um componente separado com o cluster Kafka existente para transferir os dados para o RDBMS. Portanto, para afundar dados do tópico do Kafka para o RDBMS, os produtores devem publicar mensagens/dados contendo o esquema. O esquema define a estrutura do formato de dados. Se o esquema não for fornecido, o conector de sumidouro JDBC não será capaz de mapear as mensagens com as colunas da tabela do banco de dados após consumir mensagens do tópico.
Aproveitando o Schema Registry, podemos evitar enviar o esquema toda vez com mensagens/payloads dos produtores porque o Schema Registry armazena (ou registra) esquemas no tópico _schemas
e vincula de acordo com o nome de tópico configurado/mencionado como definido no arquivo de propriedades do conector de sumidouro JDBC.
O custo de licenciamento pode ser um obstáculo para pequenas ou médias empresas que desejam usar o Oracle ou o Confluent’s Schema Registry com o Apache Kafka de código aberto para coletar dados de dispositivos IoT em seus aspectos de negócios.
Neste artigo, vamos usar um trecho de código Java para ver como os dados podem ser transmitidos continuamente para um banco de dados MySQL a partir de um tópico do Apache Kafka usando o conector JDBC Sink sem o Schema Registry.
Apache Kafka e Conectores JDBC
O Apache Kafka não inclui conectores JDBC para RDBMS específicos do fornecedor, como os conectores de origem e de destino de arquivos. Cabe a nós implementar ou desenvolver o código para RDBMS específicos, implementando a API de Conectar do Apache Kafka. Mas o Confluent desenvolveu, testou e suporta o Conector JDBC Sink, e eventualmente foi open-sourcado sob a Licença Comunitária do Confluent, então integramos o Conector JDBC Sink com o Apache Kafka.
Não haverá exceção lançada pelo tópico, mesmo se enviarmos um esquema incorreto ou nenhum esquema, porque o tópico do Kafka aceita todos os mensagens ou registros como arrays de bytes em pares chave-valor. Antes de transmitir a mensagem inteira para o tópico, o produtor precisa converter a mensagem em um array de bytes usando serializadores.
Abaixo está o esquema de amostra que está vinculado com o payload ou dados reais que devem ser publicados pelos produtores de mensagens do Apache Kafka.
Também aqui está o trecho de código Java para o produtor de mensagens:
public class ProducerWithSchema {
private String status = "Failed";
private String paylaodWithSchema = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"type\": \"int32\", \"optional\": false, \"field\": \"deviceId\" }, { \"type\": \"string\", \"optional\": false, \"field\": \"deviceData\" }, { \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"field\": \"generatedTime\" } ] }, \"payload\": { \"deviceId\": 3000, \"deviceData\": \"PPPPPwfgjijk\", \"generatedTime\": 1401984166000} }";
private String key = "first";
public Producer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConstants.CLIENT_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
System.setProperty("org.apache.logging.log4j.level", "INFO");
return new KafkaProducer(props);
}
public String sendMsgToTopic(){
Producer producer = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode jsonNode = objectMapper.readTree(paylaodWithSchema);
ProducerRecord record = new ProducerRecord(IKafkaConstants.TOPIC_NAME,jsonNode);
producer = this.createProducer();
producer.send(record);
producer.flush();
producer.close();
}catch (Exception e) {
System.out.println("Error in sending record");
System.out.println(e.getMessage());
}
return status;
}
public static void main(String[] args) {
// TODO Auto-generated method stub
new ProducerWithSchema().sendMsgToTopic();
}
}
Claro, com a abordagem acima, existem alguns gargalos, como:
- Acoplamento apertado entre mensagens e esquema.
- Toda vez que o esquema deve ser combinado com dados reais.
- Problemas com a evolução do esquema.
- Manutenção de código, etc.
Para mitigar ou resolver os problemas acima, foi introduzido o Schema Registry como um componente separado onde todos os esquemas seriam implantados/mantidos. Verificações de compatibilidade são necessárias durante a evolução do esquema para garantir que o contrato produtor-consumidor seja mantido e o Schema Registry pode ser utilizado para alcançar isso.
Você poderia assistir ao vídeo abaixo para ver como os dados estão fluindo continuamente de um tópico para uma tabela específica do MySQL usando o conector de sumidouro JDBC em um cluster Apache Kafka de nó único.
Conclusão
Até agora, você deve ter uma compreensão melhor da maior dificuldade com o conector JDBC e empacotar o Apache Kafka com conectores JDBC. Espero que tenha gostado desta leitura. Por favor, curta e compartilhe se sentir que esta composição é valiosa.
Source:
https://dzone.com/articles/streaming-data-to-rdbms-via-kafka-jdbc-sink