Springboot 2(30)統合kafka--詳細説明@KafkaListener

66990 ワード

ソースアドレス
Springboot 2チュートリアルシリーズ
書き込み性能が非常に高いため、Kafkaメッセージキューが混雑している場合にテストされることがよくあります.このtopicがパーティションが1つしかない場合、実際に新しい消費者を再起動しても効果はありません.
C o n c u r r e n t KafkaListenerContainerFactoryとfactoryが設定.setConcurrency(4); (私のtopicには4つのパーティションがあり、消費を加速させるために同時設定を4、つまり4つのKafkaMessageListenerContainerがあります)

操作Topic


コンフィギュレーション

@Component
public class PrividerKafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootStrapServer;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> props = new HashMap<>();
        // Kafka 
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
        KafkaAdmin admin = new KafkaAdmin(props);
        return admin;
    }

    @Bean
    public AdminClient adminClient() {
        return AdminClient.create(kafkaAdmin().getConfig());
    }

}

Controllerレイヤ

@RestController
@Slf4j
public class TopicController {

    @Autowired
    private AdminClient adminClient;

    @ApiOperation(value = " topic")
    @ApiImplicitParams({
           @ApiImplicitParam(name = "topicName", value = "topic ",defaultValue = "first_top",
                    required = true, dataType = "string", paramType = "query"),
            @ApiImplicitParam(name = "partitions", value = " ", defaultValue = "4",
                    required = true, dataType = "int", paramType = "query"),
            @ApiImplicitParam(name = "replicationFactor", value = " ", defaultValue = "1",
                    required = true, dataType = "int", paramType = "query")
    })
    @GetMapping("/createTopic")
    public String createTopic(String topicName,int partitions,int replicationFactor){
        adminClient.createTopics(Arrays.asList(new NewTopic(topicName,partitions,(short)replicationFactor)));
        return "create success";
    }

    @ApiOperation(value = " topic")
    @GetMapping("/findAllTopic")
    public String findAllTopic() throws ExecutionException, InterruptedException {
        ListTopicsResult result = adminClient.listTopics();
        Collection<TopicListing> list = result.listings().get();
        List<String> resultList = new ArrayList<>();
        for(TopicListing topicListing : list){
            resultList.add(topicListing.name());
        }
        return JSON.toJSONString(resultList);
    }

    @ApiOperation(value = " topic ")
    @ApiImplicitParams({
           @ApiImplicitParam(name = "topicName", value = "topic ",defaultValue = "first_top",
                    required = true, dataType = "string", paramType = "query")
    })
    @GetMapping("/info")
    public String topicInfo(String topicName) throws ExecutionException, InterruptedException {
        DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList(topicName));
        Map<String,String> resultMap = new HashMap<>();
        result.all().get().forEach((k,v)->{
            log.info("k: "+k+" ,v: "+v.toString());
            resultMap.put(k,v.toString());
        });

        return JSON.toJSONString(resultMap);
    }

    @ApiOperation(value = " topic")
    @ApiImplicitParams({
           @ApiImplicitParam(name = "topicName", value = "topic ",defaultValue = "first_top",
                    required = true, dataType = "string", paramType = "query")
    })
    @GetMapping("/delete")
    public String deleteTopic(String topicName){
        DeleteTopicsResult  result = adminClient.deleteTopics(Arrays.asList(topicName));
        return  JSON.toJSONString(result.values());
    }

}

