HiveのOutputCommitter
3922 ワード
HiveのOutputCommitter
OutputCommitterは、ジョブの出力を制御するために使用され、MapTaskおよびReduceTaskで呼び出されます.http://hi.baidu.com/_kouu/blog/item/dd2f08fd25da09e0fc037f15.htmlHadoop OutputFormatこの中でははっきりと説明されていますが、Hiveの宿題ではこの機能は使用されていません.
Hiveの出力はすべて自分で制御され、具体的にはFileSinkOperatorクラスに反映され(reduceまたはmapの場合、MR jobで呼び出される)、jobCloseメソッドはMapReduceジョブの実行が終了した後に呼び出される(clientエンド呼び出し).
public class ExecDriver extends Task<MapredWork> implements Serializable {
public int execute(DriverContext driverContext) {
ShimLoader.getHadoopShims().setNullOutputFormat(job);
}
}
public class Hadoop20Shims implements HadoopShims {
public static class NullOutputCommitter extends OutputCommitter {
@Override
public void setupJob(JobContext jobContext) { }
@Override
public void cleanupJob(JobContext jobContext) { }
@Override
public void setupTask(TaskAttemptContext taskContext) { }
@Override
public boolean needsTaskCommit(TaskAttemptContext taskContext) {
return false;
}
@Override
public void commitTask(TaskAttemptContext taskContext) { }
@Override
public void abortTask(TaskAttemptContext taskContext) { }
}
public void setNullOutputFormat(JobConf conf) {
conf.setOutputFormat(NullOutputFormat.class);
conf.setOutputCommitter(Hadoop20Shims.NullOutputCommitter.class);
// option to bypass job setup and cleanup was introduced in hadoop-21 (MAPREDUCE-463)
// but can be backported. So we disable setup/cleanup in all versions >= 0.19
conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
}
}
public class JobConf extends Configuration {
/**
* Set the {@link OutputCommitter} implementation for the map-reduce job.
*
* @param theClass the {@link OutputCommitter} implementation for the map-reduce
* job.
*/
public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
}
/**
* Get the {@link OutputCommitter} implementation for the map-reduce job,
* defaults to {@link FileOutputCommitter} if not specified explicitly.
*
* @return the {@link OutputCommitter} implementation for the map-reduce job.
*/
public OutputCommitter getOutputCommitter() {
return (OutputCommitter)ReflectionUtils.newInstance(
getClass("mapred.output.committer.class", FileOutputCommitter.class,
OutputCommitter.class), this);
}
}
OutputCommitterは、ジョブの出力を制御するために使用され、MapTaskおよびReduceTaskで呼び出されます.http://hi.baidu.com/_kouu/blog/item/dd2f08fd25da09e0fc037f15.htmlHadoop OutputFormatこの中でははっきりと説明されていますが、Hiveの宿題ではこの機能は使用されていません.
Hiveの出力はすべて自分で制御され、具体的にはFileSinkOperatorクラスに反映され(reduceまたはmapの場合、MR jobで呼び出される)、jobCloseメソッドはMapReduceジョブの実行が終了した後に呼び出される(clientエンド呼び出し).
public class ExecDriver extends Task<MapredWork> implements Serializable {
public int execute(DriverContext driverContext) {
if (rj != null) {
JobCloseFeedBack feedBack = new JobCloseFeedBack();
if (work.getAliasToWork() != null) {
for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
op.jobClose(job, success, feedBack);
ArrayList<Object> dirs = feedBack.get(JobCloseFeedBack.FeedBackType.DYNAMIC_PARTITIONS);
if (dirs != null) {
for (Object o: dirs) {
if (o instanceof String) {
dpPaths.add((String)o);
}
}
}
}
}
if (work.getReducer() != null) {
work.getReducer().jobClose(job, success, feedBack);
}
}
}
}