오늘날 M2M(Machine to machine) 통신 환경에서는 다양한 IoT 장치의 디지털 데이터를 대시보드를 통한 추가 분석 및 다양한 작업을 수행하기 위한 다양한 이벤트를 트리거하기 위해 여러 RDBMS에 스트리밍하는 데 막대한 요구가 있습니다. 위의 시나리오를 지원하기 위해 Apache Kafka는 다양한 IoT 장치에서 데이터를 수집하고 다양한 유형의 저장소, RDBMS, 클라우드 스토리지 등에 지속될 수 있는 중앙 신경계처럼 작동합니다. 또한 다양한 유형의 데이터 파이프라인을 실행할 수 있습니다. Kafka 토픽에 도착하기 전후에. Kafka JDBC 싱크 커넥터를 사용하면 Kafka 토픽에서 지속적으로 데이터를 스트리밍하여 해당 RDBMS에 저장할 수 있습니다.
JDBC 싱크 커넥터의 가장 큰 어려움
JDBC 싱크 커넥터의 가장 큰 어려움은 이미 Kafka 토픽에 도착한 데이터의 스키마에 대한 지식이 필요하다는 것입니다. 따라서 스키마 레지스트리는 RDBMS로 데이터를 전송하기 위해 기존 Kafka 클러스터와 별도로 통합되어야 합니다. 따라서 Kafka 토픽에서 RDBMS로 데이터를 싱크하려면 프로듀서는 스키마를 포함하는 메시지/데이터를 게시해야 합니다. 스키마는 데이터 형식의 구조를 정의합니다. 스키마가 제공되지 않으면 JDBC 싱크 커넥터는 토픽에서 메시지를 소비한 후 데이터베이스 테이블의 열과 메시지를 매핑할 수 없습니다.
스키마 레지스트리를 활용하면 프로듀서가 메시지/페이로드에 매번 스키마를 보내지 않아도 됩니다. 왜냐하면 스키마 레지스트리는 _schemas
토픽에 스키마를 저장(또는 등록)하고 JDBC 싱크 커넥터의 속성 파일에 정의된 대로 구성/언급된 토픽 이름과 함께 바인딩하기 때문입니다.
라이선스 비용은 오라클이나 Confluent의 Schema Registry와 오픈 소스 Apache Kafka를 사용하여 IoT 장치 데이터를 수집하려는 중소기업에게 장벽이 될 수 있습니다.
이 기사에서는 Java 코드 스니펫을 사용하여 Schema Registry 없이 JDBC Sink 커넥터를 사용하여 Apache Kafka 토픽에서 MySQL 데이터베이스로 데이터를 지속적으로 스트리밍하는 방법을 살펴보겠습니다.
Apache Kafka와 JDBC 커넥터
Apache Kafka는 파일 소스 및 싱크 커넥터와 유사하게 공급업체별 RDBMS용 JDBC 커넥터를 포함하지 않습니다. Apache Kafka의 Connect API를 구현하여 특정 RDBMS용 코드를 구현하거나 개발할 책임이 있습니다. 그러나 Confluent는 JDBC Sink 커넥터를 개발, 테스트하고 Confluent Community License 하에 오픈 소스화하여 이를 Apache Kafka와 통합했습니다.
Kafka 토픽에서 잘못된 스키마나 전혀 스키마가 없더라도 예외가 발생하지 않습니다. 토픽은 모든 메시지 또는 레코드를 키-값 쌍으로 바이트 배열로 수락합니다. 전체 메시지를 토픽으로 전송하기 전에 프로듀서는 시리얼라이저를 사용하여 메시지를 바이트 배열로 변환해야 합니다.
다음은 Apache Kafka 메시지 프로듀서에서 게시해야 하는 페이로드 또는 실제 데이터와 결합된 샘플 스키마입니다.
또한 여기 메시지 프로듀서용 Java 코드 스니펫이 있습니다.
public class ProducerWithSchema {
private String status = "Failed";
private String paylaodWithSchema = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"type\": \"int32\", \"optional\": false, \"field\": \"deviceId\" }, { \"type\": \"string\", \"optional\": false, \"field\": \"deviceData\" }, { \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"field\": \"generatedTime\" } ] }, \"payload\": { \"deviceId\": 3000, \"deviceData\": \"PPPPPwfgjijk\", \"generatedTime\": 1401984166000} }";
private String key = "first";
public Producer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConstants.CLIENT_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
System.setProperty("org.apache.logging.log4j.level", "INFO");
return new KafkaProducer(props);
}
public String sendMsgToTopic(){
Producer producer = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode jsonNode = objectMapper.readTree(paylaodWithSchema);
ProducerRecord record = new ProducerRecord(IKafkaConstants.TOPIC_NAME,jsonNode);
producer = this.createProducer();
producer.send(record);
producer.flush();
producer.close();
}catch (Exception e) {
System.out.println("Error in sending record");
System.out.println(e.getMessage());
}
return status;
}
public static void main(String[] args) {
// TODO Auto-generated method stub
new ProducerWithSchema().sendMsgToTopic();
}
}
물론, 위의 방식으로 몇 가지 병목 현상이 있습니다. 예를 들면:
- 메시지와 스키마 사이의 긴밀한 결합.
- 스키마는 실제 데이터와 함께 제공되어야 합니다.
- 스키마 진화와 관련된 문제.
- 코드 유지 보수 등.
위의 문제를 완화하거나 해결하기 위해 Schema Registry가 별도의 구성 요소로 도입되어 모든 스키마가 배포/유지됩니다. 스키마 진화 중에 호환성 검사가 필수적이며, 이를 통해 생산자-소비자 계약이 지켜지는지 확인하고 Schema Registry를 활용하여 이를 달성할 수 있습니다.
단일 노드 Apache Kafka 클러스터에서 JDBC sink connector를 사용하여 데이터가 어떻게 지속적으로 토픽에서 MySQL의 특정 테이블로 스트리밍되는지 아래 동영상을 시청하실 수 있습니다.
결론
지금까지 JDBC 커넥터와 Apache Kafka와 JDBC 커넥터를 묶는 것과 관련된 가장 큰 어려움에 대해 더 잘 이해하셨기를 바랍니다. 이 글을 읽는 것을 즐기셨기를 바라며, 이 글이 가치 있다고 생각되시면 좋아요와 공유를 부탁드립니다.
Source:
https://dzone.com/articles/streaming-data-to-rdbms-via-kafka-jdbc-sink