独自のJavaロックをカスタマイズし、不安定なサードパーティを効率的に回避

21337 ワード

JAva jucパッケージの下には、多くの同時ロックツールが用意されていますが、日常開発では、さまざまな理由でマルチスレッドを使用して問題を同時に処理しています.しかし、すべてのシーンでjucやjava自体が提供するロックを使用して、マルチスレッドがもたらす同時問題を制御できるわけではありません.この时、私たちは自分のビジネスシーンに基づいて、私たち自身の専用のロックをカスタマイズして、私たちのニーズを満たす必要があります.
システムが多くのサードパーティ企業と連携して業務を完了するのに役立つと仮定しますが、これらのサードパーティのサービスインタフェースの安定性はバラツキがあり、従来の過程でインタフェースの安定性を監視するための監視措置を行う可能性がありますが、これは問題があります.つまり、私たちが操作に失敗したことを監視すると、実際にはユーザーが操作に失敗した結果を生み出すことがあります.これはユーザー体験を重視するインターネット会社には耐えられないに違いありません.そのため、ユーザー一人一人がアクセスするときに複数の第三者を同時に呼び出すことができます.戻り結果があれば、ユーザーに相応の展示をすることができます.これで1、2人のサードパーティが故障してもユーザーには感知されませんが、もう1つの問題が来て、同時にサードパーティを呼び出してどの結果を選ぶか、簡単です.もちろん、最も速く戻ります.具体的には、最も速いリターン結果をどのように選択するかは、今日のトピックに使用され、自分のロックをカスタマイズします.
上記の大まかな流れは、ユーザ要求を受け付ける→マルチスレッド組立メッセージ呼び出しサードパーティ→ブロック待ち→任意の結果が戻ってメインスレッドを起動して処理を継続すると記述することができる.このプロセスに基づいて、このブロックは実際にはマルチスレッドのロックで実現できると自然に考えられています.メインスレッドは、タスクをスレッドプールのマルチスレッド処理に提出した後、ロックを取得します.このロックは、スレッドの最初の第三者が結果を返すときに取得する必要があります.これにより、メインスレッドを実行し続けます.大まかな考え方があります.具体的にどのように実現するかを見てみましょう.
Javaのソースコードを見たことがある学生はAbstractQueuedSynchronizerに慣れていないに違いない.javaの多くのロックReentrantLock、ReadWriteLock、ReentrantReadWriteLock、その他のコンカレントツールCountDownLatch、Semaphoreなどはこの抽象クラスに基づいて実現され、AbstractQuedSynchronizerではFIFOキューを通じてロック待ちのスレッドを管理している.スレッドのロック状態をstateのint変数で制御することで、その内部でもスレッドがロックされ、ロックされる方法を実現してくれました.ここではCountDownLatchを参照して実現します.私たちのニーズはCountDownLatchとは正反対です.CountDownLatchは複数のスレッドが処理されてから継続することができますが、私たちは1つの処理があれば継続することができます.簡単に言えば,主スレッド起動の判断条件が一致しない.まずCountDownLatchの使用を見てみましょう.
  
public static void main(String[] args) throws InterruptedException {
        CountDownLatch await = new CountDownLatch(5);

        //          
        for (int i = 0; i < 5; ++i) {
            new Thread(new MyRunnable(await)).start();
        }
        await.await();
        System.out.println("over!");
    }

class MyRunnable implements Runnable {

    private final CountDownLatch await;

    public MyRunnable(CountDownLatch await) {
        this.await = await;
    }

    public void run() {
        try {
            //    
            await.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

ソケットを熟知した後,より良い説明のために,その一部のソースコード(jdk 1.8の一部の方法)を導入した.
 private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

  
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

  
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

  
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

  
    public void countDown() {
        sync.releaseShared(1);
    }

 }

コードから分かるように、CountDownLatchは主に1つの内部クラスSyncに依存して実現され、SyncはAbstractQueuedSynchronizerのtryAcquireSharedとtryReleaseSharedメソッドを実現し、この2つのメソッドの主な目的は、tryAcquireSharedはawaitメソッドを呼び出した後にブロックする必要があるか実行する必要があるかを判断することである.tryReleaseSharedはstateを解放するための状態であり、stateの状態はtryAcquireSharedの戻り結果にも影響し、スレッドがブロックされているか、または呼び出しられて実行を継続するかを決定し、具体的な判断ロジックはAbstractQueuedSynchronizerにおけるacquireSharedInterruptiblyメソッドにある
  
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

これに基づいて、独自のクラスを使用してtryAcquireSharedとtryReleaseSharedメソッドを実装することで、stateステータスを管理し、プライマリ・スレッドがロックを取得して実行を継続し、いつブロックする必要があるかを決定することができます.AbstractQueuedSynchronizerの内部メカニズムによって、サブスレッドの処理情報をタイムリーに取得し、最も速くメインスレッドに戻ってビジネスロジックを処理することができます.実装コードは次のとおりです.
package com.chengxiansheng.common;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class ReqBraker {

    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            setState(count);
        }
    
    
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { int c = getState(); c = 0; return true; } } private final Sync sync; private ResultDto resultDto;// public ReqBraker(){ this.sync = new Sync(1); } /** * * , */ public void reqReuturn(ResultDto resultDto){ this.resultDto = resultDto; sync.tryReleaseShared(1); } /** * * @param timeout * @param unit * @return * @throws InterruptedException */ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /** * * @return */ public ResultDto getResultDto(){ return resultDto; }
  

}
package com.chengxiansheng.common;

public class RequsetWorker implements Runnable  {
    
    private RequestDto requestDto;//    
    
    private ReqBraker  reqBraker;
    
    public RequsetWorker(RequestDto requestDto, ReqBraker  reqBraker){
        this.requestDto = requestDto;
        this.reqBraker = reqBraker;
    }
    
    public void run() {
        //     
        //      
        if(requestDto.isSeccess()){
            reqBraker.reqReuturn(resultDto);
        }
    }
}

具体的な使い方は以下の通りです.
public void requestService(RequestDto request, List serviceList) {
    try {
            ReqBraker braker = new ReqBraker();
            for (Service service : serviceList) {
                executorService.submit(new RequsetWorker(braker, request);
            }
            braker.await(5, TimeUnit.SECONDS); //           5S

            ResultDto result = braker.getResultDto();
            if(result == null){ //           
                //      ;
            }
            //      ;
  } catch (Exception e) {
    //    
}

もちろん実際の使用はこれより複雑かもしれません.私たちはいろいろな業務処理状況を考慮しなければならないからです.本文はただ一つの例で、Javaのツールを合理的に利用する同僚を紹介して、私たちが使用するシーンに合った方法をカスタマイズしてこそ、より効率的なコードを書くことができます.より効率的なシステムを実現します.AbstractQueuedSynchronizerの役割もそれだけではありませんが、マルチスレッドをよりよく回転させ、同時に回転させ、さまざまな複雑な処理と論理を革新的に実現することができることを把握しています.