MapReduceが起動したMap/Reduceサブタスクの簡単な分析



Hadoopでは、DataNodeでMap/Reduce javaプロセスを開始することで分散計算処理を実現しています.では、hadoopでMap/Reduceタスクを開始するプロセスをソースレイヤから簡単に分析します.
 
まず、Map/Reduce側が起動するタスクについては、いくつかのパラメータによってjava optsを制御します.mapreduce.map.java.opts、mapreduce.reduce.java.optsです.これらのパラメータはMRJobConfigクラスで使用されます.map.java.optsの例として、org.apache.hadoop.mapred.MapReduceChildJVMクラスでは、このパラメータが使用され、MapまたはReduce側のJVMを構築するために使用されます.次の方法でChildJavaOptsを入手しました.
 
private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask)
 
 
MapTaskの場合、JavaOptsの取得方法の具体的な内容は以下の通りです(Reduce側の論理はほぼ一致しています).
 
if (isMapTask) {
      userClasspath =
          jobConf.get(
              JobConf.MAPRED_MAP_TASK_JAVA_OPTS,
              jobConf.get(
                  JobConf.MAPRED_TASK_JAVA_OPTS,
                  JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)
          );
      adminClasspath =
          jobConf.get(
              MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
              MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);

    // Add admin classpath first so it can be overridden by user.
    return adminClasspath + " " + userClasspath;
 
 
userClassPathは、次のパラメータの順序で取得されます.
mapreduce.map.java.opts, 
mapred.child.java.opts, 
DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m”;
 
 
adminClassPathが取得するパラメータの基本順序:
mapreduce.admin.map.child.java.opts,
DEFAULT_MAPRED_ADMIN_JAVA_OPTS ="-Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN ";
 
 
最後に、JVMが起動したパラメータでは、後から前を上書きできるため、userClassPathの同じオプションの設定はadminClassPathを上書きすることができ、adminClassPathはすべてのパラメータが上書きされないことを保証できない.
 
ソース側から上へ移動し、方法にジャンプします.
 
public static List<String> getVMCommand(InetSocketAddress taskAttemptListenerAddr, Task task, ID jvmID)
 
 
この方法では、オプションで@taskid@を使用すると、特定のattemptIDに置き換えることができる非表示の設定が表示されます.
 
String javaOpts = getChildJavaOpts(conf, task.isMapTask());
javaOpts = javaOpts.replace("@taskid@", attemptID.toString());
 
 
コードでもGCを例に、注釈で使い方を例に挙げて説明します.
 
   
 // Add child (task) java-vm options.
    //
    // The following symbols if present in mapred.{map|reduce}.child.java.opts
    // value are replaced:
    // + @taskid@ is interpolated with value of TaskID.
    // Other occurrences of @ will not be altered.
    //
    // Example with multiple arguments and substitutions, showing
    // jvm GC logging, and start of a passwordless JVM JMX agent so can
    // connect with jconsole and the likes to watch child memory, threads
    // and get thread dumps.
    //
    //  <property>
    //    <name>mapred.map.child.java.opts</name>
    //    <value>-Xmx 512M -verbose:gc -Xloggc:/tmp/@[email protected] \
    //           -Dcom.sun.management.jmxremote.authenticate=false \
    //           -Dcom.sun.management.jmxremote.ssl=false \
    //    </value>
    //  </property>
    //
    //  <property>
    //    <name>mapred.reduce.child.java.opts</name>
    //    <value>-Xmx 1024M -verbose:gc -Xloggc:/tmp/@[email protected] \
    //           -Dcom.sun.management.jmxremote.authenticate=false \
    //           -Dcom.sun.management.jmxremote.ssl=false \
    //    </value>
    //  </property>
    //
 
一般的に、gcログを印刷する場合は、-Xlogc:で特定のディレクトリを指定する必要がありますが、MapReduceタスクでは指定できません.2つのMapが同じマシンで実行される可能性があるため、gcファイルの上書きが発生するに違いありません.実用@taskid@では、この問題を回避できます.OOMによるスタック印刷にも適用され、ファイルが誤って上書きされないようにします.
 
デフォルトでは、javaの一時フォルダを設定するためのパラメータも追加されます(File.createTempFileなど、すべての一時ファイルの作成はこのフォルダにあります):
 
    vargs.add("-Djava.io.tmpdir=" + childTmpDir);
 
 
主クラスは次のとおりです.
 
org.apache.hadoop.mapred.YarnChild
 
 
YarnChildプロセスの最終的な完全なコマンドは次のとおりです.
/usr/java/jdk1.7.0_11//bin/java -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN -Xmx2048M -Djava.io.tmpdir=/home/data5/hdfsdir/nm-local-dir/usercache/xxx/appcache/application_1413206225298_36914/container_1413206225298_36914_01_000098/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/home/workspace/hadoop/logs/userlogs/application_1413206225298_36914/container_1413206225298_36914_01_000098 -Dyarn.app.container.log.filesize=209715200 -Dhadoop.root.logger=INFO,CLA org.apache.hadoop.mapred.YarnChild 192.168.7.26 21298 attempt_1413206225298_36914_r_000001_0 98
 
 
このコマンドがどのように生成されたかをソース側から分析します.
 
  • $JAVA_HOME: /usr/java/jdk1.7.0_11/
  • ソースコードには、/bin/java
  • と書かれています.
  • mapreduce.admin.map.child.java.opts:設定しない場合は、-Djava.net.preferIPv 4 Stack=true-Dhadoop.metrics.log.level=WARNを使用します.
  • mapreduce.map.java.opts: Xmx2048M;
  • ソースコードに追加:-Djava.io.tmpdir=/home/data 5/hdfsdir/nm-local-dir/usercache/tong/appcache/application_1413206225298_36914/container_1413206225298_36914_01_000098/tmp;
  • org.apache.hadoop.mapred.MapReduceChildJVM.setLog 4 jPropertiesでログ・レベル、ログ・サイズなどを含むlog 4 jを設定します:-Dlog 4 j.container-log 4 j.properties-Dyarn.app.container.workspace/hadoop/logs/userlogs/application_1413206225298_36914/container_1413206225298_36914_01_000098 -Dyarn.app.container.log.filesize=209715200 -Dhadoop.root.logger=INFO,CLA;
  • 主クラス:org.apache.hadoop.mapred.YarnChild;
  • TaskAttemptのホストアドレス:192.168.7.xx;
  • TaskAttemptのホストポート:212 xx;
  • TaskAttempt ID:attempt_1413206225298_36914_r_000001_0;
  • JVMD:98(何をしているのかよく分からない);

  •  
    最後に、プロセスの通常出力と例外出力をリダイレクトします.
     
    // Finally add the jvmID
        vargs.add("1>" + getTaskLogFile(TaskLog.LogName.STDOUT));
        vargs.add("2>" + getTaskLogFile(TaskLog.LogName.STDERR));