4.1 Spark Driver Program剖析

SparkContext是通往Spark集群的唯一入口,是整个Application运行调度的核心。本节将深度剖析SparkContext。

4.1.1 Spark Driver Program

Spark Driver Program(以下简称Driver)是运行Application的main函数并且新建SparkContext实例的程序。其实,初始化SparkContext是为了准备Spark应用程序的运行环境,在Spark中,由SparkContext负责与集群进行通信、资源的申请、任务的分配和监控等。当Worker节点中的Executor运行完毕Task后,Driver同时负责将SparkContext关闭。通常也可以使用SparkContext来代表驱动程序(Driver)。

Driver(SparkContext)整体架构图如图4-1所示。

图4-1 Driver(SparkContext)整体架构图

4.1.2 SparkContext深度剖析

SparkContext是通往Spark集群的唯一入口,可以用来在Spark集群中创建RDDs、累加器(Accumulators)和广播变量(Broadcast Variables)。SparkContext也是整个Spark应用程序(Application)中至关重要的一个对象,可以说是整个Application运行调度的核心(不是指资源调度)。

SparkContext的核心作用是初始化Spark应用程序运行所需要的核心组件,包括高层调度器(DAGScheduler)、底层调度器(TaskScheduler)和调度器的通信终端(SchedulerBackend),同时还会负责Spark程序向Master注册程序等。

一般而言,通常为了测试或者学习Spark开发一个Application,在Application的main方法中,最开始几行编写的代码一般是这样的:首先,创建SparkConf实例,设置SparkConf实例的属性,以便覆盖Spark默认配置文件spark-env.sh,spark-default.sh和log4j.properties中的参数;然后,SparkConf实例作为SparkContext类的唯一构造参数来实例化SparkContext实例对象。SparkContext在实例化的过程中会初始化DAGScheduler、TaskScheduler和SchedulerBackend,而当RDD的action触发了作业(Job)后,SparkContext会调用DAGScheduler将整个Job划分成几个小的阶段(Stage),TaskScheduler会调度每个Stage的任务(Task)进行处理。还有,SchedulerBackend管理整个集群中为这个当前的Application分配的计算资源,即Executor。

如果用一个车来比喻Spark Application,那么SparkContext就是车的引擎,而SparkConf是关于引擎的配置参数。说明:只可以有一个SparkContext实例运行在一个JVM内存中,所以在创建新的SparkContext实例前,必须调用stop方法停止当前JVM唯一运行的SparkContext实例。

Spark程序在运行时分为Driver和Executor两部分:Spark程序编写是基于SparkContext的,具体包含两方面。

 Spark编程的核心基础RDD是由SparkContext最初创建的(第一个RDD一定是由SparkContext创建的)。

 Spark程序的调度优化也是基于SparkContext,首先进行调度优化。

 Spark程序的注册是通过SparkContext实例化时生产的对象来完成的(其实是SchedulerBackend来注册程序)。

 Spark程序在运行时要通过Cluster Manager获取具体的计算资源,计算资源获取也是通过SparkContext产生的对象来申请的(其实是SchedulerBackend来获取计算资源的)。

 SparkContext崩溃或者结束的时候,整个Spark程序也结束。

4.1.3 SparkContext源码解析

SparkContext是Spark应用程序的核心。我们运行WordCount程序,通过日志来深入了解SparkContext。

WordCount.scala的代码如下。

