YARNでのSamzaの起動プロセス=』の1つ

7522 ワード


スクリプトの実行、jobのコミット
YARNにSamza jobを提出するにはrun-jobを使用します.shというスクリプト.
samza-example/target/bin/run-job.sh  --config-factory=samza.config.factories.PropertiesConfigFactory  --config-path=file://$PWD/config/hello-world.properties
このシナリオの内容は何ですか?
exec $(dirname $0)/run-class.sh org.apache.samza.job.JobRunner $@
run-classというスクリプトを呼び出します.
run-class.shは環境変数HADOOP_によりCONF_DIRとHADOOP_YARN_HOMEはYARNのプロファイルの場所を取得し、CLASSSPATHに追加します.同時にsamza jobルートディレクトリのlibフォルダのjarまたはwarをCLASSSPATHに追加します.ResourceManagerのアドレスを取得するためにyarnのプロファイルのみが必要です.libディレクトリの下のパッケージは、jobを実行するために必要です.
run-class.shも環境変数$SAMZA_を通過しますLOG_DIRはsamzaのlogが保存すべき場所を知り、$SAMZA_を通過CONTAINER_NAMEはcontainerの名前を決めて-DでJAVAに設置しますOPTS中です.次にlibディレクトリの下でlog 4 jを検索する.xmlファイル、存在するなら-Dlog 4 j.configurationをlog 4 jとする.xmlのパス.
以上の動作によりjavaを呼び出すために必要なclasspathとタスク実行時に必要な構成項目を構築し、呼び出す
exec $JAVA $JAVA_OPTS -cp $CLASSPATH $@
仮想マシンを起動します.
はい、run-jobを呼び出します.shの場合、orgが実行する.apache.samza.job.JobRunnerというクラス.このクラスの役割をsamzaの公式ガイドブックで簡単に紹介した.Samzaは2種類のStreamJobFactory:LocalJobFactoryとYarnJobFactoryを持っています.StreamJobFactoryの役割はJobRunnerに実行可能なjobを提供することです
public interface StreamJobFactory {

  StreamJob getJob(Config config);

}


StreamJobは実行可能なjobで、JobRunnerはsubmitメソッドを呼び出します.
public interface StreamJob {

  StreamJob submit();



  StreamJob kill();



  ApplicationStatus waitForFinish(long timeoutMs);



  ApplicationStatus waitForStatus(ApplicationStatus status, long timeoutMs);



  ApplicationStatus getStatus();

}


 
次はJobRunnerというクラスを見てみましょう.プログラムエントリはJobRunnerというクラスの伴生オブジェクトにあります
object JobRunner extends Logging {

  def main(args: Array[String]) {

    val cmdline = new CommandLine

    val options = cmdline.parser.parse(args: _*)

    val config = cmdline.loadConfig(options)

    new JobRunner(config).run

  }

}


パラメータを設定すると、JobRunnerのrunメソッドに移動し、下が主な論理になります.
    val jobFactoryClass = conf.getStreamJobFactoryClass match {

      case Some(factoryClass) => factoryClass

      case _ => throw new SamzaException("no job factory class defined")

    }



    val jobFactory = Class.forName(jobFactoryClass).newInstance.asInstanceOf[StreamJobFactory]



 // Create the actual job, and submit it.

    val job = jobFactory.getJob(conf).submit  //  job



    info("waiting for job to start")



    // Wait until the job has started, then exit.

    Option(job.waitForStatus(Running, 500)) match {

      case Some(appStatus) => {

        if (Running.equals(appStatus)) {

          info("job started successfully")

        } else {

          warn("unable to start job successfully. job has status %s" format (appStatus))

        }

      }

      case _ => warn("unable to start job successfully.")

    }


まずconfにjobが設定されているかどうかを見つけますfactoryとは、StreamJobFactoryを指定した実装があるかどうか、ない場合は例外を投げ出して終了することである.そうでなければ、このStreamJobFactoryを通じてjobを提出します.コミット後、500ミリ秒待ち、タスクのステータスがRunningでない場合は終了します.ここでのRunningはタスクがすでに走っているわけではありません.たとえばYARNを使用する場合、ResourceManagerに正常にコミットすればrunningになりますので、ここでのrunningは「タスクコミット成功」という意味です.
 
YarnJobの実装
 
YARNにコミットすると、YarnJobFactoryというStreamJobFactoryの実装を使用します.
class YarnJobFactory extends StreamJobFactory {

