Spring Webflux + Reactive Kafka (1) - Producer
1.概要
個人的なToyプロジェクトとしてSpringWebflows&Reactive Kafaを使って、APIを通じてプロデューサーとコンサルタントを組織してみましょう.
bloking IOを使用する場合の開発とは全く異なり、慣れるまでに時間がかかるかもしれません.Webトラフィックのルーティング方式ではなくRestControllerを使用して構成します.
基本構図は簡単です.これは、RestControllerを介して送信されたメッセージをカートに送信するだけで、消費者はカートからメッセージを取得するだけで、非常に簡単なプロセスです.コアは、これらすべてのプロセスを非ブロックI/Oとして処理することである.
完全なソース:Githubリンク
2.カフカ構成
ローカル環境ではなく、実際の作業環境のように外部ドッキングステーションを利用して構成されています.プライマリ・キー・マネージャとエージェントはそれぞれ1台が優先され、2台以上のクラスタは後で構成されます.
2-1. docker-compose.yml
パーソナルプライマリサーバに設定し、パーティションを1つに設定します.
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://deogicorgi.home:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
3.モニタ設定(Producer)
これは、コントローラを介して伝達された情報をカフカのbean設定とオプション設定に伝達するためです.現在、プロデューサーと消費者は独立したプロジェクトで構成されているため、この設定はプロデューサー用の設定のみが設定されている.
3-1. KafkaConfig.java
Spring Kafkaの設置とはいろいろなところがあります.非常に基本的な設定のみが構成されています.また、リファレンスを参照して複雑な設定を見つけることもできます.
公式コメント:公式参照リンク
/**
* Kafka 설정
*/
@Configuration
@RequiredArgsConstructor
public class KafkaConfig {
private final KafkaProperties properties;
/******************************************************************
************************ Producer Options ************************
******************************************************************/
// 기본 설정들로 구성
@Bean("kafkaSender")
public KafkaSender<String, Object> kafkaSender() {
SenderOptions<String, Object> senderOptions = SenderOptions.create(getProducerProps());
senderOptions.scheduler(Schedulers.parallel());
senderOptions.closeTimeout(Duration.ofSeconds(5));
return KafkaSender.create(senderOptions);
}
// 프로듀서 옵션
private Map<String, Object> getProducerProps() {
return new HashMap<>() {{
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getHosts());
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000); // 전송 시간 제한을 1000ms로 설정
}};
}
}
3-2. ProduceService.java
KafkaServiceクラスのAbstractKafkaMessageクラスは、Controllerによって要求された@RequestBodyデータです.内部5 XXサーバエラーを返したくないので、KafkaProduceResultというクラスを作成して、独自の処理を構成してみます.すなわち、要求側は、送信結果のStatusCodeを無条件に2 XXで受信し、内部ではメッセージ転送に失敗した場合、NOSQLまたは他の方法で再転送処理を行うことができる.
/**
* 프로듀싱 서비스
* Kafka 프로듀싱 전 로직 처리
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class ProduceService {
private final KafkaService kafkaService;
private final FailureMessageService failureMessageService;
public Mono<KafkaProduceResult> produceMessage(AbstractKafkaMessage message) {
return kafkaService.send(message)
.map(produceResult -> {
log.info("Kafka Sender result : Topic >> [{}], message >> [{}]", produceResult.getTopic(), produceResult.getRequestedMessage());
if (produceResult.hasError()) {
failureMessageService.produceFailure();
// TODO 카프카 프로듀싱 실패일 경우 처리
// ex ) 처리하지못한 요청을 몽고등에 저장 후 재시도, 로깅 등등
log.error("Kafka produce error : {}", produceResult.getErrorMessage());
}
return produceResult;
});
}
}
3-3. KafkaService.java
これは実際に要求された情報をカフカに送信するコードです.実行すると100件,1000件,10000件の間が単一スレッドで処理されているので,この部分をマルチスレッドに変換しようと努力してgooglingした結果,Senderは最初から単一スレッドとして体現された.オプションでも違うスケジュールを指定して、シャベルとか試してみましたが・・・
/**
* 카프카 서비스
* 실제 카프카로 메시지 프로듀싱
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaService {
private final KafkaSender<String, Object> producer;
public Mono<KafkaProduceResult> send(AbstractKafkaMessage message) {
return producer.createOutbound()
// 지정된 토픽으로 메시지 전송
.send(Mono.just(new ProducerRecord<>(message.getTopic(), null, message.getRequestedMessage())))
.then()
// 에러 없이 전송이 완료 되었을 경우
.thenReturn(new KafkaProduceResult(message))
// 에러가 발생했을 경우
.onErrorResume(e -> Mono.just(new KafkaProduceResult(message, e)));
}
}
4.その他のメッセージクラス
これらは、要求を返すメッセージマッピングおよび転送処理結果のモデルクラスである.上記Kafkaに関連するクラスは関連していません.トイプロジェクトの意図を示すためだろう.
4-1. AbstractKafkaMessage.java
ヒントをざっと見ると,@RequstBodyマッピングを用いたクラスにはKafkaUriMessageタイプとKafkaBodyMessageタイプがある.これは,伝送に失敗した場合に2種類のタイプを処理するために分離される.
/**
* 카프카 메시지 베이스
* 프로듀서 내 에러 발생시 처리를 쉽게하기 위해 URI 형태와 Message 형태로 나눔
*/
@Getter
@Setter
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
property = "type",
defaultImpl = KafkaUriProduceMessage.class)
@JsonSubTypes({
@JsonSubTypes.Type(value = KafkaUriMessage.class, names = {"uri", "Uri", "URI"}),
@JsonSubTypes.Type(value = KafkaBodyMessage.class, names = {"message", "Message", "MESSAGE"})
})
public abstract class AbstractKafkaMessage {
// 요청 토픽
protected String topic;
// 메시지 타입 (uri , message)
protected ProduceMessageType type;
// 요청 시간
protected LocalDateTime requestedAt;
public abstract String getRequestedMessage();
}
4-2. KafkaProduceResult.java
最後に、転送結果がマッピングされるクラス.リクエスト側は、クラスのコンテンツに基づいて、転送の成功と失敗を決定することができる.
/**
* 카프카 메시지 전송결과 클래스
*/
@Getter
public class KafkaProduceResult {
// 메시지 전송 상태 - true : 전송완료, false : 전송실패
private Boolean status = true;
// 메시지 전송 토픽
private String topic;
// 요청받은 메시지 타입 (uri, message)
private ProduceMessageType messageType;
// 요청받은 메시지 - URI 또는 JSON String
private String requestedMessage;
// 에러 - 전송과정 중 발생된 에러, 전송완료 일 경우 null
@JsonIgnore
private Throwable error = null;
// 에러 메시지 - 전송과정 중 발생된 에러, 전송완료 일 경우 null
private String errorMessage = null;
// 메시지를 요청받은 시간
private LocalDateTime requestedAt;
// 메시지를 처리한 시간
private LocalDateTime producedAt;
public KafkaProduceResult(AbstractKafkaMessage message) {
this.setRequestedMessage(message);
}
public KafkaProduceResult(AbstractKafkaMessage message, Throwable e) {
this.setRequestedMessage(message);
this.status = false;
this.error = e;
this.errorMessage = e.getMessage();
this.producedAt = null;
}
public Boolean hasError() {
return error != null;
}
private void setRequestedMessage(AbstractKafkaMessage requestedMessage) {
this.topic = requestedMessage.getTopic();
this.messageType = requestedMessage.getType();
this.requestedMessage = requestedMessage.getRequestedMessage();
this.producedAt = LocalDateTime.now();
this.requestedAt = requestedMessage.getRequestedAt();
}
}
Reference
この問題について(Spring Webflux + Reactive Kafka (1) - Producer), 我々は、より多くの情報をここで見つけました https://velog.io/@deogicorgi/Spring-Webflux-Reactive-Kafka-1テキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol