8.3 Task全生命周期详解

本节讲解Task的生命过程,对Task在Driver和Executor中交互的全生命周期原理和源码进行详解。

8.3.1 Task的生命过程详解

Task的生命过程详解如下。

(1)当Driver中的CoarseGrainedSchedulerBackend给CoarseGrainedExecutorBackend发送LaunchTask之后,CoarseGrainedExecutorBackend收到LaunchTask消息后,首先会反序列化TaskDescription。

(2)Executor会通过launchTask执行Task,在launchTask方法中调用new()函数创建TaskRunner,TaskRunner继承自Runnable接口。

(3)TaskRunner在ThreadPool运行具体的Task,在TaskRunner的run方法中首先会通过调用statusUpdate给Driver发信息汇报自己的状态,说明自己是Running状态。其中execBackend是ExecutorBackend,ExecutorBackend是一个trait,其具体的实现子类是CoarseGrainedExecutorBackend,其中的statusUpdate方法中将向Driver提交StatusUpdate消息。

(4)TaskRunner内部会做一些准备工作:例如,反序列化Task的依赖,然后通过网络获取需要的文件、Jar等。

(5)然后是反序列Task本身。

(6)调用反序列化后的Task.run方法来执行任务,并获得执行结果。其中Task的run方法调用时会导致Task的抽象方法runTask的调用,在Task的runTask内部会调用RDD的iterator方法,该方法就是我们针对当前Task所对应的Partition进行计算的关键所在,在处理的内部会迭代Partition的元素,并交给我们自定义的function进行处理!

 对于ShuffleMapTask,首先要对RDD以及其依赖关系进行反序列化,最终计算会调用RDD的compute方法。具体计算时有具体的RDD,例如,MapPartitionsRDD的compute。compute方法其中的f就是我们在当前的Stage中计算具体Partition的业务逻辑代码。

 对于ResultTask:调用rdd.iterator方法,最终计算仍然会调用RDD的compute方法。

(7)把执行结果序列化,并根据大小判断不同的结果传回给Driver。

(8)CoarseGrainedExecutorBackend给DriverEndpoint发送StatusUpdate来传输执行结果。DriverEndpoint会把执行结果传递给TaskSchedulerImpl处理,然后交给TaskResultGetter内部通过线程去分别处理Task执行成功和失败时的不同情况,最后告诉DAGScheduler任务处理结束的状况。

说明:

①在执行具体Task的业务逻辑前,会进行四次反序列:

a)TaskDescription的反序列化。

b)反序列化Task的依赖。

c)Task的反序列化。

d)RDD反序列化。

②在Spark 1.6中,AkkFrameSize是128MB,所以可以广播非常大的任务;而任务的执行结果最大可以达到1GB。Spark 2.2版本中,CoarseGrainedSchedulerBackend的launchTask方法中序列化任务大小的限制是maxRpcMessageSize为128MB。

8.3.2 Task在Driver和Executor中交互的全生命周期原理和源码详解

在Standalone模式中,Driver中的CoarseGrainedSchedulerBackend给CoarseGrained-ExecutorBackend发送launchTasks消息,CoarseGrainedExecutorBackend收到launchTasks消息以后会调用executor.launchTask。

CoarseGrainedExecutorBackend的receive方法如下,模式匹配收到LaunchTask消息:

(1)LaunchTask判断Executor是否存在,如果Executor不存在,则直接退出,然后会反序列化TaskDescription。

Spark 2.1.1版本的CoarseGrainedExecutorBackend的receive方法的源码如下。

1.  val taskDesc = ser.deserialize[TaskDescription](data.value)

Spark 2.2.0版本的CoarseGrainedExecutorBackend的receive方法的源码如下。

1.  val taskDesc = TaskDescription.decode(data.value)

(2)Executor会通过launchTask来执行Task,launchTask方法中分别传入taskId、尝试次数、任务名称、序列化后的任务本身。

Spark 2.1.1版本的CoarseGrainedExecutorBackend的receive方法的源码如下。

1.  executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber =
    taskDesc.attemptNumber,  taskDesc.name, taskDesc.serializedTask)

Spark 2.2.0版本的CoarseGrainedExecutorBackend的receive方法的源码如下。

1.  executor.launchTask(this, taskDesc)

进入Executor.scala的launchTask方法,在launchTask方法中调用new()函数创建一个TaskRunner,传入的参数包括taskId、尝试次数、任务名称、序列化后的任务本身。然后放入runningTasks数据结构,在threadPool中执行TaskRunner。

TaskRunner本身是一个Runnable接口。

下面看一下TaskRunner的run方法。TaskMemoryManager是内存的管理,deserialize-StartTime是反序列化开始的时间,setContextClassLoader是ClassLoader加载具体的类。ser是序列化器。

然后调用execBackend.statusUpdate,statusUpdate是ExecutorBackend的方法,Executor-Backend通过statusUpdate给Driver发信息,汇报自己的状态。

1.     private[spark] trait ExecutorBackend {
2.    def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit
3.  }