1.   package com.dt.spark.sparksql
2.
3.  import org.apache.log4j.{Level, Logger}
4.  import org.apache.spark.rdd.RDD
5.  import org.apache.spark.{SparkConf, SparkContext}
6.
7.  /**
8.    * 使用Scala开发本地测试的Spark WordCount程序
9.    * @author DT大数据梦工厂
10.   * 新浪微博:http://weibo.com/ilovepains/
11.   */
12. object WordCount {
13.   def main(args: Array[String]){
14.     Logger.getLogger("org").setLevel(Level.ALL)
15.     /**
16.       * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
          * 例如,通过setMaster设置程序要链接的Spark集群的Master的URL,如果设置
          * 为local,则代表Spark程序在本地运行,特别适合于机器配置非常差(如只有1GB
          * 的内存)的初学者
17.       */
18.
19.     val conf = new SparkConf() //创建SparkConf对象
20.     conf.setAppName("Wow,WordCountJobRuntime!")
                                 //设置应用程序的名称,在程序运行的监控界面中可以看到名称
21.     conf.setMaster("local")    //此时,程序在本地运行,不需要安装Spark集群
22.
23.     /**
24.       * 第2步:创建SparkContext对象
25.       * SparkContext是Spark程序所有功能的唯一入口,采用Scala、Java、Python、
          * R等都必须有一个SparkContext
26.       * SparkContext    核心作用:初始化  Spark  应用程序运行所需要的核心组件,包括
          * DAGScheduler、TaskScheduler、SchedulerBackend
27.       * 同时还会负责Spark程序往Master注册程序等
28.       * SparkContext是整个Spark应用程序中至关重要的一个对象
29.       */
30.     val sc = new SparkContext(conf)
                                       //创建SparkContext对象,通过传入SparkConf
                                       //实例来定制Spark运行的具体参数和配置信息
31.     /**
32.       * 第 3  步:根据具体的数据来源(如  HDFS、HBase、Local FS、DB、S3      等)通过
          * SparkContext来创建RDD
33.       * RDD的创建有3种方式:根据外部的数据来源(如HDFS),根据Scala集合,由其他
          * 的RDD操作
34.       * 数据会被RDD划分成一系列的Partitions,分配到每个Partition的数据属于一
          * 个Task的处理范畴
35.       */
36.     val lines = sc.textFile("data/wordcount/helloSpark.txt")
                                               //读取本地文件并设置为一个Partition
37.
38.     /**
39.       * 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等
          * 高阶函数等的编程,进行具体的数据计算
40.       *  第4.1步:将每一行的字符串拆分成单个单词
41.       */
42.
43.     val words = lines.flatMap { line => line.split(" ")}
                                 //对每一行的字符串进行单词拆分,并把所有行的拆分结果通过
                                 //flat合并成为一个大的单词集合
44.
45.     /**
46.       * 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等
          * 高阶函数等的编程,进行具体的数据计算
47.       *  第4.2步:在单词拆分的基础上对每个单词实例计数为1,也就是word => (word, 1)
48.       */
49.     val pairs: RDD[(String, Int)] = words.map { word => (word, 1) }
50.     pairs.cache()
51.     /**
52.       * 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等
          * 高阶函数等的编程,进行具体的数据计算
53.       *  第4.3步:在每个单词实例计数为1的基础上统计每个单词在文件中出现的总次数
54.       */
55.     val wordCountsOdered = pairs.reduceByKey(_+_).saveAsTextFile("data/
        wordcount/wordCountResult.log")
56.
57.     while(true){
58.
59.     }
60.     sc.stop()
61.
62.   }
63. }

在IDEA中运行WordCount.scala代码,日志显示如下。

1.  Using Spark's default log4j profile: org/apache/spark/log4j-defaults.
    properties
2.  17/06/16 06:00:49 INFO SparkContext: Running Spark version 2.1.0
3.  ........
4.  17/06/16 06:00:54 TRACE BlockInfoManager: Task -1024 releasing lock for
    broadcast_0_piece0
5.  17/06/16 06:00:54 DEBUG BlockManager: Putting block broadcast_0_piece0
    without replication took  377 ms
6.  17/06/16 06:00:54 INFO SparkContext: Created broadcast 0 from textFile
    at WordCountJobRuntime.scala:39
7.  ........
8.  17/06/16 06:00:54 INFO SparkContext: Starting job: saveAsTextFile at
    WordCountJobRuntime.scala:58
9.  .......

程序一开始,日志里显示的是:INFO SparkContext: Running Spark version 2.1.0,日志中间部分是一些随着SparkContext创建而创建的对象,另一条比较重要的日志信息,作业启动了并正在运行:INFO SparkContext: Starting job: saveAsTextFile at WordCountJobRuntime.scala:58。

在程序运行的过程中会创建TaskScheduler、DAGScheduler和SchedulerBackend,它们有各自的功能。DAGScheduler是面向Job的Stage的高层调度器;TaskScheduler是底层调度器。SchedulerBackend是一个接口,根据具体的ClusterManager的不同会有不同的实现。程序打印结果后便开始结束。日志显示:INFO SparkContext: Successfully stopped SparkContext。

1.  ........
2.  17/06/16 06:00:56 INFO BlockManagerMaster: BlockManagerMaster stopped
3.  17/06/16 06:00:56 INFO OutputCommitCoordinator$OutputCommitCoordinator
    Endpoint: OutputCommitCoordinator stopped!
4.  17/06/16 06:00:56 INFO SparkContext: Successfully stopped SparkContext
5.  17/06/16 06:00:56 INFO ShutdownHookManager: Shutdown hook called
6.  .......

通过这个例子可以感受到Spark程序的运行到处都可以看到SparkContext的存在,我们将SparkContext作为Spark源码阅读的入口,来理解Spark的所有内部机制。

图4-2是从一个整体去看SparkContext创建的实例对象。首先,SparkContext构建的顶级三大核心为DAGScheduler、TaskScheduler、SchedulerBackend,其中,DAGScheduler是面向Job的Stage的高层调度器;TaskScheduler是一个接口,是底层调度器,根据具体的ClusterManager的不同会有不同的实现,Standalone模式下具体的实现是TaskSchedulerImpl。SchedulerBackend是一个接口,根据具体的ClusterManager的不同会有不同的实现。Standalone模式下具体的实现是StandaloneSchedulerBackend。

图4-2 SparkContext整体运行图

从整个程序运行的角度讲,SparkContext包含四大核心对象:DAGScheduler、TaskScheduler、SchedulerBackend、MapOutputTrackerMaster。StandaloneSchedulerBackend有三大核心功能:负责与Master连接,注册当前程序RegisterWithMaster;接收集群中为当前应用程序分配的计算资源Executor的注册并管理Executors;负责发送Task到具体的Executor执行。

第一步:程序一开始运行时会实例化SparkContext里的对象,所有不在方法里的成员都会被实例化!一开始实例化时第一个关键的代码是createTaskScheduler,它位于SparkContext的PrimaryConstructor中,当它实例化时会直接被调用,这个方法返回的是taskScheduler和dagScheduler的实例,然后基于这个内容又构建了DAGScheduler,最后调用taskScheduler的start()方法。要先创建taskScheduler,然后再创建dagScheduler,因为taskScheduler是受dagScheduler管理的。

SparkContext.scala的源码如下。

1.  //创建和启动调度器
2.     val (sched, ts) = SparkContext.createTaskScheduler(this, master,
       deployMode)
3.     _schedulerBackend = sched
4.     _taskScheduler = ts
5.      _dagScheduler = new DAGScheduler(this)
6.      _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
7.      //在DAGScheduler构造器中设置taskScheduler的引用以后,启动TaskScheduler
8.      _taskScheduler.start()
9.  ......

第二步:调用createTaskScheduler,这个方法创建了TaskSchedulerImpl和StandaloneSchedulerBackend,createTaskScheduler方法的第一个入参是SparkContext,传入的this对象是在应用程序中创建的sc,第二个入参是master的地址。

以下是WordCount.scala创建SparkConf和SparkContext的上下文信息。

1.  val conf = new SparkConf() //创建SparkConf对象
2.     conf.setAppName("Wow,WordCount")
             //设置应用程序的名称,在程序运行的监控界面中可以看到名称
3.     conf.setMaster("local")
4.  val sc = new SparkContext(conf)

当SparkContext调用createTaskScheduler方法时,根据集群的条件创建不同的调度器,例如,createTaskScheduler第二个入参master如传入local参数,SparkContext将创建TaskSchedulerImpl实例及LocalSchedulerBackend实例,在测试代码的时候,可以尝试传入local[*]或者是local[2]的参数,然后跟踪代码,看看创建了什么样的实例对象。

SparkContext中的SparkMasterRegex对象定义不同的正则表达式,从master字符串中根据正则表达式适配master信息。

SparkContext.scala的源码如下。

1.   private object SparkMasterRegex {
2.    //正则表达式 local[N]和 local[*] 用于master 格式
3.    val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
4.    //正则表达式 local[N, maxRetries]用于失败任务的测试
5.    val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
6.    //正则表达式用于模拟Spark 本地集群 [N, cores, memory]
7.    val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)
      \s*,\s*([0-9]+)\s*]""".r
8.    //用于连接到Spark部署集群的正则表达式
9.    val SPARK_REGEX = """spark://(.*)""".r
10. }