  def getJob(config: Config) = {

    // TODO fix this. needed to support http package locations.

    //    yarn-site.xml。   yarn-site.xml   classpath   run-class.sh ,HADOOP_CONF_DIR      classpath 

    val hConfig = new YarnConfiguration 

    hConfig.set("fs.http.impl", classOf[HttpFileSystem].getName)



    new YarnJob(config, hConfig)

  }

}


  
” hConfig.set("fs.http.impl",classOf[HttpFileSystem].getName)"この文はjobでhttpファイルシステムを呼び出すことができて、例えばファイルパスを"http://xxx.xx.xx.xx:8080/xx/xx「そうです.SamzaはHTTP filsystemの実装を持っています.LinkedInの人がそう使う必要があるのかもしれません.
YarnJobFactoryは、主にYarnConfigurationを構築し、以前のcommandLineパラメータとともにconfigとしてYarnJobを構築する.YarnConfigurationはYARN自身のクラスで、classpathからyarn-siteを読みます.xmlというプロファイル.
YarnJobこれこそYarnにコミットされるタスクであり、StreamJobというインタフェースを実現しています.ここでは主にそのsubmit法に関心を持っている.
  val client = new ClientHelper(hadoopConfig)   var appId: Option[ApplicationId] = None
 //  job,    AppMaster      cpu  。    container  

  def submit: YarnJob = {

    appId = client.submitApplication( //  submitApplication     appId

      new Path(config.getPackagePath.getOrElse(throw new SamzaException("No YARN package path defined in config."))),

      config.getAMContainerMaxMemoryMb.getOrElse(DEFAULT_AM_CONTAINER_MEM),

      1,

      List(

        "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/bin/run-am.sh 1>logs/%s 2>logs/%s"

          format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)),

      Some(Map(

        ShellCommandConfig.ENV_CONFIG -> Util.envVarEscape(JsonConfigSerializer.toJson(config)),

        ShellCommandConfig.ENV_CONTAINER_NAME -> Util.envVarEscape("application-master"),

        ShellCommandConfig.ENV_JAVA_OPTS -> Util.envVarEscape(config.getAmOpts.getOrElse("")))),

      Some("%s_%s" format (config.getName.get, config.getJobId.getOrElse(1))))



    this

  }


submitメソッドがタスクをコミットするプロセスはClientHelperのsubmitAppliationに渡されて実現される.
この方法こそYARNタスクを提出する鍵です.
まず、YARNにタスクを提出するには、必ずYARNが指定したインタフェースを実現することを知っています.では、YARNがjobの実行をスケジュールするために何を提供する必要があるかを推測してみましょう.
当ててみましょう(実際にはいくつか知っていますが、もっと考えられるかどうかは、考えを整理することが重要です)
  • まず、提出されたのは何ですか.YARNプログラムの実行過程は、まずYARNにリソースを申請してアプリケーションマスターを実行し、アプリケーションマスターから後続のリソースを申請することであることを知っています.だから、ここで提出したのは要求で、要求の内容は:RM、私はAMを起動して、私にcontainerを分けてください
  • RMはいつアプリケーションマスターの申請に同意しますか?システム内のリソースが不足し、多くのタスクが実行されている場合は、拒否されますか?
  • ,RMがリクエストを承認した場合、containerが割り当てられます.では、それは...containerにすぎません.結局、プログラムを実行する人はいません.では、誰がこのcontainerを使ってプログラムを走りますか?それがNodeManagerです.では、AMへの申請を提出するときは、Yarnに「NodeManagerがこのcontainerでそうする必要がある」と伝える必要があります.
  • はまた、このAMに必要なリソースの数をYarnに伝えるべきかもしれません.結局、このAMを実行するにはcontainerも必要ですが、containerの核心はいくつかのリソースに対応しています.実際、下の方法のサインを見ると、cpuとmemoryという大きな字が見えます.

  • まずサインを見て
       def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId] 
    次は各パラメータの意味を紹介し、この方法が何をしているのかを理解するのに役立ちます.
  • packagePathというメカニズムは以前は考えられなかった.私たちが実際にタスクをコミットするときは、yarnにこのタスクに必要なリソース(ファイルリソース)のアドレスを教えて、NodeManagerでdownしてこのリソースをローカリゼーションするだけです.このjob package全体をyarnに転送するのではなく.このように設計することで、job packageはhttpファイルシステムでSamzaの例のようにHDFSでも入手できます(これは、Samzaエンジニアリングで.mdで終わるすべてのファイルでhdfsを検索する必要があります.HDFSを使用してjob packageを保存するガイドを見つける必要があります).もちろんローカルでも可能です(httpとhdfsの2つの方法に成功する前に、このように試したことがあります......).まあ、これで少なくともRMのストレージにプレッシャーをかけません.
  • memoryMbこれはAMを実行するcontainerに割り当てたいメモリサイズ
  • です.
  • cpuCoreにはいくつかのcoreが必要です.これは仮想的で、具体的にどのように実現するかは、研究が必要です.
  • cmdsこれがNodeManage実行命令でしょ
  • env環境変数、これはAMを実行する仮想マシンに割り当てられた
  • です.
  • nameこれはこのjobの名前で、YARN jobの名前で、RMのweb UIで見た名前です.同時に、Samzaはbinディレクトリの下にスクリプトがあり、samza jobを現在実行しています.

  • 戻り値はApplicationIdです.これはOptionなので、コミットに失敗した場合、戻り値はNoneです.
    さらにsubmitApplicationという方法の実現を書くと、ちょっと長すぎて、次の編に変えます.