其中,execBackend是ExecutorBackend,ExecutorBackend是一个trait,其具体的实现子类是CoarseGrainedExecutorBackend。execBackend实例是在CoarseGrainedExecutorBackend的receive方法收到LaunchTask消息,调用executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask)时将CoarseGrainedExecutorBackend自己本身的this实例传进来的。这里调用CoarseGrained-ExecutorBackend的statusUpdate方法。statusUpdate方法将向Driver提交StatusUpdate消息。

CoarseGrainedExecutorBackend的statusUpdate的源码如下。

1.      override def statusUpdate(taskId: Long, state: TaskState, data:
        ByteBuffer) {
2.    val msg = StatusUpdate(executorId, taskId, state, data)
3.    driver match {
4.      case Some(driverRef) => driverRef.send(msg)
5.      case None => logWarning(s"Drop $msg because has not yet connected
        to driver")
6.    }
7.  }

(3)TaskRunner的run方法中,TaskRunner在ThreadPool中运行具体的Task,在TaskRunner的run方法中首先会通过调用statusUpdate给Driver发信息汇报自己的状态,说明自己是Running状态。

1.  execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)

其中,EMPTY_BYTE_BUFFER没有具体内容。

1.  private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))

接下来通过Task.deserializeWithDependencies(serializedTask)反序列化Task,得到一个Tuple,获取到taskFiles、taskJars、taskProps、taskBytes等信息。

(4)Executor会通过TaskRunner在ThreadPool中运行具体的Task,TaskRunner内部会做一些准备工作:反序列化Task的依赖。

Spark 2.1.1版本的Executor.scala的源码如下。

1.  val (taskFiles, taskJars, taskProps, taskBytes) =
2.            Task.deserializeWithDependencies(serializedTask)

Spark 2.2.0版本的Executor.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第1~2行Properties、addedFiles、addedJars、serializedTask等信息调整为从taskDescription中获取。

1.   .......
2.    Executor.taskDeserializationProps.set(taskDescription.properties)
3.          updateDependencies(taskDescription.addedFiles,
            taskDescription.addedJars)
4.          task = ser.deserialize[Task[Any]](
5.            taskDescription.serializedTask, Thread.currentThread.
              getContextClassLoader)
6.          task.localProperties = taskDescription.properties
7.          task.setTaskMemoryManager(taskMemoryManager)
8.  ........

然后通过网络来获取需要的文件、Jar等。

Spark 2.1.1版本的Executor.scala的源码如下。

1.  updateDependencies(taskFiles, taskJars)

Spark 2.2.0版本的Executor.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中taskFiles、taskJars等信息调整为从taskDescription.addedFiles,taskDescription. addedJars中获取。

1.  updateDependencies(taskDescription.addedFiles,
    taskDescription.addedJars)

再来看一下updateDependencies方法。从SparkContext收到一组新的文件JARs,下载Task运行需要的依赖Jars,在类加载机中加载新的JARs包。updateDependencies方法的源码如下。

Spark 2.1.1版本的Executor.scala的源码如下。

1.   private def updateDependencies(newFiles: HashMap[String, Long], newJars:
     HashMap[String, Long]) {
2.      Lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
3.      synchronized {
4.        //获取将要计算的依赖关系
5.        for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name,
          -1L) < timestamp) {
6.          logInfo("Fetching " + name + " with timestamp " + timestamp)
7.          //使用useCache获取文件,本地模式关闭缓存
8.          Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
9.            env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
10.         currentFiles(name) = timestamp
11.       }
12.       for ((name, timestamp) <- newJars) {
13.         val localName = name.split("/").last
14.         val currentTimeStamp = currentJars.get(name)
15.           .orElse(currentJars.get(localName))
16.           .getOrElse(-1L)
17.         if (currentTimeStamp < timestamp) {
18.           logInfo("Fetching " + name + " with timestamp " + timestamp)
19.           //使用useCache获取文件,本地模式关闭缓存
20.           Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
21.             env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
22.           currentJars(name) = timestamp
23.           //将它增加到类装入器中
24.           val url = new File(SparkFiles.getRootDirectory(), localName).
              toURI.toURL
25.           if (!urlClassLoader.getURLs().contains(url)) {
26.             logInfo("Adding " + url + " to class loader")
27.             urlClassLoader.addURL(url)
28.           }
29.         }
30.       }
31.     }
32.   }

Spark 2.2.0版本的Executor.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第1行newFiles、newJars的数据类型由HashMap[String, Long]调整为Map[String, Long]。

