Dispruptor高性能同時フレーム二次パッキング
25966 ワード
Dispruptorはjavaの高性能を同時に処理するフレームです。JDKのBlockingQueと似たようなところがありますが、処理速度はとても速いです。「スレッド1秒で600万個の注文を処理できる」と言われています。
Dispruptorは非常に強力な機能を持っています。例えば、消費者が閉塞して待っています。生産者-消費者の一対一、一対多、多対一、多対多;消費者チェーン/並列処理チェーンなどを構築する。
具体的な概念モデルは参照できます。https://www.cnblogs.com/haiq/p/4112689.html
次は私がディスク・フレームに基づいてパッケージ化したツールです。fluntコードスタイルを採用して、Displaptorの呼び出しを簡略化しました。
上のコードから見て、私達が包装した工具類の入り口はPublisherです。彼はスレッド工場、イベント工場、待ち戦略(WaitStrategy)、メッセージ生産者(Producer)、消費者(Handler/Worker)などの一連のDiruptorに必要なパラメータを配置することができます。
この中で、ニュース生産者と消費者はPublisherのキーですので、後で重点的に説明します。まず他のPublisher Builder類を見てください。
この中のキーコードはdisruptor()方法であり、Diruptorの実用化の入り口である。eventHandlerとwork Handlerの指定はdisruptorのインスタンスの後でなければならないので、disruptor.start()が起動する前に。
したがって、私たちはツールを呼び出すには次のようなものが必要です。
Publisher BuilderのEvent Purlisher類を見に来ました。彼はPublisherインターフェースを実現しました。次の通りです。
Publisher.java
EventPublisher.java
その中で、重要なコードは生産者と消費者チェーンの指定です。動的パラメータは、生産者と消費者が対一、対一、多対一、多対多対多の関係であることができることを示しています。
まず、publisherの最小データ単位Eventについて説明します。
また、生産者のコードを見てください。EventProducerはProducerから引き継ぎます。
Producer.java
Event Producer.java
この中で、ringBufferはDiruptorのメンバーでなければなりません。ですから、プロデューサーを指定する時はリングバッファerを巡回しなければなりません。Translatorの対象も一例モードではスレッド間データの上書きが発生するかどうかは分かりません。スレッドが安全でない場合、各ProducerはTranslatorオブジェクトを初期化します。プロデュースから見られますが、disruptorはringBufferを通じてメッセージを発信しています。二つのリリース方法があります。一つはトランスポート方式を通じて、一つはsequence方式です。finallyの中のヒントに注意してください。
EventTranslatorのコードを見てみてください。EventTranslatoVargから引き継ぎます。tranlateTo()方法はEvent Factoryに作成されたインスタンスにデータを充填します。
そして消費者です。workとhandlerの結果を統一的に処理するためには、すべての消費者がDiruptorのEventhandler/WorkHandlerを実現しなければなりません。もう一つのカスタムConsmerインターフェースがあります。
コネクタ
EventConsmer.java
抽象的なEventConsmerはworkerとhandlerのオンイベントを統一的に処理する方法です。具体的なconsume操作はユーザーが自分で実現する必要があります。demoの中のDomain Consmerのようです。
ここに来てください。重要な生産者と消費者コードは全部包装しました。
コア以外のコードを確認します。
エラー処理クラス:Error Handler Exception Handlerを実現します。
EventFactoryイベント工場類、DiruptorのEeventFactoryを実現
リングバッファer送信方式
これで、Diruptorのパッケージは終了しました。デモを放してください
本体Domain
Domain Consmer
ConsmerrerError Handler
Limit ThreadFactory
デモテスト(消費データを一つだけ測定する)
単生産単消費(workerとhandlerコールの結果は一致している)
呼び出し結果
単一生産者の多消費者(handerタイプで、菱形呼び出しチェーンを設置する。)
呼び出し結果(イベントデータは各ハンドルで消費されます。)
単スレッド、単生産者多消費者(workタイプ、workタイプは直接afterを設定できません。)
呼び出しの結果(handlerと比較して、イベントデータは一つのworkerによって消費されるだけであることが分かります。)
多生産単消費(workとhandlerは一致)
運転結果(データが書き換えられました)
多生産多消費(workerとhandlerの混合用)
呼び出し結果
Dispruptorは非常に強力な機能を持っています。例えば、消費者が閉塞して待っています。生産者-消費者の一対一、一対多、多対一、多対多;消費者チェーン/並列処理チェーンなどを構築する。
具体的な概念モデルは参照できます。https://www.cnblogs.com/haiq/p/4112689.html
次は私がディスク・フレームに基づいてパッケージ化したツールです。fluntコードスタイルを採用して、Displaptorの呼び出しを簡略化しました。
package com.gravel.demo.test.disruptor;
import com.gravel.demo.test.disruptor.base.EventProducer;
import com.gravel.demo.test.disruptor.base.Publisher;
import com.gravel.demo.test.disruptor.base.PublisherBuilder;
/**
* @Auther: syh
* @Date: 2020/7/8
* @Description: Disruptor
*/
public class DisruptorTest {
public static void main(String[] args) throws Exception {
builderTest();
}
private static void builderTest() throws Exception {
//
EventProducer producer1 = new EventProducer<>("producer1");
EventProducer producer2 = new EventProducer<>("producer2");
//
DomainConsumer handler1 = new DomainConsumer("handler1");
DomainConsumer handler2 = new DomainConsumer("handler2");
DomainConsumer after1 = new DomainConsumer("after1");
DomainConsumer after2 = new DomainConsumer("after2");
DomainConsumer after3 = new DomainConsumer("after3");
DomainConsumer then = new DomainConsumer("then");
//
final Publisher publisher = PublisherBuilder.newBuilder()
//
// .threadFactory(r -> new Thread(r))
// .threadFactory(new LimitedThreadFactory())
// .threadFactory(Executors.defaultThreadFactory())
//
// .producerType(ProducerType.SINGLE)
// .producerType(ProducerType.MULTI)
//
// .eventFactory(new EventFactory())
//
// .waitStrategy(new SleepingWaitStrategy())
// .waitStrategy(new YieldingWaitStrategy())
//
// .publishStrategy(PublishStrategy.TRANSLATOR)
// .publishStrategy(PublishStrategy.NORMAL)
// ringBuffer
// .ringSize(1024 * 8)
//
.exceptionHandler(new DomainErrorHandler<>())
// Disruptor, 。
.disruptor()
//
// .producer(producer1)
//
// .handler(handler1)
//
.producer(producer1, producer2)
// ====== workers handlers start =======
// .worker(handler1)
.handler(handler1, handler2)
.after(handler1).handler(after1)
.after(handler2).handler(after2)
.after(after1, after2).handler(after3)
// .then(after3)
// ====== workers handlers end =======
//
.build();
long start = System.currentTimeMillis();
try {
for (int i = 0; i < 500; i++) {
publisher
//
//.publish(new Domain(String.valueOf("a" + i), "init"))
.publish(new Domain(String.valueOf(i), "init"));
}
} finally {
long sleep = 200;
Thread.sleep(sleep);
System.out.println("used time: " + (System.currentTimeMillis() - start - sleep) + "ms");
//
publisher.shutdown();
}
}
}
上のコードから見て、私達が包装した工具類の入り口はPublisherです。彼はスレッド工場、イベント工場、待ち戦略(WaitStrategy)、メッセージ生産者(Producer)、消費者(Handler/Worker)などの一連のDiruptorに必要なパラメータを配置することができます。
この中で、ニュース生産者と消費者はPublisherのキーですので、後で重点的に説明します。まず他のPublisher Builder類を見てください。
package com.gravel.demo.test.disruptor.base;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
* @Auther: syh
* @Date: 2020/7/8
* @Description:
*/
public class PublisherBuilder {
// Publish
private static final int RING_SIZE = 1024 * 8;
private static final ThreadFactory THREAD_FACTORY = Executors.defaultThreadFactory();
private static final WaitStrategy WAIT_STRATEGY = new SleepingWaitStrategy();
private static final com.lmax.disruptor.EventFactory EVENT_FACTORY = new EventFactory();
private static final ProducerType PRODUCER_TYPE = ProducerType.SINGLE;
private static final PublishStrategy PUBLISH_STRATEGY = PublishStrategy.TRANSLATOR;
private com.lmax.disruptor.EventFactory eventFactory;
private ThreadFactory threadFactory;
private WaitStrategy waitStrategy;
private ExceptionHandler exceptionHandler;
private ProducerType type;
private PublishStrategy publishStrategy;
private EventPublisher publisher;
private int ringSize;
public static PublisherBuilder newBuilder() {
return new PublisherBuilder();
}
/**
* ringBuffer size, 2 n 。 1024*8
*
* @param ringSize
* @return
*/
public PublisherBuilder ringSize(int ringSize) {
this.ringSize = ringSize;
return this;
}
/**
* eventFactory, EventFactory
*
* @param eventFactory
* @param
* @return
*/
public PublisherBuilder eventFactory(com.lmax.disruptor.EventFactory eventFactory) {
this.eventFactory = eventFactory;
return this;
}
/**
* ThreadFactory, Executors.defaultThreadFactory()
*
* @param threadFactory
* @return
*/
public PublisherBuilder threadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
/**
* , SleepingWaitStrategy
*
* @param waitStrategy
* @return
*/
public PublisherBuilder waitStrategy(WaitStrategy waitStrategy) {
this.waitStrategy = waitStrategy;
return this;
}
public PublisherBuilder publishStrategy(PublishStrategy publishStrategy) {
this.publishStrategy = publishStrategy;
return this;
}
/**
* disruptor
*
* @return
*/
public PublisherBuilder disruptor() {
this.eventFactory = this.eventFactory == null ? EVENT_FACTORY : this.eventFactory;
this.threadFactory = this.threadFactory == null ? THREAD_FACTORY : this.threadFactory;
this.waitStrategy = this.waitStrategy == null ? WAIT_STRATEGY : this.waitStrategy;
this.ringSize = this.ringSize <= 0 ? RING_SIZE : this.ringSize;
this.type = this.type == null ? PRODUCER_TYPE : this.type;
this.publishStrategy = this.publishStrategy == null ? PUBLISH_STRATEGY : this.publishStrategy;
publisher = new EventPublisher<>(eventFactory, ringSize, threadFactory, waitStrategy, exceptionHandler, type, publishStrategy);
return this;
}
/**
*
* @param producers
* @param
* @return
*/
public PublisherBuilder producer(EventProducer ...producers) {
if (isInit()) {
this.publisher.producer(producers);
}
return this;
}
/**
* eventHandler: event handler
*
* @param eventHandlers
* @param
* @return
*/
public PublisherBuilder handler(EventHandler>... eventHandlers) {
if (isInit()) {
this.publisher.eventHandler(eventHandlers);
}
return this;
}
/**
* workHandler: event work
*
* @param workHandlers
* @param
* @return
*/
public PublisherBuilder worker(WorkHandler>... workHandlers) {
if (isInit()) {
this.publisher.workHandler(workHandlers);
}
return this;
}
/**
* handler work
*
* @param thenEventHandlers
* @param
* @return
*/
public PublisherBuilder then(EventHandler>... thenEventHandlers) {
if (isInit()) {
this.publisher.thenHandler(thenEventHandlers);
}
return this;
}
/**
* handler, worker
*
* @param afterEventHandlers
* @param
* @return
*/
public PublisherBuilder after(EventHandler>... afterEventHandlers) {
if (isInit()) {
this.publisher.afterHandler(afterEventHandlers);
}
return this;
}
/**
* producerType
*
* @param type
* @param
* @return
*/
public PublisherBuilder producerType(ProducerType type) {
this.type = type;
return this;
}
/**
*
*
* @param exceptionHandler
* @param
* @return
*/
public PublisherBuilder exceptionHandler(ExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
return this;
}
private boolean isInit() {
if (this.publisher == null) {
throw new IllegalStateException("execute disruptor() function before set handlers or workers.");
}
return true;
}
public Publisher build() {
return this.publisher.start();
}
}
この中のキーコードはdisruptor()方法であり、Diruptorの実用化の入り口である。eventHandlerとwork Handlerの指定はdisruptorのインスタンスの後でなければならないので、disruptor.start()が起動する前に。
したがって、私たちはツールを呼び出すには次のようなものが必要です。
PublisherBuilder.newBuilder()/*. */.disruptor()/* producer handler/worker*/.build();
Publisher BuilderのEvent Purlisher類を見に来ました。彼はPublisherインターフェースを実現しました。次の通りです。
Publisher.java
package com.gravel.demo.test.disruptor.base;
/**
* @Auther: syh
* @Date: 2020/7/8
* @Description:
*/
public interface Publisher {
Publisher start();
Publisher publish(T t);
Publisher shutdown();
}
EventPublisher.java
package com.gravel.demo.test.disruptor.base;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Objects;
import java.util.concurrent.ThreadFactory;
/**
* @Auther: syh
* @Date: 2020/7/8
* @Description:
*/
public class EventPublisher implements Publisher {
private RingBuffer> ringBuffer;
private Disruptor> disruptor;
private EventHandlerGroup> handlerGroup;
private PublisherState state;
private PublishStrategy publishStrategy;
private EventTranslator translator;
private Producer[] producers;
private enum PublisherState {
START, SHUTDOWN
}
public EventPublisher(com.lmax.disruptor.EventFactory> factory,
int ringSize,
ThreadFactory threadFactory,
WaitStrategy waitStrategy,
ExceptionHandler> exceptionHandler,
ProducerType type,
PublishStrategy publishStrategy) {
this.disruptor = new Disruptor<>(factory, ringSize, threadFactory,
type, waitStrategy);
if (!Objects.isNull(exceptionHandler)) {
this.disruptor.setDefaultExceptionHandler(exceptionHandler);
}
this.ringBuffer = disruptor.getRingBuffer();
this.publishStrategy = publishStrategy;
if (publishStrategy == PublishStrategy.TRANSLATOR) {
translator = new EventTranslator<>();
}
this.state = PublisherState.SHUTDOWN;
}
public EventPublisher producer(EventProducer ...producers) {
if (!Objects.isNull(producers) && producers.length > 0) {
for (EventProducer producer : producers) {
producer.setRingBuffer(this.ringBuffer).setTranslator(this.translator);
}
this.producers = producers;
}
return this;
}
public EventPublisher eventHandler(EventHandler>... eventHandlers) {
if (this.handlerGroup != null) {
this.handlerGroup.handleEventsWith(eventHandlers);
} else {
this.handlerGroup = disruptor.handleEventsWith(eventHandlers);
}
return this;
}
public EventPublisher workHandler(WorkHandler>... workHandlers) {
if (this.handlerGroup != null) {
this.handlerGroup.handleEventsWithWorkerPool(workHandlers);
} else {
this.handlerGroup = disruptor.handleEventsWithWorkerPool(workHandlers);
}
return this;
}
public EventPublisher thenHandler(EventHandler>... thenHandlers) {
this.handlerGroup.then(thenHandlers);
return this;
}
public EventPublisher afterHandler(EventHandler>... afterHandlers) {
this.handlerGroup = this.disruptor.after(afterHandlers);
return this;
}
public Disruptor> getDisruptor() {
return disruptor;
}
@Override
public EventPublisher start() {
this.disruptor.start();
this.state = PublisherState.START;
return this;
}
@Override
public EventPublisher shutdown() {
this.disruptor.shutdown();
this.state = PublisherState.SHUTDOWN;
return this;
}
@Override
public EventPublisher publish(T t) {
if (!isStarted()) {
throw new IllegalStateException("publisher not start..");
}
if (producers == null || producers.length <= 0) {
throw new IllegalStateException("producer must be specify.");
}
for (Producer producer : producers) {
producer.produce(t);
}
return this;
}
private boolean isStarted() {
return this.state == PublisherState.START;
}
}
その中で、重要なコードは生産者と消費者チェーンの指定です。動的パラメータは、生産者と消費者が対一、対一、多対一、多対多対多の関係であることができることを示しています。
まず、publisherの最小データ単位Eventについて説明します。
package com.gravel.demo.test.disruptor.base;
/**
* @Auther: syh
* @Date: 2020/7/8
* @Description:
*/
public class Event {
private T data;
public void set(T data) {
this.data = data;
}
public T get() {
return data;
}
}
また、生産者のコードを見てください。EventProducerはProducerから引き継ぎます。
Producer.java
package com.gravel.demo.test.disruptor.base;
/**
* @Auther: syh
* @Date: 2020/7/9
* @Description:
*/
public interface Producer {
void produce(T t);
}
Event Producer.java
package com.gravel.demo.test.disruptor.base;
import com.lmax.disruptor.RingBuffer;
/**
* @Auther: syh
* @Date: 2020/7/9
* @Description:
*/
public class EventProducer implements Producer {
private String name;
private RingBuffer> ringBuffer;
private EventTranslator translator;
public EventProducer(String name) {
this.name = name;
}
public EventProducer setRingBuffer(RingBuffer> ringBuffer) {
this.ringBuffer = ringBuffer;
return this;
}
public EventProducer setTranslator(EventTranslator translator) {
this.translator = translator;
return this;
}
@Override
public void produce(T t) {
System.out.println(String.format("producer message by %s, data: %s", name, t));
if (translator != null) {
ringBuffer.publishEvent(translator, t);
} else {
long seq = ringBuffer.next();
try {
Event event = ringBuffer.get(seq);
event.set(t);
} finally {
// Disruptor RingBuffer.publish , publish
// sequence , producer
ringBuffer.publish(seq);
}
}
}
}
この中で、ringBufferはDiruptorのメンバーでなければなりません。ですから、プロデューサーを指定する時はリングバッファerを巡回しなければなりません。Translatorの対象も一例モードではスレッド間データの上書きが発生するかどうかは分かりません。スレッドが安全でない場合、各ProducerはTranslatorオブジェクトを初期化します。プロデュースから見られますが、disruptorはringBufferを通じてメッセージを発信しています。二つのリリース方法があります。一つはトランスポート方式を通じて、一つはsequence方式です。finallyの中のヒントに注意してください。
EventTranslatorのコードを見てみてください。EventTranslatoVargから引き継ぎます。tranlateTo()方法はEvent Factoryに作成されたインスタンスにデータを充填します。
package com.gravel.demo.test.disruptor.base;
import com.lmax.disruptor.EventTranslatorVararg;
/**
* @Auther: syh
* @Date: 2020/7/9
* @Description:
*/
public class EventTranslator implements EventTranslatorVararg> {
@Override
public void translateTo(Event event, long sequence, Object... args) {
event.set(((T)(args[0])));
}
}
そして消費者です。workとhandlerの結果を統一的に処理するためには、すべての消費者がDiruptorのEventhandler/WorkHandlerを実現しなければなりません。もう一つのカスタムConsmerインターフェースがあります。
コネクタ
package com.gravel.demo.test.disruptor.base;
/**
* @Auther: syh
* @Date: 2020/7/8
* @Description:
*/
public interface Consumer {
void consume(T data, Boolean over) throws Exception;
}
EventConsmer.java
package com.gravel.demo.test.disruptor.base;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
/**
* @Auther: syh
* @Date: 2020/7/8
* @Description:
*/
public abstract class EventConsumer implements EventHandler>, Consumer, WorkHandler> {
String name;
public EventConsumer(String name) {
this.name = name;
}
@Override
public void onEvent(Event event, long seq, boolean over) throws Exception {
consume(event.get(), over);
}
@Override
public void onEvent(Event event) throws Exception {
consume(event.get(), null);
}
protected String getName() {
return name;
}
}
抽象的なEventConsmerはworkerとhandlerのオンイベントを統一的に処理する方法です。具体的なconsume操作はユーザーが自分で実現する必要があります。demoの中のDomain Consmerのようです。
ここに来てください。重要な生産者と消費者コードは全部包装しました。
コア以外のコードを確認します。
エラー処理クラス:Error Handler Exception Handlerを実現します。
package com.gravel.demo.test.disruptor.base;
import com.lmax.disruptor.ExceptionHandler;
/**
* @Auther: syh
* @Date: 2020/7/8
* @Description:
*/
public abstract class ErrorHandler implements ExceptionHandler> {
@Override
public void handleEventException(Throwable throwable, long l, Event t) {
handle(t.get(), throwable);
}
@Override
public void handleOnStartException(Throwable throwable) {
}
@Override
public void handleOnShutdownException(Throwable throwable) {
}
protected abstract void handle(T object, Throwable throwable);
}
EventFactoryイベント工場類、DiruptorのEeventFactoryを実現
package com.gravel.demo.test.disruptor.base;
/**
* @Auther: syh
* @Date: 2020/7/8
* @Description:
*/
public class EventFactory implements com.lmax.disruptor.EventFactory {
@Override
public Event newInstance() {
return new Event();
}
}
リングバッファer送信方式
package com.gravel.demo.test.disruptor.base;
/**
* @Auther: syh
* @Date: 2020/7/9
* @Description:
*/
public enum PublishStrategy {
NORMAL, TRANSLATOR
}
これで、Diruptorのパッケージは終了しました。デモを放してください
本体Domain
package com.gravel.demo.test.disruptor;
/**
* @Auther: syh
* @Date: 2020/7/9
* @Description:
*/
public class Domain {
private String id;
private String value;
public Domain(String id, String value) {
this.id = id;
this.value = value;
}
public void setValue(String value) {
this.value = value;
}
@Override
public String toString() {
return "Domain{" +
"id='" + id + '\'' +
", value='" + value + '\'' +
'}';
}
}
Domain Consmer
package com.gravel.demo.test.disruptor;
import com.gravel.demo.test.disruptor.base.EventConsumer;
/**
* @Auther: syh
* @Date: 2020/7/8
* @Description:
*/
public class DomainConsumer extends EventConsumer {
public DomainConsumer() {
this("FirstDisruptorHandler" + (Math.random() * 100));
}
public DomainConsumer(String name) {
super(name);
}
@Override
public void consume(Domain data, Boolean over) throws Exception {
// errorHandler :worker ,handler
/*if (Objects.isNull(over)) {
throw new RuntimeException(getName() + " handle exception.");
}*/
System.out.println(String.format("received by %s, data: %s, is over?%s", getName(), data.toString(), over));
data.setValue(getName());
}
}
ConsmerrerError Handler
package com.gravel.demo.test.disruptor;
import com.gravel.demo.test.disruptor.base.ErrorHandler;
/**
* @Auther: syh
* @Date: 2020/7/8
* @Description:
*/
public class DomainErrorHandler extends ErrorHandler {
@Override
protected void handle(T object, Throwable throwable) {
System.err.println(String.format("received a error message: %s, data: %s, ", throwable.getMessage(), object));
// , handler
throw new IllegalStateException("interrupted.");
}
}
Limit ThreadFactory
package com.gravel.demo.test.disruptor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Auther: syh
* @Date: 2020/7/8
* @Description:
*/
public class LimitedThreadFactory implements ThreadFactory {
private final AtomicInteger count = new AtomicInteger(0);
public Thread newThread(Runnable r) {
if (count.compareAndSet(0, 1)) {
return new Thread(r);
} else {
throw new IllegalStateException("Created more that one thread");
}
}
}
デモテスト(消費データを一つだけ測定する)
単生産単消費(workerとhandlerコールの結果は一致している)
.producer(producer1)
.handler(handler1)
呼び出し結果
producer message by producer1, data: Domain{id='0', value='init'}
received by handler1, data: Domain{id='0', value='init'}, is over?true
used time: 19ms
単一生産者の多消費者(handerタイプで、菱形呼び出しチェーンを設置する。)
.producer(producer1)
.handler(handler1, handler2)
.after(handler1).handler(after1)
.after(handler2).handler(after2)
.after(after1, after2).handler(after3)
呼び出し結果(イベントデータは各ハンドルで消費されます。)
producer message by producer1, data: Domain{id='0', value='init'}
received by handler2, data: Domain{id='0', value='init'}, is over?true
received by handler1, data: Domain{id='0', value='init'}, is over?true
received by after2, data: Domain{id='0', value='handler1'}, is over?true
received by after1, data: Domain{id='0', value='handler1'}, is over?true
received by after3, data: Domain{id='0', value='after1'}, is over?true
used time: 19ms
単スレッド、単生産者多消費者(workタイプ、workタイプは直接afterを設定できません。)
.producer(producer1)
.worker(handler1, handler2)
呼び出しの結果(handlerと比較して、イベントデータは一つのworkerによって消費されるだけであることが分かります。)
producer message by producer1, data: Domain{id='0', value='init'}
received by handler1, data: Domain{id='0', value='init'}, is over?null
used time: 21ms
多生産単消費(workとhandlerは一致)
.handler(handler1)
.producer(producer1, producer2)
運転結果(データが書き換えられました)
producer message by producer1, data: Domain{id='0', value='init'}
producer message by producer2, data: Domain{id='0', value='init'}
received by handler1, data: Domain{id='0', value='init'}, is over?true
received by handler1, data: Domain{id='0', value='handler1'}, is over?true
used time: 18ms
多生産多消費(workerとhandlerの混合用)
.producer(producer1, producer2)
.worker(after1, after2)
.handler(handler1, handler2)
呼び出し結果
producer message by producer1, data: Domain{id='0', value='init'}
producer message by producer2, data: Domain{id='0', value='init'}
received by after2, data: Domain{id='0', value='init'}, is over?null
received by after1, data: Domain{id='0', value='after2'}, is over?null
received by handler2, data: Domain{id='0', value='after1'}, is over?false
received by handler1, data: Domain{id='0', value='after1'}, is over?false
received by handler2, data: Domain{id='0', value='handler2'}, is over?true
received by handler1, data: Domain{id='0', value='handler1'}, is over?true
used time: 26ms