Flink Java ExampleのAsyncIOExampleの詳細

47721 ワード

私たちはFlinkに関する概念を勉強した後、Flinkのプログラミングとプログラムについて何も知らないことを発見しました.この場合,公式のコードexampleの学習と研究が必要であり,本稿では公式サイトgithubのAsyncIOExampleの例について詳細なコード注釈を行う.(ps:実はみんな読めるはずですよ)
Flink Exampleバージョン:1.8
FlinkすべてのjavaのexampleコードのGithubアドレス:github
AsyncIOの原理:Flink原理と実現:Aysnc I/O
AsyncIOExampleの例は簡単です.Flinkコードの組み込みFlinkミニクラスタでジョブをシミュレートします.
プロシージャ:データ・ソースから送信されたデータ・ストリーム要素をAsyncFunction(非同期関数)処理に渡し、各エレメント・シミュレーションに対して外部リクエストを送信します.AsyncFunction処理中は、送信リクエスト処理に失敗して異常が発生した場合をシミュレートし、checkpointメカニズムでjobのリカバリを行います.フロー・タスクを処理するプロセス全体は、ログとコードを組み合わせてよく理解できます.
mainから見ると、はっきりしているので、コードの順序を調整して、mainを一番前に置きました.
package org.myorg.quickstart.example;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.ExecutorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *         AsyncFunction:    I/O     
 * Example to illustrates how to use {@link AsyncFunction}.
 */
public class AsyncIOExample {

	private static final Logger LOG = LoggerFactory.getLogger(AsyncIOExample.class);
	