这是设计模式中的策略模式,它会根据实际需要创建出不同的SchedulerBackend的子类。

SparkContext.scala的createTaskScheduler方法的源码如下。

1.   /**
       *基于给定的主URL创建任务调度器,返回一个二元调度程序的后台和任务调度
2.     */
3.
4.    private def createTaskScheduler(
5.        sc: SparkContext,
6.        master: String,
7.        deployMode: String): (SchedulerBackend, TaskScheduler) = {
8.      import SparkMasterRegex._
9.
10.     //当在本地运行时,不要试图在失败时重新执行任务
11.     val MAX_LOCAL_TASK_FAILURES = 1
12.
13.  master match {
14.    case "local" =>
15.      val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES,
         isLocal = true)
16.      val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
17.      scheduler.initialize(backend)
18.      (backend, scheduler)
19.
20.    case LOCAL_N_REGEX(threads) =>
21.      def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
22.      //local[*]估计机器上的核数; local[N] 精确地使用N个线程
23.      val threadCount = if (threads == "*") localCpuCount else
         threads.toInt
24.      if (threadCount <= 0) {
25.        throw new SparkException(s"Asked to run locally with $threadCount
           threads")
26.      }
27.      val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES,
         isLocal = true)
28.      val backend = new LocalSchedulerBackend(sc.getConf, scheduler,
         threadCount)
29.      scheduler.initialize(backend)
30.      (backend, scheduler)
31.
32.    case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
33.      def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
34.      //local[*, M] 计算机核发生M个故障
35.      //local[N, M] 意味着N个线程M个故障
36.      val threadCount = if (threads == "*") localCpuCount else threads.
         toInt
37.      val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt,
         isLocal = true)
