Dispruptor高性能同時フレーム二次パッキング

25966 ワード

Dispruptorはjavaの高性能を同時に処理するフレームです。JDKのBlockingQueと似たようなところがありますが、処理速度はとても速いです。「スレッド1秒で600万個の注文を処理できる」と言われています。
Dispruptorは非常に強力な機能を持っています。例えば、消費者が閉塞して待っています。生産者-消費者の一対一、一対多、多対一、多対多;消費者チェーン/並列処理チェーンなどを構築する。
具体的な概念モデルは参照できます。https://www.cnblogs.com/haiq/p/4112689.html
 
次は私がディスク・フレームに基づいてパッケージ化したツールです。fluntコードスタイルを採用して、Displaptorの呼び出しを簡略化しました。
package com.gravel.demo.test.disruptor;

import com.gravel.demo.test.disruptor.base.EventProducer;
import com.gravel.demo.test.disruptor.base.Publisher;
import com.gravel.demo.test.disruptor.base.PublisherBuilder;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description: Disruptor          
 */
public class DisruptorTest {

    public static void main(String[] args) throws Exception {
        builderTest();
    }

    private static void builderTest() throws Exception {
        //          
        EventProducer producer1 = new EventProducer<>("producer1");
        EventProducer producer2 = new EventProducer<>("producer2");

        //          
        DomainConsumer handler1 = new DomainConsumer("handler1");
        DomainConsumer handler2 = new DomainConsumer("handler2");
        DomainConsumer after1 = new DomainConsumer("after1");
        DomainConsumer after2 = new DomainConsumer("after2");
        DomainConsumer after3 = new DomainConsumer("after3");
        DomainConsumer then = new DomainConsumer("then");

        //        
        final Publisher publisher = PublisherBuilder.newBuilder()
                //       
                // .threadFactory(r -> new Thread(r))
                // .threadFactory(new LimitedThreadFactory())
                // .threadFactory(Executors.defaultThreadFactory())
                //       
                // .producerType(ProducerType.SINGLE)
                // .producerType(ProducerType.MULTI)
                //       
                // .eventFactory(new EventFactory())
                //       
                // .waitStrategy(new SleepingWaitStrategy())
                // .waitStrategy(new YieldingWaitStrategy())
                //       
                // .publishStrategy(PublishStrategy.TRANSLATOR)
                // .publishStrategy(PublishStrategy.NORMAL)
                //   ringBuffer  
                // .ringSize(1024 * 8)
                //        
                .exceptionHandler(new DomainErrorHandler<>())
                //    Disruptor,                    。
                .disruptor()
                //       
                // .producer(producer1)
                //        
                // .handler(handler1)
                //       
                .producer(producer1, producer2)
                //  ======      workers  handlers    start  =======
                // .worker(handler1)
                .handler(handler1, handler2)
                .after(handler1).handler(after1)
                .after(handler2).handler(after2)
                .after(after1, after2).handler(after3)
                // .then(after3)
                //  ======      workers  handlers    end  =======
                //   
                .build();

        long start = System.currentTimeMillis();
        try {

            for (int i = 0; i < 500; i++) {
                publisher
                        //      
                        //.publish(new Domain(String.valueOf("a" + i), "init"))
                        .publish(new Domain(String.valueOf(i), "init"));
            }
        } finally {
            long sleep = 200;
            Thread.sleep(sleep);
            System.out.println("used time: " + (System.currentTimeMillis() - start - sleep) + "ms");
        //           
            publisher.shutdown();
        }
    }
}
  
