Javaコンカレント——基石編(下)

11548 ワード

Object waitとnotifyの実現メカニズム
Java Objectクラスはnativeに基づいて実現されたwaitとnotifyスレッド間の通信方式を提供している.これはsynchronized以外の独立した同時基礎部分であり、waitとnotify・に関する部分はmonitorのexitを分析する際にすでにいくつかの関連があるが、あまり深くはなく、多くの疑問を残している.このセクションでは、詳細に分析します.
wait実装
ObjectMonitorクラスのwait関数コードは次のように実装されます.
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
  ...
  if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
    ...
    //     ,             THROW(vmSymbols::java_lang_InterruptedException());
    ...
  }
  ...
  ObjectWaiter node(Self);
  node.TState = ObjectWaiter::TS_WAIT;
  Self->_ParkEvent->reset();
  OrderAccess::fence();

  Thread::SpinAcquire(&_WaitSetLock, "WaitSet - add");
  AddWaiter(&node);
  Thread::SpinRelease(&_WaitSetLock);

  if ((SyncFlags & 4) == 0) {
    _Responsible = NULL;
  }

  ...
  // exit the monitor   exit(true, Self); 
  ...
  if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
        // Intentionally empty       } else if (node._notified == 0) {
        if (millis <= 0) {
          Self->_ParkEvent->park();
        } else {
          ret = Self->_ParkEvent->park(millis);
        }
  }
  //   notify             ...
}

例としてwait関数のコア機能部分のみをリストし、まず現在のスレッドが中断可能で中断されたかどうかを判断し、もしそうであればInterruptedException異常を直接投げ出し、wait待機に入ることなく、そうでなければ、次の待機プロセスを実行する必要があり、まずSelf現在のスレッドに基づいてObjectWaiterオブジェクトノードを新規作成し、この対象はmonitorのenterを前に分析して見たことがある.新しいノードを生成した後、このノードを待機キューに配置し、AddWaiter関数を呼び出すことでnodeのエンキュー操作を実現する必要がありますが、エンキュー操作の前に反発ロックを取得して同時セキュリティを保証する必要があります.
void Thread::SpinAcquire(volatile int * adr, const char * LockName) {
  if (Atomic::cmpxchg (1, adr, 0) == 0) {
    return;   // normal fast-path return   }

  // Slow-path : We've encountered contention -- Spin/Yield/Block strategy.   TEVENT(SpinAcquire - ctx);
  int ctr = 0;
  int Yields = 0;
  for (;;) {
    while (*adr != 0) {
      ++ctr;
      if ((ctr & 0xFFF) == 0 || !os::is_MP()) {
        if (Yields > 5) {
          os::naked_short_sleep(1);
        } else {
          os::naked_yield();
          ++Yields;
        }
      } else {
        SpinPause();
      }
    }
    if (Atomic::cmpxchg(1, adr, 0) == 0) return;
  }
}

SpinAcquireは、デッドサイクルを通じてcas検査を繰り返してロックを取得するかどうかを判断するスピンロック実装であり、ここではcas検査で成功するかどうかを確認し、成功すれば次の比較的重量級のspinプロセスを行う必要はなく、失敗を取得すると、次のspinプロセスに入る必要があります.ここのspin論理は比較的興味深いアルゴリズムである.ここではctr変数を定義しますが、counterカウンタの意味です.(ctr&0 xFFF)=0|!os::is_MP()という条件が面白いです.私が試した回数が0 xfffより大きい場合、または現在のシステムがシングルコアプロセッサシステムである場合、次の論理を実行します.ここでspinには限界があることがわかりますが、まず最初にマルチコアシステムであればSpinPauseを直接実行します.SpinPause関数の実装を見てみましょう.この関数はCPUを実現するための待ち時間です.そのため、異なるシステムとCPUアーキテクチャの対応が実現されます.SpinPause関数linuxプラットフォームコードは次のとおりです.
int SpinPause() {
    return 0;
}

すなわちSpinPause関数が直接0に戻ることは,SpinAcquireがCPUの待ち時間を実現する方式であり,また,SpinAcquireで試みた回数が0 xFFF回に達した場合,別の方式で待ち時間を実現する.
if (Yields > 5) {
   os::naked_short_sleep(1);
} else {
   os::naked_yield();
   ++Yields;
}

まず、現在のスレッドのCPU実行時間をyield関数で譲ってみますが、5回もロックを取得していない場合はnaked_short_sleepが実現するまで待っていた、ここのnaked_short_sleep関数は名前から短い休眠待機であることがわかり,休眠待機のたびに1 ms実現する.nakedを見てみましょうyieldの実現方式は、linuxプラットフォームの実現を同様に見ます.
void os::naked_yield() {
  sched_yield();
}

ここでの実装はpthreadを直接呼び出すsched_であることがわかる.yield関数はスレッドのタイムスライス譲渡を実現する.次にlinuxプラットフォームnaked_を見てみましょうshort_sleepの実装:
void os::naked_short_sleep(jlong ms) {
  struct timespec req;

  assert(ms < 1000, "Un-interruptable sleep, short time use only");
  req.tv_sec = 0;
  if (ms > 0) {
    req.tv_nsec = (ms % 1000) * 1000000;
  } else {
    req.tv_nsec = 1;
  }

  nanosleep(&req, NULL);

  return;
}

ここではnanosleepシステム呼び出しによりスレッドを実現するtimed waitingを呼び出す.
ここではSpinAcquireの実装ロジックを分析します.シングルコアプロセッサであればyieldまたはsleepで待機を実現し、マルチコアプロセッサであれば空の実装関数を呼び出すことで待機します.シングルコアCPUであれば、空実装関数を呼び出して待機を実現するのは科学的ではありません.1つのコアしかないので、このコアを通じて待機を実現すると、ロックを解放する必要があるスレッドが実行されず、飢餓待機を引き起こす可能性があります.私たちのCPUはずっと回転していますが、問題は解決されていません.したがって,シングルコアCPUシステムであれば,空関数を呼び出すことで待機を実現することはできない.逆にマルチコアであれば,別の空きCPU上でビジーウエイト増加システムのスループットを実現することができ,JVMではシステムの演算力の増加とシステムの互換性の保証のためにどれだけの努力と実現がなされているかを見ることができる.
上のSpinAcquire関数が返されると、ロックが取得されたことを示します.nodeを待機キューに配置できます.
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
  assert(node != NULL, "should not add NULL node");
  assert(node->_prev == NULL, "node already in list");
  assert(node->_next == NULL, "node already in list");
  // put node at end of queue (circular doubly linked list)   if (_WaitSet == NULL) {
    _WaitSet = node;
    node->_prev = node;
    node->_next = node;
  } else {
    ObjectWaiter* head = _WaitSet;
    ObjectWaiter* tail = head->_prev;
    assert(tail->_next == head, "invariant check");
    tail->_next = node;
    head->_prev = node;
    node->_next = head;
    node->_prev = tail;
  }
}

