Embedded KafkaでKafkaをテスト


アウトライン


マイクロサービスアーキテクチャ環境では、通常、Kafkaを使用して異なるドメイン間の分散イベントを処理しますが、実際に実行されているKafkaでテストを行う場合、よく見られます.
これは、試験が外部afkaに依存するため、試験の安定性を低下させる.
外部Kafkaの問題はテストの実行を制限する可能性があると思います.
外部Kafkaサーバに依存することなく、Spring Bootで信頼性の高い独立した統合テストを行う方法を説明します.

依存性🐭


まず、SpringにKafkaに関連する依存項目を追加してKafkaを使用します.
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.2</version>
</dependency>
次にKafka Testに関連する依存性を追加する.
テスト環境でのみ依存性を維持するため、scopeをtestに設定します.
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>2.6.3.RELEASE</version>
    <scope>test</scope>
</dependency>

適用例🐰


サンプルコードは[6]個.
  • は、まずKafkaを使用するために属性を設定する.
  • Producer設定を設定します.
  • Consumerも設定します.
  • Consumerを作成します.
  • Producerを作成します.
  • 上記のコードに基づいてテストコードを作成しました.
    テストコードの結果は成功しました.

    テキストとして表示
    application.properties
    ------------------------------------------------------
    
    spring:
        kafka:
            producer:
                bootstrap-servers: localhost:9092
            consumer:
                auto-offset-reset: earliest
                bootstrap-servers: localhost:9092
                enable-auto-commit: false
            listener:
                ack-mode: manual
    
    
    
    KafkaProducerConfig.class
    ------------------------------------------------------
    
    @Configuration
    public class KafkaProducerConfig {
        @Value("${spring.kafka.producer.bootstrap-servers}")
        private String BOOTSTRAP_SERVERS;
    
        @Bean
        public ProducerFactory<String, String> factory(){
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(props);
        }
    
        @Bean
        KafkaTemplate<String, String> kafkaTemplate(){
            return new KafkaTemplate<>(factory());
        }
    }
    
    
    
    KafkaConsumerConfig.class
    ------------------------------------------------------
    
    @Configuration
    public class KafkaConsumerConfig {
        @Value("${spring.kafka.consumer.bootstrap-servers}")
        private String BOOTSTRAP_ADDRESS;
    
        @Value("${spring.kafka.consumer.auto-offset-reset}")
        private String AUTO_OFFSET_RESET;
    
        @Value("${spring.kafka.consumer.enable-auto-commit}")
        private boolean AUTO_COMMIT;
    
        @Bean
        ConsumerFactory<String,String> consumerFactory(){
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT);
            return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new StringDeserializer());
        }
    
        @Bean
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory(){
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    }
    
    
    
    KafkaProducer.class
    ------------------------------------------------------
    
    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class KafkaProducer {
        private final KafkaTemplate<String, String> kafkaTemplate;
    
        public void send(String topic, String payload) {
            kafkaTemplate.send(topic, payload);
        }
    }
    
    
    
    KafkaConsumer.class
    ------------------------------------------------------
    
    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class KafkaConsumer {
        private final ObjectMapper objectMapper;
        private List<RegisteredPostEvent> eventRepo = new ArrayList<>();
    
        @KafkaListener(topics = "testTopic", groupId = "testGroup")
        protected void consume(@Payload String payload, Acknowledgment acknowledgment) throws Exception {
            log.info("recive event : {}", payload);
            RegisteredPostEvent event = objectMapper.readValue(payload, RegisteredPostEvent.class);
            eventRepo.add(event);
    
            // Process
            acknowledgment.acknowledge();
        }
    
        public List<RegisteredPostEvent> getEventRepo() {
            return eventRepo;
        }
    }
    
    
    
    EmbeddedKafkaTest.class
    ------------------------------------------------------
    
    @SpringBootTest
    @EmbeddedKafka(partitions = 3,
                   brokerProperties = {
                        "listeners=PLAINTEXT://localhost:9092"
                   },
                   ports = { 9092 })
    class EmbeddedKafkaIntegrationTest {
    
        @Autowired
        KafkaProducer producer;
    
        @Autowired
        ObjectMapper objectMapper;
    
        @Autowired
        private KafkaConsumer kafkaConsumer;
    
        @Test
        void test() throws Exception {
            // given
            RegisteredPostEvent event = RegisteredPostEvent.idOf(1L);
            String payload = objectMapper.writeValueAsString(event);
    
            // when
            producer.send("testTopic", payload);
            Thread.sleep(2000);
    
            // then
            assertNotEquals(0, kafkaConsumer.getEventRepo().size());
        }
    }
    

    コア🐭


    上記のコードでは、トピックの最も重要な部分は、EmbeddedKafkaintegrationTestクラスで宣言されたEmbeddedKafka言語テストです.
    上記の宣言はspring-Kafka-testに存在する宣言であり、これらの宣言をクラスレベルに宣言することで、テストはKafkaに埋め込まれた環境でテストされます.
    注)https://www.baeldung.com/spring-boot-kafka-testing