	/**
	 *       ,  equals  
	 */
	private static final String EXACTLY_ONCE_MODE = "exactly_once";
	private static final String EVENT_TIME = "EventTime";
	private static final String INGESTION_TIME = "IngestionTime";
	private static final String ORDERED = "ordered";
	
	
	public static void main(String[] args) throws Exception {

		//1.        
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		//         
		final ParameterTool params = ParameterTool.fromArgs(args);
		//      
		final String statePath;
		//checkPoint   
		final String cpMode;
		//    
		final int maxCount;
		//    
		final long sleepFactor;
		//   
		final float failRatio;
		//    
		final String mode;
		//task   
		final int taskNum;
		//    
		final String timeType;
		//      
		final long shutdownWaitTS;
		//    
		final long timeout;

		try {
			/**
			 * 2.   job     
			 * params.get:        ,          。
			 */
			statePath = params.get("fsStatePath", null);
			cpMode = params.get("checkpointMode", "exactly_once");
			maxCount = params.getInt("maxCount", 1000);
			sleepFactor = params.getLong("sleepFactor", 100);
			failRatio = params.getFloat("failRatio", 0.001f);
			mode = params.get("waitMode", "ordered");
			taskNum = params.getInt("waitOperatorParallelism", 1);
			timeType = params.get("eventType", "EventTime");
			shutdownWaitTS = params.getLong("shutdownWaitTS", 20000);
			timeout = params.getLong("timeout", 10000L);
		} catch (Exception e) {
			printUsage();
			throw e;
		}
		
		//    config
		StringBuilder configStringBuilder = new StringBuilder();
		
		//     
: linux windows final String lineSeparator = System.getProperty("line.separator"); // configStringBuilder .append("Job configuration").append(lineSeparator) .append("FS state path=").append(statePath).append(lineSeparator) .append("Checkpoint mode=").append(cpMode).append(lineSeparator) .append("Max count of input from source=").append(maxCount).append(lineSeparator) .append("Sleep factor=").append(sleepFactor).append(lineSeparator) .append("Fail ratio=").append(failRatio).append(lineSeparator) .append("Waiting mode=").append(mode).append(lineSeparator) .append("Parallelism for async wait operator=").append(taskNum).append(lineSeparator) .append("Event type=").append(timeType).append(lineSeparator) .append("Shutdown wait timestamp=").append(shutdownWaitTS); // LOG.info(configStringBuilder.toString()); // null, if (statePath != null) { // setup state and checkpoint mode env.setStateBackend(new FsStateBackend(statePath)); } /** * 3. cpMode checkpoint * param1:checkpoint */ if (EXACTLY_ONCE_MODE.equals(cpMode)) { env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE); } else { env.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE); } //4. , eventTime: watermark if (EVENT_TIME.equals(timeType)) { env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); } // ingestionTime: flink else if (INGESTION_TIME.equals(timeType)) { env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); } //5. : DataStream inputStream = env.addSource(new SimpleSource(maxCount)); //6. async , “ ” i/o AsyncFunction function = new SampleAsyncFunction(sleepFactor, failRatio, shutdownWaitTS); //7. DataStream result; if (ORDERED.equals(mode)) { /** * AsyncDataStream DataStream * :1. 2. 3. 4. 5. i/o * setParallelism : 。 */ result = AsyncDataStream.orderedWait( inputStream, function, timeout, TimeUnit.MILLISECONDS, 20).setParallelism(taskNum); } else { result = AsyncDataStream.unorderedWait( inputStream, function, timeout, TimeUnit.MILLISECONDS, 20).setParallelism(taskNum); } //result.windowAll(assigner) //8. env.execute("Async IO Example"); result.print(); } /** * , flink SourceFunction * flink SourceContext run * : -> 0 */ private static class SimpleSource implements SourceFunction, ListCheckpointed { private static final long serialVersionUID = 1L; // , :true private volatile boolean isRunning = true; // private int counter = 0; // private int start = 0; /** * : , , start */ @Override public List snapshotState(long checkpointId, long timestamp) throws Exception { return Collections.singletonList(start); } /** * ,Flink , checkpoint */ @Override public void restoreState(List state) throws Exception { // checkpoint start for (Integer i : state) { this.start = i; } } /** * : * @param maxNum */ public SimpleSource(int maxNum) { this.counter = maxNum; } @Override public void run(SourceContext ctx) throws Exception { /** * :( For the real use case in production environment, the thread pool may stay in the * async client. */ private static class SampleAsyncFunction extends RichAsyncFunction { private static final long serialVersionUID = 2098635244857937717L; // private transient ExecutorService executorService; /** * : , sleepFactor */ private final long sleepFactor; /** * : ~ :Exception : wahahaha... */ private final float failRatio; private final long shutdownWaitTS; // SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) { this.sleepFactor = sleepFactor; this.failRatio = failRatio; this.shutdownWaitTS = shutdownWaitTS; } /** * : , */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // executorService = Executors.newFixedThreadPool(30); } /** * , */ @Override public void close() throws Exception { super.close(); // ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService); } /** * IO * source */ @Override public void asyncInvoke(final Integer input, final ResultFuture resultFuture) { executorService.submit(() -> { // : , sleep long sleep = (long) (ThreadLocalRandom.current().nextFloat() * sleepFactor); try { Thread.sleep(sleep); // : , , , flink checkpoint if (ThreadLocalRandom.current().nextFloat() < failRatio) { resultFuture.completeExceptionally(new Exception("wahahahaha...")); } else { // , Flink resultFuture.complete( Collections.singletonList("key-" + (input % 10))); } } catch (InterruptedException e) { resultFuture.complete(new ArrayList<>(0)); } }); } } /** * flink */ private static void printUsage() { System.out.println("To customize example, use: AsyncIOExample [--fsStatePath ] " + "[--checkpointMode ] " + "[--maxCount ] " + "[--sleepFactor ] [--failRatio ] " + "[--waitMode ] [--waitOperatorParallelism ] " + "[--eventType ] [--shutdownWaitTS ]" + "[--timeout ]"); } }

mainメソッドを実行すると、完全なログは以下の通りです(psは実はログを見るのが面白いですが、flinkの多くのプロセスを見ることができます~):
15:34:25,957 INFO  org.myorg.quickstart.example.AsyncIOExample                   - Job configuration
FS state path=null
Checkpoint mode=exactly_once
Max count of input from source=1000
Sleep factor=100
Fail ratio=0.001
Waiting mode=ordered
Parallelism for async wait operator=1
Event type=EventTime
Shutdown wait timestamp=20000
15:34:26,130 INFO  org.apache.flink.streaming.api.environment.LocalStreamEnvironment  - Running job on local embedded Flink mini cluster
15:34:26,276 INFO  org.apache.flink.runtime.minicluster.MiniCluster              - Starting Flink Mini Cluster
15:34:26,279 INFO  org.apache.flink.runtime.minicluster.MiniCluster              - Starting Metrics Registry
15:34:26,327 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics reporter configured, no metrics will be exposed/reported.
15:34:26,327 INFO  org.apache.flink.runtime.minicluster.MiniCluster              - Starting RPC Service(s)
15:34:26,584 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
15:34:26,603 INFO  org.apache.flink.runtime.minicluster.MiniCluster              - Trying to start actor system at :0
15:34:26,636 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
15:34:26,675 INFO  akka.remote.Remoting                                          - Starting remoting
15:34:26,929 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://[email protected]:61481]
15:34:26,934 INFO  org.apache.flink.runtime.minicluster.MiniCluster              - Actor system started at akka.tcp://[email protected]:61481
15:34:26,936 INFO  org.apache.flink.runtime.minicluster.MiniCluster              - Starting high-availability services
15:34:26,948 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory C:\Users\BONC\AppData\Local\Temp\blobStore-094c3bce-9270-4e01-8c4b-38768a5893c1
15:34:26,954 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:61482 - max concurrent requests: 50 - max backlog: 1000
15:34:26,958 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Created BLOB cache storage directory C:\Users\BONC\AppData\Local\Temp\blobStore-17590f89-39e2-4415-abfb-537ea3ed075e
15:34:26,961 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Created BLOB cache storage directory C:\Users\BONC\AppData\Local\Temp\blobStore-089378be-f96d-468a-a962-9546304b6eb2
15:34:26,961 INFO  org.apache.flink.runtime.minicluster.MiniCluster              - Starting 1 TaskManger(s)
15:34:26,963 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Starting TaskManager with ResourceID: 15962233-f7e2-44c1-a442-cacdc19f537b
15:34:27,009 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices     - Temporary file directory 'C:\Users\BONC\AppData\Local\Temp': total 100 GB, usable 5 GB (5.00% usable)
15:34:27,160 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 401 MB for network buffer pool (number of memory segments: 12856, bytes per segment: 32768).
15:34:27,164 INFO  org.apache.flink.runtime.io.network.NetworkEnvironment        - Starting the network environment and its components.
15:34:27,166 WARN  org.apache.flink.runtime.taskmanager.TaskManagerLocation      - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.
15:34:27,166 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices     - Limiting managed memory to 0.7 of the currently free heap space (2527 MB), memory will be allocated lazily.
15:34:27,170 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory C:\Users\BONC\AppData\Local\Temp\flink-io-55753bcf-1076-4999-8ed1-57558521bf42 for spill files.
15:34:27,227 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have a max timeout of 10000 ms
15:34:27,237 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/taskmanager_0 .
15:34:27,250 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Start job leader service.
15:34:27,251 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory C:\Users\BONC\AppData\Local\Temp\flink-dist-cache-740d690b-ccc2-4c10-b38a-9e6256fc6f08
15:34:27,289 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Starting rest endpoint.
15:34:27,476 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Log file environment variable 'log.file' is not set.
15:34:27,477 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (fallback keys: [{key=jobmanager.web.log.path, isDeprecated=true}])'.
15:34:27,485 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Failed to load web based job submission extension. Probable reason: flink-runtime-web is not in the classpath.
15:34:27,803 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Rest endpoint listening at localhost:61501
15:34:27,805 INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint@31be6b49 @ http://localhost:61501
15:34:27,817 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
15:34:27,832 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - http://localhost:61501 was granted leadership with leaderSessionID=cd92eef3-c2f3-4fc5-9234-309493602b84
15:34:27,832 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
15:34:27,833 INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader http://localhost:61501 , session=cd92eef3-c2f3-4fc5-9234-309493602b84
15:34:27,843 INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender org.apache.flink.runtime.dispatcher.StandaloneDispatcher@214a971 @ akka://flink/user/dispatcher
15:34:27,844 INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender org.apache.flink.runtime.resourcemanager.StandaloneResourceManager@6544adb @ akka://flink/user/resourcemanager
15:34:27,845 INFO  org.apache.flink.runtime.minicluster.MiniCluster              - Flink Mini Cluster started successfully
15:34:27,847 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Dispatcher akka://flink/user/dispatcher was granted leadership with fencing token b51436a0-a06d-456f-bd26-693e6ac9b5f9
15:34:27,851 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Recovering all persisted jobs.
15:34:27,852 INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader akka://flink/user/dispatcher , session=b51436a0-a06d-456f-bd26-693e6ac9b5f9
15:34:27,870 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - ResourceManager akka://flink/user/resourcemanager was granted leadership with fencing token 866212de9f97abcf1f26c94a26294b48
15:34:27,870 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Starting the SlotManager.
15:34:27,872 INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader akka://flink/user/resourcemanager , session=1f26c94a-2629-4b48-8662-12de9f97abcf
15:34:27,874 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to ResourceManager akka://flink/user/resourcemanager(866212de9f97abcf1f26c94a26294b48).
15:34:27,879 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved ResourceManager address, beginning registration
15:34:27,879 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager attempt 1 (timeout=100ms)
15:34:27,883 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received JobGraph submission 53d61cb3e5941169150beec14320d110 (Async IO Example).
15:34:27,883 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job 53d61cb3e5941169150beec14320d110 (Async IO Example).
15:34:27,885 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering TaskManager with ResourceID 15962233-f7e2-44c1-a442-cacdc19f537b (akka://flink/user/taskmanager_0) at ResourceManager
15:34:27,886 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Successful registration at resource manager akka://flink/user/resourcemanager under registration id 3913ce156df2c5ced9df21c9edfa7931.
15:34:27,923 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_1 .
15:34:27,931 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Initializing job Async IO Example (53d61cb3e5941169150beec14320d110).
15:34:27,936 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=0) for Async IO Example (53d61cb3e5941169150beec14320d110).
15:34:27,957 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job recovers via failover strategy: full graph restart
15:34:27,984 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Running initialization on master for job Async IO Example (53d61cb3e5941169150beec14320d110).
15:34:27,984 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Successfully ran initialization on master in 0 ms.
15:34:28,001 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
15:34:28,010 INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender org.apache.flink.runtime.jobmaster.JobManagerRunner@288896b8 @ akka://flink/user/jobmanager_1
15:34:28,012 INFO  org.apache.flink.runtime.jobmaster.JobManagerRunner           - JobManager runner for job Async IO Example (53d61cb3e5941169150beec14320d110) was granted leadership with session id 2a4b2033-ba80-4056-9b04-b4bcd97848f2 at akka://flink/user/jobmanager_1.
15:34:28,014 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Starting execution of job Async IO Example (53d61cb3e5941169150beec14320d110) under job master id 9b04b4bcd97848f22a4b2033ba804056.
15:34:28,015 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Async IO Example (53d61cb3e5941169150beec14320d110) switched from state CREATED to RUNNING.
15:34:28,025 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from CREATED to SCHEDULED.
15:34:28,036 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{163e56cc239963ee2ea561e91c9e273b}]
15:34:28,043 INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader akka://flink/user/jobmanager_1 , session=2a4b2033-ba80-4056-9b04-b4bcd97848f2
15:34:28,044 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Connecting to ResourceManager akka://flink/user/resourcemanager(866212de9f97abcf1f26c94a26294b48)
15:34:28,045 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Resolved ResourceManager address, beginning registration
15:34:28,045 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Registration at ResourceManager attempt 1 (timeout=100ms)
15:34:28,046 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering job manager 9b04b4bcd97848f22a4b2033ba804056@akka://flink/user/jobmanager_1 for job 53d61cb3e5941169150beec14320d110.
15:34:28,050 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registered job manager 9b04b4bcd97848f22a4b2033ba804056@akka://flink/user/jobmanager_1 for job 53d61cb3e5941169150beec14320d110.
15:34:28,052 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - JobManager successfully registered at ResourceManager, leader id: 866212de9f97abcf1f26c94a26294b48.
15:34:28,053 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Requesting new slot [SlotRequestId{163e56cc239963ee2ea561e91c9e273b}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
15:34:28,054 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 53d61cb3e5941169150beec14320d110 with allocation id 4e3bf224392a78dad3e11bc84402898f.
15:34:28,056 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Receive slot request 4e3bf224392a78dad3e11bc84402898f for job 53d61cb3e5941169150beec14320d110 from resource manager with leader id 866212de9f97abcf1f26c94a26294b48.
15:34:28,056 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Allocated slot for 4e3bf224392a78dad3e11bc84402898f.
15:34:28,056 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Add job 53d61cb3e5941169150beec14320d110 for job leader monitoring.
15:34:28,057 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source -> async wait operator (1/1) of job 53d61cb3e5941169150beec14320d110 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
15:34:28,058 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Try to register at job manager akka://flink/user/jobmanager_1 with leader id 2a4b2033-ba80-4056-9b04-b4bcd97848f2.
15:34:28,058 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Resolved JobManager address, beginning registration
15:34:28,058 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Registration at JobManager attempt 1 (timeout=100ms)
15:34:28,060 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Successful registration at job manager akka://flink/user/jobmanager_1 for job 53d61cb3e5941169150beec14320d110.
15:34:28,061 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Establish JobManager connection for job 53d61cb3e5941169150beec14320d110.
15:34:28,064 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Offer reserved slots to the leader of job 53d61cb3e5941169150beec14320d110.
15:34:28,068 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from SCHEDULED to DEPLOYING.
15:34:28,068 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Source: Custom Source -> async wait operator (1/1) (attempt #0) to 15962233-f7e2-44c1-a442-cacdc19f537b @ 127.0.0.1 (dataPort=-1)
15:34:28,085 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received task Source: Custom Source -> async wait operator (1/1).
15:34:28,085 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from CREATED to DEPLOYING.
15:34:28,085 INFO  org.apache.flink.runtime.taskmanager.Task                     - Creating FileSystem stream leak safety net for task Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) [DEPLOYING]
15:34:28,085 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Activate slot 4e3bf224392a78dad3e11bc84402898f.
15:34:28,088 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) [DEPLOYING].
15:34:28,089 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) [DEPLOYING].
15:34:28,096 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from DEPLOYING to RUNNING.
15:34:28,096 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from DEPLOYING to RUNNING.
15:34:28,098 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
15:34:29,061 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1563003269057 for job 53d61cb3e5941169150beec14320d110.
15:34:29,086 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 for job 53d61cb3e5941169150beec14320d110 (626 bytes in 28 ms).
15:34:29,967 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b).
15:34:29,968 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from RUNNING to FAILED.
java.lang.Exception: An async function call terminated with an exception. Failing the AsyncWaitOperator.
	at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)
	at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: wahahahaha...
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68)
	at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129)
	... 2 more
