Embedded KafkaでKafkaをテスト
アウトライン
マイクロサービスアーキテクチャ環境では、通常、Kafkaを使用して異なるドメイン間の分散イベントを処理しますが、実際に実行されているKafkaでテストを行う場合、よく見られます.
これは、試験が外部afkaに依存するため、試験の安定性を低下させる.
外部Kafkaの問題はテストの実行を制限する可能性があると思います.
外部Kafkaサーバに依存することなく、Spring Bootで信頼性の高い独立した統合テストを行う方法を説明します.
依存性🐭
まず、SpringにKafkaに関連する依存項目を追加してKafkaを使用します.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.2</version>
</dependency>
次にKafka Testに関連する依存性を追加する.テスト環境でのみ依存性を維持するため、scopeをtestに設定します.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.6.3.RELEASE</version>
<scope>test</scope>
</dependency>
適用例🐰
サンプルコードは[6]個.
テストコードの結果は成功しました.
テキストとして表示
application.properties
------------------------------------------------------
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
consumer:
auto-offset-reset: earliest
bootstrap-servers: localhost:9092
enable-auto-commit: false
listener:
ack-mode: manual
KafkaProducerConfig.class
------------------------------------------------------
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String BOOTSTRAP_SERVERS;
@Bean
public ProducerFactory<String, String> factory(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(factory());
}
}
KafkaConsumerConfig.class
------------------------------------------------------
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String BOOTSTRAP_ADDRESS;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String AUTO_OFFSET_RESET;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean AUTO_COMMIT;
@Bean
ConsumerFactory<String,String> consumerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new StringDeserializer());
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
KafkaProducer.class
------------------------------------------------------
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
kafkaTemplate.send(topic, payload);
}
}
KafkaConsumer.class
------------------------------------------------------
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaConsumer {
private final ObjectMapper objectMapper;
private List<RegisteredPostEvent> eventRepo = new ArrayList<>();
@KafkaListener(topics = "testTopic", groupId = "testGroup")
protected void consume(@Payload String payload, Acknowledgment acknowledgment) throws Exception {
log.info("recive event : {}", payload);
RegisteredPostEvent event = objectMapper.readValue(payload, RegisteredPostEvent.class);
eventRepo.add(event);
// Process
acknowledgment.acknowledge();
}
public List<RegisteredPostEvent> getEventRepo() {
return eventRepo;
}
}
EmbeddedKafkaTest.class
------------------------------------------------------
@SpringBootTest
@EmbeddedKafka(partitions = 3,
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092"
},
ports = { 9092 })
class EmbeddedKafkaIntegrationTest {
@Autowired
KafkaProducer producer;
@Autowired
ObjectMapper objectMapper;
@Autowired
private KafkaConsumer kafkaConsumer;
@Test
void test() throws Exception {
// given
RegisteredPostEvent event = RegisteredPostEvent.idOf(1L);
String payload = objectMapper.writeValueAsString(event);
// when
producer.send("testTopic", payload);
Thread.sleep(2000);
// then
assertNotEquals(0, kafkaConsumer.getEventRepo().size());
}
}
コア🐭
上記のコードでは、トピックの最も重要な部分は、EmbeddedKafkaintegrationTestクラスで宣言されたEmbeddedKafka言語テストです.
上記の宣言はspring-Kafka-testに存在する宣言であり、これらの宣言をクラスレベルに宣言することで、テストはKafkaに埋め込まれた環境でテストされます.
注)https://www.baeldung.com/spring-boot-kafka-testing
Reference
この問題について(Embedded KafkaでKafkaをテスト), 我々は、より多くの情報をここで見つけました https://velog.io/@wodyd202/Embedded-Kafka를-통한-Kafka-테스트テキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol