初認識Disruptorコンカレントフレームワーク


一、Disruptorとは
Martin Fowlerは自分のウェブサイトにLMAXアーキテクチャの文章を書いて、文章の中で彼はLMAXが新型小売金融取引プラットフォームであることを紹介して、それは低い遅延で大量の取引を生むことができます.このシステムはJVMプラットフォームに構築され、その核心はビジネスロジックプロセッサであり、1つのスレッドで毎秒6百万の注文を処理することができる.ビジネスロジックプロセッサは完全にメモリで実行され、イベントソースで駆動されます.ビジネスロジックプロセッサの核心はDisruptorです.Disruptorはオープンソースの同時フレームワークであり、2011 Duke’sプログラムフレームワーク革新賞を受賞し、ロックなしでネットワークのQueue同時操作を実現することができる.Disruptorは高性能の非同期処理フレームワークであるか,最も速いメッセージフレームワーク(軽量のJMS)と考えられるか,オブザーバーモードの実装であるか,イベントリスニングモードの実装であると考えられる.使用する前に、disruptorの主な機能について説明します.彼は効率的な「生産者-消費者」モデルであることが理解できます.従来のBlockingQueue容器よりもはるかに高い性能を有している.JDKのマルチスレッドとコンカレントライブラリにおいて、BlockingQueueが生産者-消費者モデルBlockingQueueを実現することはロックに基づいて実現するが、ロックの効率は通常低い.CASメカニズムを用いて実現する生産者-消費者Disruptorがあるかどうかはこうである.Disruptorはオブザーバーモードを使用して、受信者がキューから取り出すのを待つのではなく、受信者(消費者に相当)に積極的にメッセージを送信する.ロックなしではqueue(リング,RingBuffer)の同時動作を実現し,BlockingQueueよりはるかに高い性能を示した.
二、Disruptor実現特徴
低遅延を実現する詳細は、Disruptorでロックされていないアルゴリズムを利用し、すべてのメモリの可視性と正確性はメモリバリアまたはCAS操作を利用することです.CASを使用してマルチスレッドのセキュリティを保証するには、ほとんどの同時キューで使用されるロックに比べて、CASは明らかに速いです.CASはCPUレベルの命令で、より軽量で、ロックのようにオペレーティングシステムのサポートを必要としないため、呼び出すたびにユーザー状態とカーネル状態を切り替える必要はなく、コンテキストの切り替えも必要ありません.1つの例ではロックが必要です.それはBlockingWaitStrategy(ブロック待機ポリシー)です.唯一の実装方法は、Conditionを使用して消費者が新しいイベントが来る前に待機することです.多くの低遅延システムは、Conditionのジッタを回避するために待機を使用するが、システムの待機動作において、特にCPUリソースが著しく制限されている場合、例えば仮想環境下のWEBサーバのような性能が著しく低下する可能性がある.
三、Disruptorの核心概念
3.1、RingBuffer
名前の通り、環状のバッファです.かつてRingBufferはDisruptorの中で最も主要なオブジェクトであったが、3.0バージョンからはDisruptorによって交換されたデータ(イベント)の格納と更新のみを担当する役割に簡略化された.いくつかのより高度なアプリケーションシーンでは、Ring Bufferは、ユーザのカスタムインプリメンテーションによって完全に代替することができる.
1.利点:Ringbufferは、信頼性の高いメッセージングにおいて優れたパフォーマンスを有するため、このようなデータ構造を採用している.これで十分ですが、他にもいくつかの利点があります.まず、配列であるため、チェーンテーブルよりも速く、予測しやすいアクセスモードがあります.(注:配列内の要素のメモリアドレスの連続性が格納されています).これはCPUキャッシュに友好的である.すなわち、ハードウェアレベルでは、配列内の要素がプリロードされるため、ringbufferでは、cpuが配列内の次の要素を時々ホストする必要はない.(校正注:1つの要素がキャッシュ行にロードされる限り、他の隣接するいくつかの要素も同じキャッシュ行にロードされるため).次に、プログラムが終了しない限り、配列オブジェクトが常に存在するように、配列にメモリを割り当てることができます.これは、ごみの回収に多くの時間を費やす必要がないことを意味します.また、チェーンテーブルのように、その上に追加されたオブジェクトごとにノードオブジェクトを作成する必要はありません.ノードを削除するときは、対応するメモリクリーンアップ操作を実行する必要があります.
2.RingBufferの下位実装:RingBufferは、先頭と末尾が接続された環状配列であり、いわゆる先頭と末尾が接続されている.これは、RingBuffer上のポインタが配列を越えて上界である場合、配列ヘッダから遍歴し続けることを意味する.したがって、RingBufferには少なくとも1つのポインタがあり、RingBufferにおける動作位置を表す.また,ポインタの自増操作には同時制御が必要であり,Disruptorと本稿のOptimizedQueueはともにCASの楽観的同時制御を用いてポインタの自増原子性を保証しているが,楽観的同時制御については後で重点的に紹介する.DisruptorのRingBufferには、現在のRingBuffer上のメッセージがどこに書かれているかを示すポインタが1つしかありません.また、各消費者は、RingBuffer上でどこまで読んだかを示すsequenceを維持します.この観点から、DisruptorのRingBufferには実際に消費者数+1個のポインタがあります.単一メッセージ・シート消費のブロック・キューを実現するには、1つのリード・ポインタ(対応消費者)と1つのライト・ポインタ(対応生産者)を維持すればよいため、どのポインタも、リード・ライト操作のたびに1回増加し、境界を越えると、配列ヘッダから読み書きを継続することができる.
3.2、SequenceDisruptor
順次インクリメントされたシーケンス番号によって交換されたデータ(イベント)を番号付けして管理し、データ(イベント)の処理過程は常にシーケンス番号に沿ってインクリメントされる.1つのSequenceは、特定のイベントハンドラ(RingBuffer/Consumer)を識別する処理の進捗を追跡するために使用される.1つのAtomicLongは進捗を識別するためにも使用できるが、この問題を担当するためにSequenceを定義するもう1つの目的は、異なるSequence間のCPUキャッシュ偽共有(Flase Sharing)問題を防止することである.(注:これはDisruptorが高性能を実現する鍵の一つであり、ネット上では偽共有問題についての紹介はすでに汗をかいており、ここでは後述しない).
Sequencer
SequencerはDisruptorの真のコアです.このインタフェースには、生産者と消費者の間でデータを迅速かつ正確に伝達する同時アルゴリズムを定義する2つの実装クラスSingleProducerSequencer、MultiproducerSequencerがあります.
Sequence Barrier
RingBufferのmain published SequenceおよびConsumerに依存する他のConsumerのSequenceへの参照を保持するために使用されます.Sequence Barrierはまた、Consumerに処理可能なイベントがあるかどうかを決定する論理を定義する.
Wait Strategy
Consumerが次のイベントを待つポリシーを定義します.(注:Disruptorは、さまざまなポリシーを定義し、異なるシーンに対して異なるパフォーマンスを提供します)
Event
Disruptorの意味では,生産者と消費者の間で交換されるデータをイベント(Event)と呼ぶ.これは、Disruptorによって定義される特定のタイプではなく、Disruptorのユーザによって定義され、指定されます.
EventProcessor
EventProcessorは、特定消費者(Consumer)のSequenceを保有し、イベント処理実装を呼び出すイベントループ(Event Loop)を提供する.
EventHandler
Disruptorが定義したイベント処理インタフェースは,ユーザによって実現され,イベントを処理するために用いられ,Consumerの真の実装である.
Producer
すなわち、生産者は、一般的には、特定のインタフェースまたはタイプを定義していないDisruptorパブリッシュイベントを呼び出すユーザコードを指す.
四:インスタンスDisruptorによる読み書き操作
4.1.PomはMaven依存情報を導入

		
			com.lmax
			disruptor
			3.2.1
		
	