38.      val backend = new LocalSchedulerBackend(sc.getConf, scheduler,
         threadCount)
39.      scheduler.initialize(backend)
40.      (backend, scheduler)
41.
42.    case SPARK_REGEX(sparkUrl) =>
43.      val scheduler = new TaskSchedulerImpl(sc)
44.      val masterUrls = sparkUrl.split(",").map("spark://" + _)
45.      val   backend    =  new   StandaloneSchedulerBackend(scheduler,      sc,
         masterUrls)
46.      scheduler.initialize(backend)
47.      (backend, scheduler)
48.
49.    case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
50.      //确认请求的内存<= memoryPerSlave,否则Spark将会挂起
51.      val memoryPerSlaveInt = memoryPerSlave.toInt
52.      if (sc.executorMemory > memoryPerSlaveInt) {
53.        throw new SparkException(
54.          "Asked to launch cluster with %d MB RAM / worker but requested
             %d MB/worker".format(
55.            memoryPerSlaveInt, sc.executorMemory))
56.      }
57.
58.      val scheduler = new TaskSchedulerImpl(sc)
59.      val localCluster = new LocalSparkCluster(
60.        numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
61.      val masterUrls = localCluster.start()
62.        val   backend    =  new   StandaloneSchedulerBackend(scheduler,        sc,
           masterUrls)
63.        scheduler.initialize(backend)
64.        backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
65.          localCluster.stop()
66.        }
67.        (backend, scheduler)
68.
69.      case masterUrl =>
70.        val cm = getClusterManager(masterUrl) match {
71.          case Some(clusterMgr) => clusterMgr
72.          case None => throw new SparkException("Could not parse Master URL:
             '" + master + "'")
73.        }
74.        try {
75.          val scheduler = cm.createTaskScheduler(sc, masterUrl)
76.          val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
77.          cm.initialize(scheduler, backend)
78.          (backend, scheduler)
79.        } catch {
80.          case se: SparkException => throw se
81.          case NonFatal(e) =>
82.            throw new SparkException("External scheduler cannot be
               instantiated", e)
83.        }
84.    }
85.  }

在实际生产环境下,我们都是用集群模式,即以spark://开头,此时在程序运行时,框架会创建一个TaskSchedulerImpl和StandaloneSchedulerBackend的实例,在这个过程中也会初始化taskscheduler,把StandaloneSchedulerBackend的实例对象作为参数传入。StandaloneSchedulerBackend被TaskSchedulerImpl管理,最后返回TaskScheduler和StandaloneSchdeulerBackend。

SparkContext.scala的源码如下。

1.  case SPARK_REGEX(sparkUrl) =>
2.      val scheduler = new TaskSchedulerImpl(sc)
3.      val masterUrls = sparkUrl.split(",").map("spark://" + _)
4.      val backend = new StandaloneSchedulerBackend(scheduler, sc,
        masterUrls)
