Reactor Java 1. MonoとFluxを作成します.


ReactorはJAVAライブラリであり、Reactive Streamsの説明に従ってJVMにブロックされていない反応性アプリケーションを作成するために使用されます.
このライブラリを最初に使用するのは難しいかもしれません.このシリーズでは、MonoクラスとFluxクラスを介してReactorを提供するReactive Streamsの実行を作成、操作、管理する方法について説明します.

MonoとFluxはいずれも反応性流であるが,それらの表現は異なる.
  • Mono:0または1要素のStream
  • Flux:0からN個の要素のStream
  • 例えば、HTTPサーバにリクエストを送信する場合、応答がないか1つしか受信しないため、FluxよりもMonoの方が適切である.逆に、ある区間で数学関数の結果を計算すると、Fluxはこの場合に適しています.この区間では、数値ごとに結果が得られるからです.

    Mono And Flux are Lazy


    遅延はReactive Streamのプロパティの1つです.これは、Streamを関数として呼び出す関数がどれだけあるかにかかわらず、それを使用する前に実行されないことを意味します.
    MonoとFluxの使用方法は次のとおりです.subscribe().これを呼び出す前に、MonoとFluxは何も実行しません.

    Fluxを作成する一般的な方法


    Fluxを作成する方法はいくつかあります.次のコードはFluxを使用する一般的な方法です.
    //1, 2, 3의 값을 가지는 Flux를 생성합니다.
    Flux<Integer> integerFlux = Flux.just(1, 2, 3);
    //"Hello", "foo", "bar"값을 가지는 Flux를 생성합니다.
    Flux<String> stringFlux = Flux.just("Hello", "foo", "bar");
    //Iterable한 요소로부터 Flux를 생성합니다. 여기서는 List가 사용되었습니다.
    List<String> stringList = Arrays.asList("Hello", "foo", "bar");
    Flux<String> fluxFromList = Flux.fromIterable(stringList);
    
    //Java Stream에서도 동일하게 동작합니다.
    Stream<String> stringStream = stringList.stream();
    Flux<String> fluxFromStream = Flux.fromStream(stringStream);
    //범위로 Flux 만들기
    Flux<Integer> rangeFlux = Flux.range(1, 5); // -> Flux(1, 2, 3, 4, 5)
    //100ms 마다 새로운 값을 만들어서 Flux를 생성한다. 값은 1부터 시작하여 증가한다.
    Flux<Integer> intervalFlux = Flux.interval(Duration.ofMillis(100));
    //다른 Flux또는 Mono로부터 생성
    Flux<String> fluxCopy = Flux.from(fluxFromList);

    Monoを作成する一般的な方法


    justを除いて、MonoとFluxには異なる方法があります.
    //"Hello World!"를 가지는 Mono생성
    Mono<String> helloWorld = Mono.just("Hello World");
    //빈 Mono 생성
    Mono<T> empty = Mono.empty();
    //Callable로 부터 Mono 생성
    Mono<String> helloWorldCallable = Mono.fromCallable(() -> "Hello World!");
    Mono<User> user = Mono.fromCallable(UserService::fetchAnyUser);
    //Future로 Mono 생성
    CompletableFuture<String> helloWorldFuture = MyApi.getHelloWorldAsync();
    Mono<String> monoFromFuture = Mono.fromFuture(helloWorldFuture);
    //supplier로 부터 Mono생성
    Ramdom rand = new Ramdom();
    Mono<Double> MonoFromSupplier = mono.fromSupplier(rand::nextDouble);
    //다른 Mono또는 Flux로 생성
    Mono<Double> monoCopy = Mono.from(monoFromSupplier);
    Mono<Integer> monoFromFlux = Mono.from(Flux.range(1, 10));

    FluxまたはMonoを作成する一般的な方法


    MonoとFluxには3つの作成方法があります.

    1) Error


    Mono.Error(Throwable T)とFlux.Error(Throwable T)はアクティブなStreamを用いてエラーを処理するのに非常に有用である.

    2) Defer


    Mono.defer(Supplier)はMonoです.fromCallableとよく似ています.ただし、fromCallableはTタイプを返し、deferはMonoを返します.
    また,異常コードを呼び出す場合,deferは開発者自身がこれらの異常をキャプチャするコードを記述する必要があるが,fromCallableでは内部はMonoである.errorが呼び出されます.
  • の2つの方法の結果は同じである.
  • 遅延メソッドはより冗長ですが、例外を他のタイプのMonoに再マッピングしたり、新しい例外を作成したりして、コードをより理解しやすく、柔軟にすることができます.
  • //두 예제는 같은 결과를 가져다줍니다.
    Integer getAnyInteger() throws Exception {
    	throw new RuntimeException("An error as occured for no reason");
    }
    
    
    //두 메서드의 비교
    Mono<Integer> fromCallable = Mono.fromCallable(this::getAnyInteger);
    // result -> Mono.error(RuntimeException("An error as occured for no reason."))
    
    Mono<Integer> defer = Mono.defer(() -> {
    	try {
    		Integer res = this.getAnyInteger();
    		return Mono.just(res);
    	} catch (Exception e) {
    		return Mono.error(e);
    	}
    })
    // result -> Mono.error(RuntimeException("An error as occured for no reason"))

    3) Create


    create(Consumercallback)メソッドのレベルは、上で見たよりも低いです.これにはMonoとFluxの内部信号が含まれます.

    Mono

    Integer getAnyInteger() throws Exception {
    	throw new RuntimeException("An error as occured for no reason.");
    }
    
    Mono<Integer> = Mono.create(callback -> {
    	try { callback.success(this.getAnyInteger()); }
    	catch (Exception e) { callback.error(e); }
    });

    Flux

    Flux<Double> flux = Flux.create(emitter -> {
    	Random rnd = new Random();
    	for(int i = 0; i <= 10; i++) emitter.next(rnd.nextDouble());
    
    	int random = rnd.nextInt(2);
    
    	if(random < 1) emitter.complete();
    	else emitter.error(new RuntimeException("Bad luck, you had one change out of 2 to complete Flux));
    });

    MonoとFluxの使い方


    FluxとMonoの作成方法を上に示したので、保存した値をどのように使用するかを見てみましょう.この操作はストリームを消費します.

    1) Mono

    Mono.just("Hello World.").subscribe(
    	successValue -> System.out.println(successValue),
    	error -> System.error.println(error.getMessage()),
    	() -> System.out.println("Mono consumed.")
    );

    結果

  • 成功時
  • Hello World!
    Mono consumed.
  • エラー時は
  • **the Error Message**
    Mono consumed.

    2) Flux


    Fluxでは、データとその処理は順番に行われます.これは、以下の場合、1、2、3、4、5の順に処理することを意味する.したがって、値3は2より先に表示されません.
    Flux.range(1, 5).subscribe(
    	successValue -> System.out.println(successValue),
    	error -> System.out.println(error.getMessage()),
    	() -> System.out.println("Flux consumed.")
    )

    結果

  • 成功時
  • 1
    2
    3
    4
    5
    Flux comsumed.
  • 失敗時
  • 1
    2
    3
    // 이번에는 Flux comsumed. 가 출력되지 않는다.
    // Flux는 에러가 발생하여 미래의 값을 다루는 것이 멈추면(모든 작업이 완전히 소비되지 않으면) Flux comsumed. 가 출력되지 않는다.

    悪い例


    FluxとMonoが呼び出すべきでない方法について説明します.
    Monoにはblock()メソッドがあります.呼び出すと、メソッドは現在実行中のスレッドをブロックします.したがって、以降のプログラムは応答しません.そのため、問題を解決する方法を探すことをお勧めします.それを使用しないでください.
    FluxにはblockFirst()とblockFirst(Duration timeout)とblockLast()とblockLast(Duration timeout)があります

    n/a.結論


    ここでは、FluxまたはMonoの作成方法と、最も簡単な形式で使用する方法について学習しました.私たちは次の授業で、彼らが持っているデータをどのように処理するかについて議論します.
    出典:Reactor Java #1 How to create Mono and Flux ? | by Antoine Cheron | Medium
    この文章は上の出典資料を翻訳したものだ.