6.1 Spark Application到底是如何提交给集群的

本节讲解Application提交参数配置、Application提交给集群原理、Application提交给集群源码等内容,将彻底解密Spark Application到底是如何提交给集群的。

6.1.1 Application提交参数配置详解

用户应用程序可以使用bin/spark-submit脚本来启动。spark-submit脚本负责使用Spark及其依赖关系设置类路径,并可支持Spark支持的不同群集管理器和部署模式。

bin/spark-submit脚本示例如下。

1.   ./bin/spark-submit \
2.  --class <main-class> \
3.  --master <master-url> \
4.  --deploy-mode <deploy-mode> \
5.  --conf <key>=<value> \
6.  ... # other options
7.  <application-jar> \
8.  [application-arguments]

spark-submit脚本提交参数配置中一些常用的选项。

--class:应用程序的入口点(如org.apache.spark.examples.SparkPi)。

--master:集群的主URL(如spark://23.195.26.187:7077)。

--deploy-mode:将Driver程序部署在集群Worker节点(cluster);或作为外部客户端(client)部署在本地(默认值:client)。

--conf:任意Spark配置属性,使用key = value格式。对于包含空格的值,用引号括起来,如“key = value”。

application-jar:包含应用程序和所有依赖关系Jar包的路径。该URL必须在集群内全局可见。例如,所有节点上存在的hdfs://路径或file://路径。

application-arguments:传递给主类的main方法的参数。

6.1.2 Application提交给集群原理详解

在Spark官网部署页面(http://spark.apache.org/docs/latest/cluster-overview.html),可以看到当前集群支持以下3种集群管理器(cluster manager)。

(1)Standalone:Spark原生的简单集群管理器。使用Standalone可以很方便地搭建一个集群。

(2)Apache Mesos:一个通用的集群管理器,可以在上面运行HadoopMapReduce和一些服务型的应用。

(3)Hadoop YARN:在Hadoop 2中提供的资源管理器。

另外,Spark提供的EC2启动脚本,可以很方便地在Amazon EC2上启动一个Standalone集群。

实际上,除了上面这些通用的集群管理器外,Spark内部也提供一些方便我们测试、学习的简单集群部署模式。为了更全面地理解,我们会从Spark应用程序部署点切入,也就是从提交一个Spark应用程序开始,引出并详细解析各种部署模式。

说明:下面涉及类的描述时,如果可以通过类名唯一确定一个类,将直接给出类名,如果不能,会先给出全路径的类名,然后在不出现歧义的地方再简写为类名。

为了简化应用程序提交的复杂性,Spark提供了各种应用程序提交的统一入口,即spark-submit脚本,应用程序的提交都间接或直接地调用了该脚本。下面简单分析几个脚本,包含./bin/spark-shell、./bin/pyspark、./bin/sparkR、./bin/spark-sql、./bin/run-example、./bin/speak-submit,以及所有脚本最终都调用到的一个执行Java类的脚本./bin/spark-class。

1.脚本./bin/spark-shell

通过该脚本可以打开使用Scala语言进行开发、调试的交互式界面,脚本的代码如下所示。

1.  ......
2.  function main() {
3.  ......
4.  "${SPARK_HOME}"/bin/spark-submit  --class  org.apache.spark.repl.Main
    --name "Spark shell" "$@"
5.  sttyicanon echo > /dev/null 2>&1
6.  else
7.  export SPARK_SUBMIT_OPTS
8.  "${SPARK_HOME}"/bin/spark-submit  --class  org.apache.spark.repl.Main
    --name "Spark shell" "$@"
9.  fi
10. }
11. ......

对应在第4行和第8行处,调用了应用程序提交脚本./bin/spark-submit。脚本./bin/spark-shell的基本用法如下所示:

1.  "Usage: ./bin/spark-shell [options]"

其他脚本类似。下面分别针对各个脚本的用法(具体用法可查看脚本的帮助信息,如通过--help选项来获取)与关键执行语句等进行简单解析。了解工具(如脚本)如何使用,最根本的是先查看其帮助信息,然后在此基础上进行扩展。

2.脚本./bin/pyspark

通过该脚本可以打开使用Python语言开发、调试的交互式界面。

(1)该脚本的用法如下。

1.  "Usage: ./bin/pyspark [options]"

(2)该脚本的执行语句如下。

1.  exec "${SPARK_HOME}"/bin/spark-submit pyspark-shell-main --name
    "PySparkShell" "$@"
3.脚本./bin/sparkR

通过该脚本可以打开使用sparkR开发、调试的交互式界面。

(1)该脚本的用法如下。

1.  "Usage: ./bin/sparkR [options]"

(2)该脚本的执行语句如下。

1.  exec "${SPARK_HOME}"/bin/spark-submit sparkr-shell-main "$@"
4.脚本./bin/spark-sql

通过该脚本可以打开使用SparkSql开发、调试的交互式界面。

(1)该脚本的用法如下。

1.  "Usage: ./bin/spark-sql [options] [cli option]"

(2)该脚本的执行语句如下。

1.  exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.hive.
    thriftserver.SparkSQLCLIDriver "$@"
5.脚本./bin/run-example

可以通过该脚本运行Spark自带的案例代码。该脚本中会自动补全案例类的路径。

(1)该脚本的用法如下。

1.  echo "Usage: ./bin/run-example <example-class> [example-args]" 1>&2
2.  echo "  - set MASTER=XX to use a specific master" 1>&2
3.  echo "  - can use abbreviated example class name relative to com.apache.
    spark.examples" 1>&2
4.  echo " (e.g. SparkPi, mllib.LinearRegression, streaming.
    KinesisWordCountASL)" 1>&2

(2)该脚本的执行语句如下。

1.  exec "${SPARK_HOME}"/bin/spark-submit \
2.    --master $EXAMPLE_MASTER \
3.    --class $EXAMPLE_CLASS \
4.    "$SPARK_EXAMPLES_JAR" \
5.    "$@"
6.脚本./bin/spark-submit

./bin/spark-submit是提交Spark应用程序最常用的一个脚本。从前面各个脚本的解析可以看出,各个脚本最终都调用了./bin/spark-submit脚本。

(1)该脚本的用法如下。

该脚本的用法需要从源码中获取,具体源码位置参考SparkSubmitArguments类的方法printUsageAndExit,代码如下。

1.  val command = sys.env.get("_SPARK_CMD_USAGE").getOrElse(
2.  """Usage: spark-submit [options] <app jar | python file> [app arguments]
3.      |Usage: spark-submit --kill [submission ID] --master [spark://...]
4.      |Usage: spark-submit --status [submission ID] --master [spark:
        //...]""".stripMargin)

(2)该脚本的执行语句如下。

1.  exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.
    SparkSubmit "$@"
7.脚本./bin/spark-class

该脚本是所有其他脚本最终都调用到的一个执行Java类的脚本。其中关键的执行语句如下所示。

1.  CMD=()
2.  while IFS= read -d '' -r ARG; do
3.    CMD+=("$ARG")
4.  done <<("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main
    "$@")
5.  exec "${CMD[@]}"

其中,负责运行的RUNNER变量设置如下。

1.  # Find the java binary
2.  //将RUNNER设置为Java
3.  if [ -n "${JAVA_HOME}" ]; then
4.    RUNNER="${JAVA_HOME}/bin/java"
5.  else
6.    if [ `command -v java` ]; then
7.      RUNNER="java"
8.    else
9.      echo "JAVA_HOME is not set" >&2
10.     exit 1
11.   fi
12. fi

在脚本中,LAUNCH_CLASSPATH变量对应Java命令运行时所需的classpath信息。最终Java命令启动的类是org.apache.spark.launcher.Main。Main类的入口函数main,会根据输入参数构建出最终执行的命令,即这里返回的${CMD[@]}信息,然后通过exec执行。

6.1.3 Application提交给集群源码详解

本节从应用部署的角度解析相关的源码,主要包括脚本提交时对应JVM进程启动的主类org.apache.spark.launcher.Main、定义应用程序提交的行为类型的类org.apache.spark.deploy. SparkSubmitAction、应用程序封装底层集群管理器和部署模式的类org.apache.spark.deploy. SparkSubmit,以及代表一个应用程序的驱动程序的类org.apache.spark.SparkContext。

1.Main解析

从前面的脚本分析,得出最终都是通过org.apache.spark.launcher.Main类(下面简称Main类)启动应用程序的。因此,首先解析一下Main类。

在Main类的源码中,类的注释如下。

1.  /**
2.    * Spark 启动器的命令行接口。在Spark脚本内部使用
3.    *
4.    */

对应地,在Main对象的入口方法main的注释如下。

Main.java源码如下。

1.
2.   /**
3.     * Usage: Main [class] [class args]
4.     * <p>
5.     * 命令行界面工作在两种模式下:
6.     * <ul>
7.     *   <li>"spark-submit": if <i>class</i> is "org.apache.spark.deploy.
       *   SparkSubmit", the {@link SparkLauncher} class is used to launch a
       *   Spark application. </li>
8.     *   <li>"spark-class": if another class is provided, an internal Spark
       *   class is run.</li>
9.     * </ul>
10.  .......
11. public static void main(String[] argsArray) throws Exception {
12. ......

Main类主要有两种工作模式,分别描述如下。

(1)spark-submit

启动器要启动的类为org.apache.spark.deploy.SparkSubmit时,对应为spark-submit工作模式。此时,使用SparkSubmitCommandBuilder类来构建启动命令。

(2)spark-class

启动器要启动的类是除SparkSubmit之外的其他类时,对应为spark-class工作模式。此时使用SparkClassCommandBuilder类的buildCommand方法来构建启动命令。

Main.java的源码如下。

1.  public static void main(String[] argsArray) throws Exception {
2.  .......
3.  String className = args.remove(0);
4.  .......
5.      if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
6.        try {
7.          builder = new SparkSubmitCommandBuilder(args);
8.  .......
9.   } else {
10.       builder = new SparkClassCommandBuilder(className, args);
11.     }
12. ......

以spark-submit工作模式为例,对应的在构建启动命令的SparkSubmitCommandBuilder类中,上述调用的SparkClassCommandBuilder构造函数定义如下。

SparkSubmitCommandBuilder.java的源码如下。

1.     SparkSubmitCommandBuilder(List<String> args) {
2.      this.allowsMixedArguments = false;
3.      this.sparkArgs = new ArrayList<>();
4.      boolean isExample = false;
5.      List<String> submitArgs = args;
6.   //根据输入的第一个参数设置,包括主资源appResource等
7.      if (args.size() > 0) {
8.        switch (args.get(0)) {
9.          case PYSPARK_SHELL:
10.           this.allowsMixedArguments = true;
11.           appResource = PYSPARK_SHELL;
12.           submitArgs = args.subList(1, args.size());
13.           break;
14.
15.         case SPARKR_SHELL:
16.           this.allowsMixedArguments = true;
17.           appResource = SPARKR_SHELL;
18.           submitArgs = args.subList(1, args.size());
19.           break;
20.
21.         case RUN_EXAMPLE:
22.           isExample = true;
23.           submitArgs = args.subList(1, args.size());
24.       }
25.
26.       this.isExample = isExample;
27.       OptionParser parser = new OptionParser();
28.       parser.parse(submitArgs);
29.       this.isAppResourceReq = parser.isAppResourceReq;
30.     }  else {
31.       this.isExample = isExample;
32.       this.isAppResourceReq = false;
33.     }
34.   }

从这些初步的参数解析可以看出,前面脚本中的参数与最终对应的主资源间的对应关系见表6-1。

表6-1 脚本中的参数与主资源间的对应关系

如果继续跟踪appResource赋值的源码,可以跟踪到一些特殊类的类名与最终对应的主资源间的对应关系,见表6-2。

表6-2 特殊类的类名与主资源间的对应关系

如果有兴趣,可以继续跟踪SparkClassCommandBuilder类的buildCommand方法的源码,查看构建的命令具体有哪些。

通过Main类的简单解析,可以将前面的脚本分析结果与后面即将进行分析的SparkSubmit类关联起来,以便进一步解析与应用程序提交相关的其他源码。

从前面的脚本分析可以看到,提交应用程序时,Main启动的类,也就是用户最终提交执行的类是org.apache.spark.deploy.SparkSubmit。因此,下面开始解析SparkSubmit相关的源码,包括提交行为的定义、提交时的参数解析以及最终提交运行的代码解析。

2.SparkSubmitAction解析

SparkSubmitAction定义了提交应用程序的行为类型,源码如下所示。

SparkSubmit.scala的源码如下。

1.    private[deploy] object SparkSubmitAction extends Enumeration {
2.    type SparkSubmitAction = Value
3.    val SUBMIT, KILL, REQUEST_STATUS = Value
4.  }

从源码中可以看到,分别定义了SUBMIT、KILL、REQUEST_STATUS这3种行为类型,对应提交应用、停止应用、查询应用的状态。

3.SparkSubmit解析

SparkSubmit的全路径为org.apache.spark.deploy.SparkSubmit。从SparkSubmit类的注释可以看出,SparkSubmit是启动一个Spark应用程序的主入口点,这和前面从脚本分析得到的结论一致。首先看一下SparkSubmit类的注释,如下所示。

1.  /**
2.    *启动一个Spark应用程序的主入口点
3.    *
4.    *
5.    *这个程序处理与Spark依赖相关的类路径设置,提供Spark支持的在不同集群管理器的部
      *署模式
6.    *
7.    */

SparkSubmit会帮助我们设置Spark相关依赖包的classpath,同时,为了帮助用户简化提交应用程序的复杂性,SparkSubmit提供了一个抽象层,封装了底层复杂的集群管理器与部署模式的各种差异点,即通过SparkSubmit的封装,集群管理器与部署模式对用户是透明的。

在SparkSubmit中体现透明性的集群管理器定义的源码如下所示。

SparkSubmit.scala的源码如下。

1.  //集群管理器
2.  //Cluster managers
3.  private val YARN = 1
4.  private val STANDALONE = 2
5.  private val MESOS = 4
6.  private val LOCAL = 8
7.  private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL

在SparkSubmit中体现透明性的部署模式定义的源码如下。

1.  //部署模式
2.  //Deploy modes
3.  private val CLIENT = 1
4.  private val CLUSTER = 2
5.  private val ALL_DEPLOY_MODES = CLIENT | CLUSTER

作为提交应用程序的入口点,SparkSubmit中根据具体的集群管理器进行参数转换、参数校验等操作,如对模式的检查,代码中给出了针对特定情况,不支持的集群管理器与部署模式,在这些模式下提交应用程序会直接报错退出。

SparkSubmit.scala的源码如下。

1.   //不支持的集群管理器与部署模式
2.
3.      (clusterManager, deployMode) match {
4.        case (STANDALONE, CLUSTER) if args.isPython =>
5.          printErrorAndExit("Cluster deploy mode is currently not supported
              for python " +"applications on standalone clusters.")
6.
7.        case (STANDALONE, CLUSTER) if args.isR =>
8.          printErrorAndExit("Cluster deploy mode is currently not supported
              for R " +"applications on standalone clusters.")
9.
10.       case (LOCAL, CLUSTER) =>
11.         printErrorAndExit("Cluster deploy mode is not compatible with
            master \"local\"")
12.       case (_, CLUSTER) if isShell(args.primaryResource) =>
13.         printErrorAndExit("Cluster deploy mode is not applicable to Spark
            shells.")
14.       case (_, CLUSTER) if isSqlShell(args.mainClass) =>
15.         printErrorAndExit("Cluster deploy mode is not applicable to Spark
            SQL shell.")
16.       case (_, CLUSTER) if isThriftServer(args.mainClass) =>
17.         printErrorAndExit("Cluster deploy mode is not applicable to Spark
            Thrift server.")
18.       case _ =>
19.     }

首先,一个程序运行的入口点对应单例对象的main函数,因此在执行SparkSubmit时,对应的入口点是objectSparkSubmit的main函数,具体代码如下。

SparkSubmit.scala的源码如下。

1.  //入口点函数main的定义
2.  def main(args: Array[String]): Unit = {
3.    val appArgs = new SparkSubmitArguments(args)
4.   ......
5.   //根据3种行为分别进行处理
6.  appArgs.action match {
7.      case SparkSubmitAction.SUBMIT => submit(appArgs)
8.      case SparkSubmitAction.KILL => kill(appArgs)
9.      case SparkSubmitAction.REQUEST_STATUS =>requestStatus(appArgs)
10.   }
11. }

其中,SparkSubmitArguments类对应用户调用提交脚本spark-submit时传入的参数信息。对应的脚本的帮助信息(./bin/spark-submit --help),也是由该类的printUsageAndExit方法提供的。

找到上面的入口点代码之后,就可以开始分析其内部的源码。对应参数信息的SparkSubmitArguments可以参考脚本的帮助信息,来查看具体参数对应的含义。参数分析后,便是对各种提交行为的具体处理。SparkSubmit支持SparkSubmitAction包含的3种行为,下面以行为SparkSubmitAction.SUBMIT为例进行分析,其他行为也可以通过各自的具体处理代码进行分析。

对应处理SparkSubmitAction.SUBMIT行为的代码入口点为submit(appArgs),进入该方法,即进入提交应用程序的处理方法的具体代码如下所示。

SparkSubmit.scala的源码如下。

1.     private def submit(args: SparkSubmitArguments): Unit = {
2.   //准备应用程序提交的环境,该步骤包含了内部封装的各个细节处理
3.      val (childArgs, childClasspath, sysProps, childMainClass) =
        prepareSubmitEnvironment(args)
4.
5.      def doRunMain(): Unit = {
6.        if (args.proxyUser != null) {
7.          val proxyUser = UserGroupInformation.createProxyUser
              (args.proxyUser,UserGroupInformation.getCurrentUser())
8.
9.          try {
10.           proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
11.             override def run(): Unit = {
12.               runMain(childArgs, childClasspath, sysProps, childMainClass,
                  args.verbose)
13.             }
14.           })
15.         } catch {
16.           case e: Exception =>
17.             //hadoop的AuthorizationException抑制异常堆栈跟踪,通过JVM打印输
                //出的消息不是很有帮助。这里检测异常以及空栈,对其采用不同的处理
18.             if (e.getStackTrace().length == 0) {
19.               //scalastyle:off println
20.               printStream.println(s"ERROR:            ${e.getClass().getName()}:
                  ${e.getMessage()}")
21.               //scalastyle:on println
22.               exitFn(1)
23.             } else {
24.               throw e
25.             }
26.         }
27.       } else {
28.         runMain(childArgs,       childClasspath,      sysProps,    childMainClass,
            args.verbose)
29.      }
30.    }
31.
32.   //Standalone 集群模式下,有两种提交应用程序的方式
33.   //1.传统的RPC网关方式使用o.a.s.deploy.Client进行封装
34.   //2.Spark 1.3使用新REST-based 网关方式,作为Spark 1.3的默认方法,如果Master
      //节点不是REST服务器节点,Spark应用程序提交时会切换到传统的网关模式
35.    if (args.isStandaloneCluster && args.useRest) {
36.      try {
37.        //scalastyle:off println
38.        printStream.println("Running Spark using the REST application
           submission protocol.")
39.        //scalastyle:on println
40.        doRunMain()
41.      } catch {
42.          //如果失败,则使用传统的提交方式
43.        case e: SubmitRestConnectionException =>
44.          printWarning(s"Master endpoint ${args.master} was not a REST
             server. " + "Falling back to legacy submission gateway instead.")
45.
46.       //重新设置提交方式的控制开关
47.          args.useRest = false
48.          submit(args)
49.      }
50.    //在所有其他模式中,只要准备好主类就可以
51.    } else {
52.      doRunMain()
53.    }
54.  }

其中,最终运行所需的参数都由prepareSubmitEnvironment方法负责解析、转换,然后根据其结果执行。解析的结果包含以下4部分。

 子进程运行所需的参数。

 子进程运行时的classpath列表。

 系统属性的映射。

 子进程运行时的主类。

解析之后调用runMain方法,该方法中除了一些环境设置等操作外,最终会调用解析得到的childMainClass的main方法。下面简单分析一下prepareSubmitEnvironment方法,通过该方法来了解SparkSubmit是如何帮助底层的集群管理器和部署模式的封装的。里面涉及的各种细节比较多,这里以不同集群管理器和部署模式下最终运行的childMainClass类的解析为主线进行分析。

(1)当部署模式为CLIENT时,将childMainClass设置为传入的mainClass,对应代码如下所示。

1.     //在CLIENT模式下,直接启动应用程序的主类
2.  if (deployMode == CLIENT || isYarnCluster) {
3.       childMainClass = args.mainClass
4.       if (isUserJar(args.primaryResource)) {
5.         childClasspath += args.primaryResource
6.       }
7.       if (args.jars != null) { childClasspath ++= args.jars.split(",") }
8.     }
9.
10.  if (deployMode == CLIENT) {
11.    if (args.childArgs != null) { childArgs ++= args.childArgs }
12.  }

(2)当集群管理器为STANDALONE、部署模式为CLUSTER时,根据提交的两种方式将childMainClass分别设置为不同的类,同时将传入的args.mainClass(提交应用程序时设置的主类)及其参数根据不同集群管理器与部署模式进行转换,并封装到新的主类所需的参数中。对应的设置见表6-3。

表6-3 STANDALONE+CLUSTER时两种不同提交方式下的childMainClass封装

其中,表述性状态传递(Representational State Transfer,REST)是Roy Fielding博士在2000年他的博士论文中提出来的一种软件架构风格。

这些设置的主类相当于封装了应用程序提交时的主类,运行后负责向Master节点申请启动提交的应用程序。

(3)当集群管理器为YARN、部署模式为CLUSTER时,childMainClass以及对应的mainClass的设置见表6-4。

表6-4 YARN+CLUSTER时childMainClass下的childMainClass封装

(4)当集群管理器为MESOS、部署模式为CLUSTER时,childMainClass以及对应的mainClass的设置见表6-5。

表6-5 MESOS+CLUSTER时childMainClass下的childMainClass封装

从上面的分析中可以看到,使用CLIENT部署模式进行提交时,由于设置的childMainClass为应用程序提交时的主类,因此是直接在提交点执行设置的主类,即mainClass,当使用CLUSTER部署模式进行提交时,则会根据具体集群管理器等信息,使用相应的封装类。这些封装类会向集群申请提交应用程序的请求,然后在由集群调度分配得到的节点上,启动所申请的应用程序。

以封装类设置为org.apache.spark.deploy.Client为例,从该类主入口main方法查看,可以看到构建了一个ClientEndpoint实例,该实例构建时,会将提交应用程序时设置的mainClass等信息封装到DriverDescription实例中,然后发送到Master,申请执行用户提交的应用程序。

对应各种集群管理器与部署模式的组合,实际代码中的处理细节非常多。这里仅给出一种源码阅读的方式,和对应的大数据处理一样,通常采用化繁为简的方式去阅读复杂的源码。例如,这里在理解整个大框架的调用过程后,以childMainClass的设置作为主线去解读源码,对应地,在扩展阅读其他源码时,也可以采用这种方式,以某种集群管理器与部署模式为主线,详细阅读相关的代码。最后,在了解各种组合的处理细节之后,通过对比、抽象等方法,对整个SparkSubmit进行归纳总结。

提交的应用程序的驱动程序(Driver Program)部分对应包含了一个SparkContext实例。因此,接下来从该实例出发,解析驱动程序在不同的集群管理器的部署细节。

4.SparkContext解析

在详细解析SparkContext实例前,首先查看一下SparkContext类的注释部分,具体如下所示。

1.  /**
2.   * Spark功能的主入口点。一个SparkContext代表连接到Spark集群,并可用于在集群中
     * 创建RDDs、累加器和广播变量
3.   * ......
4.   * @param 描述应用程序配置的配置对象。在该配置的任何设置将覆盖默认的配置以及系统属性
5.   */

SparkContext类是Spark功能的主入口点。一个SparkContext实例代表了与一个Spark集群的连接,并且通过该实例,可以在集群中构建RDDs、累加器以及广播变量。SparkContext实例的构建参数config描述了应用程序的Spark配置。在该参数中指定的配置属性会覆盖默认的配置属性以及系统属性。

在SparkContext类文件中定义了一个描述集群管理器类型的单例对象SparkMasterRegex,在该对象中详细给出了当前Spark支持的各种集群管理器类型。

SparkContext.scala的源码如下。

1.  /**
2.   * 定义了从Master信息中抽取集群管理器类型的一个正则表达式集合
3.   *
4.   */
5.  private object SparkMasterRegex {
6.   //对应Master格式如local[N] 和local[*]的正则表达式
7.   //对应的Master格式如local[N]和local[*]的正则表达式
8.   val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
9.
10.  //对应的Master格式如local[N, maxRetries]的正则表达式
11.  //这种集群管理器类型用于具有任务失败尝试功能的测试
12.
13.   val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
14.
15. //一种模拟Spark集群的本地模式的正则表达式,对应的Master格式如local-cluster[N,
16. //cores, memory]
17.
18.                                         LOCAL_CLUSTER_REGEX = """local-cluster\
                                             [\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*
                                             ([0-9]+)\s*]""".r
19.
20. //连接Spark部署集群的正则表达式
21.
22.   val SPARK_REGEX = """spark://(.*)""".r
23. }

在SparkContext类中的主要流程可以归纳如下:

(1)createSparkEnv:创建Spark的执行环境对应的SparkEnv实例。

对应代码如下所示。

1.      //Create the Spark execution environment (cache, map output tracker,
        //etc)
2.      _env = createSparkEnv(_conf, isLocal, listenerBus)
3.  SparkEnv.set(_env)

(2)createTaskScheduler:创建作业调度器实例。

对应代码如下所示。

1.  //创建和启动调度器scheduler
2.  val (sched, ts) = SparkContext.createTaskScheduler(this, master)
3.  _schedulerBackend = sched
4.  _taskScheduler = ts

其中,TaskScheduler是低层次的任务调度器,负责任务的调度。通过该接口提供可插拔的任务调度器。每个TaskScheduler负责调度一个SparkContext实例中的任务,负责调度上层DAG调度器中每个Stage提交的任务集(TaskSet),并将这些任务提交到集群中运行,在任务提交执行时,可以使用失败重试机制设置失败重试的次数。上述对应高层的DAG调度器的实例构建参见下一步。

(3)new DAGScheduler:创建高层Stage调度的DAG调度器实例。

对应代码如下。

1.  _dagScheduler = new DAGScheduler(this)

DAGScheduler是高层调度模块,负责作业(Job)的Stage拆分,以及最终将Stage对应的任务集提交到低层次的任务调度器上。

下面基于这些主要流程,针对SparkMasterRegex单例对象中给出的各种集群部署模式进行解析。对应不同集群模式,这些流程中构建了包括TaskScheduler与SchedulerBackend的不同的具体子类,所构建的相关实例具体见表6-6。

表6-6 各种情况下TaskScheduler与SchedulerBackend的不同的具体子类

与TaskScheduler和SchedulerBackend不同的是,在不同集群模式中,应用程序的高层调度器DAGScheduler的实例是相同的,即对应在Spark on YARN与Mesos等集群管理器中,应用程序内部的高层Stage调度是相同的。