5.      scheduler.initialize(backend)
6.      (backend, scheduler)

createTaskScheduler方法执行完毕后,调用了taskscheduler.start()方法来正式启动taskscheduler,这里虽然调用了taskscheduler.start方法,但实际上是调用了taskSchedulerImpl的start方法,因为taskSchedulerImpl是taskScheduler的子类。

Task默认失败重试次数是4次,如果任务不容许失败,就可以调大这个参数。调大spark.task.maxFailures参数有助于确保重要的任务失败后可以重试多次。

初始化TaskSchedulerImpl:调用createTaskScheduler方法时会初始化TaskSchedulerImpl,然后把StandaloneSchedulerBackend当作参数传进去,初始化TaskSchedulerImpl时首先是创建一个Pool来初定义资源分布的模式Scheduling Mode,默认是先进先出(FIFO)的模式。

Spark 2.1.1版本的TaskSchedulerImpl.scala的initialize的源码如下。

1.  def initialize(backend: SchedulerBackend) {
2.     this.backend = backend
3.     //临时设置rootPool名字为空
4.     rootPool = new Pool("", schedulingMode, 0, 0)
5.     schedulableBuilder = {
6.       schedulingMode match {
7.         case SchedulingMode.FIFO =>
8.           new FIFOSchedulableBuilder(rootPool)
9.         case SchedulingMode.FAIR =>
10.          new FairSchedulableBuilder(rootPool, conf)
11.        case _ =>
12.          throw new IllegalArgumentException(s"Unsupported spark.
             scheduler.mode: $schedulingMode")
13.      }
14.    }
15.    schedulableBuilder.buildPools()
16.  }

Spark 2.2.0版本的TaskSchedulerImpl.scala的initialize的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第4行rootPool变量的创建从initialize方法内移动至initialize方法外;rootPool作为TaskSchedulerImpl类的成员变量,在构建TaskSchedulerImpl时初始化。

1.  val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
2.  ......

可以设置spark.scheduler.mode参数来定义资源调度池,例如FAIR、FIFO,默认资源调度池是先进先出(FIFO)模式。

Spark 2.1.1版本的TaskSchedulerImpl.scala的源码如下。

1.       private val schedulingModeConf = conf.get("spark.scheduler.mode",
         "FIFO")
2.  val schedulingMode: SchedulingMode = try {
3.    SchedulingMode.withName(schedulingModeConf.toUpperCase)
4.  } catch {
5.    case e: java.util.NoSuchElementException =>
6.      throw    new    SparkException(s"Unrecognized         spark.scheduler.mode:
        $schedulingModeConf")
7.  }

Spark 2.2.0版本的TaskSchedulerImpl.scala的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第1行Conf配置文件获取属性的代码进行了微调,调整为从object TaskSchedulerImpl中获取。

 上段代码中第3行toUpperCase更新为toUpperCase(Locale.ROOT)。

 上段代码中第6行异常提示字符串更新为$SCHEDULER_MODE_PROPERTY。

1.    private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY,
      SchedulingMode.FIFO.toString)
2.   .....
3.        SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT))
4.    .....
5.          throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY:
            $schedulingModeConf")
6.      }
7.  .......
8.  private[spark] object TaskSchedulerImpl {
9.
10.   val SCHEDULER_MODE_PROPERTY = "spark.scheduler.mode"
11. .....
12. SchedulingMode.scala
13.
14. object SchedulingMode extends Enumeration {
15.
16.   type SchedulingMode = Value
17.   val FAIR, FIFO, NONE = Value
18. }

回到taskScheduler start方法,taskScheduler.start方法调用时会再调用schedulerbackend的start方法。

TaskSchedulerImpl.scala的start方法的源码如下。

1.   override def start() {
2.      backend.start()
3.
4.      if (!isLocal && conf.getBoolean("spark.speculation", false)) {
5.        logInfo("Starting speculative execution thread")
6.        speculationScheduler.scheduleAtFixedRate(new Runnable {
7.          override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
8.            checkSpeculatableTasks()
9.          }
10.       }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.
          MILLISECONDS)
11.     }
12.   }

SchedulerBackend包含多个子类,分别是LocalSchedulerBackend、CoarseGrainedScheduler-Backend和StandaloneSchedulerBackend、MesosCoarseGrainedSchedulerBackend、YarnScheduler-Backend。

