No cenário atual de comunicações M2M (Máquina para máquina), há uma enorme necessidade de transmitir dados digitais de dispositivos IoT heterogêneos para vários RDBMS para análise adicional através do painel, acionando diferentes eventos para realizar inúmeras ações. Para suportar os cenários acima, o Apache Kafka atua como um sistema nervoso central onde os dados podem ser ingeridos de vários dispositivos IoT e persistidos em vários tipos de repositório, RDBMS, armazenamento em nuvem, etc. Além disso, vários tipos de pipelines de dados podem ser executados antes ou depois que os dados chegam ao tópico do Kafka. Ao usar o conector de sumidouro JDBC do Kafka, podemos transmitir dados continuamente do tópico do Kafka para os respectivos RDBMS.
A Maior Dificuldade do Conector de Sumidouro JDBC
A maior dificuldade com o conector de sumidouro JDBC é que ele requer conhecimento do esquema de dados que já foi enviado para o tópico do Kafka. O Schema Registry deve, portanto, ser integrado como um componente separado com o cluster Kafka existente para transferir os dados para o RDBMS. Portanto, para afundar dados do tópico do Kafka para o RDBMS, os produtores devem publicar mensagens/dados contendo o esquema. O esquema define a estrutura do formato de dados. Se o esquema não for fornecido, o conector de sumidouro JDBC não poderá mapear as mensagens com as colunas da tabela do banco de dados após consumir mensagens do tópico.
Aproveitando o Schema Registry, podemos evitar enviar o esquema sempre com mensagens/payloads dos produtores porque o Schema Registry armazena (ou registra) esquemas no tópico _schemas
e vincula conforme necessário com o nome de tópico configurado/mencionado conforme definido no arquivo de propriedades do conector de sumidouro JDBC.
O custo de licenciamento pode ser um obstáculo para pequenas ou médias empresas que desejam usar o Oracle ou o Confluent’s Schema Registry com o Apache Kafka de código aberto para coletar dados de dispositivos IoT para suas perspectivas de negócios.
Neste artigo, vamos usar um trecho de código Java para ver como os dados podem ser transmitidos continuamente para um banco de dados MySQL a partir de um tópico do Apache Kafka usando o conector JDBC Sink sem o Schema Registry.
Apache Kafka e Conectores JDBC
O Apache Kafka não vem com conectores JDBC para RDBMS específicos do fornecedor, assim como os conectores de origem e de destino de arquivos. Cabe a nós implementar ou desenvolver o código para RDBMS específicos implementando a API de Connect do Apache Kafka. Mas o Confluent desenvolveu, testou e suporta o Conector JDBC Sink, e eventualmente abriu o código sob a Licença Comunitária do Confluent, então integramos o Conector JDBC Sink com o Apache Kafka.
Não haverá exceção lançada pelo tópico mesmo que enviemos o esquema incorreto ou nenhum esquema, porque o tópico do Kafka aceita todos os mensagens ou registros como arrays de bytes em pares de chave-valor. Antes de transmitir a mensagem inteira para o tópico, o produtor precisa converter a mensagem em um array de bytes usando serializadores.
Abaixo está o esquema de amostra que está vinculado ao payload ou dados reais que devem ser publicados pelos produtores de mensagens do Apache Kafka.
Também segue o trecho de código Java para o produtor de mensagens:
public class ProducerWithSchema {
private String status = "Failed";
private String paylaodWithSchema = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"type\": \"int32\", \"optional\": false, \"field\": \"deviceId\" }, { \"type\": \"string\", \"optional\": false, \"field\": \"deviceData\" }, { \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"field\": \"generatedTime\" } ] }, \"payload\": { \"deviceId\": 3000, \"deviceData\": \"PPPPPwfgjijk\", \"generatedTime\": 1401984166000} }";
private String key = "first";
public Producer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConstants.CLIENT_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
System.setProperty("org.apache.logging.log4j.level", "INFO");
return new KafkaProducer(props);
}
public String sendMsgToTopic(){
Producer producer = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode jsonNode = objectMapper.readTree(paylaodWithSchema);
ProducerRecord record = new ProducerRecord(IKafkaConstants.TOPIC_NAME,jsonNode);
producer = this.createProducer();
producer.send(record);
producer.flush();
producer.close();
}catch (Exception e) {
System.out.println("Error in sending record");
System.out.println(e.getMessage());
}
return status;
}
public static void main(String[] args) {
// TODO Auto-generated method stub
new ProducerWithSchema().sendMsgToTopic();
}
}
Claro, com a abordagem acima, existem alguns gargalos, como:
- Acoplamento rígido entre mensagens e esquema.
- Toda vez que o esquema deve ser combinado com os dados reais.
- Problemas com a evolução do esquema.
- Manutenção de código, etc.
Para mitigar ou resolver os problemas acima, foi introduzido o Schema Registry como um componente separado onde todos os esquemas seriam implantados/mantidos. Verificações de compatibilidade são necessárias durante a evolução do esquema para garantir que o contrato produtor-consumidor seja mantido e o Schema Registry pode ser usado para alcançar isso.
Você poderia assistir ao vídeo abaixo para ver como os dados estão fluindo continuamente de um tópico para uma tabela específica do MySQL usando o conector de sumidouro JDBC em um cluster Apache Kafka de nó único.
Conclusão
Até agora, você deve ter uma compreensão melhor da maior dificuldade com o conector JDBC e o empacotamento do Apache Kafka com conectores JDBC. Espero que tenha gostado desta leitura. Por favor, curta e compartilhe se sentir que esta composição é valiosa.
Source:
https://dzone.com/articles/streaming-data-to-rdbms-via-kafka-jdbc-sink