ここでの実装は簡単です.nodeが双方向チェーンテーブルを挿入することです.WaitSetの末尾.チェーンテーブルの挿入が完了したら、SpinReleaseでロックを解除する必要があります.
新しいnodeノードをWaitSetキューに追加し、wait関数の次の論理を見て、次のようにします.
// exit the monitor exit(true, Self);

wait操作によるmonitorロックの解放はここで実現される.次にwait関数のpark待ちが続きます.
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
        // Intentionally empty } else if (node._notified == 0) {
    if (millis <= 0) {
        Self->_ParkEvent->park();
    } else {
        ret = Self->_ParkEvent->park(millis);
    }
}

正式なparkの前にinterruptdがあるかどうかをもう一度見て、もしあるならpark操作をスキップして、さもなくばparkブロックを行って、parkブロックの時間はwait関数呼び出しの時に入ってきた時間パラメータです.wait関数の次の操作はparkブロック起動後の善後論理であり,我々の解析には重要ではないが,ここではスキップする.
notify実装
notify関数の実装コードは次のとおりです.
void ObjectMonitor::notify(TRAPS) {
  CHECK_OWNER();
  if (_WaitSet == NULL) {
    TEVENT(Empty-Notify);
    return;
  }
  DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
  INotify(THREAD);
  OM_PERFDATA_OP(Notifications, inc(1));
}

ここでは主にWaitSetキューにスレッドがあるかどうかを判断することによってwaitが実行されたかどうかを判断し、ない場合は直接戻り、ある場合はスレッドを起動し、起動はINotify関数を呼び出すことによって実現される.
void ObjectMonitor::INotify(Thread * Self) {
  const int policy = Knob_MoveNotifyee;

  Thread::SpinAcquire(&_WaitSetLock, "WaitSet - notify");
  ObjectWaiter * iterator = DequeueWaiter();
  if (iterator != NULL) {
    ObjectWaiter * list = _EntryList;
    if (policy == 0) {
      // prepend to EntryList       if (list == NULL) {
        ...
      } else {
        ...
      }
    } else if (policy == 1) {
      // append to EntryList       if (list == NULL) {
        ...
      } else {
        ...
      }
    } else if (policy == 2) {
      // prepend to cxq       if (list == NULL) {
        ...
      } else {
        ...
      }
    } else if (policy == 3) {
      // append to cxq       ...
    } else {
      ...
    }
    ...
  }
  Thread::SpinRelease(&_WaitSetLock);
}