StandaloneSchedulerBackend的start方法调用了CoarseGraninedSchedulerBackend的start方法,通过StandaloneSchedulerBackend注册程序把command提交给Master:Command ("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)来创建一个StandaloneAppClient的实例。

Spark 2.1.1版本的StandaloneSchedulerBackend.scala的start方法的源码如下。

1.   override def start() {
2.      super.start()
3.      launcherBackend.connect()
4.
5.      //executors 节点与我们通信的端点
6.      val driverUrl = RpcEndpointAddress(
7.        sc.conf.get("spark.driver.host"),
8.        sc.conf.get("spark.driver.port").toInt,
9.        CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
10.     val args = Seq(
11.       "--driver-url", driverUrl,
12.       "--executor-id", "{{EXECUTOR_ID}}",
13.       "--hostname", "{{HOSTNAME}}",
14.       "--cores", "{{CORES}}",
15.       "--app-id", "{{APP_ID}}",
16.       "--worker-url", "{{WORKER_URL}}")
17.     val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
18.       .map(Utils.splitCommandString).getOrElse(Seq.empty)
19.    val classPathEntries = sc.conf.getOption("spark.executor. extraClassPath")
20.      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
21.    val libraryPathEntries = sc.conf.getOption("spark.executor.
       extraLibraryPath")
22.      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
23.
24.    //测试时,将父类路径公开给子对象,由compute-classpath.{cmd,sh}计算路径。当
       //“*-provided”配置启用,子进程可使用所有需要的jar包
25.    val testingClassPath =
26.      if (sys.props.contains("spark.testing")) {
27.        sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
28.      } else {
29.        Nil
30.      }
31.
32.    //使用注册调度必要的一些配置启动executors
33.    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.
       isExecutorStartupConf)
34.    val javaOpts = sparkJavaOpts ++ extraJavaOpts
35.    val    command    =   Command("org.apache.spark.executor.CoarseGrained-
       ExecutorBackend",
36.      args, sc.executorEnvs, classPathEntries ++ testingClassPath,
         libraryPathEntries, javaOpts)
37.    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
38.    val coresPerExecutor = conf.getOption("spark.executor.cores").map
       (_.toInt)
39.    //如果使用动态分配,现在将我们的初始执行器限制设置为0,
       //ExecutorAllocationManager将实际的初始限制发送给Master节点
40.    val initialExecutorLimit =
41.      if (Utils.isDynamicAllocationEnabled(conf)) {
42.        Some(0)
43.      } else {
44.        None
45.      }
46.    val   appDesc    =   new   ApplicationDescription(sc.appName,         maxCores,
       sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec,
       coresPerExecutor, initialExecutorLimit)
47.   client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this,
      conf)
48.    client.start()
49.    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
50.    waitForRegistration()
51.    launcherBackend.setState(SparkAppHandle.State.RUNNING)
52.  }

Spark 2.2.0版本的StandaloneSchedulerBackend.scala的start方法的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第3行增加了对SparkContext部署方式的判断。

 上段代码中第37行appUIAddress变量名称调整为webUrl。

 上段代码中第46行构建应用程序的描述信息ApplicationDescription第5个参数appUIAddress更新为webUrl参数。

1.  ....
2.    //SPARK-21159:只有在client模式下scheduler backend去连接launcher。在
      //cluster 集群下,提交应用程序应提交给Master
3.    if (sc.deployMode == "client") {
4.        launcherBackend.connect()
5.      }
6.  ......
7.   val webUrl = sc.ui.map(_.webUrl).getOrElse("")
8.  ......
9.  val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
    command, webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor,
    initialExecutorLimit)
10. ......

Master发指令给Worker去启动Executor所有的进程时加载的Main方法所在的入口类就是command中的CoarseGrainedExecutorBackend,在CoarseGrainedExecutorBackend中启动Executor(Executor是先注册,再实例化),Executor通过线程池并发执行Task,然后再调用它的run方法。

CoarseGrainedExecutorBackend.scala的源码如下。

