Spring Webflux + Reactive Kafka (1) - Producer


1.概要


個人的なToyプロジェクトとしてSpringWebflows&Reactive Kafaを使って、APIを通じてプロデューサーとコンサルタントを組織してみましょう.
bloking IOを使用する場合の開発とは全く異なり、慣れるまでに時間がかかるかもしれません.Webトラフィックのルーティング方式ではなくRestControllerを使用して構成します.
基本構図は簡単です.これは、RestControllerを介して送信されたメッセージをカートに送信するだけで、消費者はカートからメッセージを取得するだけで、非常に簡単なプロセスです.コアは、これらすべてのプロセスを非ブロックI/Oとして処理することである.
完全なソース:Githubリンク

2.カフカ構成


ローカル環境ではなく、実際の作業環境のように外部ドッキングステーションを利用して構成されています.プライマリ・キー・マネージャとエージェントはそれぞれ1台が優先され、2台以上のクラスタは後で構成されます.

2-1. docker-compose.yml


パーソナルプライマリサーバに設定し、パーティションを1つに設定します.
---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://deogicorgi.home:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

3.モニタ設定(Producer)


これは、コントローラを介して伝達された情報をカフカのbean設定とオプション設定に伝達するためです.現在、プロデューサーと消費者は独立したプロジェクトで構成されているため、この設定はプロデューサー用の設定のみが設定されている.

3-1. KafkaConfig.java


Spring Kafkaの設置とはいろいろなところがあります.非常に基本的な設定のみが構成されています.また、リファレンスを参照して複雑な設定を見つけることもできます.
公式コメント:公式参照リンク
/**
 * Kafka 설정
 */
@Configuration
@RequiredArgsConstructor
public class KafkaConfig {

    private final KafkaProperties properties;

    /******************************************************************
     ************************ Producer Options ************************
     ******************************************************************/
    
    // 기본 설정들로 구성
    @Bean("kafkaSender")
    public KafkaSender<String, Object> kafkaSender() {
        SenderOptions<String, Object> senderOptions = SenderOptions.create(getProducerProps());
        senderOptions.scheduler(Schedulers.parallel());
        senderOptions.closeTimeout(Duration.ofSeconds(5));
        return KafkaSender.create(senderOptions);
    }
    
    // 프로듀서 옵션
    private Map<String, Object> getProducerProps() {
        return new HashMap<>() {{
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getHosts());
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000); // 전송 시간 제한을 1000ms로 설정
        }};
    }
}

3-2. ProduceService.java


KafkaServiceクラスのAbstractKafkaMessageクラスは、Controllerによって要求された@RequestBodyデータです.内部5 XXサーバエラーを返したくないので、KafkaProduceResultというクラスを作成して、独自の処理を構成してみます.すなわち、要求側は、送信結果のStatusCodeを無条件に2 XXで受信し、内部ではメッセージ転送に失敗した場合、NOSQLまたは他の方法で再転送処理を行うことができる.
/**
 * 프로듀싱 서비스
 * Kafka 프로듀싱 전 로직 처리
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class ProduceService {

    private final KafkaService kafkaService;
    private final FailureMessageService failureMessageService;

    public Mono<KafkaProduceResult> produceMessage(AbstractKafkaMessage message) {
        return kafkaService.send(message)
                .map(produceResult -> {
                    log.info("Kafka Sender result : Topic >> [{}], message >> [{}]", produceResult.getTopic(), produceResult.getRequestedMessage());
                    if (produceResult.hasError()) {
                    	failureMessageService.produceFailure();
                        // TODO 카프카 프로듀싱 실패일 경우 처리
                        // ex ) 처리하지못한 요청을 몽고등에 저장 후 재시도, 로깅 등등
                        log.error("Kafka produce error : {}", produceResult.getErrorMessage());
                    }
                    return produceResult;
                });
    }
}

3-3. KafkaService.java


これは実際に要求された情報をカフカに送信するコードです.実行すると100件,1000件,10000件の間が単一スレッドで処理されているので,この部分をマルチスレッドに変換しようと努力してgooglingした結果,Senderは最初から単一スレッドとして体現された.オプションでも違うスケジュールを指定して、シャベルとか試してみましたが・・・
/**
 * 카프카 서비스
 * 실제 카프카로 메시지 프로듀싱
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaService {

    private final KafkaSender<String, Object> producer;

    public Mono<KafkaProduceResult> send(AbstractKafkaMessage message) {

        return producer.createOutbound()
                // 지정된 토픽으로 메시지 전송
                .send(Mono.just(new ProducerRecord<>(message.getTopic(), null, message.getRequestedMessage())))
                .then()
                // 에러 없이 전송이 완료 되었을 경우
                .thenReturn(new KafkaProduceResult(message))
                // 에러가 발생했을 경우
                .onErrorResume(e -> Mono.just(new KafkaProduceResult(message, e)));
    }
}

4.その他のメッセージクラス


これらは、要求を返すメッセージマッピングおよび転送処理結果のモデルクラスである.上記Kafkaに関連するクラスは関連していません.トイプロジェクトの意図を示すためだろう.

4-1. AbstractKafkaMessage.java


ヒントをざっと見ると,@RequstBodyマッピングを用いたクラスにはKafkaUriMessageタイプとKafkaBodyMessageタイプがある.これは,伝送に失敗した場合に2種類のタイプを処理するために分離される.
/**
 * 카프카 메시지 베이스
 * 프로듀서 내 에러 발생시 처리를 쉽게하기 위해 URI 형태와 Message 형태로 나눔
 */