ここでの操作はすべて_WaitSetLockの保護下では、まずWaitSetキューからノードがデキューされ、このノードに対してKnob_MoveNotifyeeは、異なるポリシーロジックを実行することを決定し、ポリシーの論理フレームワークは同じです.EntryListが空で異なる操作を実行しているかどうか.Knob_MoveNottifyeeのデフォルト値は2です.notifyの起動ポリシーには主に以下のものがあります.
  • ポリシー0:起動が必要なnodeをEntryListのヘッダ
  • に配置する
  • ポリシー1:起動が必要なnodeをEntryListの末尾
  • に配置する
  • ポリシー2:起動が必要なnodeをCXQのヘッダ
  • に配置する
  • ポリシー3:起動が必要なnodeをCXQの末尾
  • に配置する
    異なるポリシーの論理を分析する前に、INotify関数が実行し始めるWaitSetのデキュー論理実装を見てみましょう.
    inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
      // dequeue the very first waiter   ObjectWaiter* waiter = _WaitSet;
      if (waiter) {
        DequeueSpecificWaiter(waiter);
      }
      return waiter;
    }

    ここではWaitSetキューの最初のnodeをデキューし、次にWaitSetキューポインタであるキューヘッダを直接返してデキューノードを削除します.
    inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) {
      assert(node != NULL, "should not dequeue NULL node");
      assert(node->_prev != NULL, "node already removed from list");
      assert(node->_next != NULL, "node already removed from list");
      // when the waiter has woken up because of interrupt,   // timeout or other spurious wake-up, dequeue the   // waiter from waiting list   ObjectWaiter* next = node->_next;
      if (next == node) {
        assert(node->_prev == node, "invariant check");
        _WaitSet = NULL;
      } else {
        ObjectWaiter* prev = node->_prev;
        assert(prev->_next == node, "invariant check");
        assert(next->_prev == node, "invariant check");
        next->_prev = prev;
        prev->_next = next;
        if (_WaitSet == node) {
          _WaitSet = next;
        }
      }
      node->_next = NULL;
      node->_prev = NULL;
    }

    これにより、WaitSet双方向チェーンテーブルキューからのキューヘッダデキューロジックを完了します.
    起動ポリシー0
    if (list == NULL) {
        iterator->_next = iterator->_prev = NULL;
        _EntryList = iterator;
    } else {
        list->_prev = iterator;
        iterator->_next = list;
        iterator->_prev = NULL;
        _EntryList = iterator;
    }

    EntryListが空の場合は、スレッドがnotifyで起動されていないことを示し、現在のノードをEntryListに直接配置すればよい.そうでなければ、現在のノードをEntryListのヘッダに配置する.
    起動ポリシー1
    ポリシー1とポリシー0の論理は似ています.ここではノードを末尾に置くだけです.
    if (list == NULL) {
            iterator->_next = iterator->_prev = NULL;
            _EntryList = iterator;
    } else {
            // CONSIDER:  finding the tail currently requires a linear-time walk of         // the EntryList.  We can make tail access constant-time by converting to         // a CDLL instead of using our current DLL.         ObjectWaiter * tail;
            for (tail = list; tail->_next != NULL; tail = tail->_next) {}
            assert(tail != NULL && tail->_next == NULL, "invariant");
            tail->_next = iterator;
            iterator->_prev = tail;
            iterator->_next = NULL;
    }

    起動ポリシー2
    if (list == NULL) {
            iterator->_next = iterator->_prev = NULL;
            _EntryList = iterator;
    } else {
            iterator->TState = ObjectWaiter::TS_CXQ;
            for (;;) {
              ObjectWaiter * front = _cxq;
              iterator->_next = front;
              if (Atomic::cmpxchg(iterator, &_cxq, front) == front) {
                break;
              }
            }
    }

    まず、EntryListが空であることを発見すると、すなわち、notifyによって最初に起動されたスレッドがEntryListに入り、WaitSetの残りのノードがcxqのヘッダに順次挿入され、cxqポインタが新しいヘッダノードを指すことを更新する.
    起動ポリシー3
    ポリシー3の論理はポリシー2と似ていますが、ポリシー3はcxqの末尾にノードを配置します.
    iterator->TState = ObjectWaiter::TS_CXQ;
          for (;;) {
            ObjectWaiter * tail = _cxq;
            if (tail == NULL) {
              iterator->_next = NULL;
              if (Atomic::replace_if_null(iterator, &_cxq)) {
                break;
              }
            } else {
              while (tail->_next != NULL) tail = tail->_next;
              tail->_next = iterator;
              iterator->_prev = tail;
              iterator->_next = NULL;
              break;
            }
    }

    ここではEntryListが空であるか否かを判断するのではなく,cxqの末尾にノードを直接置く点は,先のいくつかの戦略とは異なり,注意が必要である.
    notifyAll実装
    notifyAllの実装は、notifyの実装と大きく異なります.
    void ObjectMonitor::notifyAll(TRAPS) {
      CHECK_OWNER();
      if (_WaitSet == NULL) {
        TEVENT(Empty-NotifyAll);
        return;
      }
    
      DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD);
      int tally = 0;
      while (_WaitSet != NULL) {
        tally++;
        INotify(THREAD);
      }
    
      OM_PERFDATA_OP(Notifications, inc(tally));
    } 

    実際にはWaitSet長に基づいてINotify関数を繰り返し呼び出し,notifyを複数回呼び出すことに相当することがわかる.