Kafka Spring[1]Producer、Consumerを利用
📘Kafka Springで使用
Kafkaの動作構造を理解している以上、SpringCloudを直接作成して使用しましょう.
これまで作成したcatalog-serviceとorder-serviceを使用して、注文が発生したときにcatalog-service内の在庫qtyの値を減らすKafkaのメッセージで処理される論理を作成しようとしました.
🔨ディレクトリサービスの変更(Consumer) <!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
カフカ依存性の追加@EnableKafka //kafka 설정 추가
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); //kafka 실행 서버 ip
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); //Consumer들을 그룹핑 할수 있다.
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //KEY 값을 String de serializer로 지정
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //VALUE 값을 String de serializer로 지정
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory()); //위에서 설정한 consumerFactory를 설정해줌
return kafkaListenerContainerFactory;
}
}
srcの下にmessagequeueパッケージを生成し、KafkaConsumerConfig
ファイルを生成し、Kafka設定値を追加します.Catalogは、Orderからメッセージを読み出して使用する側なので、DESERIALIZERに設定します.@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
private final CatalogRepository repository;
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage){
log.info("kafka message = {}", kafkaMessage);
//kafka 메세지 역 직렬화
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
CatalogEntity entity = repository.findByProductId(map.get("productId").toString());
if(entity != null){
entity.setStock(entity.getStock() - (Integer)map.get("qty"));
//update
repository.save(entity);
}
}
}
その後、@KafkaListener
言語を使用してコードを記述し、example-calog-toctopトポロジーマップを待つ間に、ObjectMapperを使用してjsonデータをグループ化し、論理を実行する.
🔨オーダーサービスの変更 <!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
同じカフカ依存性を追加@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory(){
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); //kafka 실행 서버 ip
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //KEY 값을 String serializer로 지정
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //VALUE 값을 String serializer로 지정
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
これに対し、Producerの設定として追加します.ほぼ同じですが、すべての設定がConsumerではないことを確認し、コードを書きます!@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public OrderDto orderSend(String topic, OrderDto orderDto){
ObjectMapper mapper = new ObjectMapper();
//json format으로 변경
String json = "";
try {
json = mapper.writeValueAsString(orderDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
//kafka 메세지 전송
kafkaTemplate.send(topic, json);
log.info("Kafka Producer send data from the order service = {}",orderDto);
return orderDto;
}
}
次に、主題およびOrderDtoオブジェクトをパラメータとして渡し、OrderDtoはそれをjson形式に変換し、KafkaTemplate
のsend()
を介してメッセージを伝えるためにOrderSendを記述する.@RestController
@RequestMapping("/order-service")
@RequiredArgsConstructor
public class OrderController {
private final Environment env;
private final OrderService orderService;
private final KafkaProducer kafkaProducer; //kafka producer 주입
...
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder requestOrder){
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
//기존의 jpa 로직
OrderDto orderDto = mapper.map(requestOrder, OrderDto.class);
orderDto.setUserId(userId);
OrderDto createOrder = orderService.createOrder(orderDto);
ResponseOrder responseOrder = mapper.map(createOrder, ResponseOrder.class);
//kafka 로직 추가
kafkaProducer.orderSend("example-catalog-topic", orderDto);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
}
コントローラで既存の内部ロジックのみを実行する部分に、カフカメッセージングロジックを追加します.
👏テスト
実行する前に、Config Server、Eureka Server、Kafka Server、Kafka zookeaper Server、Gateway Serverを実行してください../bin/windows/kafka-server-start.bat ./config/server.properties
./bin/windows/zookeeper-server-start.bat ./config/zookeeper.properties
そしてCatalogサービスでDBをチェックすると、初期データに設定されていると判断できます.
Order Serviceを実行し、正常に戻りました.
user-service正常ログインを実行すると、アイデンティティ値は受け入れられませんが、受注のみをテストし、受注時にuserの論理をチェックしていないため、正常に実行されます.
order-serviceは正常にメッセージを送信しました.
catalog-サービスも正常にメッセージを受信します.
注文できる数量も90に減ったことが確認できます.
Reference
この問題について(Kafka Spring[1]Producer、Consumerを利用), 我々は、より多くの情報をここで見つけました
https://velog.io/@ililil9482/Kafka-활용해보기-1-Producer-Consumer
テキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
カフカ依存性の追加@EnableKafka //kafka 설정 추가
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); //kafka 실행 서버 ip
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); //Consumer들을 그룹핑 할수 있다.
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //KEY 값을 String de serializer로 지정
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //VALUE 값을 String de serializer로 지정
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory()); //위에서 설정한 consumerFactory를 설정해줌
return kafkaListenerContainerFactory;
}
}
srcの下にmessagequeueパッケージを生成し、KafkaConsumerConfig
ファイルを生成し、Kafka設定値を追加します.Catalogは、Orderからメッセージを読み出して使用する側なので、DESERIALIZERに設定します.@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
private final CatalogRepository repository;
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage){
log.info("kafka message = {}", kafkaMessage);
//kafka 메세지 역 직렬화
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
CatalogEntity entity = repository.findByProductId(map.get("productId").toString());
if(entity != null){
entity.setStock(entity.getStock() - (Integer)map.get("qty"));
//update
repository.save(entity);
}
}
}
その後、@KafkaListener
言語を使用してコードを記述し、example-calog-toctopトポロジーマップを待つ間に、ObjectMapperを使用してjsonデータをグループ化し、論理を実行する.🔨オーダーサービスの変更 <!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
同じカフカ依存性を追加@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory(){
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); //kafka 실행 서버 ip
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //KEY 값을 String serializer로 지정
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //VALUE 값을 String serializer로 지정
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
これに対し、Producerの設定として追加します.ほぼ同じですが、すべての設定がConsumerではないことを確認し、コードを書きます!@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public OrderDto orderSend(String topic, OrderDto orderDto){
ObjectMapper mapper = new ObjectMapper();
//json format으로 변경
String json = "";
try {
json = mapper.writeValueAsString(orderDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
//kafka 메세지 전송
kafkaTemplate.send(topic, json);
log.info("Kafka Producer send data from the order service = {}",orderDto);
return orderDto;
}
}
次に、主題およびOrderDtoオブジェクトをパラメータとして渡し、OrderDtoはそれをjson形式に変換し、KafkaTemplate
のsend()
を介してメッセージを伝えるためにOrderSendを記述する.@RestController
@RequestMapping("/order-service")
@RequiredArgsConstructor
public class OrderController {
private final Environment env;
private final OrderService orderService;
private final KafkaProducer kafkaProducer; //kafka producer 주입
...
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder requestOrder){
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
//기존의 jpa 로직
OrderDto orderDto = mapper.map(requestOrder, OrderDto.class);
orderDto.setUserId(userId);
OrderDto createOrder = orderService.createOrder(orderDto);
ResponseOrder responseOrder = mapper.map(createOrder, ResponseOrder.class);
//kafka 로직 추가
kafkaProducer.orderSend("example-catalog-topic", orderDto);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
}
コントローラで既存の内部ロジックのみを実行する部分に、カフカメッセージングロジックを追加します.
👏テスト
実行する前に、Config Server、Eureka Server、Kafka Server、Kafka zookeaper Server、Gateway Serverを実行してください../bin/windows/kafka-server-start.bat ./config/server.properties
./bin/windows/zookeeper-server-start.bat ./config/zookeeper.properties
そしてCatalogサービスでDBをチェックすると、初期データに設定されていると判断できます.
Order Serviceを実行し、正常に戻りました.
user-service正常ログインを実行すると、アイデンティティ値は受け入れられませんが、受注のみをテストし、受注時にuserの論理をチェックしていないため、正常に実行されます.
order-serviceは正常にメッセージを送信しました.
catalog-サービスも正常にメッセージを受信します.
注文できる数量も90に減ったことが確認できます.
Reference
この問題について(Kafka Spring[1]Producer、Consumerを利用), 我々は、より多くの情報をここで見つけました
https://velog.io/@ililil9482/Kafka-활용해보기-1-Producer-Consumer
テキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory(){
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); //kafka 실행 서버 ip
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //KEY 값을 String serializer로 지정
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //VALUE 값을 String serializer로 지정
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public OrderDto orderSend(String topic, OrderDto orderDto){
ObjectMapper mapper = new ObjectMapper();
//json format으로 변경
String json = "";
try {
json = mapper.writeValueAsString(orderDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
//kafka 메세지 전송
kafkaTemplate.send(topic, json);
log.info("Kafka Producer send data from the order service = {}",orderDto);
return orderDto;
}
}
@RestController
@RequestMapping("/order-service")
@RequiredArgsConstructor
public class OrderController {
private final Environment env;
private final OrderService orderService;
private final KafkaProducer kafkaProducer; //kafka producer 주입
...
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder requestOrder){
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
//기존의 jpa 로직
OrderDto orderDto = mapper.map(requestOrder, OrderDto.class);
orderDto.setUserId(userId);
OrderDto createOrder = orderService.createOrder(orderDto);
ResponseOrder responseOrder = mapper.map(createOrder, ResponseOrder.class);
//kafka 로직 추가
kafkaProducer.orderSend("example-catalog-topic", orderDto);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
}
実行する前に、Config Server、Eureka Server、Kafka Server、Kafka zookeaper Server、Gateway Serverを実行してください.
./bin/windows/kafka-server-start.bat ./config/server.properties
./bin/windows/zookeeper-server-start.bat ./config/zookeeper.properties
そしてCatalogサービスでDBをチェックすると、初期データに設定されていると判断できます.
Order Serviceを実行し、正常に戻りました.
user-service正常ログインを実行すると、アイデンティティ値は受け入れられませんが、受注のみをテストし、受注時にuserの論理をチェックしていないため、正常に実行されます.
order-serviceは正常にメッセージを送信しました.
catalog-サービスも正常にメッセージを受信します.
注文できる数量も90に減ったことが確認できます.
Reference
この問題について(Kafka Spring[1]Producer、Consumerを利用), 我々は、より多くの情報をここで見つけました https://velog.io/@ililil9482/Kafka-활용해보기-1-Producer-Consumerテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol