6.3 从Application提交的角度重新审视Driver

本节从Application提交的角度重新审视Driver,彻底解密Driver到底是什么时候产生的,以及Driver和Master交互原理、Driver和Master交互源码。

6.3.1 Driver到底是什么时候产生的

在SparkContext实例化时,通过createTaskScheduler来创建TaskSchedulerImpl和StandaloneSchedulerBackend。

SparkContext.scala的源码如下。

1.  class SparkContext(config: SparkConf) extends Logging {
2.  ........
3.  val (sched, ts) = SparkContext.createTaskScheduler(this, master,
    deployMode)
4.      _schedulerBackend = sched
5.      _taskScheduler = ts
6.
7.      _dagScheduler = new DAGScheduler(this)
8.      _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
9.  ......
10. private def createTaskScheduler(
11.    ......
12.       case SPARK_REGEX(sparkUrl) =>
13.         val scheduler = new TaskSchedulerImpl(sc)
14.         val masterUrls = sparkUrl.split(",").map("spark://" + _)
15.         val   backend    =  new   StandaloneSchedulerBackend(scheduler,  sc,
            masterUrls)
16.         scheduler.initialize(backend)
17.         (backend, scheduler)
18. ......

在createTaskScheduler中调用scheduler.initialize(backend),initialize的方法参数把StandaloneSchedulerBackend传进来。

TaskSchedulerImpl的initialize的源码如下。

1.    def initialize(backend: SchedulerBackend) {
2.  this.backend = backend
3.  ......

initialize的方法把StandaloneSchedulerBackend传进来了,但还没有启动Standalone-SchedulerBackend。在TaskSchedulerImpl的initialize方法中,把StandaloneSchedulerBackend传进来,赋值为TaskSchedulerImpl的backend。

在TaskSchedulerImpl中调用start方法时,会调用backend.start方法,在start方法中会注册应用程序。

SparkContext.scala的taskScheduler的源码如下。

1.    val   (sched,    ts)   =  SparkContext.createTaskScheduler(this,  master,
      deployMode)
2.      _schedulerBackend = sched
3.      _taskScheduler = ts
4.      _dagScheduler = new DAGScheduler(this)
5.  ......
6.      _taskScheduler.start()
7.      _applicationId = _taskScheduler.applicationId()
8.      _applicationAttemptId = taskScheduler.applicationAttemptId()
9.      _conf.set("spark.app.id", _applicationId)
10. ......

其中调用了_taskScheduler的start方法。

1.   private[spark] trait TaskScheduler {
2.  ......
3.
4.    def start(): Unit
5.  .....

TaskScheduler的start()方法没具体实现,TaskScheduler子类的TaskSchedulerImpl的start()方法的源码如下。

1.     override def start() {
2.      backend.start()
3.  ......

TaskSchedulerImpl的start()通过backend.start()启动了StandaloneSchedulerBackend的start方法。

StandaloneSchedulerBackend的start方法中,将command封装注册给Master,Master转过来要Worker启动具体的Executor。command已经封装好指令,Executor具体要启动进程入口类CoarseGrainedExecutorBackend。然后调用new()函数创建一个StandaloneAppClient,通过client.start()启动client。

StandaloneAppClient的start方法中调用new()函数创建一个ClientEndpoint。

1.   def start() {
2.    //启动一个rpcEndpoint,它将回调到监听器
3.    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint
      (rpcEnv)))
4.  }

ClientEndpoint的源码如下。

1.      private class ClientEndpoint(override val rpcEnv: RpcEnv) extends
        ThreadSafeRpcEndpoint
2.      with Logging {
3.  ......
4.      override def onStart(): Unit = {
5.        try {
6.          registerWithMaster(1)
7.        } catch {
8.          case e: Exception =>
9.            logWarning("Failed to connect to master", e)
10.           markDisconnected()
11.           stop()
12.       }
13.     }

ClientEndpoint是一个ThreadSafeRpcEndpoint。ClientEndpoint的onStart()方法中调用registerWithMaster(1)进行注册,向Master注册程序。registerWithMaster方法如下。

StandaloneAppClient.scala的源码如下。

1.   private def registerWithMaster(nthRetry: Int) {
2.   registerMasterFutures.set(tryRegisterAllMasters())
3.  ......

registerWithMaster中调用了tryRegisterAllMasters方法。在tryRegisterAllMasters方法中,ClientEndpoint向Master发送RegisterApplication消息进行应用程序的注册。

StandaloneAppClient.scala的源码如下。

1.   private def tryRegisterAllMasters(): Array[JFuture[_]] = {
2.  ......
3.          masterRef.send(RegisterApplication(appDescription, self))
4.    ......

程序注册以后,Master通过schedule()分配资源,通知Worker启动Executor,Executor启动的进程是CoarseGrainedExecutorBackend,Executor启动以后又转过来向Driver注册,Driver其实是StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend的一个消息循环体DriverEndpoint。

Master.scala的receive方法的源码如下。

1.  override def receive: PartialFunction[Any, Unit] = {
2.  case RegisterApplication(description, driver) =>
3.    .......
4.        registerApplication(app)
5.        logInfo("Registered app " + description.name + " with ID " + app.id)
6.        persistenceEngine.addApplication(app)
7.        driver.send(RegisteredApplication(app.id, self))
8.        schedule()
9.      }

在Master的receive方法中调用了schedule方法。Schedule方法在等待的应用程序中调度当前可用的资源。每次一个新的应用程序连接或资源发生可用性的变化时,此方法将被调用。

Master.scala的schedule方法的源码如下。

1.   private def schedule(): Unit = {
2.   .......
3.         if (worker.memoryFree >= driver.desc.mem && worker.coresFree >=
           driver.desc.cores) {
4.           launchDriver(worker, driver)
5.           waitingDrivers -= driver
6.           launched = true
7.         }
8.         curPos = (curPos + 1) % numWorkersAlive
9.       }
10.    }
11.    startExecutorsOnWorkers()
12.  }

Master.scala在schedule方法中调用launchDriver方法。launchDriver方法给Worker发送launchDriver的消息。Master.scala的launchDriver的源码如下。

1.   private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
2.    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
3.    worker.addDriver(driver)
4.    driver.worker = Some(worker)
5.    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
6.    driver.state = DriverState.RUNNING
7.  }

launchDriver本身是一个case class,包括driverId、driverDesc等信息。

1.  case class LaunchDriver(driverId: String, driverDesc: DriverDescription)
    extends DeployMessage

DriverDescription包含了jarUrl、memory、cores、supervise、command等内容。

1.   private[deploy] case class DriverDescription(
2.      jarUrl: String,
3.      mem: Int,
4.      cores: Int,
5.      supervise: Boolean,
6.      command: Command) {
7.
8.    override def toString: String = s"DriverDescription (${command.
      mainClass})"
9.  }

Master.scala中launchDriver启动了Driver,接下来,launchExecutor启动Executor。Master.scala的launchExecutor的源码如下。

1.  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc):
    Unit = {
2.    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
3.    worker.addExecutor(exec)
4.    worker.endpoint.send(LaunchExecutor(masterUrl,
5.      exec.application.id, exec.id, exec.application.desc, exec.cores,
        exec.memory))
6.    exec.application.driver.send(
7.      ExecutorAdded(exec.id,       worker.id,     worker.hostPort,      exec.cores,
        exec.memory))
8.  }

Master给Worker发送一个消息LaunchDriver启动Driver,然后是launchExecutor启动Executor,launchExecutor有自己的调度方式,资源调度后,也是给Worker发送了一个消息LaunchExecutor。

Worker就收到Master发送的LaunchDriver、LaunchExecutor消息。

图6-2是Worker原理内幕和流程机制。

图6-2 Worker原理内幕和流程机制

Master、Worker部署在不同的机器上,Master、Worker为进程存在。Master给Worker发两种不同的指令:一种指令是LaunchDriver;另一种指令是LaunchExecutor。

 Worker收到Master的LaunchDriver消息以后,调用new()函数创建一个DriverRunner,然后启动driver.start()方法。

Worker.scala的源码如下。

