Disruptorノート(四)-キークラスとコード


AggregateEventHandler.java
EventHandlerリストのカプセル化は、EventHandlerリストの機能に似ており、ライフサイクルの管理、onStart onShutdownも実現しています.
 
Sequence.java
Cache line padded sequence counterはCache lineのシーケンスカウンタを補完し、ringbufferとBatchEventProcessorはこのようなカウントに使用します.
補完方法:
 
     public 
 long 
  p1 
 , 
  p2 
 , 
  p3 
 , 
  p4 
 , 
  p5 
 , 
  p6 
 , 
  p7 
 ; 
 // cache line padding,  padding1 
  
      
 private 
 volatile 
 long 
  cursor  
 = 
  INITIAL_CURSOR_VALUE 
  
  
 

    publiclong p8, p9, p10, p11, p12, p13, p14;// cache line padding.  padding2

    1: object(0~8byte)+ padding1,    

                  cursor+padding2

    2: padding1+ cursor,

                  padding2 + object

, Sequence instance Cache line

  :http://mechanical-sympathy.blogspot.com/2011/07/false-sharing.html

64 , Hotspot JVM , 24 hash code 8 , 。 Array "word" 。 , 8 byte 。 packing , field ( ) :

1.doubles(8) and longs(8)

2.ints(4) and floats(4)

3.shorts(2) and chars(2)

4.booleans(1) and bytes(1)

5.references(4/8)

6.<repeat for sub-class fields>

cache line: field 7 long(8)

BatchEventProcessor.java RingBuffer event EventHandler 。

public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();
 
        notifyStart();
 
        T event = null;
        long nextSequence = sequence.get() + 1L;
        while (true)
        {
            try
            {
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                 //    ,nextSequence       ?
                while (nextSequence <= availableSequence)
                {
                    event = ringBuffer.get(nextSequence);
                    eventHandler.onEvent(event, nextSequence,nextSequence == availableSequence);
                    nextSequence++;
                }
 
                sequence.set(nextSequence - 1L);//    1,  (nextSequence - 1L) event      
            }
            catch (final AlertException ex)
            {
               if (!running.get())
               {
                   break;
               }
            }
            catch (final Throwable ex)
            {
                exceptionHandler.handleEventException(ex,nextSequence, event);//           
                sequence.set(nextSequence);//         
                nextSequence++;
            }
        }
 
        notifyShutdown();
 
        running.set(false);
    }

 
                         
ClaimStrategy.java
Sequencerの のevent publishersがeventシーケンスを するためのポリシー .
の3つの があります.
SingleThreadedClaimStrategy.JAva:パブリッシャーのポリシーに する スレッド は、 スレッドでpublisherを うシーンでのみ できます.
 
な :
// availableCapacity          
// dependentSequences      
public boolean hasAvailableCapacity(final int availableCapacity, final Sequence[] dependentSequences)
    {
        final long wrapPoint = (claimSequence.get() + availableCapacity) - bufferSize;//             (    )+    -
        if (wrapPoint > minGatingSequence.get())
        {
            long minSequence = getMinimumSequence(dependentSequences);
//             (    )
            minGatingSequence.set(minSequence);
 
            if (wrapPoint > minSequence)
            {
                return false;//                         (    ),      ,                
            }
        }
 
        return true;
    }
 
