現代のM2M(マシンツーマシン)通信の状況では、異種のIoTデバイスからデジタルデータをストリーミングしてダッシュボード経由でさらなる分析を行い、様々なアクションを実行するためのさまざまなイベントをトリガーするために、RDBMSに大きな要件があります。上記のシナリオをサポートするために、Apache Kafkaは、様々なIoTデバイスからデータを取り込み、RDBMS、クラウドストレージなどのさまざまなタイプのリポジトリに永続化できる中枢神経系のように機能します。さらに、さまざまな種類のデータパイプラインを、Kafkaのトピックにデータが到着する前後に実行できます。KafkaのJDBCシンクコネクタを使用することで、Kafkaのトピックから連続してデータをストリーミングして対応するRDBMSに流すことができます。
JDBCシンクコネクタの最大の困難
JDBCシンクコネクタの最大の困難は、Kafkaトピックに既に到着したデータのスキーマに関する知識が必要であることです。したがって、スキーマレジストリは、RDBMSにデータを転送するために、既存のKafkaクラスターと別のコンポーネントとして統合されなければなりません。したがって、KafkaトピックからRDBMSにデータをシンクするために、プロデューサーはスキーマを含むメッセージ/データを公開する必要があります。スキーマはデータ形式の構造を定義します。スキーマが提供されていない場合、JDBCシンクコネクタは、トピックからメッセージを消費した後にデータベーステーブルの列にメッセージをマッピングできません。
スキーマレジストリを活用することで、プロデューサーからメッセージ/ペイロードとともにスキーマを毎回送信する必要がなくなります。これは、スキーマレジストリがスキーマを_schemas
トピックに登録し、JDBCシンクコネクタのプロパティファイルで定義されたトピック名と適切にバインドするからです。
ライセンスコストは、OracleやConfluentのスキーマレジストリを使ってオープンソースのApache KafkaでIoTデバイスデータを収集し、ビジネス視点を強化したい中小企業にとっての障壁になるかもしれません。
この記事では、Javaのコードスニペットを使って、スキーマレジストリを使用せずにJDBCシンクコネクタを使って、Apache KafkaトピックからMySQLデータベースにデータを連続的にストリーミングする方法を見ていきます。
Apache KafkaとJDBCコネクタ
Apache Kafkaは、ファイルソースおよびシンクコネクタと同様に、ベンダー固有のRDBMS用のJDBCコネクタをバンドルしていません。特定のRDBMS用のコードを実装または開発するためにApache KafkaのConnect APIを実装する責任は私たちにあります。しかし、ConfluentはJDBCシンクコネクタを開発、テスト、サポートし、最終的にConfluentコミュニティライセンスの下でオープンソース化されたため、JDBCシンクコネクタをApache Kafkaと統合しました。
トピックから不正なスキーマやスキーマなしでも例外がスローされないことはありません。なぜなら、Kafkaトピックはキーバリューペアとしてバイト配列のメッセージやレコードをすべて受け入れるからです。メッセージ全体をトピックに送信する前に、プロデューサーはシリアライザを使用してメッセージをバイト配列に変換する必要があります。
以下は、Apache Kafkaメッセージプロデューサーから公開されるペイロードまたは実際のデータにバインドされたサンプルスキーマです。
また、こちらはメッセージプロデューサーのJavaコードスニペットです。
public class ProducerWithSchema {
private String status = "Failed";
private String paylaodWithSchema = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"type\": \"int32\", \"optional\": false, \"field\": \"deviceId\" }, { \"type\": \"string\", \"optional\": false, \"field\": \"deviceData\" }, { \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Timestamp\", \"version\": 1, \"field\": \"generatedTime\" } ] }, \"payload\": { \"deviceId\": 3000, \"deviceData\": \"PPPPPwfgjijk\", \"generatedTime\": 1401984166000} }";
private String key = "first";
public Producer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConstants.CLIENT_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
System.setProperty("org.apache.logging.log4j.level", "INFO");
return new KafkaProducer(props);
}
public String sendMsgToTopic(){
Producer producer = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode jsonNode = objectMapper.readTree(paylaodWithSchema);
ProducerRecord record = new ProducerRecord(IKafkaConstants.TOPIC_NAME,jsonNode);
producer = this.createProducer();
producer.send(record);
producer.flush();
producer.close();
}catch (Exception e) {
System.out.println("Error in sending record");
System.out.println(e.getMessage());
}
return status;
}
public static void main(String[] args) {
// TODO 自動生成されたメソッドスタブ
new ProducerWithSchema().sendMsgToTopic();
}
}
もちろん、上記のアプローチでは、いくつかのボトルネックがあります。例えば:
- メッセージとスキーマの密結合。
- スキーマは常に実際のデータと組み合わされる必要があります。
- スキーマの進化に関する問題。
- コードの保守性など。
上記の問題を緩和または解決するために、スキーマレジストリがすべてのスキーマが展開/維持される別のコンポーネントとして導入されました。スキーマの進化中に互換性チェックが必要であり、プロデューサーとコンシューマーの契約を維持し、スキーマレジストリを利用してこれを達成できることを確認する必要があります。
以下のビデオを視聴して、トピックからMySQLの特定のテーブルにJDBCシンクコネクタを使用してシングルノードApache Kafkaクラスターからデータが連続的にストリーミングされる様子を確認できます。
結論
これまでのところ、JDBCコネクタの最大の困難とApache KafkaとJDBCコネクタのバンドルについてより良い理解を得たことでしょう。この記事を楽しんでいただけたでしょうか。この記事が価値があると感じたら、いいねと共有してください。
Source:
https://dzone.com/articles/streaming-data-to-rdbms-via-kafka-jdbc-sink