Flink Java ExampleのAsyncIOExampleの詳細
47721 ワード
私たちはFlinkに関する概念を勉強した後、Flinkのプログラミングとプログラムについて何も知らないことを発見しました.この場合,公式のコードexampleの学習と研究が必要であり,本稿では公式サイトgithubのAsyncIOExampleの例について詳細なコード注釈を行う.(ps:実はみんな読めるはずですよ)
Flink Exampleバージョン:1.8
FlinkすべてのjavaのexampleコードのGithubアドレス:github
AsyncIOの原理:Flink原理と実現:Aysnc I/O
AsyncIOExampleの例は簡単です.Flinkコードの組み込みFlinkミニクラスタでジョブをシミュレートします.
プロシージャ:データ・ソースから送信されたデータ・ストリーム要素をAsyncFunction(非同期関数)処理に渡し、各エレメント・シミュレーションに対して外部リクエストを送信します.AsyncFunction処理中は、送信リクエスト処理に失敗して異常が発生した場合をシミュレートし、checkpointメカニズムでjobのリカバリを行います.フロー・タスクを処理するプロセス全体は、ログとコードを組み合わせてよく理解できます.
mainから見ると、はっきりしているので、コードの順序を調整して、mainを一番前に置きました.
mainメソッドを実行すると、完全なログは以下の通りです(psは実はログを見るのが面白いですが、flinkの多くのプロセスを見ることができます~):
Flink Exampleバージョン:1.8
FlinkすべてのjavaのexampleコードのGithubアドレス:github
AsyncIOの原理:Flink原理と実現:Aysnc I/O
AsyncIOExampleの例は簡単です.Flinkコードの組み込みFlinkミニクラスタでジョブをシミュレートします.
プロシージャ:データ・ソースから送信されたデータ・ストリーム要素をAsyncFunction(非同期関数)処理に渡し、各エレメント・シミュレーションに対して外部リクエストを送信します.AsyncFunction処理中は、送信リクエスト処理に失敗して異常が発生した場合をシミュレートし、checkpointメカニズムでjobのリカバリを行います.フロー・タスクを処理するプロセス全体は、ログとコードを組み合わせてよく理解できます.
mainから見ると、はっきりしているので、コードの順序を調整して、mainを一番前に置きました.
package org.myorg.quickstart.example;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.ExecutorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* AsyncFunction: I/O
* Example to illustrates how to use {@link AsyncFunction}.
*/
public class AsyncIOExample {
private static final Logger LOG = LoggerFactory.getLogger(AsyncIOExample.class);
/**
* , equals
*/
private static final String EXACTLY_ONCE_MODE = "exactly_once";
private static final String EVENT_TIME = "EventTime";
private static final String INGESTION_TIME = "IngestionTime";
private static final String ORDERED = "ordered";
public static void main(String[] args) throws Exception {
//1.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//
final ParameterTool params = ParameterTool.fromArgs(args);
//
final String statePath;
//checkPoint
final String cpMode;
//
final int maxCount;
//
final long sleepFactor;
//
final float failRatio;
//
final String mode;
//task
final int taskNum;
//
final String timeType;
//
final long shutdownWaitTS;
//
final long timeout;
try {
/**
* 2. job
* params.get: , 。
*/
statePath = params.get("fsStatePath", null);
cpMode = params.get("checkpointMode", "exactly_once");
maxCount = params.getInt("maxCount", 1000);
sleepFactor = params.getLong("sleepFactor", 100);
failRatio = params.getFloat("failRatio", 0.001f);
mode = params.get("waitMode", "ordered");
taskNum = params.getInt("waitOperatorParallelism", 1);
timeType = params.get("eventType", "EventTime");
shutdownWaitTS = params.getLong("shutdownWaitTS", 20000);
timeout = params.getLong("timeout", 10000L);
} catch (Exception e) {
printUsage();
throw e;
}
// config
StringBuilder configStringBuilder = new StringBuilder();
//
: linux windows
final String lineSeparator = System.getProperty("line.separator");
//
configStringBuilder
.append("Job configuration").append(lineSeparator)
.append("FS state path=").append(statePath).append(lineSeparator)
.append("Checkpoint mode=").append(cpMode).append(lineSeparator)
.append("Max count of input from source=").append(maxCount).append(lineSeparator)
.append("Sleep factor=").append(sleepFactor).append(lineSeparator)
.append("Fail ratio=").append(failRatio).append(lineSeparator)
.append("Waiting mode=").append(mode).append(lineSeparator)
.append("Parallelism for async wait operator=").append(taskNum).append(lineSeparator)
.append("Event type=").append(timeType).append(lineSeparator)
.append("Shutdown wait timestamp=").append(shutdownWaitTS);
//
LOG.info(configStringBuilder.toString());
// null,
if (statePath != null) {
// setup state and checkpoint mode
env.setStateBackend(new FsStateBackend(statePath));
}
/**
* 3. cpMode checkpoint
* param1:checkpoint
*/
if (EXACTLY_ONCE_MODE.equals(cpMode)) {
env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);
}
else {
env.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
}
//4. , eventTime: watermark
if (EVENT_TIME.equals(timeType)) {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
}
// ingestionTime: flink
else if (INGESTION_TIME.equals(timeType)) {
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
}
//5. :
DataStream inputStream = env.addSource(new SimpleSource(maxCount));
//6. async , “ ” i/o
AsyncFunction function =
new SampleAsyncFunction(sleepFactor, failRatio, shutdownWaitTS);
//7.
DataStream result;
if (ORDERED.equals(mode)) {
/**
* AsyncDataStream DataStream
* :1. 2. 3. 4. 5. i/o
* setParallelism : 。
*/
result = AsyncDataStream.orderedWait(
inputStream,
function,
timeout,
TimeUnit.MILLISECONDS,
20).setParallelism(taskNum);
}
else {
result = AsyncDataStream.unorderedWait(
inputStream,
function,
timeout,
TimeUnit.MILLISECONDS,
20).setParallelism(taskNum);
}
//result.windowAll(assigner)
//8.
env.execute("Async IO Example");
result.print();
}
/**
* , flink SourceFunction
* flink SourceContext run
* : -> 0
*/
private static class SimpleSource implements SourceFunction, ListCheckpointed {
private static final long serialVersionUID = 1L;
// , :true
private volatile boolean isRunning = true;
//
private int counter = 0;
//
private int start = 0;
/**
* : , , start
*/
@Override
public List snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(start);
}
/**
* ,Flink , checkpoint
*/
@Override
public void restoreState(List state) throws Exception {
// checkpoint start
for (Integer i : state) {
this.start = i;
}
}
/**
* :
* @param maxNum
*/
public SimpleSource(int maxNum) {
this.counter = maxNum;
}
@Override
public void run(SourceContext ctx) throws Exception {
/**
* :( For the real use case in production environment, the thread pool may stay in the
* async client.
*/
private static class SampleAsyncFunction extends RichAsyncFunction {
private static final long serialVersionUID = 2098635244857937717L;
//
private transient ExecutorService executorService;
/**
* : , sleepFactor
*/
private final long sleepFactor;
/**
* : ~ :Exception : wahahaha...
*/
private final float failRatio;
private final long shutdownWaitTS;
//
SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) {
this.sleepFactor = sleepFactor;
this.failRatio = failRatio;
this.shutdownWaitTS = shutdownWaitTS;
}
/**
* : ,
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//
executorService = Executors.newFixedThreadPool(30);
}
/**
* ,
*/
@Override
public void close() throws Exception {
super.close();
//
ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService);
}
/**
* IO
* source
*/
@Override
public void asyncInvoke(final Integer input, final ResultFuture resultFuture) {
executorService.submit(() -> {
// : , sleep
long sleep = (long) (ThreadLocalRandom.current().nextFloat() * sleepFactor);
try {
Thread.sleep(sleep);
// : , , , flink checkpoint
if (ThreadLocalRandom.current().nextFloat() < failRatio) {
resultFuture.completeExceptionally(new Exception("wahahahaha..."));
} else {
// , Flink
resultFuture.complete(
Collections.singletonList("key-" + (input % 10)));
}
} catch (InterruptedException e) {
resultFuture.complete(new ArrayList<>(0));
}
});
}
}
/**
* flink
*/
private static void printUsage() {
System.out.println("To customize example, use: AsyncIOExample [--fsStatePath ] " +
"[--checkpointMode ] " +
"[--maxCount ] " +
"[--sleepFactor ] [--failRatio ] " +
"[--waitMode ] [--waitOperatorParallelism ] " +
"[--eventType ] [--shutdownWaitTS ]" +
"[--timeout ]");
}
}
mainメソッドを実行すると、完全なログは以下の通りです(psは実はログを見るのが面白いですが、flinkの多くのプロセスを見ることができます~):
15:34:25,957 INFO org.myorg.quickstart.example.AsyncIOExample - Job configuration
FS state path=null
Checkpoint mode=exactly_once
Max count of input from source=1000
Sleep factor=100
Fail ratio=0.001
Waiting mode=ordered
Parallelism for async wait operator=1
Event type=EventTime
Shutdown wait timestamp=20000
15:34:26,130 INFO org.apache.flink.streaming.api.environment.LocalStreamEnvironment - Running job on local embedded Flink mini cluster
15:34:26,276 INFO org.apache.flink.runtime.minicluster.MiniCluster - Starting Flink Mini Cluster
15:34:26,279 INFO org.apache.flink.runtime.minicluster.MiniCluster - Starting Metrics Registry
15:34:26,327 INFO org.apache.flink.runtime.metrics.MetricRegistryImpl - No metrics reporter configured, no metrics will be exposed/reported.
15:34:26,327 INFO org.apache.flink.runtime.minicluster.MiniCluster - Starting RPC Service(s)
15:34:26,584 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
15:34:26,603 INFO org.apache.flink.runtime.minicluster.MiniCluster - Trying to start actor system at :0
15:34:26,636 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
15:34:26,675 INFO akka.remote.Remoting - Starting remoting
15:34:26,929 INFO akka.remote.Remoting - Remoting started; listening on addresses :[akka.tcp://[email protected]:61481]
15:34:26,934 INFO org.apache.flink.runtime.minicluster.MiniCluster - Actor system started at akka.tcp://[email protected]:61481
15:34:26,936 INFO org.apache.flink.runtime.minicluster.MiniCluster - Starting high-availability services
15:34:26,948 INFO org.apache.flink.runtime.blob.BlobServer - Created BLOB server storage directory C:\Users\BONC\AppData\Local\Temp\blobStore-094c3bce-9270-4e01-8c4b-38768a5893c1
15:34:26,954 INFO org.apache.flink.runtime.blob.BlobServer - Started BLOB server at 0.0.0.0:61482 - max concurrent requests: 50 - max backlog: 1000
15:34:26,958 INFO org.apache.flink.runtime.blob.PermanentBlobCache - Created BLOB cache storage directory C:\Users\BONC\AppData\Local\Temp\blobStore-17590f89-39e2-4415-abfb-537ea3ed075e
15:34:26,961 INFO org.apache.flink.runtime.blob.TransientBlobCache - Created BLOB cache storage directory C:\Users\BONC\AppData\Local\Temp\blobStore-089378be-f96d-468a-a962-9546304b6eb2
15:34:26,961 INFO org.apache.flink.runtime.minicluster.MiniCluster - Starting 1 TaskManger(s)
15:34:26,963 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Starting TaskManager with ResourceID: 15962233-f7e2-44c1-a442-cacdc19f537b
15:34:27,009 INFO org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary file directory 'C:\Users\BONC\AppData\Local\Temp': total 100 GB, usable 5 GB (5.00% usable)
15:34:27,160 INFO org.apache.flink.runtime.io.network.buffer.NetworkBufferPool - Allocated 401 MB for network buffer pool (number of memory segments: 12856, bytes per segment: 32768).
15:34:27,164 INFO org.apache.flink.runtime.io.network.NetworkEnvironment - Starting the network environment and its components.
15:34:27,166 WARN org.apache.flink.runtime.taskmanager.TaskManagerLocation - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.
15:34:27,166 INFO org.apache.flink.runtime.taskexecutor.TaskManagerServices - Limiting managed memory to 0.7 of the currently free heap space (2527 MB), memory will be allocated lazily.
15:34:27,170 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager uses directory C:\Users\BONC\AppData\Local\Temp\flink-io-55753bcf-1076-4999-8ed1-57558521bf42 for spill files.
15:34:27,227 INFO org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration - Messages have a max timeout of 10000 ms
15:34:27,237 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/taskmanager_0 .
15:34:27,250 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Start job leader service.
15:34:27,251 INFO org.apache.flink.runtime.filecache.FileCache - User file cache uses directory C:\Users\BONC\AppData\Local\Temp\flink-dist-cache-740d690b-ccc2-4c10-b38a-9e6256fc6f08
15:34:27,289 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Starting rest endpoint.
15:34:27,476 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - Log file environment variable 'log.file' is not set.
15:34:27,477 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (fallback keys: [{key=jobmanager.web.log.path, isDeprecated=true}])'.
15:34:27,485 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Failed to load web based job submission extension. Probable reason: flink-runtime-web is not in the classpath.
15:34:27,803 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Rest endpoint listening at localhost:61501
15:34:27,805 INFO org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService - Proposing leadership to contender org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint@31be6b49 @ http://localhost:61501
15:34:27,817 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
15:34:27,832 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - http://localhost:61501 was granted leadership with leaderSessionID=cd92eef3-c2f3-4fc5-9234-309493602b84
15:34:27,832 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
15:34:27,833 INFO org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService - Received confirmation of leadership for leader http://localhost:61501 , session=cd92eef3-c2f3-4fc5-9234-309493602b84
15:34:27,843 INFO org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService - Proposing leadership to contender org.apache.flink.runtime.dispatcher.StandaloneDispatcher@214a971 @ akka://flink/user/dispatcher
15:34:27,844 INFO org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService - Proposing leadership to contender org.apache.flink.runtime.resourcemanager.StandaloneResourceManager@6544adb @ akka://flink/user/resourcemanager
15:34:27,845 INFO org.apache.flink.runtime.minicluster.MiniCluster - Flink Mini Cluster started successfully
15:34:27,847 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Dispatcher akka://flink/user/dispatcher was granted leadership with fencing token b51436a0-a06d-456f-bd26-693e6ac9b5f9
15:34:27,851 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Recovering all persisted jobs.
15:34:27,852 INFO org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService - Received confirmation of leadership for leader akka://flink/user/dispatcher , session=b51436a0-a06d-456f-bd26-693e6ac9b5f9
15:34:27,870 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - ResourceManager akka://flink/user/resourcemanager was granted leadership with fencing token 866212de9f97abcf1f26c94a26294b48
15:34:27,870 INFO org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Starting the SlotManager.
15:34:27,872 INFO org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService - Received confirmation of leadership for leader akka://flink/user/resourcemanager , session=1f26c94a-2629-4b48-8662-12de9f97abcf
15:34:27,874 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Connecting to ResourceManager akka://flink/user/resourcemanager(866212de9f97abcf1f26c94a26294b48).
15:34:27,879 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Resolved ResourceManager address, beginning registration
15:34:27,879 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Registration at ResourceManager attempt 1 (timeout=100ms)
15:34:27,883 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received JobGraph submission 53d61cb3e5941169150beec14320d110 (Async IO Example).
15:34:27,883 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting job 53d61cb3e5941169150beec14320d110 (Async IO Example).
15:34:27,885 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registering TaskManager with ResourceID 15962233-f7e2-44c1-a442-cacdc19f537b (akka://flink/user/taskmanager_0) at ResourceManager
15:34:27,886 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Successful registration at resource manager akka://flink/user/resourcemanager under registration id 3913ce156df2c5ced9df21c9edfa7931.
15:34:27,923 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_1 .
15:34:27,931 INFO org.apache.flink.runtime.jobmaster.JobMaster - Initializing job Async IO Example (53d61cb3e5941169150beec14320d110).
15:34:27,936 INFO org.apache.flink.runtime.jobmaster.JobMaster - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=0) for Async IO Example (53d61cb3e5941169150beec14320d110).
15:34:27,957 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job recovers via failover strategy: full graph restart
15:34:27,984 INFO org.apache.flink.runtime.jobmaster.JobMaster - Running initialization on master for job Async IO Example (53d61cb3e5941169150beec14320d110).
15:34:27,984 INFO org.apache.flink.runtime.jobmaster.JobMaster - Successfully ran initialization on master in 0 ms.
15:34:28,001 INFO org.apache.flink.runtime.jobmaster.JobMaster - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
15:34:28,010 INFO org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService - Proposing leadership to contender org.apache.flink.runtime.jobmaster.JobManagerRunner@288896b8 @ akka://flink/user/jobmanager_1
15:34:28,012 INFO org.apache.flink.runtime.jobmaster.JobManagerRunner - JobManager runner for job Async IO Example (53d61cb3e5941169150beec14320d110) was granted leadership with session id 2a4b2033-ba80-4056-9b04-b4bcd97848f2 at akka://flink/user/jobmanager_1.
15:34:28,014 INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting execution of job Async IO Example (53d61cb3e5941169150beec14320d110) under job master id 9b04b4bcd97848f22a4b2033ba804056.
15:34:28,015 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Async IO Example (53d61cb3e5941169150beec14320d110) switched from state CREATED to RUNNING.
15:34:28,025 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from CREATED to SCHEDULED.
15:34:28,036 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{163e56cc239963ee2ea561e91c9e273b}]
15:34:28,043 INFO org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService - Received confirmation of leadership for leader akka://flink/user/jobmanager_1 , session=2a4b2033-ba80-4056-9b04-b4bcd97848f2
15:34:28,044 INFO org.apache.flink.runtime.jobmaster.JobMaster - Connecting to ResourceManager akka://flink/user/resourcemanager(866212de9f97abcf1f26c94a26294b48)
15:34:28,045 INFO org.apache.flink.runtime.jobmaster.JobMaster - Resolved ResourceManager address, beginning registration
15:34:28,045 INFO org.apache.flink.runtime.jobmaster.JobMaster - Registration at ResourceManager attempt 1 (timeout=100ms)
15:34:28,046 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registering job manager 9b04b4bcd97848f22a4b2033ba804056@akka://flink/user/jobmanager_1 for job 53d61cb3e5941169150beec14320d110.
15:34:28,050 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registered job manager 9b04b4bcd97848f22a4b2033ba804056@akka://flink/user/jobmanager_1 for job 53d61cb3e5941169150beec14320d110.
15:34:28,052 INFO org.apache.flink.runtime.jobmaster.JobMaster - JobManager successfully registered at ResourceManager, leader id: 866212de9f97abcf1f26c94a26294b48.
15:34:28,053 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Requesting new slot [SlotRequestId{163e56cc239963ee2ea561e91c9e273b}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
15:34:28,054 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 53d61cb3e5941169150beec14320d110 with allocation id 4e3bf224392a78dad3e11bc84402898f.
15:34:28,056 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Receive slot request 4e3bf224392a78dad3e11bc84402898f for job 53d61cb3e5941169150beec14320d110 from resource manager with leader id 866212de9f97abcf1f26c94a26294b48.
15:34:28,056 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Allocated slot for 4e3bf224392a78dad3e11bc84402898f.
15:34:28,056 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Add job 53d61cb3e5941169150beec14320d110 for job leader monitoring.
15:34:28,057 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint triggering task Source: Custom Source -> async wait operator (1/1) of job 53d61cb3e5941169150beec14320d110 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
15:34:28,058 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Try to register at job manager akka://flink/user/jobmanager_1 with leader id 2a4b2033-ba80-4056-9b04-b4bcd97848f2.
15:34:28,058 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Resolved JobManager address, beginning registration
15:34:28,058 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Registration at JobManager attempt 1 (timeout=100ms)
15:34:28,060 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Successful registration at job manager akka://flink/user/jobmanager_1 for job 53d61cb3e5941169150beec14320d110.
15:34:28,061 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Establish JobManager connection for job 53d61cb3e5941169150beec14320d110.
15:34:28,064 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Offer reserved slots to the leader of job 53d61cb3e5941169150beec14320d110.
15:34:28,068 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from SCHEDULED to DEPLOYING.
15:34:28,068 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source -> async wait operator (1/1) (attempt #0) to 15962233-f7e2-44c1-a442-cacdc19f537b @ 127.0.0.1 (dataPort=-1)
15:34:28,085 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task Source: Custom Source -> async wait operator (1/1).
15:34:28,085 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from CREATED to DEPLOYING.
15:34:28,085 INFO org.apache.flink.runtime.taskmanager.Task - Creating FileSystem stream leak safety net for task Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) [DEPLOYING]
15:34:28,085 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable - Activate slot 4e3bf224392a78dad3e11bc84402898f.
15:34:28,088 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) [DEPLOYING].
15:34:28,089 INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) [DEPLOYING].
15:34:28,096 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from DEPLOYING to RUNNING.
15:34:28,096 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from DEPLOYING to RUNNING.
15:34:28,098 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
15:34:29,061 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 @ 1563003269057 for job 53d61cb3e5941169150beec14320d110.
15:34:29,086 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1 for job 53d61cb3e5941169150beec14320d110 (626 bytes in 28 ms).
15:34:29,967 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b).
15:34:29,968 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from RUNNING to FAILED.
java.lang.Exception: An async function call terminated with an exception. Failing the AsyncWaitOperator.
at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)
at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: wahahahaha...
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68)
at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129)
... 2 more
Caused by: java.lang.Exception: wahahahaha...
at org.myorg.quickstart.example.AsyncIOExample$SampleAsyncFunction.lambda$0(AsyncIOExample.java:325)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
15:34:29,971 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b).
15:34:30,014 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b).
15:34:30,031 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) [FAILED]
15:34:30,037 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> async wait operator 40afe509423f45913394ef061f465b1b.
15:34:30,048 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from RUNNING to FAILED.
java.lang.Exception: An async function call terminated with an exception. Failing the AsyncWaitOperator.
at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)
at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: wahahahaha...
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68)
at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129)
... 2 more
Caused by: java.lang.Exception: wahahahaha...
at org.myorg.quickstart.example.AsyncIOExample$SampleAsyncFunction.lambda$0(AsyncIOExample.java:325)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
15:34:30,048 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Async IO Example (53d61cb3e5941169150beec14320d110) switched from state RUNNING to FAILING.
java.lang.Exception: An async function call terminated with an exception. Failing the AsyncWaitOperator.
at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)
at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: wahahahaha...
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68)
at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129)
... 2 more
Caused by: java.lang.Exception: wahahahaha...
at org.myorg.quickstart.example.AsyncIOExample$SampleAsyncFunction.lambda$0(AsyncIOExample.java:325)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
15:34:30,051 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Discarding the results produced by task execution 40afe509423f45913394ef061f465b1b.
15:34:30,054 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Try to restart or fail the job Async IO Example (53d61cb3e5941169150beec14320d110) if no longer possible.
15:34:30,054 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Async IO Example (53d61cb3e5941169150beec14320d110) switched from state FAILING to RESTARTING.
15:34:30,055 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Restarting the job Async IO Example (53d61cb3e5941169150beec14320d110).
15:34:30,061 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Async IO Example (53d61cb3e5941169150beec14320d110) switched from state RESTARTING to CREATED.
15:34:30,061 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job 53d61cb3e5941169150beec14320d110 from latest valid checkpoint: Checkpoint 1 @ 1563003269057 for 53d61cb3e5941169150beec14320d110.
15:34:30,068 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No master state to restore
15:34:30,069 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Async IO Example (53d61cb3e5941169150beec14320d110) switched from state CREATED to RUNNING.
15:34:30,069 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from CREATED to SCHEDULED.
15:34:30,071 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from SCHEDULED to DEPLOYING.
15:34:30,071 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source -> async wait operator (1/1) (attempt #1) to 15962233-f7e2-44c1-a442-cacdc19f537b @ 127.0.0.1 (dataPort=-1)
15:34:30,074 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task Source: Custom Source -> async wait operator (1/1).
15:34:30,074 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from CREATED to DEPLOYING.
15:34:30,075 INFO org.apache.flink.runtime.taskmanager.Task - Creating FileSystem stream leak safety net for task Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) [DEPLOYING]
15:34:30,075 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) [DEPLOYING].
15:34:30,076 INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) [DEPLOYING].
15:34:30,079 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from DEPLOYING to RUNNING.
15:34:30,079 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
15:34:30,079 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from DEPLOYING to RUNNING.
15:34:31,003 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 @ 1563003271003 for job 53d61cb3e5941169150beec14320d110.
15:34:31,009 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 for job 53d61cb3e5941169150beec14320d110 (636 bytes in 4 ms).
15:34:32,003 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 3 @ 1563003272003 for job 53d61cb3e5941169150beec14320d110.
15:34:32,004 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 3 for job 53d61cb3e5941169150beec14320d110 (616 bytes in 1 ms).
15:34:33,003 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 4 @ 1563003273003 for job 53d61cb3e5941169150beec14320d110.
15:34:33,004 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 4 for job 53d61cb3e5941169150beec14320d110 (626 bytes in 1 ms).
15:34:34,003 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 5 @ 1563003274003 for job 53d61cb3e5941169150beec14320d110.
15:34:34,005 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 5 for job 53d61cb3e5941169150beec14320d110 (646 bytes in 2 ms).
15:34:35,003 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 6 @ 1563003275003 for job 53d61cb3e5941169150beec14320d110.
15:34:35,004 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 6 for job 53d61cb3e5941169150beec14320d110 (626 bytes in 1 ms).
15:34:36,003 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 7 @ 1563003276003 for job 53d61cb3e5941169150beec14320d110.
15:34:36,005 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 7 for job 53d61cb3e5941169150beec14320d110 (636 bytes in 2 ms).
15:34:37,003 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 8 @ 1563003277003 for job 53d61cb3e5941169150beec14320d110.
15:34:37,006 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 8 for job 53d61cb3e5941169150beec14320d110 (636 bytes in 2 ms).
15:34:38,003 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 9 @ 1563003278003 for job 53d61cb3e5941169150beec14320d110.
15:34:38,005 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 9 for job 53d61cb3e5941169150beec14320d110 (631 bytes in 2 ms).
15:34:39,003 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 10 @ 1563003279003 for job 53d61cb3e5941169150beec14320d110.
15:34:39,005 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 10 for job 53d61cb3e5941169150beec14320d110 (641 bytes in 2 ms).
15:34:39,346 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from RUNNING to FINISHED.
15:34:39,346 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703).
15:34:39,347 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) [FINISHED]
15:34:39,347 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> async wait operator e16dfc0cf2a7703cace518ff6cf85703.
15:34:39,349 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from RUNNING to FINISHED.
15:34:39,349 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Async IO Example (53d61cb3e5941169150beec14320d110) switched from state RUNNING to FINISHED.
15:34:39,349 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job 53d61cb3e5941169150beec14320d110.
15:34:39,349 INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Shutting down
15:34:39,357 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job 53d61cb3e5941169150beec14320d110 reached globally terminal state FINISHED.
15:34:39,357 INFO org.apache.flink.runtime.minicluster.MiniCluster - Shutting down Flink Mini Cluster
15:34:39,358 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Shutting down rest endpoint.
15:34:39,358 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopping TaskExecutor akka://flink/user/taskmanager_0.
15:34:39,359 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Stop job leader service.
15:34:39,359 INFO org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job Async IO Example(53d61cb3e5941169150beec14320d110).
15:34:39,359 INFO org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager.
15:34:39,360 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Suspending SlotPool.
15:34:39,361 INFO org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection 5df31933d01f3737bf7ac1e33e11defc: JobManager is shutting down..
15:34:39,361 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Stopping SlotPool.
15:34:39,363 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Disconnect job manager 9b04b4bcd97848f22a4b2033ba804056@akka://flink/user/jobmanager_1 for job 53d61cb3e5941169150beec14320d110 from the resource manager.
15:34:39,370 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager removed spill file directory C:\Users\BONC\AppData\Local\Temp\flink-io-55753bcf-1076-4999-8ed1-57558521bf42
15:34:39,370 INFO org.apache.flink.runtime.io.network.NetworkEnvironment - Shutting down the network environment and its components.
15:34:39,378 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Stop job leader service.
15:34:39,379 INFO org.apache.flink.runtime.filecache.FileCache - removed file cache directory C:\Users\BONC\AppData\Local\Temp\flink-dist-cache-740d690b-ccc2-4c10-b38a-9e6256fc6f08
15:34:39,379 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopped TaskExecutor akka://flink/user/taskmanager_0.
15:34:39,384 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Removing cache directory C:\Users\BONC\AppData\Local\Temp\flink-web-ui
15:34:39,385 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Shut down complete.
15:34:39,387 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Shut down cluster because application is in CANCELED, diagnostics DispatcherResourceManagerComponent has been closed..
15:34:39,387 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Stopping dispatcher akka://flink/user/dispatcher.
15:34:39,387 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Stopping all currently running jobs of dispatcher akka://flink/user/dispatcher.
15:34:39,387 INFO org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Closing the SlotManager.
15:34:39,387 INFO org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Suspending the SlotManager.
15:34:39,387 INFO org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator - Shutting down stack trace sample coordinator.
15:34:39,387 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Stopped dispatcher akka://flink/user/dispatcher.
15:34:39,395 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.
15:34:39,396 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.
15:34:39,494 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.
15:34:39,526 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping Akka RPC service.
15:34:39,543 INFO org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down BLOB cache
15:34:39,544 INFO org.apache.flink.runtime.blob.TransientBlobCache - Shutting down BLOB cache
15:34:39,547 INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:61482
15:34:39,547 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service.