JavaコンカレントフレームワークDisruptorソース分析:RingBuffer
34900 ワード
JavaコンカレントフレームワークDisruptorソース分析:RingBuffer
Disruptorの紹介
公式文書によると、Disruptorは高性能なスレッド間通信ライブラリである.それはLMAXの同時、性能と非ブロックアルゴリズムの研究から来て、今取引システムのインフラストラクチャの核心部分です.
The LMAX Disruptor is a high performance inter-thread messaging library. It grew out of LMAX’s research into concurrency, performance and non-blocking algorithms and today forms a core part of their Exchange’s infrastructure.
Disruptorの高性能の原因は以下の点です:1.ロックされていないデータ構造RingBuffer 2.擬似共有&キャッシュライン充填
この記事では、まずリングバッファのRingBufferを紹介し、ソースコードに深く入り込んでDisruptorがどのようにロックなしでRingBufferを操作するかを分析します.
RingBufferの紹介
リングバッファ(ring buffer)は、固定サイズでヘッドとテールが接続されたバッファを表すデータ構造であり、データストリームをキャッシュするのに適している.RingBufferは通常配列実装を採用し、CPUキャッシュに友好的で、チェーンテーブルより性能が良い.
1つの円形バッファには、4つのキーパラメータがあります.1.メモリアドレスです.2.バッファ長.3.バッファに格納された有効データの開始位置:リードポインタ.4.バッファに格納されている有効なデータの末尾:ライトポインタ.
次に、DisruptorのRingBufferの実装を深く研究し、ロックなしで読み書きする方法を見てみましょう.
RingBufferソース分析
ソースコードを見る前に、Disruptorがどのように使用されているかを知る必要があります:Disruptor入門.
初期化
まずRingBufferクラスの構造方法を見てみましょう.
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
RingBuffer(EventFactory eventFactory, Sequencer sequencer)
{
super(eventFactory, sequencer);
}
}
abstract class RingBufferFields extends RingBufferPad
{
private static final int BUFFER_PAD;
private static final long REF_ARRAY_BASE;
private static final int REF_ELEMENT_SHIFT;
private static final Unsafe UNSAFE = Util.getUnsafe();
static
{
// ,
final int scale = UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale)
{
REF_ELEMENT_SHIFT = 2;
}
else if (8 == scale)
{
REF_ELEMENT_SHIFT = 3;
}
else
{
throw new IllegalStateException("Unknown pointer size");
}
BUFFER_PAD = 128 / scale;
// Including the buffer pad in the array base offset
REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
}
RingBufferFields(EventFactory eventFactory, Sequencer sequencer)
{
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
// bufferSize 2 N
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.indexMask = bufferSize - 1;
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
}
// bufferSize
private void fill(EventFactory eventFactory)
{
for (int i = 0; i < bufferSize; i++)
{
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
}
まず、構築方法の2つのパラメータを簡単に説明します.1.Sequencer:生産者がキャッシュにアクセスするためのコントローラで、消費者シーケンス番号の参照を持っています.新しいイベントがリリースされた後、WaitStrategyを介して待機中のSequenceBarrierに通知されます.2.EventFactory:RingBufferに格納されている元素の初期化学工場類.
構築方法から「bufferSize must be a power of 2」を見ると、ビット操作を容易に使用してメモリ内のリードライト要素の位置を取得することが要求され、残りの%操作よりも効率が高い.RingBufferという点はLinuxカーネルのkfifoと一致している.
申請した配列entriesの実際のサイズはbufferSize+2*BUFFER_PAD,BUFFER_PAD個の配列要素は128バイトを占有し,すなわち配列前後に128バイトずつ充填されているが,これは主に擬似共有を防止するためである.
書き込み操作
公式文書に書かれているデータの例は次のとおりです.
// RingBuffer
public class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
}
public class LongEventProducer {
private final RingBuffer ringBuffer;
public LongEventProducer(RingBuffer ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer bb) {
long sequence = ringBuffer.next(); //
try {
LongEvent event = ringBuffer.get(sequence); //
event.set(bb.getLong(0)); //
} finally {
ringBuffer.publish(sequence); //
}
}
}
コードから,RingBufferの書き込み操作は3つのステップに分けられる:1.次のノードを申請する.2.データを書き込みます.3.提出します.
次の書き込み可能なノードシーケンス番号を申請するには、Sequencerの同名メソッドに委任されたRingBufferのnextメソッドが呼び出されます.Sequencerには、単一生産者バージョンSingleProducerSequencer、マルチ生産者バージョンMultiproducerSequencerの2つの実装があります.両者の違いは、マルチプロバイダが次のライトノードを競合して取得する必要があり、シングルプロバイダバージョンでは競合していないことです.まず、マルチプロダクションバージョンのコードを見てみましょう.
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
@Override
public long next()
{
return sequencer.next();
}
}
public final class MultiProducerSequencer extends AbstractSequencer
{
@Override
public long next()
{
return next(1);
}
//
@Override
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
// cursor
current = cursor.get();
next = current + n;
long wrapPoint = next - bufferSize;
// cachedGatingSequence ( )
long cachedGatingSequence = gatingSequenceCache.get();
//
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence)
{
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
// CAS cursor
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
}
MultiproducerSequencerはCAS操作を使用してライトポインタの位置を更新します.これはSingleProducerSequencerとの主な違いで、シングルプロダクションモードはライト競合がないため、直接設定されています.単生産者と多生産者をわざわざ区別するのは、CAS操作は結局いくつかの性能を損なうため、競争がない場合、直接付与効率が高いからだ.
リードオペレーション
データを消費する必要がある場合は、EventHandlerインタフェースを実装し、disruptorに入れる必要があります.
public class LongEventHandler implements EventHandler<LongEvent> {
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
System.out.println(Thread.currentThread().getName() + " Event: " + event);
}
}
Disruptor disruptor = new Disruptor<>(factory, bufferSize, Executors.newFixedThreadPool(3));
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
Disruptor関連EventHandlerのコードは次のとおりです.
public class Disruptor<T>
{
public EventHandlerGroup handleEventsWith(final EventHandler super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}
EventHandlerGroup createEventProcessors( final Sequence[] barrierSequences,
final EventHandler super T>[] eventHandlers)
{
checkNotStarted();
// Sequence ,
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
// SequenceBarrier , SequenceBarrier
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
// eventHandler , eventHandler
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler super T> eventHandler = eventHandlers[i];
final BatchEventProcessor batchEventProcessor =
new BatchEventProcessor(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup(this, consumerRepository, processorSequences);
}
}
本当にデータのポーリング処理を担当するのはBatchEventProcessorクラスで、大まかな手順は以下の通りです:1.読み取り可能なデータ番号を取得します.2.データを個別に処理する.3.既読データの場所を更新します.
public final class BatchEventProcessor<T> implements EventProcessor {
@Override
public void run() {
if (!running.compareAndSet(false, true)) {
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
T event = null;
long nextSequence = sequence.get() + 1L;
try {
while (true) {
try {
//
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null) {
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
//
while (nextSequence <= availableSequence) {
// ,
event = dataProvider.get(nextSequence);
// eventHandler
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
//
sequence.set(availableSequence);
} catch (final TimeoutException e) {
notifyTimeout(sequence.get());
} catch (final AlertException ex) {
if (!running.get()) {
break;
}
} catch (final Throwable ex) {
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
} finally {
notifyShutdown();
running.set(false);
}
}
}
ここで重要なのは、読み取り可能なデータ番号を取得することです.ProcessingSequenceBarrierのwaitForメソッドを詳しく見てみましょう.
final class ProcessingSequenceBarrier implements SequenceBarrier
{
@Override
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
// waitStrategy BlockingWaitStrategy
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
{
return availableSequence;
}
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
}
このメソッドは主にwaitStrategyのwaitForメソッドを呼び出し、デフォルトのwaitStrategyを例にコードを見ます.
public final class BlockingWaitStrategy implements WaitStrategy
{
private final Lock lock = new ReentrantLock();
private final Condition processorNotifyCondition = lock.newCondition();
@Override
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
// cursorSequence ,sequence , , RingBuffer ,
if (cursorSequence.get() < sequence)
{
lock.lock();
try
{
while (cursorSequence.get() < sequence)
{
barrier.checkAlert();
processorNotifyCondition.await();
}
}
finally
{
lock.unlock();
}
}
// ,dependentSequence cursorSequence
// ,dependentSequence Sequence,get
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
}
return availableSequence;
}
}
BatchEventProcessorは、1組の消費者のうち1人しか消費者がいない場合に適用されますが、同じ組の消費者のうち複数の消費者がいる場合はどうすればいいのでしょうか.WorkerPoolを使用しています.1つのWorkerPoolには複数のWorkProcessor消費者が含まれており、WorkProcessorは消費データのポーリングを担当しています.対応するDisruptorのコンシューマグループ作成方法は次のとおりです.
public class Disruptor<T>
{
public EventHandlerGroup handleEventsWithWorkerPool(final WorkHandler... workHandlers)
{
return createWorkerPool(new Sequence[0], workHandlers);
}
EventHandlerGroup createWorkerPool(
final Sequence[] barrierSequences, final WorkHandler super T>[] workHandlers)
{
final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
final WorkerPool workerPool = new WorkerPool(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
consumerRepository.add(workerPool, sequenceBarrier);
final Sequence[] workerSequences = workerPool.getWorkerSequences();
updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
return new EventHandlerGroup(this, consumerRepository, workerSequences);
}
}
public final class WorkerPool<T>
{
private final AtomicBoolean started = new AtomicBoolean(false);
private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final RingBuffer ringBuffer;
// WorkProcessors are created to wrap each of the provided WorkHandlers
private final WorkProcessor>[] workProcessors;
public WorkerPool(
final RingBuffer ringBuffer,
final SequenceBarrier sequenceBarrier,
final ExceptionHandler super T> exceptionHandler,
final WorkHandler super T>... workHandlers)
{
this.ringBuffer = ringBuffer;
final int numWorkers = workHandlers.length;
workProcessors = new WorkProcessor[numWorkers];
for (int i = 0; i < numWorkers; i++)
{
workProcessors[i] = new WorkProcessor(
ringBuffer,
sequenceBarrier,
workHandlers[i],
exceptionHandler,
workSequence);
}
}
}
データのポーリング処理を担当するWorkProcessorクラスを見てみましょう.
public final class WorkProcessor<T>
implements EventProcessor
{
private final AtomicBoolean running = new AtomicBoolean(false);
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final RingBuffer ringBuffer;
private final SequenceBarrier sequenceBarrier;
private final WorkHandler super T> workHandler;
private final ExceptionHandler super T> exceptionHandler;
private final Sequence workSequence;
private final TimeoutHandler timeoutHandler;
public WorkProcessor(
final RingBuffer ringBuffer,
final SequenceBarrier sequenceBarrier,
final WorkHandler super T> workHandler,
final ExceptionHandler super T> exceptionHandler,
final Sequence workSequence)
{
this.ringBuffer = ringBuffer;
this.sequenceBarrier = sequenceBarrier;
this.workHandler = workHandler;
this.exceptionHandler = exceptionHandler;
this.workSequence = workSequence;
if (this.workHandler instanceof EventReleaseAware)
{
((EventReleaseAware) this.workHandler).setEventReleaser(eventReleaser);
}
timeoutHandler = (workHandler instanceof TimeoutHandler) ? (TimeoutHandler) workHandler : null;
}
@Override
public void run()
{
// Processor
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
boolean processedSequence = true;
long cachedAvailableSequence = Long.MIN_VALUE;
long nextSequence = sequence.get();
T event = null;
while (true)
{
try
{
if (processedSequence)
{
processedSequence = false;
do
{
nextSequence = workSequence.get() + 1L;
sequence.set(nextSequence - 1L);
}
// workSequence, CAS
while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
}
// cachedAvailableSequence nextSequence ,
if (cachedAvailableSequence >= nextSequence)
{
event = ringBuffer.get(nextSequence);
workHandler.onEvent(event);
processedSequence = true;
}
else
{
//
cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
}
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
// handle, mark as processed, unless the exception handler threw an exception
exceptionHandler.handleEventException(ex, nextSequence, event);
processedSequence = true;
}
}
notifyShutdown();
running.set(false);
}
}
単一消費者のBatchEventProcessorとは異なり、1.sequenceBarrierに読み取り可能なデータ番号を申請する以外は、同じグループの消費者間で反発アクセスを保証する(workSequenceによって保証される).2.BatchEventProcessorで申請すると、一度にデータを処理できますが、ここでは一度に1つのデータしか処理できません.
まとめ
生産者側でライトポインタロールを担当するのはSequencerオブジェクトであり、消費者側でリードポインタロールを担当するのはSequenceオブジェクトであり、SequenceBarrierは消費者間および消費者とRingBuffer間で依存関係を確立するために使用される:生産者のライトポインタ、依存する他の消費者のリードポインタに基づいて次の消費可能なデータの位置を計算する.
マルチプロダクションでスレッドセキュリティの確保を担当しているのはMultiproducerSequencerで、マルチコンシューマでスレッドセキュリティを確保しているのはWorkProcessorで、読み書きノードの競合に対してCAS操作を採用しており、ヘビー級ロックよりも効率が高い.
異なるWaitStrategyはRingBufferが空または満の場合、消費者と生産者の待機戦略を決定した.
生産者側と消費者側は特に、SingleProducerSequencerとMultiproducerSequencer、BatchEventProcessor、WorkProcessorを区別しています.これは主に競合のない状況を最適化するためであり,競合がある場合はCASを用い,競合がない場合はCASさえ必要とせず,より性能が高い.
参考資料