TaskTrackerがmapまたはreduceタスクを取得して実行するプロセス(一)

7434 ワード

TaskTrackerはデフォルトで3秒ごとにJobTrackerがハートビートパケットを送信することを知っています.つまり、このハートビートパケットにはタスクに対する要求が含まれています.JobTrackerがTaskTrackerに返すハートビートパケットには様々なaction(タスク)が含まれており,このTaskTrackerで実行されるタスクを満たすものがあれば,そのタスクもハートビートパケットの応答に含まれる.TaskTracker側では、mapまたはreduceタスクを専用に待つスレッドがあり、キューから実行を取り出します.
1. TaskTrackerハートビートパケット送信
TaskTrackerは単独のJVMとして動作し、起動後はずっとofferService()関数の中にあり、3秒ごとにtransmitHeartBeat関数を実行します.以下に示します.
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
この関数の具体的なコードは次のとおりです.
  HeartbeatResponse transmitHeartBeat(long now) throws IOException {
  ......
if (status == null) { synchronized (this) { status = new TaskTrackerStatus(taskTrackerName, localHostname, httpPort, cloneAndResetRunningTaskStatuses( sendCounters), failures, maxMapSlots, maxReduceSlots); } } // // // boolean askForNewTask; long localMinSpaceStart; synchronized (this) { askForNewTask = ((status.countOccupiedMapSlots() < maxMapSlots || status.countOccupiedReduceSlots() < maxReduceSlots) && acceptNewTasks); localMinSpaceStart = minSpaceStart; }
......
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, justInited, askForNewTask, heartbeatResponseId); ...... return heartbeatResponse; }
TaskTrackerはまずTaskTrackerStatusオブジェクトを作成し、map slotの数、reducer slotスロットの数、TaskTrackerが存在するホスト名などの情報を含む.その後、TaskTrackerの空きslotおよびディスクスペースをチェックし、対応する条件が満たされると、最終的にJobClient(JobTrackerのエージェント)を介してJobTrackerに心拍情報を送信し、JobTrackerの応答HeartbeatResponseを得る.以下に示すように、JobClientはInterTrackerProtocolの一例であり、JobTrackerはInterTrackerProtocolというインタフェースを実現している.
    this.jobClient = (InterTrackerProtocol) 
    UserGroupInformation.getLoginUser().doAs(
        new PrivilegedExceptionAction<Object>() {
      public Object run() throws IOException {
        return RPC.waitForProxy(InterTrackerProtocol.class,
            InterTrackerProtocol.versionID,
            jobTrackAddr, fConf);
      }
    });
では、TaskTrackerはどのようにJobTrackerのエージェントを通じてJobTrackerと通信するのでしょうか.RPCがJobTrackerのheartbeat(…)メソッドを呼び出すことによって実現される.
2. TaskTracker側取得タスク
TaskTrackerは、タスクを受信すると、対応するLinkedListに格納され、LinkedListは、チェーンテーブルに基づいて実装されるFIFOのキューであるListおよびQueueインタフェースを実現する.
heartbeatInterval = heartbeatResponse.getHeartbeatInterval();if (actions != null){ 
          for(TaskTrackerAction action: actions) {
            if (action instanceof LaunchTaskAction) {
              addToTaskQueue((LaunchTaskAction)action);
         ......
          }
        }
  ......
  private void addToTaskQueue(LaunchTaskAction action) {     if (action.getTask().isMapTask()) {       mapLauncher.addToTaskQueue(action);     } else {       reduceLauncher.addToTaskQueue(action);     }     }
 
TaskTrackerが起動すると、mapタスクとreduceタスクをそれぞれ処理するmapLauncherとreduceLauncherの2つのスレッドが作成され、mapタスクにはmapLauncherがLinkedListに格納し、reduceタスクにはreducerLauncherがメンテナンスするLinkedListに格納します.
  public void addToTaskQueue(LaunchTaskAction action) {
      synchronized (tasksToLaunch) {
        TaskInProgress tip = registerTask(action, this);
        tasksToLaunch.add(tip);
        tasksToLaunch.notifyAll();
      }
    }
mapLauncherまたはreducerLauncherは、受信したactionに基づいて、対応するTaskTracker.TaskInProgressオブジェクトを作成し、キューに入れ、待機しているスレッドを起動して処理します. 以下に示すように、スレッドはtaskToLaunchからtaskを取得し、空間的なslotがある場合、このtaskを実行する.
  synchronized (tasksToLaunch) {
            while (tasksToLaunch.isEmpty()) {
              tasksToLaunch.wait();
            }
            //get the TIP
            tip = tasksToLaunch.remove(0);
            task = tip.getTask();
            LOG.info("Trying to launch : " + tip.getTask().getTaskID() + 
                     " which needs " + task.getNumSlotsRequired() + " slots");
          }
.....
          //     slot ,    task
          startNewTask(tip);
これでTaskTrackerは処理すべきタスクを得ましたが、具体的にどのように実行するかは次のブログを参照してください.