YARNでのSamzaの起動プロセス=』の1つ
7522 ワード
スクリプトの実行、jobのコミット
YARNにSamza jobを提出するにはrun-jobを使用します.shというスクリプト.
samza-example/target/bin/run-job.sh --config-factory=samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/config/hello-world.properties
このシナリオの内容は何ですか?
exec $(dirname $0)/run-class.sh org.apache.samza.job.JobRunner $@
run-classというスクリプトを呼び出します.
run-class.shは環境変数HADOOP_によりCONF_DIRとHADOOP_YARN_HOMEはYARNのプロファイルの場所を取得し、CLASSSPATHに追加します.同時にsamza jobルートディレクトリのlibフォルダのjarまたはwarをCLASSSPATHに追加します.ResourceManagerのアドレスを取得するためにyarnのプロファイルのみが必要です.libディレクトリの下のパッケージは、jobを実行するために必要です.
run-class.shも環境変数$SAMZA_を通過しますLOG_DIRはsamzaのlogが保存すべき場所を知り、$SAMZA_を通過CONTAINER_NAMEはcontainerの名前を決めて-DでJAVAに設置しますOPTS中です.次にlibディレクトリの下でlog 4 jを検索する.xmlファイル、存在するなら-Dlog 4 j.configurationをlog 4 jとする.xmlのパス.
以上の動作によりjavaを呼び出すために必要なclasspathとタスク実行時に必要な構成項目を構築し、呼び出す
exec $JAVA $JAVA_OPTS -cp $CLASSPATH $@
仮想マシンを起動します.
はい、run-jobを呼び出します.shの場合、orgが実行する.apache.samza.job.JobRunnerというクラス.このクラスの役割をsamzaの公式ガイドブックで簡単に紹介した.Samzaは2種類のStreamJobFactory:LocalJobFactoryとYarnJobFactoryを持っています.StreamJobFactoryの役割はJobRunnerに実行可能なjobを提供することです
public interface StreamJobFactory {
StreamJob getJob(Config config);
}
StreamJobは実行可能なjobで、JobRunnerはsubmitメソッドを呼び出します.
public interface StreamJob {
StreamJob submit();
StreamJob kill();
ApplicationStatus waitForFinish(long timeoutMs);
ApplicationStatus waitForStatus(ApplicationStatus status, long timeoutMs);
ApplicationStatus getStatus();
}
次はJobRunnerというクラスを見てみましょう.プログラムエントリはJobRunnerというクラスの伴生オブジェクトにあります
object JobRunner extends Logging {
def main(args: Array[String]) {
val cmdline = new CommandLine
val options = cmdline.parser.parse(args: _*)
val config = cmdline.loadConfig(options)
new JobRunner(config).run
}
}
パラメータを設定すると、JobRunnerのrunメソッドに移動し、下が主な論理になります.
val jobFactoryClass = conf.getStreamJobFactoryClass match {
case Some(factoryClass) => factoryClass
case _ => throw new SamzaException("no job factory class defined")
}
val jobFactory = Class.forName(jobFactoryClass).newInstance.asInstanceOf[StreamJobFactory]
// Create the actual job, and submit it.
val job = jobFactory.getJob(conf).submit // job
info("waiting for job to start")
// Wait until the job has started, then exit.
Option(job.waitForStatus(Running, 500)) match {
case Some(appStatus) => {
if (Running.equals(appStatus)) {
info("job started successfully")
} else {
warn("unable to start job successfully. job has status %s" format (appStatus))
}
}
case _ => warn("unable to start job successfully.")
}
まずconfにjobが設定されているかどうかを見つけますfactoryとは、StreamJobFactoryを指定した実装があるかどうか、ない場合は例外を投げ出して終了することである.そうでなければ、このStreamJobFactoryを通じてjobを提出します.コミット後、500ミリ秒待ち、タスクのステータスがRunningでない場合は終了します.ここでのRunningはタスクがすでに走っているわけではありません.たとえばYARNを使用する場合、ResourceManagerに正常にコミットすればrunningになりますので、ここでのrunningは「タスクコミット成功」という意味です.
YarnJobの実装
YARNにコミットすると、YarnJobFactoryというStreamJobFactoryの実装を使用します.
class YarnJobFactory extends StreamJobFactory {
def getJob(config: Config) = {
// TODO fix this. needed to support http package locations.
// yarn-site.xml。 yarn-site.xml classpath run-class.sh ,HADOOP_CONF_DIR classpath
val hConfig = new YarnConfiguration
hConfig.set("fs.http.impl", classOf[HttpFileSystem].getName)
new YarnJob(config, hConfig)
}
}
” hConfig.set("fs.http.impl",classOf[HttpFileSystem].getName)"この文はjobでhttpファイルシステムを呼び出すことができて、例えばファイルパスを"http://xxx.xx.xx.xx:8080/xx/xx「そうです.SamzaはHTTP filsystemの実装を持っています.LinkedInの人がそう使う必要があるのかもしれません.
YarnJobFactoryは、主にYarnConfigurationを構築し、以前のcommandLineパラメータとともにconfigとしてYarnJobを構築する.YarnConfigurationはYARN自身のクラスで、classpathからyarn-siteを読みます.xmlというプロファイル.
YarnJobこれこそYarnにコミットされるタスクであり、StreamJobというインタフェースを実現しています.ここでは主にそのsubmit法に関心を持っている.
val client = new ClientHelper(hadoopConfig) var appId: Option[ApplicationId] = None
// job, AppMaster cpu 。 container
def submit: YarnJob = {
appId = client.submitApplication( // submitApplication appId
new Path(config.getPackagePath.getOrElse(throw new SamzaException("No YARN package path defined in config."))),
config.getAMContainerMaxMemoryMb.getOrElse(DEFAULT_AM_CONTAINER_MEM),
1,
List(
"export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/bin/run-am.sh 1>logs/%s 2>logs/%s"
format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)),
Some(Map(
ShellCommandConfig.ENV_CONFIG -> Util.envVarEscape(JsonConfigSerializer.toJson(config)),
ShellCommandConfig.ENV_CONTAINER_NAME -> Util.envVarEscape("application-master"),
ShellCommandConfig.ENV_JAVA_OPTS -> Util.envVarEscape(config.getAmOpts.getOrElse("")))),
Some("%s_%s" format (config.getName.get, config.getJobId.getOrElse(1))))
this
}
submitメソッドがタスクをコミットするプロセスはClientHelperのsubmitAppliationに渡されて実現される.
この方法こそYARNタスクを提出する鍵です.
まず、YARNにタスクを提出するには、必ずYARNが指定したインタフェースを実現することを知っています.では、YARNがjobの実行をスケジュールするために何を提供する必要があるかを推測してみましょう.
当ててみましょう(実際にはいくつか知っていますが、もっと考えられるかどうかは、考えを整理することが重要です)
まずサインを見て
def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId]
次は各パラメータの意味を紹介し、この方法が何をしているのかを理解するのに役立ちます.
戻り値はApplicationIdです.これはOptionなので、コミットに失敗した場合、戻り値はNoneです.
さらにsubmitApplicationという方法の実現を書くと、ちょっと長すぎて、次の編に変えます.