1.  private def updateDependencies(newFiles: Map[String, Long], newJars:
    Map[String, Long]) {.......

Executor的updateDependencies方法中,Executor运行具体任务时进行下载,下载文件使用synchronized关键字,因为Executor在线程中运行,同一个Stage内部不同的任务线程要共享这些内容,因此ExecutorBackend多条线程资源操作的时候,需要通过同步块加锁。

updateDependencies方法的Utils.fetchFile将文件或目录下载到目标目录,支持各种方式获取文件,包括HTTP,Hadoop兼容的文件系统、标准文件系统的文件,基于URL参数。获取目录只支持从Hadoop兼容的文件系统。如果usecache设置为true,第一次尝试取文件到本地缓存,执行同一应用程序进行共享。usecache主要用于executors,而不是本地模式。如果目标文件已经存在,并有不同于请求文件的内容,将抛出SparkException异常。

1.     def fetchFile(
2.        url: String,
3.        targetDir: File,
4.        conf: SparkConf,
5.        securityMgr: SecurityManager,
6.        hadoopConf: Configuration,
7.        timestamp: Long,
8.        useCache: Boolean) {
9.  ......
10.    doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf)
11. .......

doFetchFile方法如下,包括spark、http | https | ftp、file各种协议方式的下载。

1.   private def doFetchFile(
2.        url: String,
3.        targetDir: File,
4.        filename: String,
5.        conf: SparkConf,
6.        securityMgr: SecurityManager,
7.        hadoopConf: Configuration) {
8.      val targetFile = new File(targetDir, filename)
9.      val uri = new URI(url)
10.     val fileOverwrite = conf.getBoolean("spark.files.overwrite",
        defaultValue = false)
11.     Option(uri.getScheme).getOrElse("file") match {
12.       case "spark" =>
13.      ......
14.         downloadFile(url, is, targetFile, fileOverwrite)
15.       case "http" | "https" | "ftp" =>
16.    ......
17.         downloadFile(url, in, targetFile, fileOverwrite)
18.       case "file" =>
19.       ......
20.       copyFile(url, sourceFile, targetFile, fileOverwrite)
21.       case _ =>
22.         val fs = getHadoopFileSystem(uri, hadoopConf)
23.         val path = new Path(uri)
24.         fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite,
25.                       filename = Some(filename))
26.     }
27.   }

(5)回到TaskRunner的run方法,所有依赖的Jar都下载完成后,然后是反序列Task本身。

Spark 2.1.1版本的Executor.scala的源码如下。

1.  task    =  ser.deserialize[Task[Any]](taskBytes,  Thread.currentThread.
    getContextClassLoader)

Spark 2.2.0版本的Executor.scala的源码与Spark 2.1.1版本相比具有如下特点:

1.  task = ser.deserialize[Task[Any]](
2.        taskDescription.serializedTask, Thread.currentThread.
          getContextClassLoader)

在执行具体Task的业务逻辑前会进行四次反序列。

(a)TaskDescription的反序列化。

(b)反序列化Task的依赖。

(c)Task的反序列化。

(d)RDD反序列化。

(6)回到TaskRunner的run方法,调用反序列化后的Task.run方法来执行任务并获得执行结果。

其中,Task的run方法调用时会导致Task的抽象方法runTask的调用,在Task的runTask内部会调用RDD的iterator方法,该方法就是针对当前Task所对应的Partition进行计算的关键所在,在处理的内部会迭代Partition的元素并交给自定义的function进行处理。

进入task.run方法,在run方法里面再调用runTask方法。

1.        final def run(
2.        taskAttemptId: Long,
3.        attemptNumber: Int,
4.        metricsSystem: MetricsSystem): T = {
5.      SparkEnv.get.blockManager.registerTask(taskAttemptId)
6.      context = new TaskContextImpl(
7.    ......
8.      TaskContext.setTaskContext(context)
9.     ......
10.     try {
11.       runTask(context)
12.  ......

进入Task.scala的runTask方法,这里是一个抽象方法,没有具体的实现。

1.  def runTask(context: TaskContext): T

Task包括两种Task:ResultTask和ShuffleMapTask。抽象runTask方法由子类的runTask实现。先看一下ShuffleMapTask的runTask方法,runTask实际运行的时候会调用RDD的iterator,然后针对partition进行计算。

1.      override def runTask(context: TaskContext): MapStatus = {
2.   ......
3.      val ser = SparkEnv.get.closureSerializer.newInstance()
4.      val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
5.      ......
6.        val manager = SparkEnv.get.shuffleManager
7.       writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId,
         context)
8.        writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator
          [_ <: Product2[Any, Any]]])
9.        writer.stop(success = true).get
10.    ......

ShuffleMapTask在计算具体的Partition之后实际上会通过shuffleManager获得的shuffleWriter把当前Task计算内容根据具体的shuffleManager实现写入到具体的文件中。操作完成以后会把MapStatus发送给DAGscheduler,Driver的DAGScheduler的MapOutputTracker会收到注册的信息。

同样地,ResultTask的runTask方法也是调用RDD的iterator,然后针对partition进行计算。MapOutputTracker会把ShuffleMapTask执行结果交给ResultTask,ResultTask根据前面Stage的执行结果进行Shuffle,产生整个Job最后的结果。

1.    override def runTask(context: TaskContext): U = {
2.    ......
3.     val ser = SparkEnv.get.closureSerializer.newInstance()
4.     val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T])
       => U)](
5.  ......
6.     func(context, rdd.iterator(partition, context))
7.   }

ResultTask、ShuffleMapTask的runTask方法真正执行的时候,调用RDD的iterator,对Partition进行计算。RDD.scala的iterator方法的源码如下。

1.    override def runTask(context: TaskContext): U = {
2.    ......
3.     val ser = SparkEnv.get.closureSerializer.newInstance()
4.     val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T])
       => U)](
5.  ......
6.     func(context, rdd.iterator(partition, context))
7.   }

