スプリングカフカは、Kollin - IVで遊び場を流れます



文脈
このポストは、我々はスプリングブートとスプリングカフカを使用してKotlinとシンプルなカフカストリームアプリケーションを作成するシリーズの一部です.
始めるためにチュートリアルのチェックをして、我々が造っているものの更なる文脈を得てください.

If you want to start from here you can clone the source code for this project git clone [email protected]:mmaia/simple-spring-kafka-stream-kotlin.git and then checkout v7 git checkout v7 and follow from there continuing with this post.


このポストでは、私たちはquotestreamを作成するつもりです、我々は引用の話題からメッセージを処理して、我々が前のポストで作成したレバレッジのためにGlobalkTableとの結合をするつもりです、そして、我々は分岐をして、それのキーに基づく3つの異なる話題に引用を送ります.


引用ストリームの作成
今からストリームを作成しましょうstock-quotes-topic そして、我々が我々が我々が引用を豊かにして、それのキーに基づく1つの話題にそれを公開するその特定の引用のためにレバレッジがあるならば、我々が最後のポストにおいて作成したLoverage GlobalkTableに加わって、我々はストリーム結合を使用します.これは非常に一般的なユースケースである後処理のために別々のデータへの分岐の使用を示します.
我々は3つの新しいトピックのいずれかにデータを生成する我々のサンプルアプリケーションでは、私たちは、トピックには、Googleの引用符を別のトピックと他のすべての引用符を引用するには、第三のトピックを引用符を生成します.
  • AAPL ->apple-stocks-topic
  • グーグルgoogle-stocks-topic
  • その他all-other-stocks-topic
  • ために我々の最初のステップは、これらのトピックを管理するクライアントを使用して作成することです.以下の定数をKafkaCofiguration クラス
    const val AAPL_STOCKS_TOPIC = "apple-stocks-topic"
    const val GOOGL_STOCKS_TOPIC = "google-stocks-topic"
    const val ALL_OTHER_STOCKS_TOPIC = "all-other-stocks-topic"
    
    変更するappTopics 同じクラスの関数で、これら三つのトピックを作成します.
    @Bean
    fun appTopics(): NewTopics {
        return NewTopics(
            TopicBuilder.name(STOCK_QUOTES_TOPIC).build(),
            TopicBuilder.name(LEVERAGE_PRICES_TOPIC)
                .compact().build(),
            TopicBuilder.name(AAPL_STOCKS_TOPIC).build(),
            TopicBuilder.name(GOOGL_STOCKS_TOPIC).build(),
            TopicBuilder.name(ALL_OTHER_STOCKS_TOPIC).build(),
        )
    }
    
    次回は、これらの新しいトピックを管理するクライアントを使用して作成されるアプリケーションを実行します.
    新しいスキーマ定義を追加しましょうsrc > main > avro 呼ばれるprocessed-quote 次のコンテンツを使用します.
    {
      "namespace": "com.maia.springkafkastreamkotlin.repository",
      "type": "record",
      "name": "ProcessedQuote",
      "fields": [
        { "name": "symbol", "type": "string"},
        { "name": "tradeValue", "type": "double"},
        { "name": "tradeTime", "type": ["null", "long"], "default": null},
        { "name": "leverage", "type": ["null", "double"], "default":  null}
      ]
    }
    
    この場合の違いは、新しいレバレッジフィールドであることに注意してください.Javaコードがこの新しいAvroスキーマのために生成されるように、プロジェクトを構築してください.mvn clean package -DskipTests
    さあ、新しいクラスを作りましょうrepository パッケージQuoteStream , 我々のレバレッジGlobalkTableへのリファレンスが必要です.
    @Repository
    class QuoteStream(val leveragepriceGKTable: GlobalKTable<String, LeveragePrice>) {
    ...
    
    このクラスでは、引用符を処理して豊かにする関数を宣言します.
    @Bean
    fun quoteKStream(streamsBuilder: StreamsBuilder): KStream<String, ProcessedQuote> {
    
    
    この関数では、stock-quotes-topic , レバレッジのために作成したGlobalKTableとの結合を行い、新しいAVROProcessedQuote それが利用可能であるならば、レバレッジで引用を強化すること:
    val stream: KStream<String, StockQuote> = streamsBuilder.stream(STOCK_QUOTES_TOPIC)
    
    val resStream: KStream<String, ProcessedQuote> = stream
        .leftJoin(leveragepriceGKTable,
            { symbol, _ -> symbol },
            { stockQuote, leveragePrice ->
                ProcessedQuote(
                    stockQuote.symbol,
                    stockQuote.tradeValue,
                    stockQuote.tradeTime,
                    leveragePrice?.leverage
                )
            }
        )
    
    そして、この機能を包むために、我々は新しい流れでタップして、キーに基づいて、我々は特定の話題にメッセージを送って、新しい流れを返すでしょう.
    KafkaStreamBrancher<String, ProcessedQuote>()
                .branch({ symbolKey, _ -> symbolKey.equals("APPL", ignoreCase = true) }, { ks -> ks.to(AAPL_STOCKS_TOPIC) })
                .branch({ symbolKey, _ -> symbolKey.equals("GOOGL",ignoreCase = true) }, { ks -> ks.to(GOOGL_STOCKS_TOPIC) })
                .defaultBranch { ks -> ks.to(ALL_OTHER_STOCKS_TOPIC) }
                .onTopOf(resStream)
    
        return resStream
    }
    

    If you just want to get your local code to this point without using the presented code you can checkout v8: git checkout v8


    クールで、今すぐに少しでも再生できますし、アプリケーションをビルドして実行しましょう(ローカルなKafkaセットアップが実行されていることを確認してください).mvn clean package -DskipTests && mvn spring-boot:run次に、いくつかのレバレッジメッセージを送信し、このチュートリアルで前にしたようにAPIを使用してメッセージを引用することができます.
    そして、特定の引用符がレバレッジを持っていて、彼らのキーに基づく3つの異なる話題に流れているならば、あなたは強化されているメッセージをチェックすることができますConduktor .


    The messages seem duplicated on the screenshots but that's because I sent them multiple times with the same values while playing around. I also sent a few before sending the respective leverage so you can see what happens and check that the initial ones on the bottom have a null leverage.


    それは今のところです.明日は、このチュートリアルのパートVを公開します.ここで、カフカストリームDSLを使って、グループ化、カウント、計算を行います.調子を合わせなさい.
    写真でNubelson Fernandes on Unsplash