spring reactorマルチスレッド配置

11960 ワード

Reactor概要
Reactorは、リアルタイムのデータストリームアプリケーションを構築するためのベースライブラリであり、ミリ秒、ナノ秒、ピコ秒までのフォールトトレランスと低遅延が要求されるサービスです.
—PrefaceTLDR
Reactorとは何ですか
私達は大体においてリアクトについて調べてみます.好きな検索を使ってReactive、spring Reactive、Aynch ronous javaなどのキーワードを入力します.あるいは単に「What the heck is Reactor?」簡単に言えば、Reactorはライト級のJVMベースライブラリであり、これは私達が構築したサービスと応用の効率的で非同期の伝達メッセージを助けてくれます.
効率的な意味は何ですか?一つのメッセージをAからBに送る時にGCから発生したメモリはとても小さいです.あるいは全くないです.消費者のメッセージ処理速度が生産者のメッセージ生成速度より低い場合、オーバーフローが発生した場合、速やかに処理しなければならない.ロックなしの非同期ストリームを可能な限り提供する.以前の経験から見て、私たちは非同期プログラミングが困難であることを知っています.特に一つのプラットフォームがJVMのような多くのオプションを提供しています.
Reactorは、ほとんどのシーンにおいて本当のブロックなしを狙い、元のJdkのJava.util.co ncurrentライブラリよりも効率的なAPIを提供する.Reactorもオプションを提供しています.
ブロック待ち:Future.get()のようです.
 Unsafe    : ReentrantLock.lock()。
異常投げ:try.catch...finally
同期ブロック:sysschronizedのようです.
Wrapper配置(GC圧力):例えばnew Wrapper(event)
まず純粋なExectorの方法を使いましょう.
private ExecutorService  threadPool = Executors.newFixedThreadPool(8);

final List batches = new ArrayList();

Callable t = new Callable() {  //1

    public T run() {
        synchronized(batches) {  //2
            T result = callDatabase(msg); //3
            batches.add(result);
            return result;
        }
    }
};

Future f = threadPool.submit(t); //4
T result = f.get()  //5
1.フィードバック方法を割り当てる—gc圧力を引き起こす可能性があります.
2.Synch ronizationはスレッドごとに検査を強制する.
3.消費者の消費能力が生産者の生産能力を下回る潜在的なリスクがある.
4.スレッド池を使ってターゲットスレッドにtaskを送る–必ずFutureTaskを通してgcに圧力をかける.
5.calDatabaseに応答するまでブロックします.
上記の簡単な例からは、拡張性が重大な影響を受けることが分かりやすい.
連続して対象を割り振ると、gcが停止します.特に時間がかかります.一つのgcが停止すると、全体の性能が低下します.
キューのデフォルトの長さは制限されません.タスクはデータベースに蓄積されます.
バックグラウンドログはメモリが漏れているところではありませんが、副作用が煩雑です.損失データ重要bitのリスク;など
経典リンクQueはノードを割り当てる時に発生するメモリ圧力を割り当てます.
ブロック方式で要求に応答すると、悪循環が発生します.
閉塞方式の応答は生産者の効率を低下させる.実際には、より多くのタスクを提出する必要がありますので、応答待ちは、基本的な同期方式となります.
データ格納との通信異常は、生産者に不友好な形で伝達され、スレッド境界を通じて作業が分離されるため、許容範囲の交渉が容易になる.
完全で、本当の非渋滞は比較的に実現しにくいです.特に、流行の名前がある分散システムの中で、マイクロサービスアーキテクチャのようなものです.しかし、Reactorは妥協していません.最適なモデルを利用して、開発者に数学の論文を書いているような気がしなくてもいいです.マイクロサービスです.
spring reactorマルチスレッド構成は、まずspring mybatisプロジェクトを構築し、前のブログには既にステップフレームを立てています.spring reactorは、私たちの新しい非同期スレッドを開いてログを記録する機能などを処理してくれます.このように、バックグラウンドの対応する時間を節約できます.1:jarのカバンを導入します.ここで使うのはmavenです.jarのカバンを一つだけ引用すればいいです.

        <dependency>
            <groupId>org.projectreactorgroupId>
            <artifactId>reactor-springartifactId>
            <version>1.0.1.RELEASEversion>
        dependency>
2:reactorの配置を書くbean
package com.baobaotao.reactor;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.spec.Reactors;
import reactor.spring.context.config.EnableReactor;

@Configuration
@EnableReactor
public class ReactorConfig {

    @Bean(name="rootReactor")
    public Reactor rootReactor(Environment env){
        return Reactors.reactor().env(env).get();
    }

    @Bean(name = "reportReactor")
    public Reactor reportReactor(Environment env) {
        return Reactors.reactor().env(env).get();
    }
}
3:事件の処理類は、一般的にHanderで終わるので、区別しやすいです.
package com.baobaotao.reactor;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;


import reactor.core.Reactor;
import reactor.event.Event;
import reactor.spring.annotation.Selector;

@Component
public class IndexHandler {

    @Autowired
    @Qualifier("rootReactor")
    private Reactor reactor ;
    @Autowired
    @Qualifier("reportReactor")
    private Reactor reactorxx ;


    @Selector(value="hello",reactor="@rootReactor")
    public void handleTestTopic(Event evt)throws Exception{
        System.out.println("************");
    }

    @Selector(value="hellos",reactor="@reportReactor")
    public void handleTestTopics(Event evt)throws Exception{
        System.out.println("xxxxxx**********");
        String data = evt.getData();
        System.out.println(data);
    }

}
4:最後にcontrollerまたはserviceで新しいスレッドを開くことをお知らせしました.
package com.baobaotao.reactor;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Controller;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import reactor.core.Reactor;
import reactor.event.Event;

@Controller
@RequestMapping("/baobaotao/recator/")
public class IndexController {

    @Autowired
    @Qualifier("rootReactor")
    private Reactor r;

    @Autowired
    @Qualifier("reportReactor")
    private Reactor rx;
    @RequestMapping("/chen")
    @Transactional
    @ResponseBody
    public void chen() {
        r.notify("hello", Event.wrap("  "));
    }

    @RequestMapping("/chenzy")
    @Transactional
    @ResponseBody
    public void chenzy() {
        rx.notify("hellos", Event.wrap(" "));
    }
}
起動プログラム要求http://127.0.0.1/baobaotao/recator/chenzy ロゴ出力が見えます.
System.out.println("xxxxxx**********");
        String data = evt.getData();
        System.out.println(data);