RDD.scala的iterator方法中,如果storageLevel不等于NONE,就直接获取或者计算得到RDD的分区;如果storageLevel是空,就从checkpoint中读取或者计算RDD分区。

进入computeOrReadCheckpoint:

1.      private[spark] def computeOrReadCheckpoint(split: Partition,
        context: TaskContext): Iterator[T] =
2.  {
3.    if (isCheckpointedAndMaterialized) {
4.      firstParent[T].iterator(split, context)
5.    } else {
6.      compute(split, context)
7.    }
8.  }

最终计算会调用RDD的compute方法。

1.  def compute(split: Partition, context: TaskContext): Iterator[T]

RDD的compute方法中的Partition是一个trait。

1.      trait Partition extends Serializable {
2.    def index: Int
3.    override def hashCode(): Int = index
4.    override def equals(other: Any): Boolean = super.equals(other)
5.  }

RDD的compute方法中的TaskContext里面有很多方法,包括任务是否完成、任务是否中断、任务是否在本地运行、任务运行完成时的监听器、任务运行失败的监听器、stageId、partitionId、重试的次数等。

1.   abstract class TaskContext extends Serializable {
2.  def isCompleted(): Boolean
3.  def isInterrupted(): Boolean
4.  def isRunningLocally(): Boolean
5.  def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext
6.  def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext
7.  def addTaskFailureListener(listener: TaskFailureListener): TaskContext
8.  def addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext
9.  def stageId()
10. def partitionId(): Int
11.   def attemptNumber(): Int
12. ......

下面看一下TaskContext具体的实现TaskContextImpl。TaskContextImpl维持了很多上下文信息,如stageId、partitionId、taskAttemptId、重试次数、taskMemoryManager等。

1.   private[spark] class TaskContextImpl(
2.      val stageId: Int,
3.      val partitionId: Int,
4.      override val taskAttemptId: Long,
5.      override val attemptNumber: Int,
6.      override val taskMemoryManager: TaskMemoryManager,
7.      localProperties: Properties,
8.      @transient private val metricsSystem: MetricsSystem,
9.      //默认值仅用于测试
10.     override val taskMetrics: TaskMetrics = TaskMetrics.empty)
11.   extends TaskContext
12.   with Logging {
13. ......

RDD的compute方法具体计算的时候有具体的RDD,如MapPartitionsRDD的compute、传进去的Partition及TaskContext上下文。

MapPartitionsRDD.scala的compute的源码如下。

1.  override def compute(split: Partition, context: TaskContext): Iterator[U] =
2.     f(context, split.index, firstParent[T].iterator(split, context))

MapPartitionsRDD.scala的compute中的f就是我们在当前的Stage中计算具体Partition的业务逻辑代码。f是函数,是我们自己写的业务逻辑。Stage从后往前推,把所有的RDD合并变成一个,函数也会变成一个链条,展开成一个很大的函数。Compute返回的是一个Iterator。

Task包括两种Task:ResultTask和ShuffleMapTask。

先看一下ShuffleMapTask的runTask方法,从ShuffleMapTask的角度讲,rdd.iterator获得数据记录以后,对rdd.iterator计算后的Iterator记录进行write。

1.  val manager = SparkEnv.get.shuffleManager
2.      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId,
        context)
3.       writer.write(rdd.iterator(partition, context).asInstanceOf
         [Iterator[_ <: Product2[Any, Any]]])
4.       writer.stop(success = true).get

ResultTask.scala的runTask方法较简单:在ResultTask中,rdd.iterator获得数据记录以后,直接调用func函数。func函数是Task任务反序列化后直接获得的fun函数。

1.  val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
2.        ByteBuffer.wrap(taskBinary.value), Thread.currentThread.
          getContextClassLoader)
3.   ......
4.     func(context, rdd.iterator(partition, context))

(7)回到TaskRunner的run方法,把执行结果序列化,并根据大小判断不同的结果传回给Driver。

 task.run运行的结果赋值给value。

 resultSer.serialize(value)把task.run的执行结果value序列化。

 maxResultSize > 0 && resultSize > maxResultSize对任务执行结果的大小进行判断,并进行相应的处理。任务执行完以后,任务的执行结果最大可以达到1GB。

如果任务执行结果特别大,超过1GB,日志就会提示超出任务大小限制。返回元数据ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))。

如果任务执行结果小于1GB,大于maxDirectResultSize(128MB),就放入blockManager,返回元数据ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))。

如果任务执行结果小于128MB,就直接返回serializedDirectResult。

TaskRunner的run方法如下。

Spark 2.1.1版本的Executor.scala的源码如下。

