Disruptorノート(四)-キークラスとコード
AggregateEventHandler.java
EventHandlerリストのカプセル化は、EventHandlerリストの機能に似ており、ライフサイクルの管理、onStart onShutdownも実現しています.
Sequence.java
Cache line padded sequence counterはCache lineのシーケンスカウンタを補完し、ringbufferとBatchEventProcessorはこのようなカウントに使用します.
補完方法:
EventHandlerリストのカプセル化は、EventHandlerリストの機能に似ており、ライフサイクルの管理、onStart onShutdownも実現しています.
Sequence.java
Cache line padded sequence counterはCache lineのシーケンスカウンタを補完し、ringbufferとBatchEventProcessorはこのようなカウントに使用します.
補完方法:
public
long
p1
,
p2
,
p3
,
p4
,
p5
,
p6
,
p7
;
// cache line padding, padding1
private
volatile
long
cursor
=
INITIAL_CURSOR_VALUE
;
publiclong p8, p9, p10, p11, p12, p13, p14;// cache line padding. padding2
1: object(0~8byte)+ padding1,
cursor+padding2
2: padding1+ cursor,
padding2 + object
, Sequence instance Cache line
:http://mechanical-sympathy.blogspot.com/2011/07/false-sharing.html
64 , Hotspot JVM , 24 hash code 8 , 。 Array "word" 。 , 8 byte 。 packing , field ( ) :
1.doubles(8) and longs(8)
2.ints(4) and floats(4)
3.shorts(2) and chars(2)
4.booleans(1) and bytes(1)
5.references(4/8)
6.<repeat for sub-class fields>
cache line: field 7 long(8)
BatchEventProcessor.java RingBuffer event EventHandler 。
:
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
T event = null;
long nextSequence = sequence.get() + 1L;
while (true)
{
try
{
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
// ,nextSequence ?
while (nextSequence <= availableSequence)
{
event = ringBuffer.get(nextSequence);
eventHandler.onEvent(event, nextSequence,nextSequence == availableSequence);
nextSequence++;
}
sequence.set(nextSequence - 1L);// 1, (nextSequence - 1L) event
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex,nextSequence, event);//
sequence.set(nextSequence);//
nextSequence++;
}
}
notifyShutdown();
running.set(false);
}
ClaimStrategy.java
Sequencerの のevent publishersがeventシーケンスを するためのポリシー .
の3つの があります.
SingleThreadedClaimStrategy.JAva:パブリッシャーのポリシーに する スレッド は、 スレッドでpublisherを うシーンでのみ できます.
な :// availableCapacity
// dependentSequences
public boolean hasAvailableCapacity(final int availableCapacity, final Sequence[] dependentSequences)
{
final long wrapPoint = (claimSequence.get() + availableCapacity) - bufferSize;// ( )+ -
if (wrapPoint > minGatingSequence.get())
{
long minSequence = getMinimumSequence(dependentSequences);
// ( )
minGatingSequence.set(minSequence);
if (wrapPoint > minSequence)
{
return false;// ( ), ,
}
}
return true;
}
private void waitForFreeSlotAt(final long sequence, final Sequence[] dependentSequences)
{
final long wrapPoint = sequence - bufferSize;
if (wrapPoint > minGatingSequence.get())
{
long minSequence;
while (wrapPoint > (minSequence = getMinimumSequence(dependentSequences)))
{
LockSupport.parkNanos(1L);// 1
}
minGatingSequence.set(minSequence);
}
}
MultiThreadedClaimStrategy.java @Override
public long incrementAndGet(final Sequence[] dependentSequences)
{
final MutableLong minGatingSequence = minGatingSequenceThreadLocal.get();
waitForCapacity(dependentSequences,minGatingSequence);// ?
final long nextSequence = claimSequence.incrementAndGet();
waitForFreeSlotAt(nextSequence,dependentSequences, minGatingSequence);
return nextSequence;
}
@Override
public long incrementAndGet(final int delta, final Sequence[] dependentSequences)
{
final long nextSequence = claimSequence.addAndGet(delta);
waitForFreeSlotAt(nextSequence,dependentSequences, minGatingSequenceThreadLocal.get());
return nextSequence;
}
@Override
public void serialisePublishing(final long sequence, final Sequence cursor, final int batchSize)
{
int counter = RETRIES;
while (sequence - cursor.get() > pendingPublication.length())
{
if (--counter == 0)
{
Thread.yield();
counter = RETRIES;
}
}
long expectedSequence = sequence - batchSize;
for (long pendingSequence = expectedSequence + 1;pendingSequence <= sequence; pendingSequence++)
{
pendingPublication.set((int) pendingSequence& pendingMask, pendingSequence);
}
long cursorSequence = cursor.get();
if (cursorSequence >= sequence)
{
return;
}
expectedSequence = Math.max(expectedSequence,cursorSequence);
long nextSequence = expectedSequence + 1;
while (cursor.compareAndSet(expectedSequence, nextSequence))
{
expectedSequence = nextSequence;
nextSequence++;
if (pendingPublication.get((int) nextSequence & pendingMask) != nextSequence) // ? nextSequence PendingBufferSize 。
{
break;
}
}
}
MultiThreadedLowContentionClaimStrategy.java
MultiThreadedClaimStrategyとJAvaは のとおりです.@Override
public void serialisePublishing(final long sequence, final Sequence cursor, final int batchSize)
{
final long expectedSequence = sequence - batchSize;
while (expectedSequence != cursor.get())// ?
{
// busy spin
}
cursor.set(sequence);
}
EventPublisher.java
タイム・パブリッシャ、プライマリ・コード:
private void translateAndPublish(final EventTranslator<E> translator, final long sequence)
{
try
{
translator.translateTo(ringBuffer.get(sequence),sequence);// translator sequence event , event.
}
finally
{
ringBuffer.publish(sequence);
}
}
WaitStrategy.java
EventProcessorがcursorを つこのsequenceをカスタマイズするポリシーには、 の4つの があります.
/**
* Blocking strategy that uses a lock andcondition variable for {@linkEventProcessor}s waiting on a barrier.
*
* This strategy can be used when throughputand low-latencyare not as important as CPU resource.
*/
BlockingWaitStrategy.JAva:lockを しているので、throughputやlow-latencyの が くない にのみ できます.
/**
* Busy Spin strategy that uses a busy spinloop for {@link com.lmax.disruptor.EventProcessor}s waiting on a barrier.
*
* This strategy will use CPU resource to avoidsyscalls which can introduce latency jitter. It is best
* used when threads can be bound to specificCPU cores.
*/
BusySpinWaitStrategy.JAva:これはcpuを する で、yield()をしません.
/**
* Sleeping strategy that initially spins, thenuses a Thread.yield(), and eventually for the minimum number of nanos
* the OS and JVM will allow while the {@link com.lmax.disruptor.EventProcessor}s are waiting on a barrier.
*
* This strategy is a good compromise betweenperformance and CPU resource. Latency spikes can occur after quiet periods.
*/
SleepingWaitStrategy.JAva:counterの をして、100 でyield()をして、0 でLockSupportをします.parkNanos(1L);
/**
* Yielding strategy that uses a Thread.yield()for {@link com.lmax.disruptor.EventProcessor}s waiting on a barrier
* after an initially spinning.
*
* This strategy is a good compromise betweenperformance and CPU resource without incurring significant latency spikes.
*/
YieldingWaitStrategy.JAva:counter=0、yield()をします.