4.2、伝達するデータを含むイベントを宣言する
public class DisruptorEvent {

    private String name;

    private String value;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }
}

4.3、EventFactoryを宣言してEventオブジェクトをインスタンス化する
public class DisruptorEventFactory implements EventFactory {

    public DisruptorEvent newInstance() {
        return new DisruptorEvent();
    }
}

4.4、メッセージ送信者の作成
public class DisruptorEventProducer {

    public final RingBuffer ringBuffer;

    public DisruptorEventProducer(RingBuffer ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(DisruptorEvent producerEvent) {
        // 1.ringBuffer          
        long sequence = ringBuffer.next();

        try {
            //2.        
            DisruptorEvent stringEvent = ringBuffer.get(sequence);
            //3.           
            stringEvent.setName(producerEvent.getName());
            stringEvent.setValue(producerEvent.getValue());
        } catch (Exception e) {

        } finally {
            System.out.println("         ");
            //4.    
            ringBuffer.publish(sequence);

        }

    }
}

4.5、メッセージ受信者の作成
public class DisruptorEventHandler implements EventHandler {

    public void onEvent(DisruptorEvent stringEvent, long l, boolean b) throws Exception {
        System.out.println("     name:"+stringEvent.getName()+"----value:"+stringEvent.getValue());
    }
}

4.6、テストクラスの作成
public class DisruptorApp {

    public static void main(String[] args) {
        // 1.                            
        ExecutorService executor = Executors.newCachedThreadPool();
        // 2.  Factory      Event
        DisruptorEventFactory factory = new DisruptorEventFactory();
        // 3.  ringBuffer   
        int ringBufferSize = 1024 * 1024; // ringBufferSize      2 N  
        // 4.  Disruptor
        Disruptor disruptor = new Disruptor(factory, ringBufferSize, executor, ProducerType.MULTI, new YieldingWaitStrategy());
        // 5.                  。         
        EventHandlerGroup handlerGroup = disruptor.handleEventsWith(new DisruptorEventHandler());
        // handlerGroup.then(new DisruptorEventOneHandler());// then               
        disruptor.start();
        // 7.  RingBuffer  
        RingBuffer ringBuffer = disruptor.getRingBuffer();
        // 8.      
        DisruptorEventProducer eventProducer = new DisruptorEventProducer(ringBuffer);
        for (int i = 0; i < 10; i++) {
            DisruptorEvent producerEvent=new DisruptorEvent();
            producerEvent.setName("   "+i);
            producerEvent.setValue("  "+i+" ");
            eventProducer.onData(producerEvent);
        }
        executor.shutdown();
        disruptor.shutdown();
    }
}

より多くの学習資料がプログラミングネットワークを並行している-ifeve.com/disruptor/