Spring WebSocket with Kafka
計画されたプロジェクトアーキテクチャは次のとおりです.
基本的にMVC構造と似ており,DTOを用いてクライアントと通信する.
チャットルーム情報などはデバイスに格納され、チャット履歴はまずカフカに格納される方式…?として現れる.
MVCモードに関する問題は抜きにして、カフカの設定だけに集中しているので、理解してほしい.
build.gradle
を使用するか 文字列値uuidであり、チャット履歴jsonの値であるため、KeyをJSONに設定します.したがって、これに基づいてProduceFactoryを作成し、Kafkaで
この設定は消費者も同じです.
ここでもFactoryを作成して使用します.違いは、私が作成した任意のオブジェクトであるため、カフカでの復号を支援する必要がある点です.
したがって、
Service
基本的にMVC構造と似ており,DTOを用いてクライアントと通信する.
チャットルーム情報などはデバイスに格納され、チャット履歴はまずカフカに格納される方式…?として現れる.
MVCモードに関する問題は抜きにして、カフカの設定だけに集中しているので、理解してほしい.
build.gradle // build.gradle
// Kafka
implementation("org.springframework.kafka:spring-kafka")
// 1. Use Guava in your implementation only
implementation("com.google.guava:guava:31.1-jre")
build.gradle設定にこの項目を追加します.
Configurations // Kafka 에서 통신할 내용
public class KafkaConstants {
public static final String KAFKA_TOPIC = "test";
public static final String GROUP_ID = "tt";
public static final String KAFKA_BROKER = "localhost:9092";
}
トピック、グループ名、ブローカ・アドレスの変更を容易にするために、それらをクラスとして個別に保持します.
Producer Config
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, MessageDto> producerFactory() {
return new DefaultKafkaProducerFactory<>(kafkaProducerConfiguration());
}
@Bean
public Map<String, Object> kafkaProducerConfiguration() {
return ImmutableMap.<String, Object>builder()
.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER)
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class)
.put("group.id", KafkaConstants.GROUP_ID)
.build();
}
@Bean
public KafkaTemplate<String, MessageDto> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
重要なのは中央がkafkaProducerConfiguration
のようです.ここにはいくつかの設定があります.HashMap
を使っても大丈夫ですが、この設定は変わらないのでGoogle番ザクロを導入してImmutableMap
を使いました.
内容的には、
// build.gradle
// Kafka
implementation("org.springframework.kafka:spring-kafka")
// 1. Use Guava in your implementation only
implementation("com.google.guava:guava:31.1-jre")
// Kafka 에서 통신할 내용
public class KafkaConstants {
public static final String KAFKA_TOPIC = "test";
public static final String GROUP_ID = "tt";
public static final String KAFKA_BROKER = "localhost:9092";
}
トピック、グループ名、ブローカ・アドレスの変更を容易にするために、それらをクラスとして個別に保持します.Producer Config
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, MessageDto> producerFactory() {
return new DefaultKafkaProducerFactory<>(kafkaProducerConfiguration());
}
@Bean
public Map<String, Object> kafkaProducerConfiguration() {
return ImmutableMap.<String, Object>builder()
.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER)
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class)
.put("group.id", KafkaConstants.GROUP_ID)
.build();
}
@Bean
public KafkaTemplate<String, MessageDto> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
重要なのは中央がkafkaProducerConfiguration
のようです.ここにはいくつかの設定があります.HashMap
を使っても大丈夫ですが、この設定は変わらないのでGoogle番ザクロを導入してImmutableMap
を使いました.内容的には、
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
:エージェントアドレスを設定します.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
:どのシリアル設定キーProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
:Valueは?"group.id"
:指定グループidKafkaTemplate
として使用します.この設定は消費者も同じです.
ConsumerConfig
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageDto> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MessageDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, MessageDto> consumerFactory() {
JsonDeserializer<MessageDto> deserializer = new JsonDeserializer<>(MessageDto.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER)
.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.put("group.id", KafkaConstants.GROUP_ID)
.build();
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}
}
Producerとあまり差がありませんここでもFactoryを作成して使用します.違いは、私が作成した任意のオブジェクトであるため、カフカでの復号を支援する必要がある点です.
したがって、
MessageDto
では、上に4行を追加する必要があります.カフカではエラーは発生しません.WebSocketconfig
@Configuration
// @EnableWebSocket
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
// 메시지 발행 요청 : /topic (Application Destination Prefix)
// 메시지 구독 요청 : /kafka (enable Simple Broker)
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic");
registry.setApplicationDestinationPrefixes("/kafka");
}
// Stomp WebSocket Endpoint : /ws-chat
// Unity 에서 접속하려 하니 SockJS 를 빼야 했다.
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-chat")
.setAllowedOrigins("*");
}
WebSocketプリファレンスです.不思議なことにconsumerFactory
Unityでは接続できません.Service
KafkaProducer
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {
private final KafkaTemplate<String, MessageDto> kafkaTemplate;
public void send(String topic, MessageDto messageDto) {
log.info("topic : " + topic);
log.info("send Message : " + messageDto.getMessage());
kafkaTemplate.send(topic, messageDto);
}
}
上記SockJS
に登録されているカフカテンプレートを使用して、指定されたトピックに情報を送信する方法.
KafkaConsumuer
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaConsumer {
private final SimpMessagingTemplate template;
@KafkaListener(topics = KafkaConstants.KAFKA_TOPIC, groupId = KafkaConstants.GROUP_ID)
public void consume(MessageDto message) throws IOException {
log.info("Consumed Message : " + message.getMessage());
HashMap<String, String> msg = new HashMap<>();
msg.put("roomId", message.getRoomId());
msg.put("message", message.getMessage());
msg.put("writer", message.getWriter());
ObjectMapper mapper = new ObjectMapper();
template.convertAndSend("/topic/tt", mapper.writeValueAsString(msg));
}
}
producerConfig
arnotationで、購読するトピックとグループ名を設定します.
そこからメッセージを読み出し、KafkaListener
を使用してSTOMP WebSocketにメッセージを送信する.
Controller
ChatController
@RestController
@RequiredArgsConstructor
@Slf4j
public class ChatController {
private final ChatServiceImpl chatService;
private final KafkaTemplate<String, MessageDto> kafkaTemplate;
private final KafkaProducer kafkaProducer;
private final ChatMessageHistoryRepository chatMessageHistoryRepository;
@PostMapping("/publish")
public void sendMessage(@RequestBody MessageDto messageDto) {
log.info("ChatController -> sendMessage : " + messageDto.getMessage());
try {
kafkaTemplate.send(KafkaConstants.KAFKA_TOPIC, messageDto);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@PostMapping("/message")
@MessageMapping("/message")
public void message(@RequestBody MessageDto message) {
log.info(message.getMessage());
chatMessageHistoryRepository.save(message);
kafkaProducer.send(KafkaConstants.KAFKA_TOPIC, message);
}
@GetMapping("/history")
public List<MessageDto> getMessageHistory() {
log.info("history 호출");
return chatMessageHistoryRepository.get();
}
}
個別のクライアントウィンドウがないため、StompWebsocketが接続されているかどうかを判断できないため、SimpMessagingTemplate
が追加されました.PostMapping
にメッセージを送信すると、プロデューサーサービスが呼び出され、破棄されます.kafka/message
は、コントローラ上で擬似動作を直接処理する動作である.
追いかけながら理解してみましたが、関数の定理が必要だと思います.
簡単にコードを書いて説明しただけで、少し理解できましたが、初めて会った人は理解しにくいようです.
もう一度触ってから整理して
Reference
この問題について(Spring WebSocket with Kafka), 我々は、より多くの情報をここで見つけました
https://velog.io/@sossont/Spring-WebSocket-with-Kafka
テキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {
private final KafkaTemplate<String, MessageDto> kafkaTemplate;
public void send(String topic, MessageDto messageDto) {
log.info("topic : " + topic);
log.info("send Message : " + messageDto.getMessage());
kafkaTemplate.send(topic, messageDto);
}
}
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaConsumer {
private final SimpMessagingTemplate template;
@KafkaListener(topics = KafkaConstants.KAFKA_TOPIC, groupId = KafkaConstants.GROUP_ID)
public void consume(MessageDto message) throws IOException {
log.info("Consumed Message : " + message.getMessage());
HashMap<String, String> msg = new HashMap<>();
msg.put("roomId", message.getRoomId());
msg.put("message", message.getMessage());
msg.put("writer", message.getWriter());
ObjectMapper mapper = new ObjectMapper();
template.convertAndSend("/topic/tt", mapper.writeValueAsString(msg));
}
}
ChatController
@RestController
@RequiredArgsConstructor
@Slf4j
public class ChatController {
private final ChatServiceImpl chatService;
private final KafkaTemplate<String, MessageDto> kafkaTemplate;
private final KafkaProducer kafkaProducer;
private final ChatMessageHistoryRepository chatMessageHistoryRepository;
@PostMapping("/publish")
public void sendMessage(@RequestBody MessageDto messageDto) {
log.info("ChatController -> sendMessage : " + messageDto.getMessage());
try {
kafkaTemplate.send(KafkaConstants.KAFKA_TOPIC, messageDto);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@PostMapping("/message")
@MessageMapping("/message")
public void message(@RequestBody MessageDto message) {
log.info(message.getMessage());
chatMessageHistoryRepository.save(message);
kafkaProducer.send(KafkaConstants.KAFKA_TOPIC, message);
}
@GetMapping("/history")
public List<MessageDto> getMessageHistory() {
log.info("history 호출");
return chatMessageHistoryRepository.get();
}
}
個別のクライアントウィンドウがないため、StompWebsocketが接続されているかどうかを判断できないため、SimpMessagingTemplate
が追加されました.PostMapping
にメッセージを送信すると、プロデューサーサービスが呼び出され、破棄されます.kafka/message
は、コントローラ上で擬似動作を直接処理する動作である.追いかけながら理解してみましたが、関数の定理が必要だと思います.
簡単にコードを書いて説明しただけで、少し理解できましたが、初めて会った人は理解しにくいようです.
もう一度触ってから整理して
Reference
この問題について(Spring WebSocket with Kafka), 我々は、より多くの情報をここで見つけました https://velog.io/@sossont/Spring-WebSocket-with-Kafkaテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol