HiveのOutputCommitter

3922 ワード

HiveのOutputCommitter
public class ExecDriver extends Task<MapredWork> implements Serializable {

  public int execute(DriverContext driverContext) {
      
    ShimLoader.getHadoopShims().setNullOutputFormat(job);

  }

}



public class Hadoop20Shims implements HadoopShims {

  public static class NullOutputCommitter extends OutputCommitter {
    @Override
    public void setupJob(JobContext jobContext) { }
    @Override
    public void cleanupJob(JobContext jobContext) { }

    @Override
    public void setupTask(TaskAttemptContext taskContext) { }
    @Override
    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
      return false;
    }
    @Override
    public void commitTask(TaskAttemptContext taskContext) { }
    @Override
    public void abortTask(TaskAttemptContext taskContext) { }
  }

  public void setNullOutputFormat(JobConf conf) {
    conf.setOutputFormat(NullOutputFormat.class);
    conf.setOutputCommitter(Hadoop20Shims.NullOutputCommitter.class);

    // option to bypass job setup and cleanup was introduced in hadoop-21 (MAPREDUCE-463)
    // but can be backported. So we disable setup/cleanup in all versions >= 0.19
    conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
  }

}


public class JobConf extends Configuration {

  /**
   * Set the {@link OutputCommitter} implementation for the map-reduce job.
   *
   * @param theClass the {@link OutputCommitter} implementation for the map-reduce
   *                 job.
   */
  public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
    setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
  }

  /**
   * Get the {@link OutputCommitter} implementation for the map-reduce job,
   * defaults to {@link FileOutputCommitter} if not specified explicitly.
   *
   * @return the {@link OutputCommitter} implementation for the map-reduce job.
   */
  public OutputCommitter getOutputCommitter() {
    return (OutputCommitter)ReflectionUtils.newInstance(
      getClass("mapred.output.committer.class", FileOutputCommitter.class,
               OutputCommitter.class), this);
  }

}

OutputCommitterは、ジョブの出力を制御するために使用され、MapTaskおよびReduceTaskで呼び出されます.http://hi.baidu.com/_kouu/blog/item/dd2f08fd25da09e0fc037f15.htmlHadoop OutputFormatこの中でははっきりと説明されていますが、Hiveの宿題ではこの機能は使用されていません.
Hiveの出力はすべて自分で制御され、具体的にはFileSinkOperatorクラスに反映され(reduceまたはmapの場合、MR jobで呼び出される)、jobCloseメソッドはMapReduceジョブの実行が終了した後に呼び出される(clientエンド呼び出し).
 
public class ExecDriver extends Task<MapredWork> implements Serializable {

  public int execute(DriverContext driverContext) {
       if (rj != null) {
        JobCloseFeedBack feedBack = new JobCloseFeedBack();
        if (work.getAliasToWork() != null) {
          for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
            op.jobClose(job, success, feedBack);
            ArrayList<Object> dirs = feedBack.get(JobCloseFeedBack.FeedBackType.DYNAMIC_PARTITIONS);
            if (dirs != null) {
              for (Object o: dirs) {
                if (o instanceof String) {
                  dpPaths.add((String)o);
                }
              }
            }
          }
        }
        if (work.getReducer() != null) {
          work.getReducer().jobClose(job, success, feedBack);
        }
      }
  }

}