1.  case LaunchDriver(driverId, driverDesc) =>
2.     ......
3.  val driver = new DriverRunner(
4.  ......
5.   driver.start()

 Worker收到Master的LaunchExecutor消息以后,new()函数创建一个ExecutorRunner,然后启动manager.start()方法。

Worker.scala的源码如下。

1.   case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
2.  ......
3.  val manager = new ExecutorRunner(
4.  ......
5.  manager.start()

Worker的DriverRunner、ExecutorRunner在调用start方法时,在start内部都启动了一条线程,使用Thread来处理Driver、Executor的启动。以Worker收到LaunchDriver消息,new出DriverRunnerDriverRunner为例,DriverRunner.scala的start的源码如下。

1.  /**启动一个线程来运行和管理Driver*/
2.    private[worker] def start() = {
3.      new Thread("DriverRunner for " + driverId) {
4.        override def run() {
5.          var shutdownHook: AnyRef = null
6.          try {
7.            shutdownHook = ShutdownHookManager.addShutdownHook { () =>
8.              logInfo(s"Worker shutting down, killing driver $driverId")
9.              kill()
10.           }
11.
12.           //准备Driver 的jars 包,运行Driver
13.           val exitCode = prepareAndRunDriver()
14.
15.           //设置的最终状态取决于是否强制删除,并处理退出代码
16.           finalState = if (exitCode == 0) {
17.             Some(DriverState.FINISHED)
18.           } else if (killed) {
19.             Some(DriverState.KILLED)
20.           } else {
21.             Some(DriverState.FAILED)
22.           }
23.         } catch {
24.           case e: Exception =>
25.             kill()
26.             finalState = Some(DriverState.ERROR)
27.             finalException = Some(e)
28.         } finally {
29.           if (shutdownHook != null) {
30.             ShutdownHookManager.removeShutdownHook(shutdownHook)
31.           }
32.        }
33.
34.        //通知worker节点Driver的最终状态及可能的异常
35.        worker.send(DriverStateChanged(driverId, finalState.get,
           finalException))
36.      }
37.    }.start()
38.  }

DriverRunner.scala的start方法中调用了prepareAndRunDriver方法,准备Driver的jar包和启动Driver。prepareAndRunDriver的源码如下。

1.   private[worker] def prepareAndRunDriver(): Int = {
2.     val driverDir = createWorkingDirectory()
3.     val localJarFilename = downloadUserJar(driverDir)
4.
5.     def substituteVariables(argument: String): String = argument match {
6.       case "{{WORKER_URL}}" => workerUrl
7.       case "{{USER_JAR}}" => localJarFilename
8.       case other => other
9.     }
10.
11.     //待办事项:如果我们增加了提交多个jars包的能力,在这里也要增加
12.    val builder = CommandUtils.buildProcessBuilder(driverDesc.command,
       securityManager, driverDesc.mem, sparkHome.getAbsolutePath,
       substituteVariables)
13.
14.
15.    runDriver(builder, driverDir, driverDesc.supervise)
16.  }

LaunchDriver的启动过程如下。

 Worker进程:Worker的DriverRunner调用start方法,内部使用Thread来处理Driver启动。DriverRunner创建Driver在本地系统的工作目录(即Linux的文件目录),每次工作都有自己的目录,封装好Driver的启动Command,通过ProcessBuilder启动Driver。这些内容都属于Worker进程。

 Driver进程:启动的Driver属于Driver进程。

LaunchExecutor的启动过程如下。

 Worker进程:Worker的ExecutorRunner调用start方法,内部使用Thread来处理Executor启动。ExecutorRunner创建Executor在本地系统的工作目录(即Linux的文件目录),每次工作都有自己的目录,封装好Executor的启动Command,通过ProcessBuilder来启动Executor。这些内容都属于Worker进程。

 Executor进程:启动的Executor属于Executor进程。Executor在ExecutorBackend里面,ExecutorBackend在Spark standalone模式中是CoarseGrainedExecutorBackend。CoarseGrainedExecutorBackend继承自ExecutorBackend。Executor和ExecutorBackend是一对一的关系,一个ExecutorBackend有一个Executor,在Executor内部是通过线程池并发处理的方式来处理Spark提交过来的Task的。

 Executor启动后要向Driver注册,注册给SchedulerBackend。

CoarseGrainedExecutorBackend的源码如下。

1.  private[spark] class CoarseGrainedExecutorBackend(
2.      override val rpcEnv: RpcEnv,
3.      driverUrl: String,
4.      executorId: String,
5.      hostname: String,
6.      cores: Int,
7.      userClassPath: Seq[URL],
8.      env: SparkEnv)
9.    extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
10.
11.   private[this] val stopping = new AtomicBoolean(false)
12.   var executor: Executor = null
13.   @volatile var driver: Option[RpcEndpointRef] = None
14. ......

再次看一下Master的schedule方法。

1.     private def schedule(): Unit = {
2.      ......
3.         if (worker.memoryFree >= driver.desc.mem && worker.coresFree >=
           driver.desc.cores) {
4.           launchDriver(worker, driver)
5.           waitingDrivers -= driver
6.           launched = true
7.         }
8.         curPos = (curPos + 1) % numWorkersAlive
9.       }
10.    }
11.    startExecutorsOnWorkers()
12.  }

Master的schedule方法中,如果Driver运行在集群中,通过launchDriver来启动Driver。launchDriver发送一个消息交给worker的endpoint,这是RPC的通信机制。

1.   private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
2.    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
3.    worker.addDriver(driver)
4.    driver.worker = Some(worker)
5.    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
6.    driver.state = DriverState.RUNNING
7.  }

Master的schedule方法中启动Executor的部分,通过startExecutorsOnWorkers启动,startExecutorsOnWorkers也是通过RPC的通信方式。

Master.scala的方法中调用allocateWorkerResourceToExecutors方法进行正式分配。

allocateWorkerResourceToExecutors正式分配时就通过launchExecutor方法启动Executor。

1.  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit
    = {
2.     logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
3.     worker.addExecutor(exec)
4.     worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id,
       exec.id, exec.application.desc, exec.cores, exec.memory))
5.     exec.application.driver.send( ExecutorAdded(exec.id, worker.id,
       worker.hostPort, exec.cores, exec.memory))
6.   }

Master发送消息给Worker,发送两个消息:一个是LaunchDriver;另一个是LaunchExecutor。Worker收到Master的LaunchDriver、LaunchExecutor消息。下面看一下Worker。

