spring整合kafkaモニター消費の配置過程


前言
最近のプロジェクトで需要があります。kafkaのデータを消費したいです。前にも手動でコードを書いて、kafkaデータを買いに行きました。でも考えてみます。springはkafkaを消費する方法を提供しました。車輪を作る必要はない。そこで、springのAPIを使ってみました。
プロジェクトの技術背景には、springMVC、XML配置と注釈を使って相互に使用します。kafkaの配置は全部XML方式です。
統合プロセス
1.スプリング-kafkaの依存バッグを導入する

 <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.2.0.RELEASE</version>
    </dependency>
2.springのxmlファイルに配置項目を追加しても、単独でspring-context-X.xmlファイルを作成することができます。

<!-- consumer configuration                        -->
  <bean id="consumerProperties" class="java.util.HashMap">
    <constructor-arg>
      <map>
        <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
        <entry key="group.id" value="group" />
        <entry key="enable.auto.commit" value="true" />
        <entry key="auto.commit.interval.ms" value="3000" />
        <entry key="session.timeout.ms" value="10000" />
        <entry key="key.deserializer"
            value="org.apache.kafka.common.serialization.StringDeserializer" />
        <entry key="value.deserializer"
            value="org.apache.kafka.common.serialization.StringDeserializer" />
      </map>
    </constructor-arg>
  </bean>

  <!-- create factory    spring jar    ,     -->
  <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
      <ref bean="consumerProperties" />
    </constructor-arg>
  </bean>

  <!--        ,    spring    -->
  <bean id="payPalConsumer"
     class="com.chao.service.consumer.PayPalConsumer" />

  <!--     jar     ,            ,topic          -->
  <bean id="containerProperties" class="org.springframework.kafka.listener.ContainerProperties">
    <constructor-arg name="topics" value="${kafka.paypal.topic.name}"/>
    <property name="messageListener" ref="payPalConsumer" />
  </bean>

  <!--     jar    ,   containerProperties consumerfactory    -->
  <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
     init-method="doStart">
    <constructor-arg ref="consumerFactory" />
    <constructor-arg ref="containerProperties" />
  </bean>
2.カスタム消費者類、消費者類は依然として注釈を使用することができます。

/**
 * get msg from kafka
 */
@Component 
public class PayPalConsumer implements MessageListener<String, String> {

  private static Logger logger = LoggerFactory.getLogger(PayPalConsumer.class);
  @Autowired
  private XXService XXService;
  @Override
  public void onMessage(ConsumerRecord<String, String> authorizeRecord) {
    String value = authorizeRecord.value();
    if (StringUtils.isEmpty(value)){
      logger.warn("receive message from kafka is null");
      return;
    }
    logger.info("receive message from kafka is {}",value);
  }
}
このステップを使って配置し、使い捨てにします。とても順調です。
以上で、springがkafkaを統合して消費の配置過程を監督することについての文章を紹介しました。spring整合kafkaの内容については、以前の文章を検索したり、次の関連記事を見たりしてください。これからもよろしくお願いします。