12_Flink Streaming cluster

34564 ワード

これまではappiをどのように作成し、topologyを作成する過程でした。クラスタに提出するとtopologyが実行されます。運行時のモジュールの多くはscalaで書かれています。おそらくakka通信を使っているからです。前にDAGを描く過程はほとんどjavaが書いたものです。
flinkのclusterは2種類あります。LocalFlinkMiniClauserとFlinkMiniClausterはローカル運転時にLocal Flink MiniClausterで、クラスタ運転時にFlink MiniClancerです。Local FlinkMiniClausterはjvmでマルチスレッドシミュレーションを用いて分散計算を行う。この研究は意味が小さい。だからFlinkMiniClausterを研究します。
FlinkMiniClauster
クラスタを理解する前に。まずflinkのアーキテクチャを理解します。flink runtimeクラスタは、1つのJobManager(非HA)と複数のTaskmanagerからなります。構成する
JobManagerは、Clientの要請を受けて、TaskyManagerを一括管理します。stormのようなnimusとworkerの関係。
Task Managerは、タスクの実行を管理します。
以下はJMとTMという
JMとTMの通信は、akkaによって実現される。両方ともFlinkActorを実現しました。handleMessage方法によって、異なるメッセージを伝達して通信する。
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.runtime

import _root_.akka.actor.Actor
import grizzled.slf4j.Logger

/** Base trait for Flink's actors.
  *
  * The message handling logic is defined in the handleMessage method. This allows to mixin
  * stackable traits which change the message receiving behaviour.
  */
trait FlinkActor extends Actor {
  val log: Logger

  override def receive: Receive = handleMessage

  /** Handle incoming messages
    *
    * @return
    */
  def handleMessage: Receive

  /** Factory method for messages. This method can be used by mixins to decorate messages
    *
    * @param message The message to decorate
    * @return The decorated message
    */
  def decorateMessage(message: Any): Any = {
    message
  }
}
Task Manager
/**
   * Central handling of actor messages. This method delegates to the more specialized
   * methods for handling certain classes of messages.
   */
  override def handleMessage: Receive = {
    // task messages are most common and critical, we handle them first
    case message: TaskMessage => handleTaskMessage(message)

    // messages for coordinating checkpoints
    case message: AbstractCheckpointMessage => handleCheckpointingMessage(message)

    case JobManagerLeaderAddress(address, newLeaderSessionID) =>
      handleJobManagerLeaderAddress(address, newLeaderSessionID)

    // registration messages for connecting and disconnecting from / to the JobManager
    case message: RegistrationMessage => handleRegistrationMessage(message)

    // task sampling messages
    case message: StackTraceSampleMessages => handleStackTraceSampleMessage(message)

    // ----- miscellaneous messages ----

    // periodic heart beats that transport metrics
    case SendHeartbeat => sendHeartbeatToJobManager()

    // sends the stack trace of this TaskManager to the sender
    case SendStackTrace => sendStackTrace(sender())

    // registers the message sender to be notified once this TaskManager has completed
    // its registration at the JobManager
    case NotifyWhenRegisteredAtJobManager =>
      if (isConnected) {
        sender ! decorateMessage(RegisteredAtJobManager)
      } else {
        waitForRegistration += sender
      }

    // this message indicates that some actor watched by this TaskManager has died
    case Terminated(actor: ActorRef) =>
      if (isConnected && actor == currentJobManager.orNull) {
          handleJobManagerDisconnect(sender(), "JobManager is no longer reachable")
          triggerTaskManagerRegistration()
      } else {
        log.warn(s"Received unrecognized disconnect message " +
            s"from ${if (actor == null) null else actor.path}.")
      }

    case Disconnect(msg) =>
      handleJobManagerDisconnect(sender(), s"JobManager requested disconnect: $msg")
      triggerTaskManagerRegistration()

    case msg: StopCluster =>
      log.info(s"Stopping TaskManager with final application status ${msg.finalStatus()} " +
        s"and diagnostics: ${msg.message()}")
      shutdown()

    case FatalError(message, cause) =>
      killTaskManagerFatal(message, cause)

    case RequestTaskManagerLog(requestType : LogTypeRequest) =>
      blobService match {
        case Some(_) =>
          handleRequestTaskManagerLog(sender(), requestType, currentJobManager.get)
        case None =>
          sender() ! new IOException("BlobService not available. Cannot upload TaskManager logs.")
      }
  }

  /**
   * Handle unmatched messages with an exception.
   */
  override def unhandled(message: Any): Unit = {
    val errorMessage = "Received unknown message " + message
    val error = new RuntimeException(errorMessage)
    log.error(errorMessage)

    // terminate all we are currently running (with a dedicated message)
    // before the actor is stopped
    cancelAndClearEverything(error)

    // let the actor crash
    throw error
  }