1.    private[deploy] class Worker(
2.     override val rpcEnv: RpcEnv,
3.     webUiPort: Int,
4.     cores: Int,
5.     memory: Int,
6.     masterRpcAddresses: Array[RpcAddress],
7.     endpointName: String,
8.     workDirPath: String = null,
9.     val conf: SparkConf,
10.    val securityMgr: SecurityManager)
11.  extends ThreadSafeRpcEndpoint with Logging {

Worker实现RPC通信,继承自ThreadSafeRpcEndpoint。ThreadSafeRpcEndpoint是一个trait,其他的RPC对象可以给它发消息。

1.  private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint

Worker在receive方法中接收消息。就像一个邮箱,不断地循环邮箱接收邮件,我们可以把消息看成邮件。

1.   override def receive: PartialFunction[Any, Unit] = synchronized {
2.    case SendHeartbeat =>
3.    ......
4.    case WorkDirCleanup =>
5.    ......
6.    case MasterChanged(masterRef, masterWebUiUrl) =>
7.    ......
8.    case ReconnectWorker(masterUrl) =>
9.     .......
10.  case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_)
     =>
11.    ......
12.  case executorStateChanged @ ExecutorStateChanged(appId, execId, state,
     message, exitStatus)
13.         ......
14.    case KillExecutor(masterUrl, appId, execId) =>
15.     ......
16.   case LaunchDriver(driverId, driverDesc) =>
17.     ......

Worker.scala的receive方法LaunchDriver启动Driver的源码如下。

1.   case LaunchDriver(driverId, driverDesc) =>
2.        logInfo(s"Asked to launch driver $driverId")
3.        val driver = new DriverRunner(
4.          conf,
5.          driverId,
6.          workDir,
7.          sparkHome,
8.          driverDesc.copy(command = Worker.maybeUpdateSSLSettings
            (driverDesc.command, conf)),
9.          self,
10.         workerUri,
11.         securityMgr)
12.  drivers(driverId) = driver
13.  driver.start()
14.
15.  coresUsed += driverDesc.cores
16.  memoryUsed += driverDesc.mem

LaunchDriver方法首先打印日志,传进来时肯定会告诉driverId。启动Driver或者Executor时,Driver或者Executor所在的进程一定满足内存级别的要求,但不一定满足Cores的要求,实际的Cores可能比期待的Cores多,也有可能少。

logInfo方法打印日志使用了封装。

1.    protected def logInfo(msg: => String) {
2.    if (log.isInfoEnabled) log.info(msg)
3.  }

回到LaunchDriver方法,其中调用new()函数创建一个DriverRunner。DriverRunner包括driverId、工作目录(workDir)、spark的路径(sparkHome)、driverDesc、workerUri、securityMgr等内容。在代码drivers(driverId) = driver中,将driver交给一个数据结构drivers,drivers是一个HashMap,是Key-Value的方式,其中Key是Driver的ID,Value是DriverRunner。Worker下可能启动很多Executor,须根据具体的ID管理DriverRunner。DriverRunner内部通过线程的方式启动另外一个进程Driver。DriverRunner是Driver所在进程的代理。

1.  val drivers = new HashMap[String, DriverRunner]

回到Worker.scala的LaunchDriver,Worker在启动driver前,将相关的DriverRunner数据保存到Worker的内存数据结构中,然后进行driver.start()。start之后,将消耗的cores、memory增加到coresUsed、memoryUsed。

接下来进入DriverRunner.scala的源码。DriverRunner管理Driver的执行,包括在Driver失败的时候自动重启。如Driver运行在集群模式中,加入supervise关键字可以自动重启。

1.      private[deploy] class DriverRunner(
2.     conf: SparkConf,
3.     val driverId: String,
4.     val workDir: File,
5.     val sparkHome: File,
6.     val driverDesc: DriverDescription,
7.     val worker: RpcEndpointRef,
8.     val workerUrl: String,
9.     val securityManager: SecurityManager)
10.  extends Logging {

其中DriverDescription的源码如下。其中包括DriverDescription的成员supervise,supervise是一个布尔值,如果设置为True,在集群模式中Driver运行失败的时候,Worker会负责重新启动Driver。

1.   private[deploy] case class DriverDescription(
2.    jarUrl: String,
3.    mem: Int,
4.    cores: Int,
5.    supervise: Boolean,
6.    command: Command) {
7.
8.  override def toString: String = s"DriverDescription (${command
      .mainClass})"
9.  }

回到Worker.scala的LaunchDriver,DriverRunner构造出后,调用其start方法,通过一个线程管理Driver,包括启动Driver及关闭Driver。其中,Thread("DriverRunner for " + driverId),DriverRunner for driverId是线程的名字,Thread是Java的代码,scala可以无缝连接Java。

DriverRunner的start方法调用prepareAndRunDriver来实现driver jar包的准备及启动driver。

prepareAndRunDriver方法中调用了createWorkingDirectory方法创建目录。通过Java的new File创建了Driver的工作目录,如果目录不存在而且创建不成功,就提示失败。在本地文件系统创建一个目录一般不会失败,除非磁盘满。createWorkingDirectory的源码如下。

1.  private def createWorkingDirectory(): File = {
2.     val driverDir = new File(workDir, driverId)
3.     if (!driverDir.exists() && !driverDir.mkdirs()) {
4.       throw new IOException("Failed to create directory " + driverDir)
5.     }
6.     driverDir
7.   }

回到DriverRunner.scala的prepareAndRunDriver方法,其中采用downloadUserJar方法下载jar包。我们自己写的代码是一个jar包,这里下载用户的jar包到本地。jar包在Hdfs中,开发人员需要从Hdfs中获取Jar包下载到本地。

downloadUserJar方法的源码如下。