Caused by: java.lang.Exception: wahahahaha...
	at org.myorg.quickstart.example.AsyncIOExample$SampleAsyncFunction.lambda$0(AsyncIOExample.java:325)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
15:34:29,971 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b).
15:34:30,014 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b).
15:34:30,031 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) [FAILED]
15:34:30,037 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> async wait operator 40afe509423f45913394ef061f465b1b.
15:34:30,048 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from RUNNING to FAILED.
java.lang.Exception: An async function call terminated with an exception. Failing the AsyncWaitOperator.
	at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)
	at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: wahahahaha...
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68)
	at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129)
	... 2 more
Caused by: java.lang.Exception: wahahahaha...
	at org.myorg.quickstart.example.AsyncIOExample$SampleAsyncFunction.lambda$0(AsyncIOExample.java:325)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
15:34:30,048 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Async IO Example (53d61cb3e5941169150beec14320d110) switched from state RUNNING to FAILING.
java.lang.Exception: An async function call terminated with an exception. Failing the AsyncWaitOperator.
	at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)
	at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: wahahahaha...
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68)
	at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129)
	... 2 more
Caused by: java.lang.Exception: wahahahaha...
	at org.myorg.quickstart.example.AsyncIOExample$SampleAsyncFunction.lambda$0(AsyncIOExample.java:325)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