上のコードから見て、私達が包装した工具類の入り口はPublisherです。彼はスレッド工場、イベント工場、待ち戦略(WaitStrategy)、メッセージ生産者(Producer)、消費者(Handler/Worker)などの一連のDiruptorに必要なパラメータを配置することができます。
この中で、ニュース生産者と消費者はPublisherのキーですので、後で重点的に説明します。まず他のPublisher Builder類を見てください。
package com.gravel.demo.test.disruptor.base;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public class PublisherBuilder {
    // Publish     
    private static final int RING_SIZE = 1024 * 8;
    private static final ThreadFactory THREAD_FACTORY = Executors.defaultThreadFactory();
    private static final WaitStrategy WAIT_STRATEGY = new SleepingWaitStrategy();
    private static final com.lmax.disruptor.EventFactory EVENT_FACTORY = new EventFactory();
    private static final ProducerType PRODUCER_TYPE = ProducerType.SINGLE;
    private static final PublishStrategy PUBLISH_STRATEGY = PublishStrategy.TRANSLATOR;

    private com.lmax.disruptor.EventFactory eventFactory;
    private ThreadFactory threadFactory;
    private WaitStrategy waitStrategy;
    private ExceptionHandler exceptionHandler;
    private ProducerType type;
    private PublishStrategy publishStrategy;
    private EventPublisher publisher;
    private int ringSize;

    public static PublisherBuilder newBuilder() {
        return new PublisherBuilder();
    }
    
    /**
     *   ringBuffer size,   2 n  。  1024*8
     *
     * @param ringSize
     * @return
     */
    public PublisherBuilder ringSize(int ringSize) {
        this.ringSize = ringSize;
        return this;
    }

    /**
     *   eventFactory,   EventFactory
     *
     * @param eventFactory
     * @param 
     * @return
     */
    public  PublisherBuilder eventFactory(com.lmax.disruptor.EventFactory eventFactory) {
        this.eventFactory = eventFactory;
        return this;
    }

    /**
     *   ThreadFactory,   Executors.defaultThreadFactory()
     *
     * @param threadFactory
     * @return
     */
    public PublisherBuilder threadFactory(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        return this;
    }

    /**
     *       ,   SleepingWaitStrategy
     *
     * @param waitStrategy
     * @return
     */
    public PublisherBuilder waitStrategy(WaitStrategy waitStrategy) {
        this.waitStrategy = waitStrategy;
        return this;
    }

    public PublisherBuilder publishStrategy(PublishStrategy publishStrategy) {
        this.publishStrategy = publishStrategy;
        return this;
    }

    /**
     *    disruptor
     *
     * @return
     */
    public PublisherBuilder disruptor() {
        this.eventFactory = this.eventFactory == null ? EVENT_FACTORY : this.eventFactory;
        this.threadFactory = this.threadFactory == null ? THREAD_FACTORY : this.threadFactory;
        this.waitStrategy = this.waitStrategy == null ? WAIT_STRATEGY : this.waitStrategy;
        this.ringSize = this.ringSize <= 0 ? RING_SIZE : this.ringSize;
        this.type = this.type == null ? PRODUCER_TYPE : this.type;
        this.publishStrategy = this.publishStrategy == null ? PUBLISH_STRATEGY : this.publishStrategy;

        publisher = new EventPublisher<>(eventFactory, ringSize, threadFactory, waitStrategy, exceptionHandler, type, publishStrategy);
        return this;
    }

    /**
     *      
     * @param producers
     * @param 
     * @return
     */
    public  PublisherBuilder producer(EventProducer ...producers) {
        if (isInit()) {
            this.publisher.producer(producers);
        }
        return this;
    }

    /**
     * eventHandler:  event       handler  
     *
     * @param eventHandlers
     * @param 
     * @return
     */
    public  PublisherBuilder handler(EventHandler>... eventHandlers) {
        if (isInit()) {
            this.publisher.eventHandler(eventHandlers);
        }
        return this;
    }

    /**
     * workHandler:  event       work  
     *
     * @param workHandlers
     * @param 
     * @return
     */
    public  PublisherBuilder worker(WorkHandler>... workHandlers) {
        if (isInit()) {
            this.publisher.workHandler(workHandlers);
        }
        return this;
    }

    /**
     * handler work    
     *
     * @param thenEventHandlers
     * @param 
     * @return
     */
    public  PublisherBuilder then(EventHandler>... thenEventHandlers) {
        if (isInit()) {
            this.publisher.thenHandler(thenEventHandlers);
        }
        return this;
    }

    /**
     *     handler,      worker      
     *
     * @param afterEventHandlers
     * @param 
     * @return
     */
    public  PublisherBuilder after(EventHandler>... afterEventHandlers) {
        if (isInit()) {
            this.publisher.afterHandler(afterEventHandlers);
        }
        return this;
    }

    /**
     *   producerType
     *
     * @param type
     * @param 
     * @return
     */
    public  PublisherBuilder producerType(ProducerType type) {
        this.type = type;
        return this;
    }

    /**
     *      
     *
     * @param exceptionHandler
     * @param 
     * @return
     */
    public  PublisherBuilder exceptionHandler(ExceptionHandler exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
        return this;
    }

    private boolean isInit() {
        if (this.publisher == null) {
            throw new IllegalStateException("execute disruptor() function before set handlers or workers.");
        }
        return true;
    }

    public  Publisher build() {
        return this.publisher.start();
    }
}
  
この中のキーコードはdisruptor()方法であり、Diruptorの実用化の入り口である。eventHandlerとwork Handlerの指定はdisruptorのインスタンスの後でなければならないので、disruptor.start()が起動する前に。
したがって、私たちはツールを呼び出すには次のようなものが必要です。
PublisherBuilder.newBuilder()/*.    */.disruptor()/*  producer handler/worker*/.build();
  