1.       private def downloadUserJar(driverDir: File): String = {
2.     val jarFileName = new URI(driverDesc.jarUrl).getPath.split("/").last
3.     val localJarFile = new File(driverDir, jarFileName)
4.     if (!localJarFile.exists()) { //如果在一个节点上运行多个Worker,文件可能
                                            //已经存在
5.       logInfo(s"Copying user jar ${driverDesc.jarUrl} to $localJarFile")
6.       Utils.fetchFile(
7.         driverDesc.jarUrl,
8.         driverDir,
9.         conf,
10.        securityManager,
11.        SparkHadoopUtil.get.newConfiguration(conf),
12.        System.currentTimeMillis(),
13.        useCache = false)
14.      if (!localJarFile.exists()) { //验证复制成功
15.        throw new IOException(
16.          s"Can not find expected jar $jarFileName which should have been
             loaded in $driverDir")
17.      }
18.    }
19.    localJarFile.getAbsolutePath
20.  }

downloadUserJar方法调用了fetchFile,fetchFile借助Hadoop,从Hdfs中下载文件。我们提交文件时,将jar包上传到Hdfs上,提交一份,大家都可以从Hdfs中下载。Utile. fetchFile方法的源码如下。

1.   def fetchFile(
2.        url: String,
3.        targetDir: File,
4.        conf: SparkConf,
5.        securityMgr: SecurityManager,
6.        hadoopConf: Configuration,
7.        timestamp: Long,
8.        useCache: Boolean) {
9.      val fileName = decodeFileNameInURI(new URI(url))
10.     val targetFile = new File(targetDir, fileName)
11.     val fetchCacheEnabled = conf.getBoolean("spark.files.useFetchCache",
        defaultValue = true)
12.     if (useCache && fetchCacheEnabled) {
13.       val cachedFileName = s"${url.hashCode}${timestamp}_cache"
14.       val lockFileName = s"${url.hashCode}${timestamp}_lock"
15.       val localDir = new File(getLocalDir(conf))
16.       val lockFile = new File(localDir, lockFileName)
17.       val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel()
18.       //只有一个executor 入口。FileLock用来控制executors 下载的文件同步,无论
          //锁类型是mandatory还是advisory,它始终是安全的
19.       val lock = lockFileChannel.lock()
20.       val cachedFile = new File(localDir, cachedFileName)
21.       try {
22.         if (!cachedFile.exists()) {
23.           doFetchFile(url, localDir, cachedFileName, conf, securityMgr,
              hadoopConf)
24.         }
25.       } finally {
26.         lock.release()
27.         lockFileChannel.close()
28.       }
29.       copyFile(
30.         url,
31.         cachedFile,
32.         targetFile,
33.         conf.getBoolean("spark.files.overwrite", false)
34.       )
35.     } else {
36.       doFetchFile(url, targetDir, fileName, conf, securityMgr,
          hadoopConf)
37.     }

回到DriverRunner.scala的prepareAndRunDriver方法,driverDesc.command表明运行什么类,构建进程运行类的入口,然后是runDriver启动Driver。

1.    private[worker] def prepareAndRunDriver(): Int = {
2.   .......
3.    val builder = CommandUtils.buildProcessBuilder(driverDesc.command,
      securityManager,
4.      driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
5.
6.    runDriver(builder, driverDir, driverDesc.supervise)
7.  }

DriverRunner.scala的runDriver方法如下。runDriver中重定向输出文件和err文件,可以通过log文件查看执行的情况。最后是调用runCommandWithRetry方法。

1.  private def runDriver(builder: ProcessBuilder, baseDir: File, supervise:
    Boolean): Int = {
2.     builder.directory(baseDir)
3.     def initialize(process: Process): Unit = {
4.       //stdout和stderr重定向到文件
5.       val stdout = new File(baseDir, "stdout")
6.       CommandUtils.redirectStream(process.getInputStream, stdout)
7.
8.       val stderr = new File(baseDir, "stderr")
9.       val formattedCommand = builder.command.asScala.mkString("\"", "\"
         \"", "\"")
10.      val header = "Launch Command: %s\n%s\n\n".format(formattedCommand,
         "=" * 40)
11.      Files.append(header, stderr, StandardCharsets.UTF_8)
12.      CommandUtils.redirectStream(process.getErrorStream, stderr)
13.    }
14.    runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
15.  }

runCommandWithRetry中传入的参数是ProcessBuilderLike(builder),这里调用new()函数创建一个ProcessBuilderLike,在重载方法start中执行processBuilder.start()。ProcessBuilderLike的源码如下。

1.  private[deploy] object ProcessBuilderLike {
2.    def apply(processBuilder: ProcessBuilder): ProcessBuilderLike = new
      ProcessBuilderLike {
3.      override def start(): Process = processBuilder.start()
4.      override def command: Seq[String] = processBuilder.command().asScala
5.    }
6.  }

runCommandWithRetry的源码如下。

1.   private[worker] def runCommandWithRetry(
2.        command: ProcessBuilderLike, initialize: Process => Unit, supervise:
          Boolean): Int = {
3.      var exitCode = -1
4.      //等待时间提交重试
5.      var waitSeconds = 1
6.      //运行一定秒的时间以后回退重置
7.      val successfulRunDuration = 5
8.      var keepTrying = !killed
9.
10.     while (keepTrying) {
11.       logInfo("Launch Command: " + command.command.mkString("\"", "\" \"",
          "\""))
12.
13.       synchronized {
14.         if (killed) { return exitCode }
15.         process = Some(command.start())
16.         initialize(process.get)
17.       }
18.
19.       val processStart = clock.getTimeMillis()
20.       exitCode = process.get.waitFor()
21.
22.       //如果尝试另一个运行检查
23.       keepTrying = supervise && exitCode != 0 && !killed
24.       if (keepTrying) {
25.         if (clock.getTimeMillis() - processStart > successfulRunDuration
            * 1000) {
26.           waitSeconds = 1
27.         }
28.         logInfo(s"Command exited with status $exitCode, re-launching after
            $waitSeconds s.")
29.         sleeper.sleep(waitSeconds)
30.         waitSeconds = waitSeconds * 2 //exponential back-off
31.       }
32.     }
33.
34.     exitCode
35.   }
36. }

runCommandWithRetry第一次不一定能申请成功,因此循环遍历重试。DriverRunner启动进程是通过ProcessBuilder中的process.get.waitFor来完成的。如果supervise设置为True,exitCode为非零退出码及driver进程没有终止,我们将keepTrying设置为True,继续循环重试启动进程。

回到DriverRunner.scala的LaunchDriver方法如下。

1.  case LaunchDriver(driverId, driverDesc) =>
2.   ......
3.     drivers(driverId) = driver
4.     driver.start()

采用driver.start方法启动Driver,进入start的源码如下。

1.   private[worker] def start() = {
2.     new Thread("DriverRunner for " + driverId) {
3.       override def run() {
4.       ......
5.         } catch {
6.           case e: Exception =>
7.             kill()
8.             finalState = Some(DriverState.ERROR)
9.             finalException = Some(e)
10.        } finally {
11.          if (shutdownHook != null) {
12.            ShutdownHookManager.removeShutdownHook(shutdownHook)
13.          }
14.        }
15.
16.        //通知worker节点Driver的最终状态及可能的异常
17.        worker.send(DriverStateChanged(driverId, finalState.get,
           finalException))
18.      }
19.    }.start()
20.  }

Start启动时运行到了finalState,可能是Spark运行出状况了,如Driver运行时KILLED或者FAILED,出状况以后,通过worker.send给自己发一个消息,通知DriverStateChanged状态改变。下面是Worker.scala中的driverStateChanged的源码。

1.  case driverStateChanged @ DriverStateChanged(driverId, state, exception) =>
2.      handleDriverStateChanged(driverStateChanged)

在其中调用handleDriverStateChanged方法,handleDriverStateChanged的源码如下。

1.   private[worker]       def     handleDriverStateChanged(driverStateChanged:
     DriverStateChanged): Unit = {
2.     val driverId = driverStateChanged.driverId
3.     val exception = driverStateChanged.exception
4.     val state = driverStateChanged.state
5.     state match {
6.       case DriverState.ERROR =>
7.         logWarning(s"Driver $driverId failed with unrecoverable exception:
           ${exception.get}")
8.       case DriverState.FAILED =>
9.         logWarning(s"Driver $driverId exited with failure")
10.      case DriverState.FINISHED =>
11.        logInfo(s"Driver $driverId exited successfully")
12.      case DriverState.KILLED =>
13.        logInfo(s"Driver $driverId was killed by user")
14.      case _ =>
15.        logDebug(s"Driver $driverId changed state to $state")
16.    }
17.    sendToMaster(driverStateChanged)
18.    val driver = drivers.remove(driverId).get
19.    finishedDrivers(driverId) = driver
20.    trimFinishedDriversIfNecessary()
21.    memoryUsed -= driver.driverDesc.mem
22.    coresUsed -= driver.driverDesc.cores
23.  }

Worker.scala的handleDriverStateChanged方法中对于state的不同情况,打印相关日志。关键代码是sendToMaster(driverStateChanged),发一个消息给Master,告知Driver进程挂掉。消息内容是driverStateChanged。sendToMaster的源码如下。

1.  private def sendToMaster(message: Any): Unit = {
2.    master match {
3.      case Some(masterRef) => masterRef.send(message)
4.      case None =>
5.        logWarning(
6.          s"Dropping $message because the connection to master has not yet
            been established")
7.    }
8.  }

下面来看一下Master的源码。Master收到DriverStateChanged消息以后,无论Driver的状态是DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED中的任何一个,都把Driver从内存数据结构中删掉,并把持久化引擎中的数据清理掉。

1.  case DriverStateChanged(driverId, state, exception) =>
2.  state match {
3.    case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED
      | DriverState.FAILED =>
4.      removeDriver(driverId, state, exception)
5.    case _ =>
6.      throw new Exception(s"Received unexpected state update for driver
        $driverId: $state")
7.  }

进入removeDriver的源码,清理掉相关数据以后,再次调用schedule方法。

1.   private def removeDriver(
2.        driverId: String,
3.        finalState: DriverState,
4.        exception: Option[Exception]) {
5.      drivers.find(d => d.id == driverId) match {
6.        case Some(driver) =>
7.          logInfo(s"Removing driver: $driverId")
8.          drivers -= driver
9.          if (completedDrivers.size >= RETAINED_DRIVERS) {
10.           val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
11.           completedDrivers.trimStart(toRemove)
12.         }
13.         completedDrivers += driver
14.         persistenceEngine.removeDriver(driver)
15.         driver.state = finalState
16.         driver.exception = exception
17.         driver.worker.foreach(w => w.removeDriver(driver))
18.         schedule()
19.       case None =>
20.         logWarning(s"Asked to remove unknown driver: $driverId")
21.     }
22.   }
23. }

接下来看一下启动Executor。Worker.scala的LaunchExecutor方法的源码如下。

Spark 2.1.1版本的Worker.scala的源码如下。

1.   case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
2.        if (masterUrl != activeMasterUrl) {
3.          logWarning("Invalid Master (" + masterUrl + ") attempted to launch
            executor.")
4.        } else {
5.          try {
6.            logInfo("Asked to launch executor %s/%d for %s".format(appId,
              execId, appDesc.name))
7.
8.            //创建executor节点的工作目录
9.            val executorDir = new File(workDir, appId + "/" + execId)
10.           if (!executorDir.mkdirs()) {
11.             throw new IOException("Failed to create directory " +
                executorDir)
12.           }
13.
14.           //创建  executor   的本地目录。通过 SPARK_EXECUTOR_DIRS      环境变量传递给
              //executor。应用程序完成后,这些目录将会被Worker删除
15.           val appLocalDirs = appDirectories.getOrElse(appId,
16.             Utils.getOrCreateLocalRootDirs(conf).map { dir =>
17.               val appDir = Utils.createDirectory(dir, namePrefix = "executor")
18.               Utils.chmod700(appDir)
19.               appDir.getAbsolutePath()
20.             }.toSeq)
21.           appDirectories(appId) = appLocalDirs
22.           val manager = new ExecutorRunner(
23.             appId,
24.             execId,
25.             appDesc.copy(command = Worker.maybeUpdateSSLSettings
                (appDesc.command, conf)),
26.             cores_,
27.             memory_,
28.        self,
29.        workerId,
30.        host,
31.        webUi.boundPort,
32.        publicAddress,
33.        sparkHome,
34.        executorDir,
35.        workerUri,
36.        conf,
37.        appLocalDirs, ExecutorState.RUNNING)
38.      executors(appId + "/" + execId) = manager
39.      manager.start()
40.      coresUsed += cores_
41.      memoryUsed += memory_
42.     sendToMaster(ExecutorStateChanged(appId, execId, manager.state,
        None, None))
43.    } catch {
44.      case e: Exception =>
45.        logError(s"Failed to launch executor $appId/$execId for
           ${appDesc.name}.", e)
46.        if (executors.contains(appId + "/" + execId)) {
47.          executors(appId + "/" + execId).kill()
48.          executors -= appId + "/" + execId
49.        }
50.        sendToMaster(ExecutorStateChanged(appId, execId,
           ExecutorState.FAILED,
51.          Some(e.toString), None))
52.    }
53.  }

Spark 2.2.0版本的Worker.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第16~20行整体替换,新增以下代码实现对Executor本地目录创建失败的异常处理。

1.  .....
2.         val localRootDirs = Utils.getOrCreateLocalRootDirs(conf)
3.           val dirs = localRootDirs.flatMap { dir =>
4.             try {
5.               val appDir = Utils.createDirectory(dir, namePrefix =
                 "executor")
6.               Utils.chmod700(appDir)
7.               Some(appDir.getAbsolutePath())
8.             } catch {
9.               case e: IOException =>
10.                logWarning(s"${e.getMessage}. Ignoring this directory.")
11.                None
12.            }
13.          }.toSeq
14.          if (dirs.isEmpty) {
15.            throw new IOException("No subfolder can be created in " +
16.              s"${localRootDirs.mkString(",")}.")
17.          }
18.          dirs
19.        })
20. .....

直接看一下manager.start方法,启动一个线程Thread,在run方法中调用fetchAndRunExecutor。

其中,fetchAndRunExecutor的源码如下。

1.  private def fetchAndRunExecutor() {
2.      try {
3.        //启动进程
4.        val builder = CommandUtils.buildProcessBuilder(appDesc.command, new
          SecurityManager(conf),
5.          memory, sparkHome.getAbsolutePath, substituteVariables)
6.        val command = builder.command()
7.        val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
8.        logInfo(s"Launch command: $formattedCommand")
9.
10.       builder.directory(executorDir)
11.       builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString
          (File.pathSeparator))
12.       //如果在Spark Shell中运行,避免创建一个“Scala”的父进程执行executor命令
13.       builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
14.
15.       //增加WebUI日志网址
16.       val baseUrl =
17.         if (conf.getBoolean("spark.ui.reverseProxy", false)) {
18.           s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&
              logType="
19.         } else {
20.           s"http://$publicAddress:$webUiPort/logPage/?appId=
              $appId&executorId=$execId&logType="
21.         }
22.       builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}
          stderr")
23.       builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}
          stdout")
24.
25.       process = builder.start()
26.       val header = "Spark Executor Command: %s\n%s\n\n".format(
27.         formattedCommand, "=" * 40)
28.
29.       //重定向stdout和stderr文件
30.       val stdout = new File(executorDir, "stdout")
31.       stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
32.
33.       val stderr = new File(executorDir, "stderr")
34.       Files.write(header, stderr, StandardCharsets.UTF_8)
35.       stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
36.
37.       //等待它退出;执行器可以退出代码0(当driver 指示它关闭)或非零退出码
38.       val exitCode = process.waitFor()
39.       state = ExecutorState.EXITED
40.       val message = "Command exited with code " + exitCode
41.       worker.send(ExecutorStateChanged(appId, execId, state, Some
          (message), Some(exitCode)))
42.     } catch {
43.       case interrupted: InterruptedException =>
44.         logInfo("Runner thread for executor " + fullId + " interrupted")
45.         state = ExecutorState.KILLED
46.         killProcess(None)
47.       case e: Exception =>
48.         logError("Error running executor", e)
49.         state = ExecutorState.FAILED
50.         killProcess(Some(e.toString))
51.     }
52.   }
53. }

