Flinkタスクをyarnに送信

6219 ワード

flink on yarnモードでは、flink yarn-sessionの2つのコミット方式
1.yarn-sessionを共通に使用
yarnでflinkクラスタを初期化し、指定したリソースを開き、後でタスクをコミットするにはここにコミットします.このflinkクラスタは、手動で停止しない限りyarnクラスタに常駐します.
2.jobごとにyarn-sessionを1つ提供する
 
 
コミットするたびに新しいflinkクラスタが作成され、タスク間は互いに独立し、互いに影響を及ぼさず、管理が容易になります.タスクの実行が完了すると作成されたクラスタも消えます.
1つ目の方法:
1.まずyarnセッションを起動し、Flinkの2つの必要なサービス:JobManagerとTaskManagersを起動し、クラスタにジョブをコミットできます.同じセッションで複数のFlinkジョブをコミットできます.
まず、このスクリプトがサポートするパラメータを見てみましょう.. /bin/yarn-session .sh Usage:     Required       -n,--container    Number of YARN container to allocate (=Number of Task Managers)     Optional       -D                   Dynamic properties       -d,–detached              Start detached spark       -jm,--jobManagerMemory     Memory  for   JobManager Container [ in   MB]       -nm,--name             Set a custom name  for   the application on YARN       -q,–query                Display available YARN resources (memory, cores)       -qu,--queue             Specify YARN queue.       -s,--slots             Number of slots per TaskManager       -st,--streaming             Start Flink  in   streaming mode       -tm,--taskManagerMemory     Memory per TaskManager Container [ in   MB]
オンラインスクリプト:bin/yarn-session.sh -n 7 -s 8 -jm 3072 -tm 32768 -qu root.*.*-nm *-* -d
このうち、taskManagerを7個申請し、8コアごとにtaskmanagerごとに32768 Mメモリを有する
これでyarn-sessionを起動してflinkタスクをコミットできます.
2../bin/flinkスクリプトを使用してジョブを発行できます.このスクリプトがサポートするパラメータを見てみましょう.bin/flink ./flink [OPTIONS] [ARGUMENTS]   The following actions are available : Action  "run"   compiles and runs a program.    Syntax :   run [OPTIONS]    "run"   action options :       -c,-- class                  Class  with   the program entry point ( "main"   method or  "getPlan()"   method. Only needed  if   the JAR file does not  specify the  class   in its manifest.       -C,--classpath                  Adds a URL to each user code  classloader  on all nodes in the  cluster. The paths must specify a  protocol (e.g. file : //) and be  accessible on all nodes (e.g. by means  of a NFS share). You can use  this  ption multiple times  for   specifying  more than one URL. The protocol must  be supported by the { @ link  java.net.URLClassLoader}.       -d,--detached                     If present, runs the job in detached  mode       -m,--jobmanager : port>                  Address of the JobManager (master) to  which to connect. Specify  yarn-cluster'   as the JobManager to  deploy a YARN cluster  for   the job. Use  this   flag to connect to a different  JobManager than the one specified in  the configuration.       -p,--parallelism                 The parallelism  with   which to run the  program. Optional flag to  override   the  default value specified in the  configuration.       -q,--sysoutLogging                   If present, supress logging output to  standard out.       -s,--fromSavepoint             Path to a savepoint to reset the job  back to ( for   example  file : ///flink/savepoint-1537).
runオプションを してFlinkジョブを できます.このスクリプトはYARNセッションのアドレスを に できます
オンラインスクリプト:nohup bin/flink run-shdfs:///flink/savepoints/savepoint-bcabee-bf3f54a3b924 -c **** jars/**** test > Flink-RealtimeDAU.log 2>&1 &
-sでsavepointsアドレスを してjobを できます.
Savepointって ?
Flinkのsavepointは、グローバルで のあるスナップショットです. の2つの があります.
  • データソースのすべてのデータの .
  • 「グローバル 」とは、すべての ソースデータが された にあり、すべての の が にcheckpointされていることを します.
    アプリケーションが の でsavepointを した は、 のsavepointからパブリッシュアプリケーションをいつでも できます.この 、 しいアプリケーションはsavepointの から され、savepointのデータソース からすべてのデータが されます.
    3. に を する
    jobmanagerオンラインスクリプトを じる:bin/flink cancel-shdfs:///flink/savepoints /savepoints-* -yid application_1535964220442_0034
    キャンセルコマンドで
    またはyarnアプリケーション-killアプリケーションIdによってyarnセッションを する
    またはflink listでjobIdを する
    bin/flink cancel -s hdfs:///flink/savepoints/savepoint-* jobId 
    ここで-sはオプションです
     
    2の
    nohup bin/flink run -m yarn-cluster -yn 7  -s hdfs:///flink/savepoints/savepoint-* -c *.* jars/**** test > Flink-RealtimeDAU.log 2>&1 &
    このうち-ynはTaskManagerの を し、 する があります.