RustでKafkaのSchema Registryを利用する
RustでConfluentのKSQLの結果をconsumeするコードを作ってみた。
コード見ればわかる人は以下参照。
kakfa と schema_registry_converter のサンプルを組み合わせたものになる。
schema_registry_converterのサンプルだとrdkafkaを使っていたのでをそこの部分を置き換えてみた。
前提としてconfluentのチュートリアルをやってデータが投入されているものとする。
以下ざっと解説。
brokerとschema registryを指定する。
let registry_url = "http://localhost:8081";
let broker_url = "localhost:29092";
購読するトピックの指定。
let topic = "clicks";
// let topic = "IP_SESSIONS";
実験目的だとgroup_idを変更しないと実行しても前見たところからの続きになっ
て確認が面倒くさいので毎度変えるためのおまじない。
let group_id = Uuid::new_v4().to_simple().to_string();
このあとschema registryを利用するデコーダの定義とconsumerの定義が続く。
consumerの作成にはbuilderパターンを使っているが詳しいオプションの意味はKafkaのAPIのオプションなのでKafka自体のドキュメントを参照。他の言語でやったことがあれば理解も早いはず。
let mut decoder = AvroDecoder::new(SrSettings::new(String::from(registry_url)));
let mut consumer = Consumer::from_hosts(vec![broker_url.to_owned()])
.with_topic_partitions(topic.to_owned(), &[0])
.with_fallback_offset(FetchOffset::Earliest)
.with_group(group_id)
.with_offset_storage(GroupOffsetStorage::Kafka)
.create()
.unwrap();
あとはデータが送られてきたらデコードを行う。
if let Ok(result) = decoder.decode(Some(m.value)) {
// println!("{:?}", m);
println!("{:?}", result.value);
}
実行すると以下のような出力が得られるはずである。
Record([("IP", Union(String("51.56.119.117"))), ("URL", Union(String("/etiam/justo/etiam/pretium/iaculis.xml"))), ("TIMESTAMP", Union(String("2019-07-18T10:00:00Z")))])
Record([("IP", Union(String("51.56.119.117"))), ("URL", Union(String("/nullam/orci/pede/venenatis.json"))), ("TIMESTAMP", Union(String("2019-07-18T10:01:00Z")))])
Record([("IP", Union(String("53.170.33.192"))), ("URL", Union(String("/mauris/morbi/non.jpg"))), ("TIMESTAMP", Union(String("2019-07-18T10:01:31Z")))])
Record([("IP", Union(String("51.56.119.117"))), ("URL", Union(String("/convallis/nunc/proin.jsp"))), ("TIMESTAMP", Union(String("2019-07-18T10:01:36Z")))])
...略
文字列としてはちゃんと取れていそうではあるがでは個々の要素を取りたい…となったときには別の工夫が必要になる。
result.value
はavro_rs::types::Valueというenumになっている。地道にenumを分解してね…ってことになるがこれは別の話なのでやらない(がよくある話だとは思う)。
Author And Source
この問題について(RustでKafkaのSchema Registryを利用する), 我々は、より多くの情報をここで見つけました https://zenn.dev/takaki/articles/rust-kafka-schema-registry著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Collection and Share based on the CC protocol