Publisher BuilderのEvent Purlisher類を見に来ました。彼はPublisherインターフェースを実現しました。次の通りです。
Publisher.java
package com.gravel.demo.test.disruptor.base;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public interface Publisher {

    Publisher start();

    Publisher publish(T t);

    Publisher shutdown();
}
  
EventPublisher.java
package com.gravel.demo.test.disruptor.base;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.Objects;
import java.util.concurrent.ThreadFactory;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public class EventPublisher implements Publisher {

    private RingBuffer> ringBuffer;
    private Disruptor> disruptor;
    private EventHandlerGroup> handlerGroup;
    private PublisherState state;
    private PublishStrategy publishStrategy;
    private EventTranslator translator;
    private Producer[] producers;

    private enum PublisherState {
        START, SHUTDOWN
    }

    public EventPublisher(com.lmax.disruptor.EventFactory> factory,
                          int ringSize,
                          ThreadFactory threadFactory,
                          WaitStrategy waitStrategy,
                          ExceptionHandler> exceptionHandler,
                          ProducerType type,
                          PublishStrategy publishStrategy) {

        this.disruptor = new Disruptor<>(factory, ringSize, threadFactory,
                type, waitStrategy);

        if (!Objects.isNull(exceptionHandler)) {
            this.disruptor.setDefaultExceptionHandler(exceptionHandler);
        }

        this.ringBuffer = disruptor.getRingBuffer();

        this.publishStrategy = publishStrategy;
        if (publishStrategy == PublishStrategy.TRANSLATOR) {
            translator = new EventTranslator<>();
        }

        this.state = PublisherState.SHUTDOWN;
    }

    public EventPublisher producer(EventProducer ...producers) {
        if (!Objects.isNull(producers) && producers.length > 0) {
            for (EventProducer producer : producers) {
                producer.setRingBuffer(this.ringBuffer).setTranslator(this.translator);
            }
            this.producers = producers;
        }
        return this;
    }

    public EventPublisher eventHandler(EventHandler>... eventHandlers) {
        if (this.handlerGroup != null) {
            this.handlerGroup.handleEventsWith(eventHandlers);
        } else {
            this.handlerGroup = disruptor.handleEventsWith(eventHandlers);
        }
        return this;
    }

    public EventPublisher workHandler(WorkHandler>... workHandlers) {
        if (this.handlerGroup != null) {
            this.handlerGroup.handleEventsWithWorkerPool(workHandlers);
        } else {
            this.handlerGroup = disruptor.handleEventsWithWorkerPool(workHandlers);
        }
        return this;
    }

    public EventPublisher thenHandler(EventHandler>... thenHandlers) {
        this.handlerGroup.then(thenHandlers);
        return this;
    }

    public EventPublisher afterHandler(EventHandler>... afterHandlers) {
        this.handlerGroup = this.disruptor.after(afterHandlers);
        return this;
    }

    public Disruptor> getDisruptor() {
        return disruptor;
    }

    @Override
    public EventPublisher start() {
        this.disruptor.start();
        this.state = PublisherState.START;
        return this;
    }

    @Override
    public EventPublisher shutdown() {
        this.disruptor.shutdown();
        this.state = PublisherState.SHUTDOWN;
        return this;
    }

    @Override
    public EventPublisher publish(T t) {
        if (!isStarted()) {
            throw new IllegalStateException("publisher not start..");
        }
        if (producers == null || producers.length <= 0) {
            throw new IllegalStateException("producer must be specify.");
        }

        for (Producer producer : producers) {
            producer.produce(t);
        }
        return this;
    }

    private boolean isStarted() {
        return this.state == PublisherState.START;
    }
}
  
その中で、重要なコードは生産者と消費者チェーンの指定です。動的パラメータは、生産者と消費者が対一、対一、多対一、多対多対多の関係であることができることを示しています。
まず、publisherの最小データ単位Eventについて説明します。
package com.gravel.demo.test.disruptor.base;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public class Event {

    private T data;

    public void set(T data) {
        this.data = data;
    }

    public T get() {
        return data;
    }
}
  
また、生産者のコードを見てください。EventProducerはProducerから引き継ぎます。
Producer.java
package com.gravel.demo.test.disruptor.base;

/**
 * @Auther: syh
 * @Date: 2020/7/9
 * @Description:
 */
public interface Producer {
    void produce(T t);
}
  
Event Producer.java
package com.gravel.demo.test.disruptor.base;

import com.lmax.disruptor.RingBuffer;

