JobInProgress TaskInProgress TaskAttempt
6322 ワード
Job対task 1対多
task対taskAttempt 1対では、1つのtaskが同時に複数の試行(推定実行)を実行する可能性がある.
task対taskAttempt 1対では、1つのtaskが同時に複数の試行(推定実行)を実行する可能性がある.
public class JobInProgress {
JobProfile profile;
JobStatus status;
String jobFile = null;
Path localJobFile = null;
TaskInProgress maps[] = new TaskInProgress[0];
TaskInProgress reduces[] = new TaskInProgress[0];
TaskInProgress cleanup[] = new TaskInProgress[0];
TaskInProgress setup[] = new TaskInProgress[0];
int numMapTasks = 0;
int numReduceTasks = 0;
final long memoryPerMap;
final long memoryPerReduce;
volatile int numSlotsPerMap = 1;
volatile int numSlotsPerReduce = 1;
final int maxTaskFailuresPerTracker;
// Counters to track currently running/finished/failed Map/Reduce task-attempts
int runningMapTasks = 0;
int runningReduceTasks = 0;
int finishedMapTasks = 0;
int finishedReduceTasks = 0;
int failedMapTasks = 0;
int failedReduceTasks = 0;
private static long DEFAULT_REDUCE_INPUT_LIMIT = -1L;
long reduce_input_limit = -1L;
private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
int completedMapsForReduceSlowstart = 0;
// runningMapTasks include speculative tasks, so we need to capture
// speculative tasks separately
int speculativeMapTasks = 0;
int speculativeReduceTasks = 0;
final int mapFailuresPercent;
final int reduceFailuresPercent;
int failedMapTIPs = 0;
int failedReduceTIPs = 0;
private volatile boolean launchedCleanup = false;
private volatile boolean launchedSetup = false;
private volatile boolean jobKilled = false;
private volatile boolean jobFailed = false;
JobPriority priority = JobPriority.NORMAL;
final JobTracker jobtracker;
protected Credentials tokenStorage;
// NetworkTopology Node to the set of TIPs
Map<Node, List<TaskInProgress>> nonRunningMapCache;
// Map of NetworkTopology Node to set of running TIPs
Map<Node, Set<TaskInProgress>> runningMapCache;
// A list of non-local, non-running maps
final List<TaskInProgress> nonLocalMaps;
// Set of failed, non-running maps sorted by #failures
final SortedSet<TaskInProgress> failedMaps;
// A set of non-local running maps
Set<TaskInProgress> nonLocalRunningMaps;
// A list of non-running reduce TIPs
Set<TaskInProgress> nonRunningReduces;
// A set of running reduce TIPs
Set<TaskInProgress> runningReduces;
// A list of cleanup tasks for the map task attempts, to be launched
List<TaskAttemptID> mapCleanupTasks = new LinkedList<TaskAttemptID>();
// A list of cleanup tasks for the reduce task attempts, to be launched
List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
public class TaskInProgress {
static final int MAX_TASK_EXECS = 1;
int maxTaskAttempts = 4;
static final double SPECULATIVE_GAP = 0.2;
static final long SPECULATIVE_LAG = 60 * 1000;
private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
// Defines the TIP
private String jobFile = null;
// JobInProgress
private final TaskSplitMetaInfo splitInfo;
private int numMaps;
private int partition;
private JobTracker jobtracker;
private TaskID id;
private JobInProgress job;
private final int numSlotsRequired;
// Status of the TIP
private int successEventNumber = -1;
private int numTaskFailures = 0;
private int numKilledTasks = 0;
private double progress = 0;
private String state = "";
private long startTime = 0;
private long execStartTime = 0;
private long execFinishTime = 0;
private int completes = 0;
private boolean failed = false;
private boolean killed = false;
private long maxSkipRecords = 0;
private FailedRanges failedRanges = new FailedRanges();
private volatile boolean skipping = false;
private boolean jobCleanup = false;
private boolean jobSetup = false;
// The 'next' usable taskid of this tip (taskAttemptId)
int nextTaskId = 0;
// The taskid that took this TIP to SUCCESS
private TaskAttemptID successfulTaskId;
// The first taskid of this tip
private TaskAttemptID firstTaskId;
// Map from task Id -> TaskTracker Id, contains tasks that are
// currently runnings
private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();
// All attempt Ids of this TIP
private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
private JobConf conf;
private Map<TaskAttemptID,List<String>> taskDiagnosticData = new TreeMap<TaskAttemptID,List<String>>();
/**
* Map from taskId -> TaskStatus
*/
private TreeMap<TaskAttemptID,TaskStatus> taskStatuses = new TreeMap<TaskAttemptID,TaskStatus>();
// Map from taskId -> TaskTracker Id,
// contains cleanup attempts and where they ran, if any
private TreeMap<TaskAttemptID, String> cleanupTasks = new TreeMap<TaskAttemptID, String>();
private TreeSet<String> machinesWhereFailed = new TreeSet<String>();
private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
//list of tasks to kill, <taskid> -> <shouldFail>
private TreeMap<TaskAttemptID, Boolean> tasksToKill = new TreeMap<TaskAttemptID, Boolean>();
//task to commit, <taskattemptid>
private TaskAttemptID taskToCommit;
private Counters counters = new Counters();
private String user;