@Getter
@Setter
@JsonTypeInfo(
        use = JsonTypeInfo.Id.NAME,
        property = "type",
        defaultImpl = KafkaUriProduceMessage.class)
@JsonSubTypes({
        @JsonSubTypes.Type(value = KafkaUriMessage.class, names = {"uri", "Uri", "URI"}),
        @JsonSubTypes.Type(value = KafkaBodyMessage.class, names = {"message", "Message", "MESSAGE"})
})
public abstract class AbstractKafkaMessage {

    // 요청 토픽
    protected String topic;

    // 메시지 타입 (uri , message)
    protected ProduceMessageType type;

    // 요청 시간
    protected LocalDateTime requestedAt;

    public abstract String getRequestedMessage();

}

4-2. KafkaProduceResult.java


最後に、転送結果がマッピングされるクラス.リクエスト側は、クラスのコンテンツに基づいて、転送の成功と失敗を決定することができる.
/**
 * 카프카 메시지 전송결과 클래스
 */
@Getter
public class KafkaProduceResult {

    // 메시지 전송 상태 - true : 전송완료, false : 전송실패
    private Boolean status = true;

    // 메시지 전송 토픽
    private String topic;

    // 요청받은 메시지 타입 (uri, message)
    private ProduceMessageType messageType;

    // 요청받은 메시지 - URI 또는 JSON String
    private String requestedMessage;

    // 에러 - 전송과정 중 발생된 에러, 전송완료 일 경우 null
    @JsonIgnore
    private Throwable error = null;

    // 에러 메시지 - 전송과정 중 발생된 에러, 전송완료 일 경우 null
    private String errorMessage = null;

    // 메시지를 요청받은 시간
    private LocalDateTime requestedAt;

    // 메시지를 처리한 시간
    private LocalDateTime producedAt;

    public KafkaProduceResult(AbstractKafkaMessage message) {
        this.setRequestedMessage(message);
    }

    public KafkaProduceResult(AbstractKafkaMessage message, Throwable e) {
        this.setRequestedMessage(message);
        this.status = false;
        this.error = e;
        this.errorMessage = e.getMessage();
        this.producedAt = null;
    }

    public Boolean hasError() {
        return error != null;
    }

    private void setRequestedMessage(AbstractKafkaMessage requestedMessage) {
        this.topic = requestedMessage.getTopic();
        this.messageType = requestedMessage.getType();
        this.requestedMessage = requestedMessage.getRequestedMessage();
        this.producedAt = LocalDateTime.now();
        this.requestedAt = requestedMessage.getRequestedAt();
    }
}