TaskTrackerがmapまたはreduceタスクを取得して実行するプロセス(一)
7434 ワード
TaskTrackerはデフォルトで3秒ごとにJobTrackerがハートビートパケットを送信することを知っています.つまり、このハートビートパケットにはタスクに対する要求が含まれています.JobTrackerがTaskTrackerに返すハートビートパケットには様々なaction(タスク)が含まれており,このTaskTrackerで実行されるタスクを満たすものがあれば,そのタスクもハートビートパケットの応答に含まれる.TaskTracker側では、mapまたはreduceタスクを専用に待つスレッドがあり、キューから実行を取り出します.
1. TaskTrackerハートビートパケット送信
TaskTrackerは単独のJVMとして動作し、起動後はずっとofferService()関数の中にあり、3秒ごとにtransmitHeartBeat関数を実行します.以下に示します.
2. TaskTracker側取得タスク
TaskTrackerは、タスクを受信すると、対応するLinkedListに格納され、LinkedListは、チェーンテーブルに基づいて実装されるFIFOのキューであるListおよびQueueインタフェースを実現する.
1. TaskTrackerハートビートパケット送信
TaskTrackerは単独のJVMとして動作し、起動後はずっとofferService()関数の中にあり、3秒ごとにtransmitHeartBeat関数を実行します.以下に示します.
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
この関数の具体的なコードは次のとおりです. HeartbeatResponse transmitHeartBeat(long now) throws IOException {
......
if (status == null) {
synchronized (this) {
status = new TaskTrackerStatus(taskTrackerName, localHostname,
httpPort,
cloneAndResetRunningTaskStatuses(
sendCounters),
failures,
maxMapSlots,
maxReduceSlots);
}
} //
//
//
boolean askForNewTask;
long localMinSpaceStart;
synchronized (this) {
askForNewTask =
((status.countOccupiedMapSlots() < maxMapSlots ||
status.countOccupiedReduceSlots() < maxReduceSlots) &&
acceptNewTasks);
localMinSpaceStart = minSpaceStart;
}
......
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
justStarted,
justInited,
askForNewTask,
heartbeatResponseId);
......
return heartbeatResponse;
}
TaskTrackerはまずTaskTrackerStatusオブジェクトを作成し、map slotの数、reducer slotスロットの数、TaskTrackerが存在するホスト名などの情報を含む.その後、TaskTrackerの空きslotおよびディスクスペースをチェックし、対応する条件が満たされると、最終的にJobClient(JobTrackerのエージェント)を介してJobTrackerに心拍情報を送信し、JobTrackerの応答HeartbeatResponseを得る.以下に示すように、JobClientはInterTrackerProtocolの一例であり、JobTrackerはInterTrackerProtocolというインタフェースを実現している. this.jobClient = (InterTrackerProtocol)
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Object>() {
public Object run() throws IOException {
return RPC.waitForProxy(InterTrackerProtocol.class,
InterTrackerProtocol.versionID,
jobTrackAddr, fConf);
}
});
では、TaskTrackerはどのようにJobTrackerのエージェントを通じてJobTrackerと通信するのでしょうか.RPCがJobTrackerのheartbeat(…)メソッドを呼び出すことによって実現される.2. TaskTracker側取得タスク
TaskTrackerは、タスクを受信すると、対応するLinkedListに格納され、LinkedListは、チェーンテーブルに基づいて実装されるFIFOのキューであるListおよびQueueインタフェースを実現する.
heartbeatInterval = heartbeatResponse.getHeartbeatInterval();if (actions != null){
for(TaskTrackerAction action: actions) {
if (action instanceof LaunchTaskAction) {
addToTaskQueue((LaunchTaskAction)action);
......
}
}
......
private void addToTaskQueue(LaunchTaskAction action) { if (action.getTask().isMapTask()) { mapLauncher.addToTaskQueue(action); } else { reduceLauncher.addToTaskQueue(action); } }
TaskTrackerが起動すると、mapタスクとreduceタスクをそれぞれ処理するmapLauncherとreduceLauncherの2つのスレッドが作成され、mapタスクにはmapLauncherがLinkedListに格納し、reduceタスクにはreducerLauncherがメンテナンスするLinkedListに格納します. public void addToTaskQueue(LaunchTaskAction action) {
synchronized (tasksToLaunch) {
TaskInProgress tip = registerTask(action, this);
tasksToLaunch.add(tip);
tasksToLaunch.notifyAll();
}
}
mapLauncherまたはreducerLauncherは、受信したactionに基づいて、対応するTaskTracker.TaskInProgressオブジェクトを作成し、キューに入れ、待機しているスレッドを起動して処理します. 以下に示すように、スレッドはtaskToLaunchからtaskを取得し、空間的なslotがある場合、このtaskを実行する. synchronized (tasksToLaunch) {
while (tasksToLaunch.isEmpty()) {
tasksToLaunch.wait();
}
//get the TIP
tip = tasksToLaunch.remove(0);
task = tip.getTask();
LOG.info("Trying to launch : " + tip.getTask().getTaskID() +
" which needs " + task.getNumSlotsRequired() + " slots");
}
.....
// slot , task
startNewTask(tip);
これでTaskTrackerは処理すべきタスクを得ましたが、具体的にどのように実行するかは次のブログを参照してください.