8.2 Stage划分内幕

本节讲解Stage划分原理及Stage划分源码。一个Application中,每个Job由一个或多个Stage构成,Stage根据宽依赖(如reducByKey、groupByKey算子等)进行划分。

8.2.1 Stage划分原理详解

Spark Application中可以因为不同的Action触发众多的Job。也就是说,一个Application中可以有很多的Job,每个Job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage。也就是说,只有前面依赖的Stage计算完毕后,后面的Stage才会运行。

Stage划分的依据就是宽依赖,什么时候产生宽依赖呢?例如,reducByKey、groupByKey等;Action(如collect)导致SparkContext.runJob的执行,最终导致DAGScheduler中的submitJob的执行,其核心是通过发送一个case class JobSubmitted对象给eventProcessLoop。

eventProcessLoop是DAGSchedulerEventProcessLoop的具体实例,而DAGSchedulerEvent-ProcessLoop是EventLoop的子类,具体实现EventLoop的onReceive方法。onReceive方法转过来回调doOnReceive。在doOnReceive中通过模式匹配的方式把执行路由到JobSubmitted,在handleJobSubmitted中首先创建finalStage,创建finalStage时会建立父Stage的依赖链条。

8.2.2 Stage划分源码详解

Spark的Action算子执行SparkContext.runJob,提交至DAGScheduler中的submitJob,submitJob发送JobSubmitted对象到eventProcessLoop循环消息队列,提交该任务,其中JobSubmitted的源码如下。

DAGSchedulerEvent.scala的源码如下。

1.  private[scheduler] case class JobSubmitted(
2.     jobId: Int,
3.     finalRDD: RDD[_],
4.     func: (TaskContext, Iterator[_]) => _,
5.     partitions: Array[Int],
6.     callSite: CallSite,
7.     listener: JobListener,
8.     properties: Properties = null)
9.   extends DAGSchedulerEvent

eventProcessLoop是DAGSchedulerEventProcessLoop的具体实例,而DAGScheduler-EventProcessLoop是EventLoop的子类,具体实现EventLoop的onReceive方法,onReceive方法转过来回调doOnReceive。

DAGScheduler.scala的源码如下。

1.  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
2.      case JobSubmitted(jobId, rdd, func, partitions, callSite, listener,
        properties) =>
3.        dagScheduler.handleJobSubmitted(jobId,           rdd,    func,    partitions,
          callSite, listener, properties)
4.
5.      case MapStageSubmitted(jobId, dependency, callSite, listener,
        properties) =>
6.        dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite,
          listener, properties)
7.   ......