/**
 * @Auther: syh
 * @Date: 2020/7/9
 * @Description:
 */
public class EventProducer implements Producer {
    private String name;
    private RingBuffer> ringBuffer;
    private EventTranslator translator;
    public EventProducer(String name) {
        this.name = name;
    }

    public EventProducer setRingBuffer(RingBuffer> ringBuffer) {
        this.ringBuffer = ringBuffer;
        return this;
    }

    public EventProducer setTranslator(EventTranslator translator) {
        this.translator = translator;
        return this;
    }

    @Override
    public void produce(T t) {
        System.out.println(String.format("producer message by %s, data: %s", name, t));
        if (translator != null) {
            ringBuffer.publishEvent(translator, t);
        } else {
            long seq = ringBuffer.next();
            try {
                Event event = ringBuffer.get(seq);
                event.set(t);
            } finally {
                // Disruptor    RingBuffer.publish             ,             publish
                //         sequence     ,                 producer
                ringBuffer.publish(seq);
            }
        }
    }
}
  
この中で、ringBufferはDiruptorのメンバーでなければなりません。ですから、プロデューサーを指定する時はリングバッファerを巡回しなければなりません。Translatorの対象も一例モードではスレッド間データの上書きが発生するかどうかは分かりません。スレッドが安全でない場合、各ProducerはTranslatorオブジェクトを初期化します。プロデュースから見られますが、disruptorはringBufferを通じてメッセージを発信しています。二つのリリース方法があります。一つはトランスポート方式を通じて、一つはsequence方式です。finallyの中のヒントに注意してください。
EventTranslatorのコードを見てみてください。EventTranslatoVargから引き継ぎます。tranlateTo()方法はEvent Factoryに作成されたインスタンスにデータを充填します。
package com.gravel.demo.test.disruptor.base;

import com.lmax.disruptor.EventTranslatorVararg;

/**
 * @Auther: syh
 * @Date: 2020/7/9
 * @Description:
 */
public class EventTranslator implements EventTranslatorVararg> {

    @Override
    public void translateTo(Event event, long sequence, Object... args) {
        event.set(((T)(args[0])));
    }
}
  
そして消費者です。workとhandlerの結果を統一的に処理するためには、すべての消費者がDiruptorのEventhandler/WorkHandlerを実現しなければなりません。もう一つのカスタムConsmerインターフェースがあります。
コネクタ
package com.gravel.demo.test.disruptor.base;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public interface Consumer {
    void consume(T data, Boolean over) throws Exception;
}
  
EventConsmer.java
package com.gravel.demo.test.disruptor.base;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public abstract class EventConsumer implements EventHandler>, Consumer, WorkHandler> {
    String name;

    public EventConsumer(String name) {
        this.name = name;
    }

    @Override
    public void onEvent(Event event, long seq, boolean over) throws Exception {
        consume(event.get(), over);
    }

    @Override
    public void onEvent(Event event) throws Exception {
        consume(event.get(), null);
    }

    protected String getName() {
        return name;
    }
}
  
抽象的なEventConsmerはworkerとhandlerのオンイベントを統一的に処理する方法です。具体的なconsume操作はユーザーが自分で実現する必要があります。demoの中のDomain Consmerのようです。
ここに来てください。重要な生産者と消費者コードは全部包装しました。
コア以外のコードを確認します。
エラー処理クラス:Error Handler Exception Handlerを実現します。
package com.gravel.demo.test.disruptor.base;

import com.lmax.disruptor.ExceptionHandler;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public abstract class ErrorHandler implements ExceptionHandler> {
    @Override
    public void handleEventException(Throwable throwable, long l, Event t) {
        handle(t.get(), throwable);
    }

    @Override
    public void handleOnStartException(Throwable throwable) {
    }

    @Override
    public void handleOnShutdownException(Throwable throwable) {
    }

    protected abstract void handle(T object, Throwable throwable);
}
  
EventFactoryイベント工場類、DiruptorのEeventFactoryを実現
package com.gravel.demo.test.disruptor.base;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public class EventFactory implements com.lmax.disruptor.EventFactory {

    @Override
    public Event newInstance() {
        return new Event();
    }
}
  
リングバッファer送信方式
package com.gravel.demo.test.disruptor.base;

/**
 * @Auther: syh
 * @Date: 2020/7/9
 * @Description:
 */
public enum  PublishStrategy {
    NORMAL, TRANSLATOR
}
  
これで、Diruptorのパッケージは終了しました。デモを放してください
本体Domain
package com.gravel.demo.test.disruptor;

/**
 * @Auther: syh
 * @Date: 2020/7/9
 * @Description:
 */