fetchAndRunExecutor类似于启动Driver的过程,在启动Executor时首先构建CommandUtils.buildProcessBuilder,然后是builder.start(),退出时发送ExecutorStateChanged消息给Worker。

Worker.scala源码中的executorStateChanged如下。

1.  case executorStateChanged @ ExecutorStateChanged(appId, execId, state,
    message, exitStatus) =>
2.     handleExecutorStateChanged(executorStateChanged)

进入handleExecutorStateChanged源码,sendToMaster(executorStateChanged)发executorStateChanged消息给Master。

1.   private[worker] def handleExecutorStateChanged(executorStateChanged:
     ExecutorStateChanged):
2.      Unit = {
3.      sendToMaster(executorStateChanged)
4.      val state = executorStateChanged.state
5.      if (ExecutorState.isFinished(state)) {
6.        val appId = executorStateChanged.appId
7.        val fullId = appId + "/" + executorStateChanged.execId
8.        val message = executorStateChanged.message
9.        val exitStatus = executorStateChanged.exitStatus
10.       executors.get(fullId) match {
11.         case Some(executor) =>
12.           logInfo("Executor " + fullId + "finished with state" + state +
13.             message.map(" message " + _).getOrElse("") +
14.             exitStatus.map(" exitStatus " + _).getOrElse(""))
15.           executors -= fullId
16.           finishedExecutors(fullId) = executor
17.           trimFinishedExecutorsIfNecessary()
18.           coresUsed -= executor.cores
19.           memoryUsed -= executor.memory
20.         case None =>
21.           logInfo("Unknown Executor " + fullId + " finished with state "
              + state +
22.             message.map(" message " + _).getOrElse("") +
23.             exitStatus.map(" exitStatus " + _).getOrElse(""))
24.       }
25.       maybeCleanupApplication(appId)
26.     }
27.   }
28. }

下面看一下Master.scala。Master收到ExecutorStateChanged消息。如状态发生改变,通过exec.application.driver.send给Driver也发送一个ExecutorUpdated消息,流程和启动Driver基本是一样的。ExecutorStateChanged的源码如下。

1.  case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
2.       val execOption = idToApp.get(appId).flatMap(app => app.executors
         .get(execId))