1.   def main(args: Array[String]) {
2.     var driverUrl: String = null
3.     var executorId: String = null
4.     var hostname: String = null
5.     var cores: Int = 0
6.     var appId: String = null
7.     var workerUrl: Option[String] = None
8.     val userClassPath = new mutable.ListBuffer[URL]()
9.
10.    var argv = args.toList
11.    while (!argv.isEmpty) {
12.      argv match {
13.        case ("--driver-url") :: value :: tail =>
14.          driverUrl = value
15.          argv = tail
16.        case ("--executor-id") :: value :: tail =>
17.          executorId = value
18.          argv = tail
19.        case ("--hostname") :: value :: tail =>
20.          hostname = value
21.          argv = tail
22.        case ("--cores") :: value :: tail =>
23.          cores = value.toInt
24.          argv = tail
25.        case ("--app-id") :: value :: tail =>
26.          appId = value
27.          argv = tail
28.        case ("--worker-url") :: value :: tail =>
29.          //Worker url 用于spark standalone 模式,以加强与Worker的分享
30.          workerUrl = Some(value)
31.          argv = tail
32.        case ("--user-class-path") :: value :: tail =>
33.          userClassPath += new URL(value)
34.          argv = tail
35.        case Nil =>
36.        case tail =>
37.          //scalastyle:off println
38.          System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
39.          //scalastyle:on println
40.          printUsageAndExit()
41.      }
42.    }
43.
44.    if (driverUrl == null || executorId == null || hostname == null || cores
       <= 0 ||
45.      appId == null) {
46.      printUsageAndExit()
47.    }
48.
49.    run(driverUrl, executorId, hostname, cores, appId, workerUrl,
       userClassPath)
50.    System.exit(0)
51.  }

CoarseGrainedExecutorBackend的main入口方法中调用了run方法。

Spark 2.1.1版本的CoarseGrainedExecutorBackend的run入口方法的源码如下。

1.   private def run(
2.      driverUrl: String,
3.      executorId: String,
4.      hostname: String,
5.      cores: Int,
6.      appId: String,
7.      workerUrl: Option[String],
8.      userClassPath: Seq[URL]) {
9.
10.   Utils.initDaemon(log)
11.
12.   SparkHadoopUtil.get.runAsSparkUser { () =>
13.     //Debug 代码
14.     Utils.checkHost(hostname)
15.
16.     //Bootstrap 去抓取 driver节点 Spark属性
17.     val executorConf = new SparkConf
18.     val port = executorConf.getInt("spark.executor.port", 0)
19.     val fetcher = RpcEnv.create(
20.       "driverPropsFetcher",
21.       hostname,
22.       port,
23.       executorConf,
24.       new SecurityManager(executorConf),
25.       clientMode = true)
26.     val driver = fetcher.setupEndpointRefByURI(driverUrl)
27.     val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig)
28.     val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.
        app.id", appId))
29.     fetcher.shutdown()
30.
31.     //从driver 节点获取属性信息,创建SparkEnv
32.     val driverConf = new SparkConf()
33.     for ((key, value) <- props) {
34.       //这是SSL在独立模式下需要的
35.       if (SparkConf.isExecutorStartupConf(key)) {
36.         driverConf.setIfMissing(key, value)
37.       } else {
38.         driverConf.set(key, value)
39.       }
40.     }
41.     if (driverConf.contains("spark.yarn.credentials.file")) {
42.       logInfo("Will periodically update credentials from: " +
43.         driverConf.get("spark.yarn.credentials.file"))
44.        SparkHadoopUtil.get.startCredentialUpdater(driverConf)
45.      }
46.
47.      val env = SparkEnv.createExecutorEnv(
48.       driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey,
          isLocal = false)
49.
50.      env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
51.        env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath,
           env))
52.      workerUrl.foreach { url =>
53.        env.rpcEnv.setupEndpoint("WorkerWatcher",             new    WorkerWatcher
           (env.rpcEnv, url))
54.      }
55.      env.rpcEnv.awaitTermination()
56.      SparkHadoopUtil.get.stopCredentialUpdater()
57.    }
58.  }

Spark 2.2.0版本的CoarseGrainedExecutorBackend的run入口方法的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第27行Spark 2.2.0版本将Rpc消息终端引用RpcEndpointRef的askWithRetry方法调整为askSync方法。CoarseGrainedExecutorBackend通过消息循环体向driver发送RetrieveSparkAppConfig消息,RetrieveSparkAppConfig是一个case object。Driver端的CoarseGrainedSchedulerBackend消息循环体收到消息以后,将Spark的属性信息sparkProperties及加密key等内容封装成SparkAppConfig消息,将SparkAppConfig消息再回复给CoarseGrainedExecutorBackend。

