Spring-Kafka略記-ベース使用

5373 ワード

new无言転載オリジナル出典を明記してください.ありがとうございます.
に頼る
MAVEN

  org.springframework.kafka
  spring-kafka
  2.1.10.RELEASE


Gradle
compile 'org.springframework.kafka:spring-kafka:2.1.10.RELEASE'

互換性
  • Apache Kafkaクライアント1.0.x以降
  • Spring Framework 5.0.x
  • 最低Javaバージョン:8
  • インフラストラクチャの実装
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class DemoTest {
    
        private static Logger logger = LoggerFactory.getLogger(DemoTest.class);
    
        @Test
        public void testAutoCommit() throws Exception {
            logger.info("Start auto");
            ContainerProperties containerProps = new ContainerProperties("test");
            final CountDownLatch latch = new CountDownLatch(50);
            containerProps.setMessageListener(new MessageListener() {
    
                @Override
                public void onMessage(ConsumerRecord message) {
                    logger.info("received: " + message);
                    latch.countDown();
                }
    
            });
            KafkaMessageListenerContainer container = createContainer(containerProps);
            container.setBeanName("testAuto");
            container.start();
            Thread.sleep(1000); // wait a bit for the container to start
            KafkaTemplate template = createTemplate();
            template.setDefaultTopic("test1");
            template.sendDefault(0, "foo");
            template.sendDefault(2, "bar");
            template.sendDefault(0, "baz");
            template.sendDefault(2, "qux");
            template.flush();
            assertTrue(latch.await(60, TimeUnit.SECONDS));
            container.stop();
            logger.info("Stop auto");
    
        }
        private KafkaMessageListenerContainer createContainer(
                ContainerProperties containerProps) {
            Map props = consumerProps();
            DefaultKafkaConsumerFactory cf =
                    new DefaultKafkaConsumerFactory(props);
            KafkaMessageListenerContainer container =
                    new KafkaMessageListenerContainer<>(cf, containerProps);
            return container;
        }
    
        private KafkaTemplate createTemplate() {
            Map senderProps = senderProps();
            ProducerFactory pf =
                    new DefaultKafkaProducerFactory(senderProps);
            KafkaTemplate template = new KafkaTemplate<>(pf);
            return template;
        }
    
        private Map consumerProps() {
            Map props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return props;
        }
    
        private Map senderProps() {
            Map props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ProducerConfig.RETRIES_CONFIG, 0);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
    }
    

    Spring Boot構成実装アプリケーション.yml
    spring:
      kafka:
        consumer:
          group-id: foo
          auto-offset-reset: earliest
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class DemoSpringBootTest {
    
        private static Logger logger = LoggerFactory.getLogger(DemoSpringBootTest.class);
    
    
        @Autowired
        private KafkaTemplate template;
    
        private final CountDownLatch latch = new CountDownLatch(3);
    
        @Test
        public void test(){
            try {
                this.template.send("test", "foo1");
                this.template.send("test", "foo2");
                this.template.send("test", "foo3");
                latch.await(60, TimeUnit.SECONDS);
                logger.info("All received");
            } catch (Exception e) {
                logger.error("", e);
            }
        }
    
        @KafkaListener(topics = "test")
        public void listen(ConsumerRecord, ?> cr) throws Exception {
            logger.info(cr.toString());
            latch.countDown();
        }
    }