public class Domain {

    private String id;
    private String value;

    public Domain(String id, String value) {
        this.id = id;
        this.value = value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "Domain{" +
                "id='" + id + '\'' +
                ", value='" + value + '\'' +
                '}';
    }
}
  
Domain Consmer
package com.gravel.demo.test.disruptor;

import com.gravel.demo.test.disruptor.base.EventConsumer;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public class DomainConsumer extends EventConsumer {

    public DomainConsumer() {
        this("FirstDisruptorHandler" + (Math.random() * 100));
    }

    public DomainConsumer(String name) {
        super(name);
    }

    @Override
    public void consume(Domain data, Boolean over) throws Exception {
        // errorHandler   :worker   ,handler    
        /*if (Objects.isNull(over)) {
            throw new RuntimeException(getName() + " handle exception.");
        }*/

        System.out.println(String.format("received by %s, data: %s, is over?%s", getName(), data.toString(), over));
        data.setValue(getName());
    }
}
  
ConsmerrerError Handler
package com.gravel.demo.test.disruptor;

import com.gravel.demo.test.disruptor.base.ErrorHandler;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:
 */
public class DomainErrorHandler extends ErrorHandler {
    @Override
    protected void handle(T object, Throwable throwable) {
        System.err.println(String.format("received a error message: %s, data: %s, ", throwable.getMessage(), object));
        //     ,            handler
        throw new IllegalStateException("interrupted.");
    }
}
  
Limit ThreadFactory
package com.gravel.demo.test.disruptor;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Auther: syh
 * @Date: 2020/7/8
 * @Description:    
 */
public class LimitedThreadFactory implements ThreadFactory {
    private final AtomicInteger count = new AtomicInteger(0);

    public Thread newThread(Runnable r) {
        if (count.compareAndSet(0, 1)) {
            return new Thread(r);
        } else {
            throw new IllegalStateException("Created more that one thread");
        }
    }
}
 
デモテスト(消費データを一つだけ測定する)
単生産単消費(workerとhandlerコールの結果は一致している)
.producer(producer1)
.handler(handler1)
  
呼び出し結果
producer message by producer1, data: Domain{id='0', value='init'}
received by handler1, data: Domain{id='0', value='init'}, is over?true
used time: 19ms
  
単一生産者の多消費者(handerタイプで、菱形呼び出しチェーンを設置する。)
.producer(producer1)
.handler(handler1, handler2)
.after(handler1).handler(after1)
.after(handler2).handler(after2)
.after(after1, after2).handler(after3)
  
呼び出し結果(イベントデータは各ハンドルで消費されます。)
producer message by producer1, data: Domain{id='0', value='init'}
received by handler2, data: Domain{id='0', value='init'}, is over?true
received by handler1, data: Domain{id='0', value='init'}, is over?true
received by after2, data: Domain{id='0', value='handler1'}, is over?true
received by after1, data: Domain{id='0', value='handler1'}, is over?true
received by after3, data: Domain{id='0', value='after1'}, is over?true
used time: 19ms
  
単スレッド、単生産者多消費者(workタイプ、workタイプは直接afterを設定できません。)
.producer(producer1)
.worker(handler1, handler2)
   
呼び出しの結果(handlerと比較して、イベントデータは一つのworkerによって消費されるだけであることが分かります。)
producer message by producer1, data: Domain{id='0', value='init'}
received by handler1, data: Domain{id='0', value='init'}, is over?null
used time: 21ms
  
多生産単消費(workとhandlerは一致)
.handler(handler1)
.producer(producer1, producer2)
  
運転結果(データが書き換えられました)
producer message by producer1, data: Domain{id='0', value='init'}
producer message by producer2, data: Domain{id='0', value='init'}
received by handler1, data: Domain{id='0', value='init'}, is over?true
received by handler1, data: Domain{id='0', value='handler1'}, is over?true
used time: 18ms
  
多生産多消費(workerとhandlerの混合用)
.producer(producer1, producer2)
.worker(after1, after2)
.handler(handler1, handler2)
   
呼び出し結果
producer message by producer1, data: Domain{id='0', value='init'}
producer message by producer2, data: Domain{id='0', value='init'}
received by after2, data: Domain{id='0', value='init'}, is over?null
received by after1, data: Domain{id='0', value='after2'}, is over?null
received by handler2, data: Domain{id='0', value='after1'}, is over?false
received by handler1, data: Domain{id='0', value='after1'}, is over?false
received by handler2, data: Domain{id='0', value='handler2'}, is over?true
received by handler1, data: Domain{id='0', value='handler1'}, is over?true
used time: 26ms