1.           override def run(): Unit = {
2.  ......
3.  val value = try {
4.            val res = task.run(
5.              taskAttemptId = taskId,
6.              attemptNumber = attemptNumber,
7.              metricsSystem = env.metricsSystem)
8.            threwException = false
9.            Res
10. ......
11.    val valueBytes = resultSer.serialize(value)
12. ......
13.  val directResult = new DirectTaskResult(valueBytes, accumUpdates)
14.         val serializedDirectResult = ser.serialize(directResult)
15.         val resultSize = serializedDirectResult.limit
16. ......
17.
18.  val serializedResult: ByteBuffer = {
19.           if (maxResultSize > 0 && resultSize > maxResultSize) {
20.         .......
21.       ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId
          (taskId), resultSize))
22.           } else if (resultSize > maxDirectResultSize) {
23.             val blockId = TaskResultBlockId(taskId)
24.             env.blockManager.putBytes(
25.               blockId,
26.               new ChunkedByteBuffer(serializedDirectResult.duplicate()),
27.               StorageLevel.MEMORY_AND_DISK_SER)
28.           ......
29.             ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
30.           } else {
31.        ......
32.             serializedDirectResult
33.           }
34.         }

Spark 2.2.0版本的Executor.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第6行attemptNumber调整为taskDescription.attemptNumber。

1.  ......
2.            attemptNumber = taskDescription.attemptNumber,
3.       .......

其中的maxResultSize大小是1GB,任务的执行结果最大可以达到1GB。

1.    Executor.scala
2.  //对结果的总大小限制的字节数(默认为1GB)
3.    private val maxResultSize = Utils.getMaxResultSize(conf)
4.  .......
5.  Utils.scala
6.    //对结果的总大小限制的字节数(默认为1GB)
7.    def getMaxResultSize(conf: SparkConf): Long = {
8.      memoryStringToMb(conf.get("spark.driver.maxResultSize",
        "1g")).toLong << 20
9.    }

其中的Executor.scala中的maxDirectResultSize大小,取spark.task.maxDirectResultSize和RpcUtils.maxMessageSizeBytes的最小值。其中spark.rpc.message.maxSize默认配置是128MB。spark.task.maxDirectResultSize在配置文件中进行配置。

