JAva非同期クエリー転送同期の多様な実現方式:ループ待ち、CountDownLatch、EventListener


ひどうきかいてんどうき
ビジネスニーズ
一部のインタフェースクエリのフィードバック結果は非同期で返され、すぐにクエリ結果を取得できません.
  • 通常処理ロジック
  • 非同期操作をトリガーし、一意のIDを渡します.
    非同期結果が返されるまで待って、入力された一意の識別子に基づいて、今回の結果を一致させます.
  • 同期への変換方法
  • 通常のアプリケーションシーンは多いが、データストレージをしたくない場合があり、呼び出し結果を簡単に取得したいだけだ.
    つまり、同期操作の結果を達成するには、どうすればいいのでしょうか.
    構想
  • 非同期操作開始
  • 非同期結果が戻るまで待機(タイムアウト設定可)
  • 結果が返された後、非同期操作結果が一括して返される
  • ループ待ち
  • LoopQuery.java
  • query()を使用して、非同期の操作remoteCallback()の実行が完了すると、同期して戻ります.
    public class LoopQuery implements Async {
    
        private String result;
    
        private static final Logger LOGGER = LogManager.getLogger(LoopQuery.class.getName());
    
        @Override
        public String query(String key) {
            startQuery(key);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    remoteCallback(key);
                }
            }).start();
    
            final String queryResult = endQuery();
            LOGGER.info("    : {}", queryResult);
            return queryResult;
        }
    
        /**
         *     
         * @param key     
         */
        private void startQuery(final String key) {
            LOGGER.info("    : {}", key);
        }
    
        /**
         *             
         *
         * @param key     
         */
        private void remoteCallback(final String key) {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.result = key + "-result";
            LOGGER.info("remoteCallback set result: {}", result);
        }
    
        /**
         *     
         * @return     
         */
        private String endQuery() {
            while (true) {
                if (null == result) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    return result;
                }
            }
        }
    }
  • main()
  • public static void main(String[] args) {
        new LoopQuery().query("12345");
    }
  • 試験結果
  • 18:14:16.491 [main] INFO  com.github.houbb.thread.learn.aysnc.loop.LoopQuery -     : 12345
    18:14:21.498 [Thread-1] INFO  com.github.houbb.thread.learn.aysnc.loop.LoopQuery - remoteCallback set result: 12345-result
    18:14:21.548 [main] INFO  com.github.houbb.thread.learn.aysnc.loop.LoopQuery -     : 12345-result

    CountDownLatch
  • AsyncQuery.java
  • CountDownLatchクラスを使用して同期の効果を達成します.
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    public class AsyncQuery {
    
        private static final Logger LOGGER = LogManager.getLogger(AsyncQuery.class.getName());
    
        /**
         *   
         */
        private String result;
    
        /**
         *        
         * @param key
         */
        public void asyncQuery(final String key) {
            final CountDownLatch latch = new CountDownLatch(1);
            this.startQuery(key);
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info("        ");
                    remoteCallback(key, latch);
                    LOGGER.info("        ");
                }
            }).start();
    
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            this.endQuery();
        }
    
        private void startQuery(final String key) {
            LOGGER.info("    : {}", key);
        }
    
        /**
         *             
         * @param key
         */
        private void remoteCallback(final String key, CountDownLatch latch) {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.result = key + "-result";
            latch.countDown();
        }
    
        private void endQuery() {
            LOGGER.info("    : {}", result);
        }
    
    }
  • main()
  • public static void main(String[] args) {
        AsyncQuery asyncQuery = new AsyncQuery();
        final String key = "123456";
        asyncQuery.asyncQuery(key);
    }
  • ログ
  • 18:19:12.714 [main] INFO  com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery -     : 123456
    18:19:12.716 [Thread-1] INFO  com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery -         
    18:19:17.720 [main] INFO  com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery -     : 123456-result
    18:19:17.720 [Thread-1] INFO  com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery -         

    Spring EventListener
    観察者モードを使ってもいいです.(シナリオ1の最適化)
    ここではspringと組み合わせて使用します.
  • BookingCreatedEvent.java

  • 転送プロパティのオブジェクトを定義します.
    public class BookingCreatedEvent extends ApplicationEvent {
    
        private static final long serialVersionUID = -1387078212317348344L;
    
        private String info;
    
        public BookingCreatedEvent(Object source) {
            super(source);
        }
    
        public BookingCreatedEvent(Object source, String info) {
            super(source);
            this.info = info;
        }
    
        public String getInfo() {
            return info;
        }
    }
  • BookingService.java

  • 説明:this.context.publishEvent(bookingCreatedEvent);がトリガーされると、@EventListenerで指定された方法で傍受されます.
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.event.EventListener;
    import org.springframework.stereotype.Service;
    
    import java.util.concurrent.TimeUnit;
    
    @Service
    public class BookingService {
    
        @Autowired
        private ApplicationContext context;
    
        private volatile BookingCreatedEvent bookingCreatedEvent;
    
        /**
         *        
         * @param info
         * @return
         */
        public String asyncQuery(final String info) {
            query(info);
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    remoteCallback(info);
                }
            }).start();
    
            while(bookingCreatedEvent == null) {
                //..    
                //     。
                try {
                    TimeUnit.MILLISECONDS.sleep(1);
                } catch (InterruptedException e) {
                    //...
                }
                //2.         event...
    
            }
    
            final String result = bookingCreatedEvent.getInfo();
            bookingCreatedEvent = null;
            return result;
        }
    
        @EventListener
        public void onApplicationEvent(BookingCreatedEvent bookingCreatedEvent) {
            System.out.println("        : " + bookingCreatedEvent.getInfo());
            this.bookingCreatedEvent = bookingCreatedEvent;
            System.out.println("        : " + this.bookingCreatedEvent.getInfo());
        }
    
        /**
         *     
         * @param info
         */
        public void query(final String info) {
            System.out.println("    : " + info);
        }
    
        /**
         *     
         * @param info
         */
        public void remoteCallback(final String info) {
            System.out.println("      : " + info);
    
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            //       
            String result = info + "-result";
            BookingCreatedEvent bookingCreatedEvent = new BookingCreatedEvent(this, result);
            //  event
            this.context.publishEvent(bookingCreatedEvent);
        }
    }
  • 試験方法
  • @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(classes = SpringConfig.class)
    public class BookServiceTest {
    
        @Autowired
        private BookingService bookingService;
    
        @Test
        public void asyncQueryTest() {
            bookingService.asyncQuery("1234");
        }
    
    }
  • ログ
  • 2018-08-10 18:27:05.958  INFO  [main] com.github.houbb.spring.lean.core.ioc.event.BookingService:84 -     :1234
    2018-08-10 18:27:05.959  INFO  [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:93 -       :1234
         : 1234-result
    2018-08-10 18:27:07.964  INFO  [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:73 -         : 1234-result
    2018-08-10 18:27:07.964  INFO  [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:75 -         : 1234-result
    2018-08-10 18:27:07.964  INFO  [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:106 -     event
    2018-08-10 18:27:07.964  INFO  [main] com.github.houbb.spring.lean.core.ioc.event.BookingService:67 -     : 1234-result
    2018-08-10 18:27:07.968  INFO  [Thread-1] org.springframework.context.support.GenericApplicationContext:993 - Closing org.springframework.context.support.GenericApplicationContext@5cee5251: startup date [Fri Aug 10 18:27:05 CST 2018]; root of context hierarchy

    タイムアウトと空のサイクル
    くうサイクル
    空のサイクルはcpuを急上昇させます
    while(true) {
    }
  • 解決方法
  • while(true) {
        //     
        TimeUnit.sleep(1);
    }

    タイムアウト作成
    フィードバックをずっと待つことはできません.タイムアウト時間を設定できます.
    /**
     *           
     * @param key key
     * @param timeoutInSeconds     
     * @param    
     * @return   。         
     */
    public  T loopWaitForValue(final String key, long timeoutInSeconds) {
        long startTime = System.nanoTime();
        long deadline = startTime + TimeUnit.SECONDS.toNanos(timeoutInSeconds);
        //1.        ,   key        。     
        while(ObjectUtil.isNull(map.get(key))) {
            try {
                TimeUnit.MILLISECONDS.sleep(5);
            } catch (InterruptedException e) {
                LOGGER.warn("Loop meet InterruptedException, just ignore it.", e);
            }
            //     
            long currentTime = System.nanoTime();
            if(currentTime >= deadline) {
                throw new BussinessException(ErrorCode.READ_TIME_OUT);
            }
        }
        final T target = (T) map.get(key);
        LOGGER.debug("loopWaitForValue get value:{} for key:{}", JSON.toJSON(target), key);
        //2.        ,         
        map.remove(key);
        return target;
    }

    コードアドレス
    loop
    countdownlatch
    spring-event-listener