JobManager
/**
   * Central work method of the JobManager actor. Receives messages and reacts to them.
   *
   * @return
   */
  override def handleMessage: Receive = {

    case GrantLeadership(newLeaderSessionID) =>
      log.info(s"JobManager $getAddress was granted leadership with leader session ID " +
        s"$newLeaderSessionID.")

      leaderSessionID = newLeaderSessionID

      // confirming the leader session ID might be blocking, thus do it in a future
      future {
        leaderElectionService.confirmLeaderSessionID(newLeaderSessionID.orNull)

        // TODO (critical next step) This needs to be more flexible and robust (e.g. wait for task
        // managers etc.)
        if (recoveryMode != RecoveryMode.STANDALONE) {
          log.info(s"Delaying recovery of all jobs by $jobRecoveryTimeout.")

          context.system.scheduler.scheduleOnce(
            jobRecoveryTimeout,
            self,
            decorateMessage(RecoverAllJobs))(
            context.dispatcher)
        }
      }(context.dispatcher)

    case RevokeLeadership =>
      log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.")

      val newFuturesToComplete = cancelAndClearEverything(
        new Exception("JobManager is no longer the leader."),
        removeJobFromStateBackend = false)

      futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete)

      // disconnect the registered task managers
      instanceManager.getAllRegisteredInstances.asScala.foreach {
        _.getActorGateway().tell(
          Disconnect("JobManager is no longer the leader"),
          new AkkaActorGateway(self, leaderSessionID.orNull))
      }

      instanceManager.unregisterAllTaskManagers()

      leaderSessionID = None

    case msg: RegisterResourceManager =>
      log.debug(s"Resource manager registration: $msg")

      // ditch current resource manager (if any)
      currentResourceManager = Option(msg.resourceManager())

      val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map(
        instance => instance.getResourceId).toList.asJava

      // confirm registration and send known task managers with their resource ids
      sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources))

    case msg: ReconnectResourceManager =>
      log.debug(s"Resource manager reconnect: $msg")

      /**
        * In most cases, the ResourceManager handles the reconnect itself (due to leader change)
        * but in case it doesn't we're sending a TriggerRegistrationAtJobManager message until we
        * receive a registration of this or another ResourceManager.
        */
      def reconnectRepeatedly(): Unit = {
        msg.resourceManager() ! decorateMessage(new TriggerRegistrationAtJobManager(self))
        // try again after some delay
        context.system.scheduler.scheduleOnce(2 seconds) {
          self ! decorateMessage(msg)
        }(context.dispatcher)
      }

      currentResourceManager match {
        case Some(rm) if rm.equals(msg.resourceManager()) =>
          // we should ditch the current resource manager
          log.debug(s"Disconnecting resource manager $rm and forcing a reconnect.")
          currentResourceManager = None
          reconnectRepeatedly()
        case Some(rm) =>
          // we have registered with another ResourceManager in the meantime, stop sending
          // TriggerRegistrationAtJobManager messages to the old ResourceManager
        case None =>
          log.warn(s"No resource manager ${msg.resourceManager()} connected. " +
            s"Telling old ResourceManager to register again.")
          reconnectRepeatedly()
      }

    case msg @ RegisterTaskManager(
          resourceId,
          connectionInfo,
          hardwareInformation,
          numberOfSlots) =>
      // we are being informed by the ResourceManager that a new task manager is available
      log.debug(s"RegisterTaskManager: $msg")

      val taskManager = sender()

      currentResourceManager match {
        case Some(rm) =>
          val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout)
          future.onComplete {
            case scala.util.Success(response) =>
              // the resource manager is available and answered
              self ! response
            case scala.util.Failure(t) =>
              log.error("Failure while asking ResourceManager for RegisterResource", t)
              // slow or unreachable resource manager, register anyway and let the rm reconnect
              self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg))
              self ! decorateMessage(new ReconnectResourceManager(rm))
          }(context.dispatcher)

        case None =>
          log.info("Task Manager Registration but not connected to ResourceManager")
          // ResourceManager not yet available
          // sending task manager information later upon ResourceManager registration
          self ! decorateMessage(new RegisterResourceSuccessful(taskManager, msg))
      }

    case msg: RegisterResourceSuccessful =>

      val originalMsg = msg.getRegistrationMessage
      val taskManager = msg.getTaskManager

      // ResourceManager knows about the resource, now let's try to register TaskManager
      if (instanceManager.isRegistered(taskManager)) {
        val instanceID = instanceManager.getRegisteredInstance(taskManager).getId

        taskManager ! decorateMessage(
          AlreadyRegistered(
            instanceID,
            libraryCacheManager.getBlobServerPort))
      } else {
        try {
          val instanceID = instanceManager.registerTaskManager(
            taskManager,
            originalMsg.resourceId,
            originalMsg.connectionInfo,
            originalMsg.resources,
            originalMsg.numberOfSlots,
            leaderSessionID.orNull)

          taskManager ! decorateMessage(
            AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort))

          // to be notified when the taskManager is no longer reachable
          context.watch(taskManager)
        } catch {
          // registerTaskManager throws an IllegalStateException if it is already shut down
          // let the actor crash and restart itself in this case
          case e: Exception =>
            log.error("Failed to register TaskManager at instance manager", e)

            taskManager ! decorateMessage(
              RefuseRegistration(e))
        }
      }

    case msg: RegisterResourceFailed =>

      val taskManager = msg.getTaskManager
      val resourceId = msg.getResourceID
      log.warn(s"TaskManager's resource id $resourceId failed to register at ResourceManager. " +
        s"Refusing registration because of
${msg.getMessage}.") taskManager ! decorateMessage( RefuseRegistration(new IllegalStateException( s"Resource $resourceId not registered with resource manager."))) case msg: ResourceRemoved => // we're being informed by the resource manager that a resource has become unavailable val resourceID = msg.resourceId() log.debug(s"Resource has been removed: $resourceID") val instance = instanceManager.getRegisteredInstance(resourceID) // trigger removal of task manager handleTaskManagerTerminated(instance.getActorGateway.actor()) case RequestNumberRegisteredTaskManager => sender ! decorateMessage(instanceManager.getNumberOfRegisteredTaskManagers) case RequestTotalNumberOfSlots => sender ! decorateMessage(instanceManager.getTotalNumberOfSlots) case SubmitJob(jobGraph, listeningBehaviour) => val client = sender() val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(), jobGraph.getSessionTimeout) submitJob(jobGraph, jobInfo) case RecoverSubmittedJob(submittedJobGraph) => if (!currentJobs.contains(submittedJobGraph.getJobId)) { submitJob( submittedJobGraph.getJobGraph(), submittedJobGraph.getJobInfo(), isRecovery = true) } else { log.info(s"Ignoring job recovery for ${submittedJobGraph.getJobId}, " + s"because it is already submitted.") } case RecoverJob(jobId) => future { try { // The ActorRef, which is part of the submitted job graph can only be // de-serialized in the scope of an actor system. akka.serialization.JavaSerializer.currentSystem.withValue( context.system.asInstanceOf[ExtendedActorSystem]) { log.info(s"Attempting to recover job $jobId.") val submittedJobGraphOption = submittedJobGraphs.recoverJobGraph(jobId) submittedJobGraphOption match { case Some(submittedJobGraph) => if (!leaderElectionService.hasLeadership()) { // we've lost leadership. mission: abort. log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.") } else { self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph)) } case None => log.info(s"Attempted to recover job $jobId, but no job graph found.") } } } catch { case t: Throwable => log.error(s"Failed to recover job $jobId.", t) } }(context.dispatcher) case RecoverAllJobs => future { try { // The ActorRef, which is part of the submitted job graph can only be // de-serialized in the scope of an actor system. akka.serialization.JavaSerializer.currentSystem.withValue( context.system.asInstanceOf[ExtendedActorSystem]) { log.info(s"Attempting to recover all jobs.") val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala if (!leaderElectionService.hasLeadership()) { // we've lost leadership. mission: abort. log.warn(s"Lost leadership during recovery. Aborting recovery of ${jobGraphs.size} " + s"jobs.") } else { log.info(s"Re-submitting ${jobGraphs.size} job graphs.") jobGraphs.foreach{ submittedJobGraph => self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph)) } } } } catch { case t: Throwable => log.error("Fatal error: Failed to recover jobs.", t) } }(context.dispatcher) case CancelJob(jobID) => log.info(s"Trying to cancel job with ID $jobID.") currentJobs.get(jobID) match { case Some((executionGraph, _)) => // execute the cancellation asynchronously Future { executionGraph.cancel() }(context.dispatcher) sender ! decorateMessage(CancellationSuccess(jobID)) case None => log.info(s"No job found with ID $jobID.") sender ! decorateMessage( CancellationFailure( jobID, new IllegalArgumentException(s"No job found with ID $jobID.")) ) } case StopJob(jobID) => log.info(s"Trying to stop job with ID $jobID.") currentJobs.get(jobID) match { case Some((executionGraph, _)) => try { if (!executionGraph.isStoppable()) { sender ! decorateMessage( StoppingFailure( jobID, new IllegalStateException(s"Job with ID $jobID is not stoppable.")) ) } else if (executionGraph.getState() != JobStatus.RUNNING) { sender ! decorateMessage( StoppingFailure( jobID, new IllegalStateException(s"Job with ID $jobID is in state " + executionGraph.getState().name() + " but stopping is only allowed in state " + "RUNNING.")) ) } else { executionGraph.stop() sender ! decorateMessage(StoppingSuccess(jobID)) } } catch { case t: Throwable => sender ! decorateMessage(StoppingFailure(jobID, t)) } case None => log.info(s"No job found with ID $jobID.") sender ! decorateMessage( StoppingFailure( jobID, new IllegalArgumentException(s"No job found with ID $jobID.")) ) } case UpdateTaskExecutionState(taskExecutionState) => if (taskExecutionState == null) { sender ! decorateMessage(false) } else { currentJobs.get(taskExecutionState.getJobID) match { case Some((executionGraph, _)) => val originalSender = sender() Future { val result = executionGraph.updateState(taskExecutionState) originalSender ! decorateMessage(result) }(context.dispatcher) case None => log.error("Cannot find execution graph for ID " + s"${taskExecutionState.getJobID} to change state to " + s"${taskExecutionState.getExecutionState}.") sender ! decorateMessage(false) } } case RequestNextInputSplit(jobID, vertexID, executionAttempt) => val serializedInputSplit = currentJobs.get(jobID) match { case Some((executionGraph,_)) => val execution = executionGraph.getRegisteredExecutions.get(executionAttempt) if (execution == null) { log.error(s"Can not find Execution for attempt $executionAttempt.") null } else { val slot = execution.getAssignedResource val taskId = execution.getVertex.getParallelSubtaskIndex val host = if (slot != null) { slot.getInstance().getInstanceConnectionInfo.getHostname } else { null } executionGraph.getJobVertex(vertexID) match { case vertex: ExecutionJobVertex => vertex.getSplitAssigner match { case splitAssigner: InputSplitAssigner => val nextInputSplit = splitAssigner.getNextInputSplit(host, taskId) log.debug(s"Send next input split $nextInputSplit.") try { InstantiationUtil.serializeObject(nextInputSplit) } catch { case ex: Exception => log.error(s"Could not serialize the next input split of " + s"class ${nextInputSplit.getClass}.", ex) vertex.fail(new RuntimeException("Could not serialize the next input split " + "of class " + nextInputSplit.getClass + ".", ex)) null } case _ => log.error(s"No InputSplitAssigner for vertex ID $vertexID.") null } case _ => log.error(s"Cannot find execution vertex for vertex ID $vertexID.") null } } case None => log.error(s"Cannot find execution graph for job ID $jobID.") null } sender ! decorateMessage(NextInputSplit(serializedInputSplit)) case checkpointMessage : AbstractCheckpointMessage => handleCheckpointMessage(checkpointMessage) case TriggerSavepoint(jobId) => currentJobs.get(jobId) match { case Some((graph, _)) => val savepointCoordinator = graph.getSavepointCoordinator() if (savepointCoordinator != null) { // Immutable copy for the future val senderRef = sender() future { try { // Do this async, because checkpoint coordinator operations can // contain blocking calls to the state backend or ZooKeeper. val savepointFuture = savepointCoordinator.triggerSavepoint( System.currentTimeMillis()) savepointFuture.onComplete { // Success, respond with the savepoint path case scala.util.Success(savepointPath) => senderRef ! TriggerSavepointSuccess(jobId, savepointPath) // Failure, respond with the cause case scala.util.Failure(t) => senderRef ! TriggerSavepointFailure( jobId, new Exception("Failed to complete savepoint", t)) }(context.dispatcher) } catch { case e: Exception => senderRef ! TriggerSavepointFailure(jobId, new Exception( "Failed to trigger savepoint", e)) } }(context.dispatcher) } else { sender() ! TriggerSavepointFailure(jobId, new IllegalStateException( "Checkpointing disabled. You can enable it via the execution environment of " + "your job.")) } case None => sender() ! TriggerSavepointFailure(jobId, new IllegalArgumentException("Unknown job.")) } case DisposeSavepoint(savepointPath) => val senderRef = sender() future { try { log.info(s"Disposing savepoint at '$savepointPath'.") val savepoint = savepointStore.getState(savepointPath) log.debug(s"$savepoint") // Discard the associated checkpoint savepoint.discard(getClass.getClassLoader) // Dispose the savepoint savepointStore.disposeState(savepointPath) senderRef ! DisposeSavepointSuccess } catch { case t: Throwable => log.error(s"Failed to dispose savepoint at '$savepointPath'.", t) senderRef ! DisposeSavepointFailure(t) } }(context.dispatcher) case JobStatusChanged(jobID, newJobStatus, timeStamp, error) => currentJobs.get(jobID) match { case Some((executionGraph, jobInfo)) => executionGraph.getJobName log.info( s"Status of job $jobID (${executionGraph.getJobName}) changed to $newJobStatus.", error) if (newJobStatus.isTerminalState()) { jobInfo.end = timeStamp future{ // TODO If removing the JobGraph from the SubmittedJobGraphsStore fails, the job will // linger around and potentially be recovered at a later time. There is nothing we // can do about that, but it should be communicated with the Client. if (jobInfo.sessionAlive) { jobInfo.setLastActive() val lastActivity = jobInfo.lastActive context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) { // remove only if no activity occurred in the meantime if (lastActivity == jobInfo.lastActive) { self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) } }(context.dispatcher) } else { self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) } // is the client waiting for the job result? if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) { newJobStatus match { case JobStatus.FINISHED => try { val accumulatorResults = executionGraph.getAccumulatorsSerialized() val result = new SerializedJobExecutionResult( jobID, jobInfo.duration, accumulatorResults) jobInfo.client ! decorateMessage(JobResultSuccess(result)) } catch { case e: Exception => log.error(s"Cannot fetch final accumulators for job $jobID", e) val exception = new JobExecutionException(jobID, "Failed to retrieve accumulator results.", e) jobInfo.client ! decorateMessage(JobResultFailure( new SerializedThrowable(exception))) } case JobStatus.CANCELED => // the error may be packed as a serialized throwable val unpackedError = SerializedThrowable.get( error, executionGraph.getUserClassLoader()) jobInfo.client ! decorateMessage(JobResultFailure( new SerializedThrowable( new JobCancellationException(jobID, "Job was cancelled.", unpackedError)))) case JobStatus.FAILED => val unpackedError = SerializedThrowable.get( error, executionGraph.getUserClassLoader()) jobInfo.client ! decorateMessage(JobResultFailure( new SerializedThrowable( new JobExecutionException(jobID, "Job execution failed.", unpackedError)))) case x => val exception = new JobExecutionException(jobID, s"$x is not a terminal state.") jobInfo.client ! decorateMessage(JobResultFailure( new SerializedThrowable(exception))) throw exception } } }(context.dispatcher) } case None => self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) } case ScheduleOrUpdateConsumers(jobId, partitionId) => currentJobs.get(jobId) match { case Some((executionGraph, _)) => sender ! decorateMessage(Acknowledge) executionGraph.scheduleOrUpdateConsumers(partitionId) case None => log.error(s"Cannot find execution graph for job ID $jobId to schedule or update " + s"consumers.") sender ! decorateMessage( Failure( new IllegalStateException("Cannot find execution graph for job ID " + s"$jobId to schedule or update consumers.") ) ) } case RequestPartitionState(jobId, partitionId, taskExecutionId, taskResultId) => val state = currentJobs.get(jobId) match { case Some((executionGraph, _)) => val execution = executionGraph.getRegisteredExecutions.get(partitionId.getProducerId) if (execution != null) execution.getState else null case None => // Nothing to do. This is not an error, because the request is received when a sending // task fails during a remote partition request. log.debug(s"Cannot find execution graph for job $jobId.") null } sender ! decorateMessage( PartitionState( taskExecutionId, taskResultId, partitionId.getPartitionId, state) ) case RequestJobStatus(jobID) => currentJobs.get(jobID) match { case Some((executionGraph,_)) => sender ! decorateMessage(CurrentJobStatus(jobID, executionGraph.getState)) case None => // check the archive archive forward decorateMessage(RequestJobStatus(jobID)) } case RequestRunningJobs => val executionGraphs = currentJobs map { case (_, (eg, jobInfo)) => eg } sender ! decorateMessage(RunningJobs(executionGraphs)) case RequestRunningJobsStatus => try { val jobs = currentJobs map { case (_, (eg, _)) => new JobStatusMessage( eg.getJobID, eg.getJobName, eg.getState, eg.getStatusTimestamp(JobStatus.CREATED) ) } sender ! decorateMessage(RunningJobsStatus(jobs)) } catch { case t: Throwable => log.error("Exception while responding to RequestRunningJobsStatus", t) } case RequestJob(jobID) => currentJobs.get(jobID) match { case Some((eg, _)) => sender ! decorateMessage(JobFound(jobID, eg)) case None => // check the archive archive forward decorateMessage(RequestJob(jobID)) } case RequestBlobManagerPort => sender ! decorateMessage(libraryCacheManager.getBlobServerPort) case RequestArchive => sender ! decorateMessage(ResponseArchive(archive)) case RequestRegisteredTaskManagers => sender ! decorateMessage( RegisteredTaskManagers( instanceManager.getAllRegisteredInstances.asScala ) ) case RequestTaskManagerInstance(instanceID) => sender ! decorateMessage( TaskManagerInstance(Option(instanceManager.getRegisteredInstanceById(instanceID))) ) case Heartbeat(instanceID, metricsReport, accumulators) => log.debug(s"Received heartbeat message from $instanceID.") updateAccumulators(accumulators) instanceManager.reportHeartBeat(instanceID, metricsReport) case message: AccumulatorMessage => handleAccumulatorMessage(message) case message: InfoMessage => handleInfoRequestMessage(message, sender()) case RequestStackTrace(instanceID) => val gateway = instanceManager.getRegisteredInstanceById(instanceID).getActorGateway gateway.forward(SendStackTrace, new AkkaActorGateway(sender, leaderSessionID.orNull)) case Terminated(taskManagerActorRef) => handleTaskManagerTerminated(taskManagerActorRef) case RequestJobManagerStatus => sender() ! decorateMessage(JobManagerStatusAlive) case RemoveJob(jobID, clearPersistedJob) => currentJobs.get(jobID) match { case Some((graph, info)) => removeJob(graph.getJobID, clearPersistedJob) match { case Some(futureToComplete) => futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete) case None => } case None => } case RemoveCachedJob(jobID) => currentJobs.get(jobID) match { case Some((graph, info)) => if (graph.getState.isTerminalState) { removeJob(graph.getJobID, removeJobFromStateBackend = true) match { case Some(futureToComplete) => futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete) case None => } } else { // triggers removal upon completion of job info.sessionAlive = false } case None => } case Disconnect(msg) => val taskManager = sender() if (instanceManager.isRegistered(taskManager)) { log.info(s"Task manager ${taskManager.path} wants to disconnect, because $msg.") instanceManager.unregisterTaskManager(taskManager, false) context.unwatch(taskManager) } case msg: StopCluster => log.info(s"Stopping JobManager with final application status ${msg.finalStatus()} " + s"and diagnostics: ${msg.message()}") // stop all task managers instanceManager.getAllRegisteredInstances.asScala foreach { instance => instance.getActorGateway.tell(msg) } // send resource manager the ok currentResourceManager match { case Some(rm) => // inform rm rm ! decorateMessage(msg) case None => // ResourceManager not available // we choose not to wait here beacuse it might block the shutdown forever } sender() ! decorateMessage(StopClusterSuccessful.getInstance()) shutdown() case RequestLeaderSessionID => sender() ! ResponseLeaderSessionID(leaderSessionID.orNull) case RequestWebMonitorPort => sender() ! ResponseWebMonitorPort(webMonitorPort) }