AdminClientの一般的な方法は
  • 作成Topic:createTopics(Collection newTopics)
  • 削除Topic:deleteTopics(Collection topics)
  • すべてのTopic:listTopics()
  • を羅列
  • パーティションの追加:createPartitions(MapnewPartitions)
  • クエリーTopic:describeTopics(Collection topicName)
  • クエリークラスタ情報:describeCluster()
  • クエリーACL情報:describeAcls(AclBindingFilter)
  • ACL情報の作成:createAcls(Collection acls)
  • ACL情報の削除:deleteAcls(Collection filters)
  • クエリー構成情報:describeConfigs(Collection resources)
  • 構成情報の変更:alterConfigs(Mapconfigs)
  • レプリカの変更ログディレクトリ:alterReplicaLogDirs(MapreplicaAssignment)
  • クエリーノードのログディレクトリ情報:describeLogDirs(Collection brokers)
  • クエリー・レプリカのログ・ディレクトリ情報:describeReplicaLogDirs(Collection replicas)
  • メッセージの送信


    KafkaTemplate送信メッセージは非同期で送信されます

    メッセージを送信する3つの方法

    // 
    template.send(topic, 0, System.currentTimeMillis(), "0", msg);
    
    // ProducerRecord 
    ProducerRecord record = new ProducerRecord(topic, msg);
    template.send(record);
    
    // Message 
    Map map = new HashMap();
    map.put(KafkaHeaders.TOPIC, topic);
    map.put(KafkaHeaders.PARTITION_ID, 0);
    map.put(KafkaHeaders.MESSAGE_KEY, "0");
    GenericMessage message = new GenericMessage(msg,new MessageHeaders(map));
    template.send(message);
    

    メッセージ結果コールバック

    @Component
    @Slf4j
    public class KafkaSendResultHandler implements ProducerListener {
        @Override
        public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
            log.info("Message send success : " + producerRecord.toString());
        }
    
        @Override
        public void onError(ProducerRecord producerRecord, Exception exception) {
            log.info("Message send error : " + producerRecord.toString());
        }
    
    }
    

    同期メッセージの送信

    @GetMapping("/syncMsg")
    public String syncMsg(@RequestParam String topic, @RequestParam String msg){
        try {
            template.send(topic, msg).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return "success";
    }
    

    消費情報


    Spring-Kafkaにおけるメッセージ傍受は大きく2つのタイプに分けられ、1つは単一のデータ消費であり、1つは大量消費である.

    GenericMessageListener

    @Bean
    public KafkaMessageListenerContainer demoListenerContainer(ConsumerFactory consumerFactory) {
        ContainerProperties properties = new ContainerProperties("topic3");
    
        properties.setGroupId("group1");
    
        /*
          // 
          properties.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(ConsumerRecord record) {
                    log.info("topic3: " + record.toString());
                }
            });*/
    
        // 
        properties.setMessageListener(
            new BatchAcknowledgingConsumerAwareMessageListener<String,String>(){
                @Override
                public void onMessage(List<ConsumerRecord<String, String>> list,
                                      Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
                    log.info("size:{}",list.size());
                }
            });
        return new KafkaMessageListenerContainer(consumerFactory, properties);
    }
    

    他のMessageListener,B a t c h A k o w l e d g g i n C o n s u m e r A wareMessageListenerはGenericMessageListenerの実装クラスである

    @KafkaListener

    @Component
    @Slf4j
    public class KafkaConsumer {
        // 
          @KafkaListener(topics = {"first_top2"})
          public void consumer(ConsumerRecord<?, ?> record){
              Optional<?> kafkaMessage = Optional.ofNullable(record.value());
              if (kafkaMessage.isPresent()) {
                  Object message = kafkaMessage.get();
                  log.info("record =" + record);
                  log.info(" message =" + message);
              }
          }
    
          // 
        @KafkaListener(topics = {"first_top"},containerFactory="batchFactory")
        public void consumerBatch(List<ConsumerRecord<?, ?>> record){
            log.info(" :{}",record.size());
        }
    }
    
    @Bean
    public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory consumerFactory){
        ConcurrentKafkaListenerContainerFactory<Integer,String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(10);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.setBatchListener(true);// , Kafka 
        return factory;
    }
    

    application.yml
     messages:
        basename: i18n/Messages,i18n/Pages
      kafka:
        #bootstrap-servers: 10.10.2.138:9092,10.10.2.138:9093,10.10.2.138:9094
        bootstrap-servers: 47.106.106.53:9092
        template:
          default-topic: self-topic0
        consumer:
              key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
              value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
              group-id: myGroup998
              #  offset
              auto-offset-reset: earliest
              #  
              max-poll-records: 1000
              #  
              enable-auto-commit: true
        consumer-extra:
               #  
               batch-listener: true
    

    @KafkaListenerプロパティ
  • id:消費者のidであり、GroupIdが構成されていない場合、デフォルトidはGroupId
  • である.
  • containerFactory:上記@KafkaListenerは単一データかマルチデータ消費かを区別する注釈のcontainerFactory属性を構成するだけでよい.この中にはリスニングコンテナ工場、つまりC o n c u r r e n t KafkaListenerContainerFactoryが配置され、BeanName
  • が配置されている.
  • topics:リスニングが必要なTopic、複数の
  • をリスニング可能
  • topicPartitions:より詳細なリスニング情報を構成できます.Topicの指定されたパーティションをリスニングするか、offsetが200のオフセットから
  • をリスニングする必要があります.
  • errorHandler:例外プロセッサの傍受、BeanName
  • の構成
  • groupId:消費グループID
  • idIsGroup:idがGroupIdであるかどうか
  • clientIdPrefix:消費者Idプレフィックス
  • beanRef:コンテナのBeanNameを実際にリスニングするには、BeanNameの前に「」を付ける必要があります.

  • Topicで指定されたパーティションのリスニング

    @KafkaListener(id = "id0", containerFactory="batchFactory",
                   topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
    public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
        log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id0 records size " +  records.size());
    
        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p0 Received message={}",  message);
            }
        }
    }
    

    注記方式メッセージヘッダおよびメッセージボディの取得

    @KafkaListener(id = "group3", topics = "first_top3")
        public void annoListener(@Payload String data,
                                 @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
                                 @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partition,
                                 @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                 @Header(KafkaHeaders.RECEIVED_TIMESTAMP) String ts) {
            log.info(" receive : 
    "
    + "data : "+data+"
    "
    + "key : "+key+"
    "
    + "partitionId : "+partition+"
    "
    + "topic : "+topic+"
    "
    + "timestamp : "+ts+"
    "
    ); }

    テスト方法:http://127.0.0.1:8080/send?topic=first_top3&msg=message

    Ackメカニズムを用いて消費を確認する


    RabbitMQの消費は使い捨てといえます.つまり、消費を確認するとすぐにハードディスクやメモリから削除されます.また、RabbitMQの粗さは順番消費で、列に並ぶように順番消費され、確認されていないメッセージは再び列に戻り、リスナーが再び消費されるのを待っています.
    しかし、Kafkaとは異なり、Kafkaは最新のオフセット量を保存することによってメッセージ消費を行い、消費を確認するメッセージはすぐに削除されないので、削除されていないデータを繰り返し消費することができ、第1のメッセージが確認されていない場合、第2のメッセージが確認された場合、Kafkaは第2のメッセージのオフセット量、すなわち第1のメッセージがリスナーによって取得されない場合、最初のメッセージのオフセット量に基づいて手動で取得しない限り
    アプリケーションをymlのenable-auto-commitfalseに設定され、自動的にコミットされないように設定されています.
    @Bean("ackContainerFactory")
    public ConcurrentKafkaListenerContainerFactory ackContainerFactory(
        ConsumerFactory consumerFactory) {
        ConcurrentKafkaListenerContainerFactory factory = 
            new ConcurrentKafkaListenerContainerFactory();
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(consumerFactory);
        return factory;
    } 
    
    @KafkaListener(id = "ack", topics = "ack",containerFactory = "ackContainerFactory")
    public void ackListener(ConsumerRecord record, Acknowledgment ack) {
        log.info("receive : " + record.value());
        // 
        //  ack.acknowledge();
    }
    

    メッセージ転送の実装

    @KafkaListener(id = "forward", topics = "first_top4")
    @SendTo("first_top2")
    public String forward(String data) {
        log.info(" :{}",data);
        return "send msg : " + data;
    }
    

    リスニングの停止を開始

    @RestController
    public class ConsumerContoller {
    
        @Autowired
        private KafkaListenerEndpointRegistry registry;
    
        @Autowired
        private ConsumerFactory consumerFactory;
    
        @GetMapping("/stop")
        public String stop(){
            registry.getListenerContainer("forward").pause();
            return "success";
        }
    
        @GetMapping("/start")
        public String start(){
           // , 
            if (!registry.getListenerContainer("forward").isRunning()) {
                registry.getListenerContainer("forward").start();
            }
            registry.getListenerContainer("forward").resume();
            return "success";
        }
    
    }
    

    起動クラス@EnableKafkaを追加するには

    メッセージフィルタの設定


    メッセージフィルタは、メッセージがリスニングコンテナに到達する前にブロックされ、フィルタは、システムトラフィックロジックに従って必要なデータをフィルタし、KafkaListenerによって処理される.
    /**
         *  
         * @return
         */
        @Bean
    public ConcurrentKafkaListenerContainerFactory filterContainerFactory(
        ConsumerFactory consumerFactory) {
        ConcurrentKafkaListenerContainerFactory factory = 
            new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        // RecordFilterStrategy , 
        factory.setAckDiscarded(true);
        factory.setRecordFilterStrategy(new RecordFilterStrategy() {
            @Override
            public boolean filter(ConsumerRecord consumerRecord) {
                String msg = (String) consumerRecord.value();
                if(msg.contains("abc")){
                    return false;
                }
                log.info("filterContainerFactory filter : "+msg);
                // true 
                return true;
            }
        });
        return factory;
    }
    
    public class FilterListener {
    
        @KafkaListener(topics = {"filter_topic"},containerFactory="filterContainerFactory")
        public void consumerBatch(ConsumerRecord<?, ?> record){
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            if (kafkaMessage.isPresent()) {
                Object message = kafkaMessage.get();
                log.info("record =" + record);
                log.info(" :{}",message);
            }
    
        }
    }
    

    テスト


    生産者住所:http://127.0.0.1:8080/swagger-ui.html