1.   ........
2.  val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
3.  ........

回到StandaloneSchedulerBackend.scala的start方法:其中创建了一个很重要的对象,即StandaloneAppClient对象,然后调用它的client.start()方法。

在start方法中创建一个ClientEndpoint对象。

StandaloneAppClient.scala的star方法的源码如下。

1.  def start() {
2.    //启动 rpcEndpoint; it will call back into the listener.
3.    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint
      (rpcEnv)))
4.  }

ClientEndpoint是一个RpcEndPoint,首先调用自己的onStart方法,接下来向Master注册。

StandaloneAppClient.scala的ClientEndpoint类的源码如下。

1.  private class ClientEndpoint(override val rpcEnv: RpcEnv) extends
    ThreadSafeRpcEndpoint
2.  .......
3.   override def onStart(): Unit = {
4.     try {
5.       registerWithMaster(1)
6.     } catch {
7.       case e: Exception =>
8.           logWarning("Failed to connect to master", e)
9.           markDisconnected()
10.          stop()
11.       }
12.     }
13. .......

调用registerWithMaster方法,从registerWithMaster调用tryRegisterAllMasters,开一条新的线程来注册,然后发送一条信息(RegisterApplication的case class)给Master。

StandaloneAppClient.scala的registerWithMaster的源码如下。

1.     private def registerWithMaster(nthRetry: Int) {
2.        registerMasterFutures.set(tryRegisterAllMasters())
3.        registrationRetryTimer.set(registrationRetryThread.schedule(new
          Runnable {
4.          override def run(): Unit = {
5.            if (registered.get) {
6.              registerMasterFutures.get.foreach(_.cancel(true))
7.              registerMasterThreadPool.shutdownNow()
8.            } else if (nthRetry >= REGISTRATION_RETRIES) {
9.              markDead("All masters are unresponsive! Giving up.")
10.           } else {
11.             registerMasterFutures.get.foreach(_.cancel(true))
12.             registerWithMaster(nthRetry + 1)
13.           }
14.         }
15.       }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
16.     }
17. ......

StandaloneAppClient.scala的tryRegisterAllMasters的源码如下。

1.       private def tryRegisterAllMasters(): Array[JFuture[_]] = {
2.        for (masterAddress <- masterRpcAddresses) yield {
3.          registerMasterThreadPool.submit(new Runnable {
4.            override def run(): Unit = try {
5.              if (registered.get) {
6.                return
7.              }
8.              logInfo("Connecting to master " + masterAddress.toSparkURL +
                "...")
9.              val masterRef = rpcEnv.setupEndpointRef(masterAddress,
                Master.ENDPOINT_NAME)
10.             masterRef.send(RegisterApplication(appDescription, self))
11.           } catch {
12.             case ie: InterruptedException => //Cancelled
13.             case NonFatal(e) => logWarning(s"Failed to connect to master
                $masterAddress", e)
14.           }
15.         })
16.       }
17.     }
18. ......

Master收到RegisterApplication信息后便开始注册,注册后再次调用schedule()方法。

Master.scala的receive方法的源码如下。

1.  override def receive: PartialFunction[Any, Unit] = {
2.     .......
3.
4.      case RegisterApplication(description, driver) =>
5.        //待办事宜:防止某些driver重复注册
6.        if (state == RecoveryState.STANDBY) {
7.          //忽略,不要发送响应
8.        } else {
9.          logInfo("Registering app " + description.name)
10.         val app = createApplication(description, driver)
11.         registerApplication(app)
12.         logInfo("Registered app " + description.name + " with ID " + app.id)
13.         persistenceEngine.addApplication(app)
14.         driver.send(RegisteredApplication(app.id, self))
15.         schedule()
16.       }
17. .......

总结:从SparkContext创建taskSchedulerImpl初始化不同的实例对象来完成最终向Master注册的任务,中间包括调用scheduler的start方法和创建StandaloneAppClient来间接创建ClientEndPoint完成注册工作。

我们把SparkContext称为天堂之门,SparkContext开启天堂之门:Spark程序是通过SparkContext发布到Spark集群的;SparkContext导演天堂世界:Spark程序的运行都是在SparkContext为核心的调度器的指挥下进行的;SparkContext关闭天堂之门:SparkContext崩溃或者结束的时候整个Spark程序也结束。