3.       execOption match {
4.         case Some(exec) =>
5.           val appInfo = idToApp(appId)
6.           val oldState = exec.state
7.           exec.state = state
8.
9.           if (state == ExecutorState.RUNNING) {
10.        assert(oldState == ExecutorState.LAUNCHING,
11.          s"executor $execId state transfer from $oldState to RUNNING
             is illegal")
12.        appInfo.resetRetryCount()
13.      }
14.
15.      exec.application.driver.send(ExecutorUpdated(execId, state,
         message, exitStatus, false))
16.
17.      if (ExecutorState.isFinished(state)) {
18.        //从Worker和应用程序中删除此executor
19.        logInfo(s"Removing executor ${exec.fullId} because it is
           $state")
20.        //如果应用程序已经完成,保存应用程序状态,以在UI页面上正确显示信息
21.
22.        if (!appInfo.isFinished) {
23.          appInfo.removeExecutor(exec)
24.        }
25.        exec.worker.removeExecutor(exec)
26.
27.        val normalExit = exitStatus == Some(0)
28.        //只重试一定次数,这样就不会进入无限循环。重要提示:此代码路径不是通过测
           //试执行的,改变if条件时要小心
29.
30.
31.        if (!normalExit
32.            && appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES
33.            && MAX_EXECUTOR_RETRIES >= 0) { //< 0 disables this
               application-killing path
34.          val execs = appInfo.executors.values
35.          if (!execs.exists(_.state == ExecutorState.RUNNING)) {
36.            logError(s"Application ${appInfo.desc.name} with ID
               ${appInfo.id} failed " +
37.              s"${appInfo.retryCount} times; removing it")
38.            removeApplication(appInfo, ApplicationState.FAILED)
39.          }
40.        }
41.      }
42.      schedule()
43.    case None =>
44.      logWarning(s"Got status update for unknown executor
         $appId/$execId")
45.  }

6.3.2 Driver和Master交互原理解析

Driver和Master交互,Master是一个消息循环体。本节讲解Driver消息循环体的产生过程,Driver消息循环体生成之后,就可以与Master互相通信了。

Spark应用程序提交时,我们会提交一个spark-submit脚本。spark-submit脚本中直接运行了org.apache.spark.deploy.SparkSubmit对象。Spark-submit脚本内容如下所示。

1. #!/usr/bin/env bash
2. SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
3. export PYTHONHASHSEED=0
4. exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit
    "$@"//运行SparkSubmit

进入到SparkSubmit中,main函数代码如下所示。

SparkSubmit.scala的源码如下。

1. def main(args: Array[String]): Unit = {
2.     //由启动main函数传入的参数构建SparkSubmitArguments对象
3.     val appArgs = new SparkSubmitArguments(args)
4.     //打印参数信息
5.     if (appArgs.verbose) {
6.       printStream.println(appArgs)
7.     }
8.     appArgs.action match {
9.       //提交,调用submit方法
10.       case SparkSubmitAction.SUBMIT => submit(appArgs)
11.       //杀死,调用kill方法
12.       case SparkSubmitAction.KILL => kill(appArgs)
13.       //请求状态,调用requestStatus方法
14.       case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
15.     }
16.   }

上面的代码中,spark-submit脚本提交的命令行参数通过main函数的args获取,并将args参数传入SparkSubmitArguments中完成解析。最后通过匹配appArgs参数中的action类型,执行submit、kill、requestStatus操作。

进入到SparkSubmitArguments中,分析一下参数的解析过程。SparkSubmitArguments中的关键代码如下所示。

SparkSubmitArguments.scala的源码如下。

1. //调用parse方法,从命令行解析出各个参数
2.   try {
3.     parse(args.asJava)
4.   } catch {
5.     //捕获到IllegalArgumentException,打印错误并退出
6.     case e: IllegalArgumentException =>
7.       SparkSubmit.printErrorAndExit(e.getMessage())
8.   }
9.   //合并默认的Spark配置项,使用传入的配置覆盖默认的配置
10.   mergeDefaultSparkProperties()
11.   //从sparkProperties移除不是“spark.”为开始的配置
12.   ignoreNonSparkProperties()
13.   //加载系统环境变量中的配置信息
14.   loadEnvironmentArguments()
15.   //验证参数是否合法
16.   validateArguments()

在上面的代码中,parse(args.toList)将会解析命令行参数,通过mergeDefaultSpark-Properties合并默认配置,调用ignoreNonSparkProperties方法忽略不是以“spark.”为开始的配置,方法loadEnvironmentArguments加载系统环境变量,最后调用validateArguments方法检验参数的合法性。这些配置如何提交呢?main函数中由case SparkSubmitAction.SUBMIT => submit(appArgs)这句代码判断是否提交参数并执行程序,如果匹配到SparkSubmit-Action.SUBMIT,则调用submit(appArgs)方法,参数appArgs是SparkSubmitArguments类型,appArgs中包含了提交的各种参数,包括命令行传入以及默认的配置项。

submit(appArgs)方法主要完成两件事情:

(1)准备提交环境。

(2)执行main方法,完成提交。

首先来看Spark中是如何准备环境的。在submit(appArgs)方法中,有如下源码。

SparkSubmit.scala的源码如下。

1.   private def submit(args: SparkSubmitArguments): Unit = {
2.      val (childArgs, childClasspath, sysProps, childMainClass) =
        prepareSubmitEnvironment(args)
3.  .......
4.    runMain(childArgs, childClasspath, sysProps, childMainClass,
      args.verbose)
5.  ......

这段代码中,调用prepareSubmitEnvironment(args)方法,完成提交环境的准备。该方法返回一个四元Tuple,分别表示子进程参数、子进程classpath列表、系统属性map、子进程main方法。完成了提交环境的准备工作后,接下来就启动子进程,在Standalone模式下,启动的子进程是org.apache.spark.deploy.Client对象。具体的执行过程在runMain函数中,关键代码如下所示。

SparkSubmit.scala的源码如下。

1. private def runMain(
2.       childArgs: Seq[String],
3.       childClasspath: Seq[String],
4.       sysProps: Map[String, String],
5.       childMainClass: String,
6.       verbose: Boolean): Unit = {
7.    ......
8.     Thread.currentThread.setContextClassLoader(loader)//获得classLoader
9.     for (jar <- childClasspath) {           //遍历Classpath列表
10.       addJarToClasspath(jar, loader)
                                     //使用loader类加载器将jar包依赖加入Classpath
11.     }
12.     for ((key, value) <- sysProps) {
                                     //将sysProps中的配置全部设置到System全局变量中
13.       System.setProperty(key, value)
14.     }
15.    var mainClass: Class[_] = null
16.    mainClass = Utils.classForName(childMainClass)//获取启动的MainClass
17. ......//得到启动的对象的main方法
18.     val mainMethod = mainClass.getMethod("main", new Array[String]
        (0).getClass)
19. ......//使用反射执行main方法,并将childArgs作为参数传入该main方法
20.     mainMethod.invoke(null, childArgs.toArray)
21.   }

在上面的代码中,使用Utils工具提供的classForName方法,找到主类,然后在mainClass上调用getMethod方法得到main方法,最后在mainMethod上调用invoke执行main方法。需要注意的是,执行invoke方法同时传入了childArgs参数,这个参数中保留了配置信息。Utils.classForName(childMainClass)方法将会返回要执行的主类,这里的childMainClass是哪一个类呢?其实,这个参数在不同的部署模式下是不一样的,standalone模式下,childMainClass指的是org.apache.spark.deploy.Client类,从源码中可以找到依据,源码如下所示。

SparkSubmit.scala的源码如下。

1.        //在prepareSubmitEnvironment方法中判断是否为Standalone集群模式
2.  if (args.isStandaloneCluster) {
3.        //判断使用Rest,使用Rest childMainClass为org.apache.spark.deploy
          //.rest.RestSubmissionClient
4.        if (args.useRest) {
5.          childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
6.          childArgs += (args.primaryResource, args.mainClass)
7.        } else {
8.          //非Rest,childMainClass为org.apache.spark.deploy.Client
9.          childMainClass = "org.apache.spark.deploy.Client"
10.         if (args.supervise) { childArgs += "--supervise" }
11.         //设置driver memory
12.         Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
13.         //设置driver cores
14.         Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
15.         childArgs += "launch"
16.         childArgs += (args.master, args.primaryResource, args.mainClass)
17.       }
18.       if (args.childArgs != null) {
19.         childArgs ++= args.childArgs
20.       }
21.     }

在上面的代码中,程序首先根据args.isStandaloneCluster判断部署模式,如果是standalone模式并且不使用REST服务,childMainClass = "org.apache.spark.deploy.Client"。从上述代码中可以看出,childArgs中存入了Executor的memory配置和cores配置。与runMain方法中描述一样,程序将启动org.apache.spark.deploy.Client类,并运行主方法。Client类中做了哪些事情?先来看这个类是怎样完成调用的。下面是Client对象及主方法。

Client.scala的源码如下。

1.         object Client {
2.   def main(args: Array[String]) {
3.    //若sys中不包含SPARK_SUBMIT,则打印警告信息
4.     if (!sys.props.contains("SPARK_SUBMIT")) {
5.       println("WARNING: This client is deprecated and will be removed in
         a future version of Spark")
6.       println("Use     ./bin/spark-submit       with   \"--master    spark://host:
         port\"")
7.     }
8.     //scalastyle:on println
9.    //创建SparkConf对象
10.    val conf = new SparkConf()
11.   //创建ClientArguments对象,代表Driver端的参数
12.    val driverArgs = new ClientArguments(args)
13.
14.   //设置RPC请求超时时间为10s
15.    if (!conf.contains("spark.rpc.askTimeout")) {
16.      conf.set("spark.rpc.askTimeout", "10s")
17.    }
18.    Logger.getRootLogger.setLevel(driverArgs.logLevel)
19.   //使用RpcEnv的create创建RPC环境
20.    val rpcEnv =
21.       RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new
          SecurityManager(conf))
22.
23.     //得到master的URL并得到Master的Endpoints,用于同Master通信
24.     val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
25.       map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
26.    rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs,
       masterEndpoints, conf))
27. //等待rpcEnv的终止
28.     rpcEnv.awaitTermination()
29.   }
30. }

上面的代码中,首先实例化出一个SparkConfig对象,通过这个配置对象,可以在代码中指定一些配置项,如appName、Master地址等。val driverArgs = new ClientArguments(args)使用传入的args参数构建一个ClientArguments对象,该对象同样保留传入的配置信息,如Executor memory、Executor cores等都包含在这个对象中。

使用RpcEnv.create工厂方法,创建一个rpcEnv成员,使用该成员设置好到Master的通信端点,通过该端点实现同Master的通信。Spark 2.0中默认采用Netty框架来实现远程过程调用(Remote Precedure Call,RPC),通过使用RPC异步通信机制,完成各节点之间的通信。在rpcEnv.setupEndpoint方法中调用new()函数创建一个Driver ClientEndpoint。ClientEndpoint是一个ThreadSafeRpcEndpoint消息循环体,至此就生成了Driver ClientEndpoint。在ClientEndpoint的onStart方法中向Master提交注册。这里通过masterEndpoint向Master发送RequestSubmitDriver(driverDescription)请求,完成Driver的注册。

Client.scala的onStart的源码如下。

1.   private class ClientEndpoint(
2.      override val rpcEnv: RpcEnv,
3.      driverArgs: ClientArguments,
4.      masterEndpoints: Seq[RpcEndpointRef],
5.      conf: SparkConf)
6.    extends ThreadSafeRpcEndpoint with Logging {
7.
8.  override def onStart(): Unit = {
9.      driverArgs.cmd match {
10.     ........
11.         val driverDescription = new DriverDescription(
12.           driverArgs.jarUrl,
13.           driverArgs.memory,
14.           driverArgs.cores,
15.           driverArgs.supervise,
16.           command)
17.         ayncSendToMasterAndForwardReply[SubmitDriverResponse](
18.           RequestSubmitDriver(driverDescription))
19.
20. ......

Master收到Driver ClientEndpoint的RequestSubmitDriver消息以后,就将Driver的信息加入到waitingDrivers和drivers的数据结构中。然后进行schedule()资源分配,Master向Worker发送LaunchDriver的消息指令。

Master.scala的源码如下。

1.           case RequestSubmitDriver(description) =>
2.     .......
3.          val driver = createDriver(description)
4.          persistenceEngine.addDriver(driver)
5.          waitingDrivers += driver
6.          drivers.add(driver)
7.          schedule()
8.  ......

在Client.scala的onStart代码中,提交的配置参数始终在不同的对象、节点上传递。Master把Driver加载到Worker节点并启动,Worker节点上运行的Driver同样包含配置参数。当Driver端的SparkContext启动并实例化DAGScheduler、TaskScheduler时,StandaloneSchedulerBackend在做另一件事情——实例化StandaloneAppClient,StandaloneAppClient中有StandaloneApp-ClientPoint,也是一个RPC端口的引用,用于和Master进行通信。在StandaloneAppClientPoint的onStart方法中,向Master发送RegisterApplication(appDescription,self)请求,Master节点收到请求并调用schedule方法,向Worker发送LaunchExecutor(masterUrl,exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)请求,Worker节点启动ExecutorRunner。ExecutorRunner中启动CoarseGrainedExecutorBackend并向Driver注册。

在CoarseGrainedExecutorBackend的main方法中,有如下所示代码。

1. var argv = args.toList                               //将args转化成List
2.     while (!argv.isEmpty) {                          //argv不为空,则一直循环
3.       argv match {
4.         case ("--driver-url") :: value :: tail =>
5.           driverUrl = value                          //得到driveRurl
6.           argv = tail
7.         case ("--executor-id") :: value :: tail =>
8.           executorId = value                         //得到executorid
9.           argv = tail
10.         case ("--hostname") :: value :: tail =>
11.           hostname = value                          //得到hostname
12.           argv = tail
13.         case ("--cores") :: value :: tail =>
14.           cores = value.toInt                       //得到配置的Executor核的个数
15.           argv = tail
16.         case ("--app-id") :: value :: tail =>
17.           appId = value                             //得到application的id
18.           argv = tail
19.         case ("--worker-url") :: value :: tail =>
20.           workerUrl = Some(value)                   //得到worker的url
21.           argv = tail
22.         case ("--user-class-path") :: value :: tail =>
23.           userClassPath += new URL(value)           //得到用户类路径
24.           argv = tail
25.         case Nil =>
26.         case tail =>
27.           System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
28.           printUsageAndExit()                       //打印并退出
29.       }
30.     }

从程序提交一直到CoarseGrainedExecutorBackend进程启动,配置参数一直被传递。在CoarseGrainedExecutorBackend中取出了cores配置信息,并通过run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)将cores传入run方法,CoarseGrainedExecutor-Backend以进程的形式在JVM中启动,此时JVM的资源指占用资源的数量并启动起来。需要注意的是,在一个Worker节点上,只要物理内核的个数和内存大小能够满足Executor启动要求,一个Worker节点上就可以运行多个Executor。

6.3.3 Driver和Master交互源码详解

从Spark-Submit的脚本分析,提交应用程序时,Main启动的类,也就是用户最终提交执行的类是org.apache.spark.deploy.SparkSubmit。SparkSubmit的全路径为org.apache.spark.deploy. SparkSubmit。SparkSubmit是启动一个Spark应用程序的主入口点。当集群管理器为STANDALONE、部署模式为CLUSTER时,根据提交的两种方式将childMainClass分别设置为不同的类,同时将传入的args.mainClass(提交应用程序时设置的主类)及其参数根据不同集群管理器与部署模式进行转换,并封装到新的主类所需的参数中。在REST方式(Spark 1.3+)方式中,childMainClass是"org.apache.spark.deploy.rest.RestSubmissionClient";在传统方式中,childMainClass是"org.apache.spark.deploy.Client"。

接下来以REST方式讲解。当提交方式为REST方式(Spark 1.3+)时,会将应用程序的主类等信息封装到RestSubmissionClient类中,由该类负责向RestSubmissionServer发送提交应用程序的请求,而RestSubmissionServer接收到应用程序提交的请求后,会向Master发送RequestSubmitDriver消息,然后由Master根据资源调度策略,启动集群中相应的Driver,执行提交的应用程序。Cluster部署模式下的部署与执行框架如图6-3所示。

图6-3 Cluster部署模式下的部署与执行框架

为了体现各个组件间的部署关系,这里以框架图的形式进行描述,对应地,可以从时序图的角度去理解各个类或组件之间的交互关系。其中,组件Master和Worker的标注在方框的左上角,其他方框表示一个具体的实例。

其中,RestSubmissionClient是提交应用程序的客户端处,对提交的应用程序进行封装的类。之后各个组件间的交互流程分析如下。

(1)第1步constructSubmitRequest,就是在RestSubmissionClient实例中,根据提交的应用程序信息,构建出提交请求。

(2)然后继续第2步createSubmission,在该步骤中向RestSubmissionServer发送post请求,即图6-3中对应的第3步(注意,实际上是在第2步中调用)。

(3)RestSubmissionServer接收到post请求后,由对应的Servlet进行处理,这里对应为StandaloneSubmitRequestServlet,即开始第4步,调用doPost,发送Post请求。

(4)doPost中继续第5步handleSubmit,开始处理提交请求。在处理过程中,向Master的RPC终端发送消息RequestSubmitDriver,对应图中的第6步。

(5)Master接收到该消息后,执行第7步createDriver,创建Driver,需要由Master的调度机制创建,对应第8步schedule,获取分配的资源后,向Worker(这些Worker启动时会注册到Master上)的RPC终端发送LaunchDriver消息。

(6)Worker在RPC终端接收到消息后开始处理,实例化一个DriverRunner,并运行之前封装的应用程序。

注意:从上面部署框架及其术语解析部分可以知道,由于提交的应用程序在main部分包含了SparkContext实例,因此我们也称之为Driver Program,即驱动程序。因此,在框架中,对应在Master和Worker处都使用Driver,而不是Application(应用程序)。

其中主要的源码及其分析如下。

(1)RestSubmissionClient中run方法的代码如下所示。

RestSubmissionClient.scala的源码如下。

1.   def run(
2.        appResource: String,
3.        mainClass: String,
4.        appArgs: Array[String],
5.        conf: SparkConf,
6.        env: Map[String, String] = Map()): SubmitRestProtocolResponse = {
7.      val master = conf.getOption("spark.master").getOrElse {
8.        throw new IllegalArgumentException("'spark.master' must be set.")
9.      }
10.     val sparkProperties = conf.getAll.toMap
11.      //创建一个Rest提交客户端
12.     val client = new RestSubmissionClient(master)
13.   //封装应用程序的相关信息,包括主资源、主类等
14.     val submitRequest = client.constructSubmitRequest(
15.       appResource, mainClass, appArgs, sparkProperties, env)
16.    //Rest提交客户端开始创建Submission,创建过程中向RestSubmissionServer发送
       //post请求
17.
18.     client.createSubmission(submitRequest)
19.   }

(2)收到提交的Post请求之后,StandaloneSubmitRequestServlet向Master的RPC终端发送RequestSubmitDriver请求,代码如下所示。

Spark 2.1.1版本StandaloneRestServer.scala的源码如下。

1.   protected override def handleSubmit(
2.        requestMessageJson: String,
3.        requestMessage: SubmitRestProtocolMessage,
4.        responseServlet: HttpServletResponse): SubmitRestProtocolResponse
          = {
5.      requestMessage match {
6.        case submitRequest: CreateSubmissionRequest =>
7.
8.   //在这里开始构建驱动程序(也就是包含SparkContext的应用程序)的描述信息,
     //对应DriverDescription实例并向Master的RPC终端masterEndpoint发
     //送请求消息RequestSubmitDriver
9.
10.
11.  val driverDescription = buildDriverDescription(submitRequest)
12.         val response = masterEndpoint.askWithRetry[DeployMessages.
            SubmitDriverResponse](
13.           DeployMessages.RequestSubmitDriver(driverDescription))
14.
15.         val submitResponse = new CreateSubmissionResponse
16.         submitResponse.serverSparkVersion = sparkVersion
17.         submitResponse.message = response.message
18.         submitResponse.success = response.success
19.         submitResponse.submissionId = response.driverId.orNull
20. ......

Spark 2.2.0版本的StandaloneRestServer.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第12行masterEndpoint.askWithRetry方法调整为masterEndpoint.askSync方法。

1.  ......
2.  val response = masterEndpoint.askSync[DeployMessages.SubmitDriverResponse](
3.            DeployMessages.RequestSubmitDriver(driverDescription))
4.  ......

(3)构建DriverDescription的buildDriverDescription方法的代码如下所示。

StandaloneRestServer.scala的源码如下。

1.  DriverDescription  private def buildDriverDescription(request:
    CreateSubmissionRequest): DriverDescription = {
2.  ......
3.     //构建Command实例,将主类mainClass封装到DriverWrapper(可以通过jps查看)
4.      val command = new Command(
5.        "org.apache.spark.deploy.worker.DriverWrapper",
6.  Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs,
    //args to the DriverWrapper
7.        environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
8.  ......
9.
10.  //构建驱动程序的描述信息DriverDescription
11.  new DriverDescription(
12. appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver,
    command)
13.   }

(4)Master接收RequestSubmitDriver,处理消息并返回SubmitDriverResponse消息。

Master.scala的源码如下。

1.   case RequestSubmitDriver(description) =>
2.        if (state != RecoveryState.ALIVE) {
3.          val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
4.            "Can only accept driver submissions in ALIVE state."
5.          context.reply(SubmitDriverResponse(self, false, None, msg))
6.        } else {
7.          logInfo("Driver submitted " + description.command.mainClass)
8.          val driver = createDriver(description)
9.          persistenceEngine.addDriver(driver)
10.         waitingDrivers += driver
11.         drivers.add(driver)
12.         schedule()
13.
14.         //待办事项:让提交的客户端轮询master来确定driver的当前状态。目前使用fire
            //and forget方式发送消息
15.
16.
17.         context.reply(SubmitDriverResponse(self, true, Some(driver.id),
18.           s"Driver successfully submitted as ${driver.id}"))
19.       }

(5)Master的schedule():调度机制的调度代码如下所示。

Master.scala的源码如下。

1.    private def schedule(): Unit = {
2.
3.          launchDriver(worker, driver)
4.      .....
5.    startExecutorsOnWorkers()
6.  }

(6)Worker上的Driver启动的代码如下所示。

Worker.scala的源码如下。

1.   case LaunchDriver(driverId, driverDesc) =>
2.   logInfo(s"Asked to launch driver $driverId")
3.   val driver = new DriverRunner(
4.     conf,
5.     driverId,
6.     workDir,
7.     sparkHome,
8.     driverDesc.copy(command = Worker.maybeUpdateSSLSettings
       (driverDesc.command, conf)),
9.     self,
10.    workerUri,
11.    securityMgr)
12.  drivers(driverId) = driver
13.  driver.start()
14.
15.  coresUsed += driverDesc.cores
16.  memoryUsed += driverDesc.mem

Driver Client管理Driver,包括向Master提交Driver、请求Kill Driver等。Driver Client与Master间的交互消息如下。

DeployMessages.scala的源码如下。

1.  //DriverClient <-> Master
2.  //Driver Client向Master请求提交Driver
3.    case class RequestSubmitDriver(driverDescription: DriverDescription)
      extends DeployMessage
4.  //Master向Driver Client返回注册是否成功的消息
5.    case class SubmitDriverResponse(
6.        master: RpcEndpointRef, success: Boolean, driverId: Option[String],
          message: String)
7.      extends DeployMessage
8.  //Driver Client向Master请求Kill Driver
9.    case class RequestKillDriver(driverId: String) extends DeployMessage
10. //Master回复Kill Driver是否成功
11.   case class KillDriverResponse(
12.       master: RpcEndpointRef, driverId: String, success: Boolean, message:
          String)
13.     extends DeployMessage
14. //Driver Client向Master请求Driver状态
15.   case class RequestDriverStatus(driverId: String) extends DeployMessage
16. //Master向Driver Client返回状态请求信息
17.   case class DriverStatusResponse(found: Boolean, state: Option
      [DriverState],
18.     workerId: Option[String], workerHostPort: Option[String], exception:
        Option[Exception])

Driver在handleSubmit方法中向Master请求提交RequestSubmitDriver消息。

Master收到Driver StandaloneSubmitRequestServlet发送的消息RequestSubmitDriver。Master做相应的处理以后,返回Driver StandaloneSubmitRequestServlet消息SubmitDriver-Response。

Master的源码如下。

1.   case RequestSubmitDriver(description) =>
2.    if (state != RecoveryState.ALIVE) {
3.      val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
4.        "Can only accept driver submissions in ALIVE state."
5.      context.reply(SubmitDriverResponse(self, false, None, msg))
6.    } else {
7.      logInfo("Driver submitted " + description.command.mainClass)
8.      val driver = createDriver(description)
9.      persistenceEngine.addDriver(driver)
10.     waitingDrivers += driver
11.     drivers.add(driver)
12.     schedule()
13.
14.     //待办事项:让提交的客户端轮询master来确定driver的当前状态,目前使用fire
        //and forget方式发送消息
15.
16.
17.     context.reply(SubmitDriverResponse(self, true, Some(driver.id),
18.       s"Driver successfully submitted as ${driver.id}"))
19.   }

类似地,Master收到Driver StandaloneKillRequestServlet方法中发送的RequestKillDriver消息,Master做相应的处理以后,返回Driver StandaloneKillRequestServlet消息KillDriverResponse。

Master收到Driver StandaloneStatusRequestServlet方法中发送的RequestDriverStatus更新消息,Master做相应的处理以后,返回Driver StandaloneStatusRequestServlet消息DriverStatusResponse。