15:34:30,051 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Discarding the results produced by task execution 40afe509423f45913394ef061f465b1b.
15:34:30,054 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Try to restart or fail the job Async IO Example (53d61cb3e5941169150beec14320d110) if no longer possible.
15:34:30,054 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Async IO Example (53d61cb3e5941169150beec14320d110) switched from state FAILING to RESTARTING.
15:34:30,055 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Restarting the job Async IO Example (53d61cb3e5941169150beec14320d110).
15:34:30,061 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Async IO Example (53d61cb3e5941169150beec14320d110) switched from state RESTARTING to CREATED.
15:34:30,061 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring job 53d61cb3e5941169150beec14320d110 from latest valid checkpoint: Checkpoint 1 @ 1563003269057 for 53d61cb3e5941169150beec14320d110.
15:34:30,068 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - No master state to restore
15:34:30,069 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Async IO Example (53d61cb3e5941169150beec14320d110) switched from state CREATED to RUNNING.
15:34:30,069 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from CREATED to SCHEDULED.
15:34:30,071 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from SCHEDULED to DEPLOYING.
15:34:30,071 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Source: Custom Source -> async wait operator (1/1) (attempt #1) to 15962233-f7e2-44c1-a442-cacdc19f537b @ 127.0.0.1 (dataPort=-1)
15:34:30,074 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received task Source: Custom Source -> async wait operator (1/1).
15:34:30,074 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from CREATED to DEPLOYING.
15:34:30,075 INFO  org.apache.flink.runtime.taskmanager.Task                     - Creating FileSystem stream leak safety net for task Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) [DEPLOYING]
15:34:30,075 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) [DEPLOYING].
15:34:30,076 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) [DEPLOYING].
15:34:30,079 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from DEPLOYING to RUNNING.
15:34:30,079 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
15:34:30,079 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from DEPLOYING to RUNNING.
15:34:31,003 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1563003271003 for job 53d61cb3e5941169150beec14320d110.
15:34:31,009 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 for job 53d61cb3e5941169150beec14320d110 (636 bytes in 4 ms).
15:34:32,003 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 3 @ 1563003272003 for job 53d61cb3e5941169150beec14320d110.
15:34:32,004 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 3 for job 53d61cb3e5941169150beec14320d110 (616 bytes in 1 ms).
15:34:33,003 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 4 @ 1563003273003 for job 53d61cb3e5941169150beec14320d110.
15:34:33,004 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 4 for job 53d61cb3e5941169150beec14320d110 (626 bytes in 1 ms).
15:34:34,003 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 5 @ 1563003274003 for job 53d61cb3e5941169150beec14320d110.
15:34:34,005 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 5 for job 53d61cb3e5941169150beec14320d110 (646 bytes in 2 ms).
15:34:35,003 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 6 @ 1563003275003 for job 53d61cb3e5941169150beec14320d110.
15:34:35,004 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 6 for job 53d61cb3e5941169150beec14320d110 (626 bytes in 1 ms).
15:34:36,003 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 7 @ 1563003276003 for job 53d61cb3e5941169150beec14320d110.
15:34:36,005 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 7 for job 53d61cb3e5941169150beec14320d110 (636 bytes in 2 ms).
15:34:37,003 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 8 @ 1563003277003 for job 53d61cb3e5941169150beec14320d110.
15:34:37,006 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 8 for job 53d61cb3e5941169150beec14320d110 (636 bytes in 2 ms).
15:34:38,003 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 9 @ 1563003278003 for job 53d61cb3e5941169150beec14320d110.
15:34:38,005 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 9 for job 53d61cb3e5941169150beec14320d110 (631 bytes in 2 ms).
15:34:39,003 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 10 @ 1563003279003 for job 53d61cb3e5941169150beec14320d110.
15:34:39,005 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 10 for job 53d61cb3e5941169150beec14320d110 (641 bytes in 2 ms).
15:34:39,346 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from RUNNING to FINISHED.
15:34:39,346 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703).
15:34:39,347 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) [FINISHED]
15:34:39,347 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> async wait operator e16dfc0cf2a7703cace518ff6cf85703.
15:34:39,349 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from RUNNING to FINISHED.
15:34:39,349 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Async IO Example (53d61cb3e5941169150beec14320d110) switched from state RUNNING to FINISHED.
15:34:39,349 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Stopping checkpoint coordinator for job 53d61cb3e5941169150beec14320d110.
15:34:39,349 INFO  org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - Shutting down
15:34:39,357 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Job 53d61cb3e5941169150beec14320d110 reached globally terminal state FINISHED.
15:34:39,357 INFO  org.apache.flink.runtime.minicluster.MiniCluster              - Shutting down Flink Mini Cluster
15:34:39,358 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Shutting down rest endpoint.
15:34:39,358 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Stopping TaskExecutor akka://flink/user/taskmanager_0.
15:34:39,359 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Stop job leader service.
15:34:39,359 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Stopping the JobMaster for job Async IO Example(53d61cb3e5941169150beec14320d110).
15:34:39,359 INFO  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Shutting down TaskExecutorLocalStateStoresManager.
15:34:39,360 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Suspending SlotPool.
15:34:39,361 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Close ResourceManager connection 5df31933d01f3737bf7ac1e33e11defc: JobManager is shutting down..
15:34:39,361 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Stopping SlotPool.
15:34:39,363 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Disconnect job manager 9b04b4bcd97848f22a4b2033ba804056@akka://flink/user/jobmanager_1 for job 53d61cb3e5941169150beec14320d110 from the resource manager.
15:34:39,370 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager removed spill file directory C:\Users\BONC\AppData\Local\Temp\flink-io-55753bcf-1076-4999-8ed1-57558521bf42
15:34:39,370 INFO  org.apache.flink.runtime.io.network.NetworkEnvironment        - Shutting down the network environment and its components.
15:34:39,378 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Stop job leader service.
15:34:39,379 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory C:\Users\BONC\AppData\Local\Temp\flink-dist-cache-740d690b-ccc2-4c10-b38a-9e6256fc6f08
15:34:39,379 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Stopped TaskExecutor akka://flink/user/taskmanager_0.
15:34:39,384 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Removing cache directory C:\Users\BONC\AppData\Local\Temp\flink-web-ui
15:34:39,385 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Shut down complete.
15:34:39,387 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Shut down cluster because application is in CANCELED, diagnostics DispatcherResourceManagerComponent has been closed..
15:34:39,387 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Stopping dispatcher akka://flink/user/dispatcher.
15:34:39,387 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Stopping all currently running jobs of dispatcher akka://flink/user/dispatcher.
15:34:39,387 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Closing the SlotManager.
15:34:39,387 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Suspending the SlotManager.
15:34:39,387 INFO  org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator  - Shutting down stack trace sample coordinator.
15:34:39,387 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Stopped dispatcher akka://flink/user/dispatcher.
15:34:39,395 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
15:34:39,396 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
15:34:39,494 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
15:34:39,526 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopping Akka RPC service.
15:34:39,543 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down BLOB cache
15:34:39,544 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
15:34:39,547 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped BLOB server at 0.0.0.0:61482
15:34:39,547 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka RPC service.