【Netty】NioEventLoopの起動(四):runAllTasks


レビュー
方法を忘れた場合は、newChild章に戻ってくださいio.netty.channel.nio.NioEventLoop#run
...
//   I/O  
select(wakenUp.getAndSet(false));
...
//     select   I/O  
processSelectedKeys();
...
//           
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
...

前2節ではselectメソッドとprocessSelectedKeysメソッドの大まかな実行ロジックを記録したが,この2つのメソッドは本節のrunAlltTasksメソッドとは大きく関係なく,第1節【起動前概要】と関係があり,覚えていなければ思い出したほうがよい.
Netty Version:4.1.6
任務はどこにあるの?
通常タスク
runAlltTasksは、その名の通りタスクキュー内のタスクを処理することであり、selectメソッドを説明する節では、通常のタスクのほかに、タイミングタスクも言及されています.では、これまで話していなかったものを全部補充します.
まず、視点をio.netty.util.concurrent.SingleThreadEventExecutor#executeに戻し、どうやって入ってきたのか忘れたら、newChildの節に戻って振り返ることができます.
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

addTaskメソッドへのアクセス:io.netty.util.concurrent.SingleThreadEventExecutor#addTask
    protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (!offerTask(task)) {
            reject(task);
        }
    }

さらにofferTaskメソッドに入ります.
    final boolean offerTask(Runnable task) {
        if (isShutdown()) {
            reject();
        }
        return taskQueue.offer(task);
    }
  • ここのtaskQueue(mpscQueue)もnewChildの節で話したことがある.
  • でofferメソッドはタスクをtaskQueueに追加し、taskQueueは通常のタスクを格納するために使用されます.

  • タイミングタスク
    Nettyは、NioEventLoopの親であるA b s t r a c t S h e d u l edEventExecutorのscheduleメソッドを追加するインタフェースを提供しています.
        @Override
        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
            ObjectUtil.checkNotNull(callable, "callable");
            ObjectUtil.checkNotNull(unit, "unit");
            if (delay < 0) {
                throw new IllegalArgumentException(
                        String.format("delay: %d (expected: >= 0)", delay));
            }
            return schedule(new ScheduledFutureTask<V>(
                    this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
        }
    
  • ScheduledFutureTaskはタイミングタスクキューの要素のタイプで、前にselectメソッドを話したときに話しましたが、忘れたものは帰ってみてください.

  • 次にscheduleメソッドに入ります:io.netty.util.concurrent.AbstractScheduledEventExecutor#schedule(java.util.concurrent.Callable, long, java.util.concurrent.TimeUnit)
        <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
            if (inEventLoop()) {
                scheduledTaskQueue().add(task);
            } else {
                execute(new Runnable() {
                    @Override
                    public void run() {
                        scheduledTaskQueue().add(task);
                    }
                });
            }
    
            return task;
        }
    
  • inEventLoop()は、第1節【起動前の概要】で述べた.
  • executeメソッドの目的は、スレッドのセキュリティ(NioEventLoopのスレッドでなければ)を保証することであり、「ThreadPerTaskExcutor」と述べている.
  • いずれにしても、最終タスクはio.netty.util.concurrent.AbstractScheduledEventExecutor#schedule(io.netty.util.concurrent.ScheduledFutureTask)に追加されました.

  • 以上、runAllTasksの前に通常タスクがmpscQueueに格納され、タイミングタスクがPriorityQueueに格納されることが知られている.次にメインラインに戻りましょう.
    追跡runAllTasksメソッド
    まず、視点をPriorityQueue()メソッドに戻し(上を振り返るのを忘れた)、次のコードを見つけます:io.netty.channel.nio.NioEventLoop#run
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            final long ioTime = System.nanoTime() - ioStartTime;
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
    
  • ioRatioのデフォルト値は50なのでelseコードブロックに入ります.
  • ioTimeはprocessSelectedKeys()を実行する.かかった時間.
  • runAllTasks(ioTime * (100 - ioRatio)/ioRatio);コードの意味:I/Oイベントの処理時間とタスクの実行時間を1:1にする.
  • これも、前のselectがI/Oイベントを検出したときに、タイミングタスク距離が実行された時間<0.5 msであることが判明すると、非ブロックメソッドになる理由です.

  • runAllTasksメソッドに入ります.ここで【座標1】:io.netty.channel.nio.NioEventLoop#run
        protected boolean runAllTasks(long timeoutNanos) {
            //     
            fetchFromScheduledTaskQueue();
            //       
            Runnable task = pollTask();
            if (task == null) {
                afterRunningAllTasks();
                return false;
            }
    
            final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
            long runTasks = 0;
            long lastExecutionTime;
            for (;;) {
                safeExecute(task);
    
                runTasks ++;
    
                // Check timeout every 64 tasks because nanoTime() is relatively expensive.
                // XXX: Hard-coded value - will make it configurable if it is really a problem.
                if ((runTasks & 0x3F) == 0) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    if (lastExecutionTime >= deadline) {
                        break;
                    }
                }
    
                task = pollTask();
                if (task == null) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    break;
                }
            }
    
            afterRunningAllTasks();
            this.lastExecutionTime = lastExecutionTime;
            return true;
        }
    

    fetchFromScheduledTaskQueueメソッドに入ります.これはタスクを集約する方法です.io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)
        private boolean fetchFromScheduledTaskQueue() {
            long nanoTime = AbstractScheduledEventExecutor.nanoTime();
            Runnable scheduledTask  = pollScheduledTask(nanoTime);
            while (scheduledTask != null) {
                if (!taskQueue.offer(scheduledTask)) {
                    // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
                    scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
                    return false;
                }
                scheduledTask  = pollScheduledTask(nanoTime);
            }
            return true;
        }
    

    コードの簡単な分析:
  • nanoTimeは、締め切り時間と見なすことができます.
  • pollScheduledTask(nanoTime);つまり、タイムアウトイベントに最も近いタイムアウトタスクをタイムアウトタスクキューから取り出し、それがなければnullを返します.(タイミングタスクキューは、selectメソッドの章で説明した締め切り時間に従って並べ替えられていることを忘れないでください.)
  • taskQueue.offer(scheduledTask):取り出したタイミングタスクを通常のタスクキューtaskQueue(mpscQueue)に追加しようとします.taskQueueのスペースが十分であれば、trueを正常に返します.そうでなければ、追加に失敗してfasleを返し、ifのコードブロックでタスクをタイミングタスクキューに返します.

  • 簡単にfetchFromScheduledTaskQueueメソッドを解析した結果、このメソッドを実行すると、処理されるすべての通常のタスク、実行されるタイミングタスクが、同じキュー、すなわちmpscQueue(TaskQueue)に格納されることが分かった.
    次に、[座標1]に視点を戻して下を見続けます(コードを読み返さないので、便宜上、コンパイラでコードを開いてください):io.netty.util.concurrent.SingleThreadEventExecutor#fetchFromScheduledTaskQueue
  • deadline:計算された締め切り時間、すなわちforループタスクはdeadline時間までしか実行できません.
  • runTasksがタスクを実行するカウンタは、タスクを実行するたびに1.
  • 増加する.
  • safeExecute(task):シリアル(スレッドセキュリティ)でタスクを実行します.
  • if((runTasks&0 x 3 F)==0):64回のタスクが実行されると、ifコードブロックに進み続けます.ifコードブロックでは、カットオフ時間に達したか否かを判断し、現在時刻>=deadlineであればbreakはforループタスク実行を終了する.
  • afterRunningAllTasks():最終的なタスクを実行します.

  • これでrunAllTasksメソッドの大まかな論理理が完了する.
    小結
  • runAllTasksメソッドが実行されると、まずタスクの集約が行われます.
  • タスクの集約方法は、実行するタイミングタスクを取得し、通常のタスクのキューに追加することです.
  • タスクキューを処理するとき、タスクは1つずつ実行される.
  • runAllTasksタスクの実行にも時間があり、runAllTasksメソッドがタスクを実行する時間は、デフォルトでは前のprocessSelectedKeysがI/Oイベントを処理する時間と等しい.
  • runAllTasks実行中、64回のタスクを実行するたびに締め切り時間を超えたかどうかをチェックします.なぜなら、チェック時間の操作に時間がかかり、64回のこの回数はNettyが経験に基づいてハードコーディングしたからです.