1.    private val maxDirectResultSize = Math.min(
2.      conf.getSizeAsBytes("spark.task.maxDirectResultSize", 1L << 20),
3.      RpcUtils.maxMessageSizeBytes(conf))
4.  ......
5.  def maxMessageSizeBytes(conf: SparkConf): Int = {
6.      val maxSizeInMB = conf.getInt("spark.rpc.message.maxSize", 128)
7.      if (maxSizeInMB > MAX_MESSAGE_SIZE_IN_MB) {
8.        throw new IllegalArgumentException(
9.          s"spark.rpc.message.maxSize should not be greater than $MAX_
            MESSAGE_SIZE_IN_MB MB")
10.     }
11.     maxSizeInMB * 1024 * 1024
12.   }

补充说明:Driver发消息给Executor,Spark 1.6版本中CoarseGrainedSchedulerBackend的launchTask方法中序列化任务大小的限制是akkaFrameSize-AkkaUtils.reservedSizeBytes。其中,akkaFrameSize是128MB,reservedSizeBytes是200B。

Spark 1.6.0版本的CoarseGrainedSchedulerBackend.scala的源码如下。

1.  private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
2.      ......
3.          if (serializedTask.limit >= akkaFrameSize - AkkaUtils.
            reservedSizeBytes) {
4.  ......
5.  private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
6.  .......
7.    def maxFrameSizeBytes(conf: SparkConf): Int = {
8.      val frameSizeInMB = conf.getInt("spark.akka.frameSize", 128)
9.      if (frameSizeInMB > AKKA_MAX_FRAME_SIZE_IN_MB) {
10.       throw new IllegalArgumentException(
11.         s"spark.akka.frameSize should not be greater than $AKKA_MAX_FRAME_
            SIZE_IN_MB MB")
12.     }
13.     frameSizeInMB * 1024 * 1024
14.   }
15. .......
16. val reservedSizeBytes = 200 * 1024
17. ......

Spark 2.2.0版本的CoarseGrainedSchedulerBackend.scala的源码与Spark 1.6.0版本相比具有如下特点。

 上段代码中第3行Driver发消息给Executor,发送任务的序列化大小的限制serializedTask.limit从akkaFrameSize - AkkaUtils.reservedSizeBytes调整为maxRpc-MessageSize。

 上段代码中第5行AkkaUtils.maxFrameSizeBytes(conf)调整为RpcUtils.maxMessage-SizeBytes(conf)。

 上段代码中第7~14行maxFrameSizeBytes函数整体替换为以下代码。Spark 2.2.0版本中,CoarseGrainedSchedulerBackend的launchTasks方法中序列化任务大小的限制是maxRpcMessageSize为128MB。

1.      ......
2.          if (serializedTask.limit >= maxRpcMessageSize) {
3.  ......
4.
5.  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
6.
7.  def maxMessageSizeBytes(conf: SparkConf): Int = {
8.      val maxSizeInMB = conf.getInt("spark.rpc.message.maxSize", 128)
9.      if (maxSizeInMB > MAX_MESSAGE_SIZE_IN_MB) {
10.       throw new IllegalArgumentException(
11.         s"spark.rpc.message.maxSize should not be greater than
            $MAX_MESSAGE_SIZE_IN_MB MB")
12.     }
13.     maxSizeInMB * 1024 * 1024
14.   }
15. }

回到TaskRunner的run方法,execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)给Driver发送一个消息,消息中将taskId、TaskState.FINISHED、serializedResult放进去。

statusUpdate方法的源码如下。

1.  override def statusUpdate(taskId: Long, state: TaskState, data:
    ByteBuffer) {
2.    val msg = StatusUpdate(executorId, taskId, state, data)
3.    driver match {
4.      case Some(driverRef) => driverRef.send(msg)
5.      case None => logWarning(s"Drop $msg because has not yet connected
        to driver")
6.    }
7.  }

(8)CoarseGrainedExecutorBackend给DriverEndpoint发送StatusUpdate来传输执行结果,DriverEndpoint会把执行结果传递给TaskSchedulerImpl处理,然后交给TaskResultGetter内部通过线程去分别处理Task执行成功和失败的不同情况,最后告诉DAGScheduler任务处理结束的状况。

CoarseGrainedSchedulerBackend.scala中DriverEndpoint的receive方法如下。

1.   override def receive: PartialFunction[Any, Unit] = {
2.      case StatusUpdate(executorId, taskId, state, data) =>
3.        scheduler.statusUpdate(taskId, state, data.value)
4.        if (TaskState.isFinished(state)) {
5.          executorDataMap.get(executorId) match {
6.            case Some(executorInfo) =>
7.              executorInfo.freeCores += scheduler.CPUS_PER_TASK
8.              makeOffers(executorId)
9.            case None =>
10.             //忽略更新,因为我们不知道Executor
11.             logWarning(s"Ignored task status update ($taskId state $state)" +
12.               s"from unknown executor with ID $executorId")
13.         }
14.       }

DriverEndpoint的receive方法中,StatusUpdate调用scheduler.statusUpdate,然后释放资源,再次进行资源调度makeOffers(executorId)。

TaskSchedulerImpl的statusUpdate中:

 如果是TaskState.LOST,则记录下原因,将Executor清理掉。

 如果是TaskState.isFinished,则从taskSet中运行的任务中remove掉任务,调用taskResultGetter.enqueueSuccessfulTask处理。

 如果是TaskState.FAILED、TaskState.KILLED、TaskState.LOST,则调用taskResultGetter. enqueueFailedTask处理。

TaskSchedulerImpl的statusUpdate的源码如下。

1.        def statusUpdate(tid: Long, state: TaskState, serializedData:
          ByteBuffer) {
2.   var failedExecutor: Option[String] = None
3.   var reason: Option[ExecutorLossReason] = None
4.   synchronized {
5.     try {
6.       taskIdToTaskSetManager.get(tid) match {
7.         case Some(taskSet) =>
8.           if (state == TaskState.LOST) {
9.    //TaskState.LOST只被废弃的Mesos 细粒度的调度模式使用,每个Executor对应单
      //个任务,因此将Executor标记为失败
10.            val execId = taskIdToExecutorId.getOrElse(tid, throw new
               IllegalStateException(
11.             "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.
                contains(tid)"))
12.            if (executorIdToRunningTaskIds.contains(execId)) {
13.              reason = Some(
14.                SlaveLost(s"Task $tid was lost, so marking the executor
                   as lost as well."))
15.              removeExecutor(execId, reason.get)
16.              failedExecutor = Some(execId)
17.            }
18.          }
19.          if (TaskState.isFinished(state)) {
20.            cleanupTaskState(tid)
21.              taskSet.removeRunningTask(tid)
22.              if (state == TaskState.FINISHED) {
23.                taskResultGetter.enqueueSuccessfulTask(taskSet, tid,
                   serializedData)
24.              } else if (Set(TaskState.FAILED, TaskState.KILLED, T
                 askState.LOST).contains(state)) {
25.                taskResultGetter.enqueueFailedTask(taskSet, tid, state,
                   serializedData)
26.              }
27.            }
28.          case None =>
29.            logError(
30.              ("Ignoring update with state %s for TID %s because its task
                 set is gone (this is " +
31.                "likely the result of receiving duplicate task finished
                   status updates) or its " +
32.                "executor has been marked as failed.")
33.                .format(state, tid))
34.        }
35.      } catch {
36.        case e: Exception => logError("Exception in statusUpdate", e)
37.      }
38.    }
39.    //更新DAGScheduler时没持有这个锁,所以可能导致死锁
40.    if (failedExecutor.isDefined) {
41.      assert(reason.isDefined)
42.      dagScheduler.executorLost(failedExecutor.get, reason.get)
43.      backend.reviveOffers()
44.    }
45.  }

其中,taskResultGetter是TaskResultGetter的实例化对象。

1.  private[spark] var taskResultGetter = new TaskResultGetter(sc.env,
    this)

TaskResultGetter.scala的源码如下。

1.     private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler:
       TaskSchedulerImpl)
2.    extends Logging {
3.
4.    private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.
      threads", 4)
5.
6.    //用于测试
7.    protected val getTaskResultExecutor: ExecutorService =
8.      ThreadUtils.newDaemonFixedThreadPool(THREADS, "task-result-getter")
9.  .......
10. def enqueueSuccessfulTask(
11.       taskSetManager: TaskSetManager,
12.       tid: Long,
13.       serializedData: ByteBuffer): Unit = {
14.     getTaskResultExecutor.execute(new Runnable {
15.       override def run(): Unit = Utils.logUncaughtExceptions {
16.         try {
17.          val (result, size) = serializer.get().deserialize[TaskResult[_]]
             (serializedData) match {
18.             case directResult: DirectTaskResult[_] =>
19.               if (!taskSetManager.canFetchMoreResults(serializedData.
               limit())) {
20.              return
21.            }
22.    //反序列化“值”时不持有任何锁,所以不会阻止其他线程。我们在这里调用它,这样在
       //TaskSetManager.handleSuccessfulTask中,当它再次被调用时,不需要反序列化值
23.            directResult.value(taskResultSerializer.get())
24.            (directResult, serializedData.limit())
25.          case IndirectTaskResult(blockId, size) =>
26.            if (!taskSetManager.canFetchMoreResults(size)) {
27.              //如果大小超过maxResultSize,将被Executor丢弃
28.              sparkEnv.blockManager.master.removeBlock(blockId)
29.              return
30.            }
31.            logDebug("Fetching indirect task result for TID %s".format(tid))
32.            scheduler.handleTaskGettingResult(taskSetManager, tid)
33.            val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes
               (blockId)
34.
35.            if (!serializedTaskResult.isDefined) {
36.              /*如果运行任务的机器失败,我们将无法获得任务结果
                   当任务结束,我们试图取结果时,块管理器必须刷新结果*/
37.              scheduler.handleFailedTask(
38.                taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
39.              return
40.            }
41.            val deserializedResult = serializer.get().deserialize
               [DirectTaskResult[_]](
42.              serializedTaskResult.get.toByteBuffer)
43.            //反序列化获取值
44.            deserializedResult.value(taskResultSerializer.get())
45.            sparkEnv.blockManager.master.removeBlock(blockId)
46.            (deserializedResult, size)
47.        }
48.
49.  //从Executors接收的累加器更新中设置任务结果大小,我们需要在Driver上执行此操
     //作,因为如果我们在Executors 上执行此操作,那么将结果更新大小后须进行序列化
50.        result.accumUpdates = result.accumUpdates.map { a =>
51.          if (a.name == Some(InternalAccumulator.RESULT_SIZE)) {
52.            val acc = a.asInstanceOf[LongAccumulator]
53.            assert(acc.sum == 0L, "task result size should not have been
               set on the executors")
54.            acc.setValue(size.toLong)
55.            acc
56.          } else {
57.            a
58.          }
59.        }
60.
61.        scheduler.handleSuccessfulTask(taskSetManager, tid, result)
62.      } catch {
63.        case cnf: ClassNotFoundException =>
64.          val loader = Thread.currentThread.getContextClassLoader
65.          taskSetManager.abort("ClassNotFound with classloader: " +
             loader)
66.        //匹配NonFatal,所以我们不从上面的return捕获ControlThrowable 异常
67.        case NonFatal(ex) =>
68.          logError("Exception while getting task result", ex)
69.          taskSetManager.abort("Exception while getting task result:
              %s".format(ex))
70.        }
71.      }
72.    })
73.  }

TaskResultGetter.scala的enqueueSuccessfulTask方法中,处理成功任务的时候开辟了一条新线程,先将结果反序列化,然后根据接收的结果类型DirectTaskResult、IndirectTaskResult分别处理。

如果是DirectTaskResult,则直接获得结果并返回。

如果是IndirectTaskResult,就通过blockManager.getRemoteBytes远程获取。获取以后再进行反序列化。

最后是scheduler.handleSuccessfulTask。

TaskSchedulerImpl的handleSuccessfulTask的源码如下。

1.   def handleSuccessfulTask(
2.      taskSetManager: TaskSetManager,
3.      tid: Long,
4.      taskResult: DirectTaskResult[_]): Unit = synchronized {
5.    taskSetManager.handleSuccessfulTask(tid, taskResult)
6.  }

TaskSchedulerImpl中也有失败任务的相应处理。

Spark 2.1.1版本的TaskSchedulerImpl.scala的源码如下。

1.     def handleFailedTask(
2.       taskSetManager: TaskSetManager,
3.       tid: Long,
4.       taskState: TaskState,
5.       reason: TaskFailedReason): Unit = synchronized {
6.     taskSetManager.handleFailedTask(tid, taskState, reason)
7.     if (!taskSetManager.isZombie && taskState != TaskState.KILLED) {
8.       //任务集管理状态更新后,需要再次分配资源,失败的任务需要重新运行
9.       backend.reviveOffers()
10.    }
11.  }

Spark 2.2.0版本的TaskSchedulerImpl.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第7行if语句判断条件更新。

1.  ......
2.   if (!taskSetManager.isZombie && !taskSetManager.someAttemptSucceeded
     (tid)) {
3.  .......

TaskSchedulerImpl的handleSuccessfulTask交给TaskSetManager调用handleSuccessfulTask,告诉DAGScheduler任务处理结束的状况,并且Kill掉其他尝试的相同任务(因为一个任务已经尝试成功,其他的相同任务没必要再次去尝试)。

Spark 2.1.1版本的TaskSetManager的handleSuccessfulTask的源码如下。

1.  def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit
    = {
2.    val info = taskInfos(tid)
3.    val index = info.index
4.     info.markFinished(TaskState.FINISHED)
5.     removeRunningTask(tid)
6.   /**这种方法被   TaskSchedulerImpl.handleSuccessfulTask          调用,其持有  Task
       *SchedulerImpl锁直至退出。为了避免SPARK-7655的问题,当持有一个锁的时候,我
       *们不应该反序列化值,以避免阻塞其他线程。所以,我们在TaskResultGetter.enqueue-
       *SuccessfulTask中调用result.value()。注意:result.value()只在第一次调用
       *时反序列化值,所以在这里result.value()只是返回值,并不会阻止其他线程
7.      */
8.
9.     sched.dagScheduler.taskEnded(tasks(index), Success, result.value(),
       result.accumUpdates, info)
10.    //杀掉同一任务的任何其他尝试(因为现在不需要这些任务,所以一次尝试成功)
11.    for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
12.      logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task
         ${attemptInfo.id} " +
13.        s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on
           ${attemptInfo.host} " +
14.        s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
15.     sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId,
        true)
16.    }
17.    if (!successful(index)) {
18.      tasksSuccessful += 1
19.      logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID
         ${info.taskId}) in" +
20.        s" ${info.duration} ms on ${info.host} (executor ${info.
           executorId})" +
21.        s" ($tasksSuccessful/$numTasks)")
22.      //如果所有的任务都成功了,就标记成功并停止
23.      successful(index) = true
24.      if (tasksSuccessful == numTasks) {
25.        isZombie = true
26.      }
27.    } else {
28.      logInfo("Ignoring task-finished event for " + info.id + " in stage
         " + taskSet.id +
29.        " because task " + index + " has already completed successfully")
30.    }
31.    maybeFinishTaskSet()
32.  }

Spark 2.2.0版本的TaskSetManager的handleSuccessfulTask的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第4行info.markFinished新增第2个参数clock.getTimeMillis()获取时间。

 上段代码中第4行之后新增if (speculationEnabled)的处理代码。

 上段代码中第9行sched.dagScheduler.taskEnded代码置后,放到maybeFinishTaskSet()方法之前。

 上段代码中第15行sched.backend.killTask的第3个参数调整为interruptThread = true,新增第4个参数reason。

1.  ......
2.     info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
3.      if (speculationEnabled) {
4.        successfulTaskDurations.insert(info.duration)
5.      }
6.   ......
7.          interruptThread = true,
8.          reason = "another attempt succeeded")
9.    ......
10.     sched.dagScheduler.taskEnded(tasks(index), Success, result.value(),
        result.accumUpdates, info)
11. ......

speculationEnabled默认设置为spark.speculation=false,用于推测执行慢的任务;如果设置为true,successfulTaskDurations使用MedianHeap记录成功任务的持续时间,这样就可以确定什么时候启动推测性任务,这种情况只在启用推测时使用,以避免不使用堆时增加堆中的开销。

TaskSetManager的handleSuccessfulTask中调用了maybeFinishTaskSet。maybeFinishTaskSet的源码如下。

Spark 2.1.1版本的TaskSetManager.scala的源码如下。

1.  private def maybeFinishTaskSet() {
2.    if (isZombie && runningTasks == 0) {
3.      sched.taskSetFinished(this)
4.    }
5.  }

Spark 2.2.0版本的TaskSetManager.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第3行之后增加了tasksSuccessful == numTasks的逻辑处理。BlacklistTracker设计跟踪问题的Executors和nodes。blacklistTracker循环遍历更新黑名单列表。

1.  ......
2.        if (tasksSuccessful == numTasks) {
3.          blacklistTracker.foreach(_.updateBlacklistForSuccessfulTaskSet(
4.            taskSet.stageId,
5.            taskSet.stageAttemptId,
6.            taskSetBlacklistHelperOpt.get.execToFailures))
7.        }
8.      }

TaskSetManager:单TaskSet的任务调度在TaskSchedulerImpl中进行。TaskSetManager类跟踪每项任务,如果任务重试失败(超过有限的次数),对于TaskSet处理本地调度主要的接口是resourceOffer,询问TaskSet是否要在一个节点上运行任务,进行状态更新statusUpdate,告诉TaskSet的一个任务的状态发生了改变(如已完成)。线程:这个类被设计成只在具有锁的代码TaskScheduler上调用(如事件处理程序),不应该从其他线程调用。

总结:

Task执行及结果处理原理流程图如图8-3所示。任务从Driver上发送过来,CoarseGrainedSchedulerBackend发送任务,CoarseGrainedExecutorBackend收到任务后,交给Executor处理,Executor会通过launchTask执行Task。TaskRunner内部会做很多准备工作:反序列化Task的依赖,通过网络获取需要的文件、Jar、反序列Task本身等待;然后调用Task的runTask执行,runTask有ShuffleMapTask、ResultTask两种。通过iterator()方法根据业务逻辑循环遍历,如果是ShuffleMapTask,就把MapStatus汇报给MapOutTracker;如果是ResultTask,就从前面的MapOutTracker中获取信息。

图8-3 Task执行及结果处理原理流程图