private void waitForFreeSlotAt(final long sequence, final Sequence[] dependentSequences)
    {
        final long wrapPoint = sequence - bufferSize;
        if (wrapPoint > minGatingSequence.get())
        {
            long minSequence;
            while (wrapPoint > (minSequence = getMinimumSequence(dependentSequences)))
            {
                LockSupport.parkNanos(1L);//  1  
            }
 
            minGatingSequence.set(minSequence);
        }
}

 
 
 
MultiThreadedClaimStrategy.java
@Override
    public long incrementAndGet(final Sequence[] dependentSequences)
    {
        final MutableLong minGatingSequence = minGatingSequenceThreadLocal.get();
        waitForCapacity(dependentSequences,minGatingSequence);//    ?
 
        final long nextSequence = claimSequence.incrementAndGet();
        waitForFreeSlotAt(nextSequence,dependentSequences, minGatingSequence);
 
        return nextSequence;
    }
 
    @Override
    public long incrementAndGet(final int delta, final Sequence[] dependentSequences)
    {
        final long nextSequence = claimSequence.addAndGet(delta);
        waitForFreeSlotAt(nextSequence,dependentSequences, minGatingSequenceThreadLocal.get());
 
        return nextSequence;
}
 
 
 
 
@Override
public void serialisePublishing(final long sequence, final Sequence cursor, final int batchSize)
    {
        int counter = RETRIES;
        while (sequence - cursor.get() > pendingPublication.length())
        {
            if (--counter == 0)
            {
                Thread.yield();
                counter = RETRIES;
            }
        }
 
        long expectedSequence = sequence - batchSize;
        for (long pendingSequence = expectedSequence + 1;pendingSequence <= sequence; pendingSequence++)
        {
            pendingPublication.set((int) pendingSequence& pendingMask, pendingSequence);
        }
 
        long cursorSequence = cursor.get();
        if (cursorSequence >= sequence)
        {
            return;
        }
 
        expectedSequence = Math.max(expectedSequence,cursorSequence);
        long nextSequence = expectedSequence + 1;
        while (cursor.compareAndSet(expectedSequence, nextSequence))
        {
            expectedSequence = nextSequence;
            nextSequence++;
            if (pendingPublication.get((int) nextSequence & pendingMask) != nextSequence)  //       ?   nextSequence    PendingBufferSize          。
            {
                break;
            }
        }
    }

 
 
MultiThreadedLowContentionClaimStrategy.java
MultiThreadedClaimStrategyとJAvaは のとおりです.
@Override
    public void serialisePublishing(final long sequence, final Sequence cursor, final int batchSize)
    {
        final long expectedSequence = sequence - batchSize;
        while (expectedSequence != cursor.get())//      ?
        {
            // busy spin
        }
 
        cursor.set(sequence);
    }
 

 
EventPublisher.java
タイム・パブリッシャ、プライマリ・コード:
 
 
 
 
 
private void translateAndPublish(final EventTranslator<E> translator, final long sequence)
    {
        try
        {
            translator.translateTo(ringBuffer.get(sequence),sequence);//       translator   sequence  event ,   event.
        }
        finally
        {
            ringBuffer.publish(sequence);
        }
}
 

WaitStrategy.java
EventProcessorがcursorを つこのsequenceをカスタマイズするポリシーには、 の4つの があります.
/**
 * Blocking strategy that uses a lock andcondition variable for {@linkEventProcessor}s waiting on a barrier.
 *
 * This strategy can be used when throughputand low-latencyare not as important as CPU resource.
 */
BlockingWaitStrategy.JAva:lockを しているので、throughputやlow-latencyの が くない にのみ できます.
 
/**
 * Busy Spin strategy that uses a busy spinloop for {@link com.lmax.disruptor.EventProcessor}s waiting on a barrier.
 *
 * This strategy will use CPU resource to avoidsyscalls which can introduce latency jitter. It is best
 * used when threads can be bound to specificCPU cores.
 */
BusySpinWaitStrategy.JAva:これはcpuを する で、yield()をしません.
 
/**
 * Sleeping strategy that initially spins, thenuses a Thread.yield(), and eventually for the minimum number of nanos
 * the OS and JVM will allow while the {@link com.lmax.disruptor.EventProcessor}s are waiting on a barrier.
 *
 * This strategy is a good compromise betweenperformance and CPU resource. Latency spikes can occur after quiet periods.
 */
SleepingWaitStrategy.JAva:counterの をして、100 でyield()をして、0 でLockSupportをします.parkNanos(1L);
/**
 * Yielding strategy that uses a Thread.yield()for {@link com.lmax.disruptor.EventProcessor}s waiting on a barrier
 * after an initially spinning.
 *
 * This strategy is a good compromise betweenperformance and CPU resource without incurring significant latency spikes.
 */
YieldingWaitStrategy.JAva:counter=0、yield()をします.