1.flinkソース分析:flinkクラスタ起動エントリ分析
19581 ワード
Flinkクラスタ起動エントリ分析
最初に表示
start-cluster.shファイル:
最終的にjobmanagerが呼び出されたことがわかりますshこのスクリプトはstart[オプションパラメータ1][オプションパラメータ2]に渡され,jobmanager.を解析する.shというスクリプトは、ここでは、2つのオプションパラメータがhostとportであることがわかります.
上のスクリプトは最終的にflink-daemonを呼び出しました.sh、このスクリプトでは、異なるパラメータに基づいてflinkプロジェクトの異なる起動クラスを選択します.
standaloneクラスタを起動するとorgが起動しますapache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypointというjavaクラスは、ここでshell起動エントリが見つかります.
最初に表示
start-cluster.shファイル:
bin=`dirname "$0"
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
# Start the JobManager instance(s)
#
shopt -s nocasematch
if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then
# HA Mode
readMasters
# ha , master , jobmanager.sh
echo "Starting HA cluster with ${#MASTERS[@]} masters."
for ((i=0;i<${#MASTERS[@]};++i)); do
master=${MASTERS[i]}
webuiport=${WEBUIPORTS[i]}
if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then
"${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}"
else
ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &"
fi
done
else
echo "Starting cluster."
# Start single JobManager on this machine
"$FLINK_BIN_DIR"/jobmanager.sh start
fi
shopt -u nocasematch
# Start TaskManager instance(s)
TMSlaves start
最終的にjobmanagerが呼び出されたことがわかりますshこのスクリプトはstart[オプションパラメータ1][オプションパラメータ2]に渡され,jobmanager.を解析する.shというスクリプトは、ここでは、2つのオプションパラメータがhostとportであることがわかります.
USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"
STARTSTOP=$1
HOST=$2 # optional when starting multiple instances
WEBUIPORT=$3 # optional when starting multiple instances
# , start,start-foregroud,stop,stop-all
if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
echo $USAGE
exit 1
fi
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
ENTRYPOINT=standalonesession
# , jvm
if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then
echo "used deprecated key \`${
KEY_JOBM_MEM_MB}\`, please replace with key \`${
KEY_JOBM_MEM_SIZE}\`"
else
flink_jm_heap_bytes=$(parseBytes ${
FLINK_JM_HEAP})
FLINK_JM_HEAP_MB=$(getMebiBytes ${
flink_jm_heap_bytes})
fi
if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then
echo "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
exit 1
fi
if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then
export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m"
fi
# Add JobManager-specific JVM options
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
# Startup parameters
args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")
if [ ! -z $HOST ]; then
args+=("--host")
args+=("${HOST}")
fi
if [ ! -z $WEBUIPORT ]; then
args+=("--webui-port")
args+=("${WEBUIPORT}")
fi
fi
# , flink-console.sh
if [[ $STARTSTOP == "start-foreground" ]]; then
exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"
else
# , flink-daemon.sh , STARTSTOP start,
"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
fi
上のスクリプトは最終的にflink-daemonを呼び出しました.sh、このスクリプトでは、異なるパラメータに基づいてflinkプロジェクトの異なる起動クラスを選択します.
...
case $DAEMON in
(taskexecutor)
CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
;;
(zookeeper)
CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
;;
(historyserver)
CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
;;
# standalone
(standalonesession)
CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
;;
(standalonejob)
CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint
;;
(*)
echo "Unknown daemon '${DAEMON}'. $USAGE."
exit 1
;;
esac
...
...
...
echo "Starting $DAEMON daemon on host $HOSTNAME."
$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null &
mypid=$!
...
standaloneクラスタを起動するとorgが起動しますapache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypointというjavaクラスは、ここでshell起動エントリが見つかります.