Flink-1.10.0 Native Kubernetesの導入方法の実践


最新リリースのFlinkではnative Kubernetesの導入がサポートされており、必要に応じてpodを動的に増やしたり減らしたりすることができ、テストに値する.現在はbeta段階にあり、Flink-Sessionの導入方式をサポートしている
前提条件
1、1.9バージョン以上のK 8 sクラスタで、DNS 2、pod、serviceを操作できるKubeConfig 3、RBAC権限のあるservice Accountが有効になり、podを作成、削除する
配置
flink tarパッケージを解凍したら、次のコマンドを実行します.
./bin/kubernetes-session.sh

起動に成功したらログ出力印刷のURLに従ってflinkのページにアクセスしたり、flink起動のClsuterIP/LoadBalancer serviceのCLUSTER-IP:8081に従ってアクセスしたりすることができます.
デフォルトflink native k 8 sで起動するjob master podログはpod/opt/flink/logディレクトリに印刷され、kubectl logでログを表示したい場合はconf/log 4 j.properties関連構成を追加するには、次の手順に従います.
log4j.rootLogger=INFO, file, console

# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{
     yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

その後、起動時に-Dkubernetes.container-start-command-templateのパラメータを追加すると、kubectl logsでログを表示できます.
./bin/kubernetes-session.sh -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"

その後、関連jobを提出することができます.
./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=<ClusterId> examples/streaming/WindowJoin.jar

jobをコミットし続け、cancelでjobを削除することで、native k 8 sの導入方式は必要に応じてpodを動的に追加/削除することができ、taskmanagerの数を事前に設定するよりも便利で、リソースをより効果的に利用することができることが分かった.
採坑
テスト中、jobmanager.heap.size: 600mが構成されていたが、podが正常に起動していないことが判明し、describe podとpodログを表示するとjvmの起動メモリが0に設定されていることが判明した.
Start command : /bin/bash -c $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xms0m -Xmx0m -Dlog.file=/opt/flink/log/jobmanager.log -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml -Dlog4j.cint.KubernetesSessionClusterEntrypoint
Invalid maximum heap size: -Xmx0m
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

関連するソースコードを確認したところ、Yarn、Mesos、K 8 sモードで起動したcontainerには、Flinkの関連プロセスに加えて他のjvmメモリが予約されていることが判明し、デフォルトでは最小600 mが予約されているため、上記構成のjobmanager.heap.size: 600mでは、以下の計算ロジックを経て残りのjobmanagerのjvmの値が0になるため、起動に失敗した.
	/**
	 * Calculate heap size after cut-off. The heap size after cut-off will be used to set -Xms and -Xmx for jobmanager
	 * start command.
	 * 
	 *    :org.apache.flink.runtime.clusterframework.BootstrapTools#calculateHeapSize
	 */
	public static int calculateHeapSize(int memory, Configuration conf) {
     

		final float memoryCutoffRatio = conf.getFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);
		final int minCutoff = conf.getInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);

		if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) {
     
			throw new IllegalArgumentException("The configuration value '"
				+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key()
				+ "' must be between 0 and 1. Value given=" + memoryCutoffRatio);
		}
		if (minCutoff > memory) {
     
			throw new IllegalArgumentException("The configuration value '"
				+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key()
				+ "' is higher (" + minCutoff + ") than the requested amount of memory " + memory);
		}

		int heapLimit = (int) ((float) memory * memoryCutoffRatio);
		if (heapLimit < minCutoff) {
     
			heapLimit = minCutoff;
		}
		return memory - heapLimit;
	}

上記の計算論理コードには、2つの構成パラメータが含まれています.
  • containerized.heap-cutoff-ratio:安全のため、Job Masterコンテナで削減するメモリ
  • containerized.heap-cutoff-min:Job Masterコンテナ(YARN/Mesos/Kubernetes)から他のJVMメモリで使用されるヒープ領域の割合を削除します.

  • プロセス参照:公式サイト-Flink native k 8 s配置モード公式サイト-Flink関連パラメータ構成《Flink 1.10 Native Kubernetes原理と実践》