Flink-1.10.0 Native Kubernetesの導入方法の実践
最新リリースのFlinkではnative Kubernetesの導入がサポートされており、必要に応じてpodを動的に増やしたり減らしたりすることができ、テストに値する.現在はbeta段階にあり、Flink-Sessionの導入方式をサポートしている
前提条件
1、1.9バージョン以上のK 8 sクラスタで、DNS 2、pod、serviceを操作できるKubeConfig 3、RBAC権限のあるservice Accountが有効になり、podを作成、削除する
配置
flink tarパッケージを解凍したら、次のコマンドを実行します.
起動に成功したらログ出力印刷のURLに従ってflinkのページにアクセスしたり、flink起動のClsuterIP/LoadBalancer serviceのCLUSTER-IP:8081に従ってアクセスしたりすることができます.
デフォルトflink native k 8 sで起動するjob master podログはpod/opt/flink/logディレクトリに印刷され、kubectl logでログを表示したい場合はconf/log 4 j.properties関連構成を追加するには、次の手順に従います.
その後、起動時に
その後、関連jobを提出することができます.
jobをコミットし続け、cancelでjobを削除することで、native k 8 sの導入方式は必要に応じてpodを動的に追加/削除することができ、taskmanagerの数を事前に設定するよりも便利で、リソースをより効果的に利用することができることが分かった.
採坑
テスト中、
関連するソースコードを確認したところ、Yarn、Mesos、K 8 sモードで起動したcontainerには、Flinkの関連プロセスに加えて他のjvmメモリが予約されていることが判明し、デフォルトでは最小600 mが予約されているため、上記構成の
上記の計算論理コードには、2つの構成パラメータが含まれています.
プロセス参照:公式サイト-Flink native k 8 s配置モード公式サイト-Flink関連パラメータ構成《Flink 1.10 Native Kubernetes原理と実践》
前提条件
1、1.9バージョン以上のK 8 sクラスタで、DNS 2、pod、serviceを操作できるKubeConfig 3、RBAC権限のあるservice Accountが有効になり、podを作成、削除する
配置
flink tarパッケージを解凍したら、次のコマンドを実行します.
./bin/kubernetes-session.sh
起動に成功したらログ出力印刷のURLに従ってflinkのページにアクセスしたり、flink起動のClsuterIP/LoadBalancer serviceのCLUSTER-IP:8081に従ってアクセスしたりすることができます.
デフォルトflink native k 8 sで起動するjob master podログはpod/opt/flink/logディレクトリに印刷され、kubectl logでログを表示したい場合はconf/log 4 j.properties関連構成を追加するには、次の手順に従います.
log4j.rootLogger=INFO, file, console
# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{
yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
その後、起動時に
-Dkubernetes.container-start-command-template
のパラメータを追加すると、kubectl logsでログを表示できます../bin/kubernetes-session.sh -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
その後、関連jobを提出することができます.
./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=<ClusterId> examples/streaming/WindowJoin.jar
jobをコミットし続け、cancelでjobを削除することで、native k 8 sの導入方式は必要に応じてpodを動的に追加/削除することができ、taskmanagerの数を事前に設定するよりも便利で、リソースをより効果的に利用することができることが分かった.
採坑
テスト中、
jobmanager.heap.size: 600m
が構成されていたが、podが正常に起動していないことが判明し、describe podとpodログを表示するとjvmの起動メモリが0に設定されていることが判明した.Start command : /bin/bash -c $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xms0m -Xmx0m -Dlog.file=/opt/flink/log/jobmanager.log -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml -Dlog4j.cint.KubernetesSessionClusterEntrypoint
Invalid maximum heap size: -Xmx0m
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
関連するソースコードを確認したところ、Yarn、Mesos、K 8 sモードで起動したcontainerには、Flinkの関連プロセスに加えて他のjvmメモリが予約されていることが判明し、デフォルトでは最小600 mが予約されているため、上記構成の
jobmanager.heap.size: 600m
では、以下の計算ロジックを経て残りのjobmanagerのjvmの値が0になるため、起動に失敗した. /**
* Calculate heap size after cut-off. The heap size after cut-off will be used to set -Xms and -Xmx for jobmanager
* start command.
*
* :org.apache.flink.runtime.clusterframework.BootstrapTools#calculateHeapSize
*/
public static int calculateHeapSize(int memory, Configuration conf) {
final float memoryCutoffRatio = conf.getFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);
final int minCutoff = conf.getInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);
if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) {
throw new IllegalArgumentException("The configuration value '"
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key()
+ "' must be between 0 and 1. Value given=" + memoryCutoffRatio);
}
if (minCutoff > memory) {
throw new IllegalArgumentException("The configuration value '"
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key()
+ "' is higher (" + minCutoff + ") than the requested amount of memory " + memory);
}
int heapLimit = (int) ((float) memory * memoryCutoffRatio);
if (heapLimit < minCutoff) {
heapLimit = minCutoff;
}
return memory - heapLimit;
}
上記の計算論理コードには、2つの構成パラメータが含まれています.
containerized.heap-cutoff-ratio
:安全のため、Job Masterコンテナで削減するメモリcontainerized.heap-cutoff-min
:Job Masterコンテナ(YARN/Mesos/Kubernetes)から他のJVMメモリで使用されるヒープ領域の割合を削除します.プロセス参照:公式サイト-Flink native k 8 s配置モード公式サイト-Flink関連パラメータ構成《Flink 1.10 Native Kubernetes原理と実践》