Spring WebSocket with Kafka


計画されたプロジェクトアーキテクチャは次のとおりです.
基本的にMVC構造と似ており,DTOを用いてクライアントと通信する.
チャットルーム情報などはデバイスに格納され、チャット履歴はまずカフカに格納される方式…?として現れる.
MVCモードに関する問題は抜きにして、カフカの設定だけに集中しているので、理解してほしい.

build.gradle

// build.gradle
// Kafka
implementation("org.springframework.kafka:spring-kafka")

// 1. Use Guava in your implementation only
implementation("com.google.guava:guava:31.1-jre")
build.gradle設定にこの項目を追加します.

Configurations

// Kafka 에서 통신할 내용
public class KafkaConstants {
    public static final String KAFKA_TOPIC = "test";
    public static final String GROUP_ID = "tt";
    public static final String KAFKA_BROKER = "localhost:9092";
}
トピック、グループ名、ブローカ・アドレスの変更を容易にするために、それらをクラスとして個別に保持します.

Producer Config

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, MessageDto> producerFactory() {
        return new DefaultKafkaProducerFactory<>(kafkaProducerConfiguration());
    }

    @Bean
    public Map<String, Object> kafkaProducerConfiguration() {
        return ImmutableMap.<String, Object>builder()
                .put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER)
                .put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                .put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class)
                .put("group.id", KafkaConstants.GROUP_ID)
                .build();
    }

    @Bean
    public KafkaTemplate<String, MessageDto> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}
重要なのは中央がkafkaProducerConfigurationのようです.ここにはいくつかの設定があります.HashMapを使っても大丈夫ですが、この設定は変わらないのでGoogle番ザクロを導入してImmutableMapを使いました.
内容的には、
  • ProducerConfig.BOOTSTRAP_SERVERS_CONFIG:エージェントアドレスを設定します.
  • ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG:どのシリアル設定キー
  • を使用するか
  • ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG:Valueは?
  • "group.id":指定グループid
  • 文字列値uuidであり、チャット履歴jsonの値であるため、KeyをJSONに設定します.したがって、これに基づいてProduceFactoryを作成し、KafkaでKafkaTemplateとして使用します.
    この設定は消費者も同じです.

    ConsumerConfig

    @EnableKafka
    @Configuration
    public class KafkaConsumerConfig {
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, MessageDto> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, MessageDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    
        @Bean
        public ConsumerFactory<String, MessageDto> consumerFactory() {
            JsonDeserializer<MessageDto> deserializer = new JsonDeserializer<>(MessageDto.class);
            deserializer.setRemoveTypeHeaders(false);
            deserializer.addTrustedPackages("*");
            deserializer.setUseTypeMapperForKey(true);
    
            ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
                    .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER)
                    .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                    .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
                    .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
                    .put("group.id", KafkaConstants.GROUP_ID)
                    .build();
    
            return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
        }
    
    }
    Producerとあまり差がありません
    ここでもFactoryを作成して使用します.違いは、私が作成した任意のオブジェクトであるため、カフカでの復号を支援する必要がある点です.
    したがって、MessageDtoでは、上に4行を追加する必要があります.カフカではエラーは発生しません.

    WebSocketconfig

    @Configuration
    // @EnableWebSocket
    @EnableWebSocketMessageBroker
    public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    
        // 메시지 발행 요청 : /topic (Application Destination Prefix)
        // 메시지 구독 요청 : /kafka (enable Simple Broker)
        @Override
        public void configureMessageBroker(MessageBrokerRegistry registry) {
            registry.enableSimpleBroker("/topic");
            registry.setApplicationDestinationPrefixes("/kafka");
        }
    
        // Stomp WebSocket Endpoint : /ws-chat
        // Unity 에서 접속하려 하니 SockJS 를 빼야 했다.
        @Override
        public void registerStompEndpoints(StompEndpointRegistry registry) {
            registry.addEndpoint("/ws-chat")
                    .setAllowedOrigins("*");
        }
    WebSocketプリファレンスです.不思議なことにconsumerFactoryUnityでは接続できません.

    Service


    KafkaProducer

    @Service
    @RequiredArgsConstructor
    @Slf4j
    public class KafkaProducer {
    
        private final KafkaTemplate<String, MessageDto> kafkaTemplate;
    
        public void send(String topic, MessageDto messageDto) {
            log.info("topic : " + topic);
            log.info("send Message : " + messageDto.getMessage());
            kafkaTemplate.send(topic, messageDto);
        }
    }
    上記SockJSに登録されているカフカテンプレートを使用して、指定されたトピックに情報を送信する方法.

    KafkaConsumuer

    @Service
    @RequiredArgsConstructor
    @Slf4j
    public class KafkaConsumer {
    
        private final SimpMessagingTemplate template;
    
        @KafkaListener(topics = KafkaConstants.KAFKA_TOPIC, groupId = KafkaConstants.GROUP_ID)
        public void consume(MessageDto message) throws IOException {
            log.info("Consumed Message : " + message.getMessage());
            HashMap<String, String> msg = new HashMap<>();
            msg.put("roomId", message.getRoomId());
            msg.put("message", message.getMessage());
            msg.put("writer", message.getWriter());
    
            ObjectMapper mapper = new ObjectMapper();
            template.convertAndSend("/topic/tt", mapper.writeValueAsString(msg));
        }
    }
    producerConfigarnotationで、購読するトピックとグループ名を設定します.
    そこからメッセージを読み出し、KafkaListenerを使用してSTOMP WebSocketにメッセージを送信する.

    Controller


    ChatController

    @RestController
    @RequiredArgsConstructor
    @Slf4j
    public class ChatController {
    
        private final ChatServiceImpl chatService;
        private final KafkaTemplate<String, MessageDto> kafkaTemplate;
        private final KafkaProducer kafkaProducer;
        private final ChatMessageHistoryRepository chatMessageHistoryRepository;
    
        @PostMapping("/publish")
        public void sendMessage(@RequestBody MessageDto messageDto) {
            log.info("ChatController -> sendMessage : " + messageDto.getMessage());
            try {
                kafkaTemplate.send(KafkaConstants.KAFKA_TOPIC, messageDto);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    
        @PostMapping("/message")
        @MessageMapping("/message")
        public void message(@RequestBody MessageDto message) {
            log.info(message.getMessage());
            chatMessageHistoryRepository.save(message);
            kafkaProducer.send(KafkaConstants.KAFKA_TOPIC, message);
        }
    
        @GetMapping("/history")
        public List<MessageDto> getMessageHistory() {
            log.info("history 호출");
            return chatMessageHistoryRepository.get();
        }
    }
    個別のクライアントウィンドウがないため、StompWebsocketが接続されているかどうかを判断できないため、SimpMessagingTemplateが追加されました.PostMappingにメッセージを送信すると、プロデューサーサービスが呼び出され、破棄されます.kafka/messageは、コントローラ上で擬似動作を直接処理する動作である.
    追いかけながら理解してみましたが、関数の定理が必要だと思います.
    簡単にコードを書いて説明しただけで、少し理解できましたが、初めて会った人は理解しにくいようです.
    もう一度触ってから整理して