- Spark大数据商业实战三部曲:内核解密|商业案例|性能调优
- 王家林
- 6243字
- 2021-03-30 21:55:55
6.1 Spark Application到底是如何提交给集群的
本节讲解Application提交参数配置、Application提交给集群原理、Application提交给集群源码等内容,将彻底解密Spark Application到底是如何提交给集群的。
6.1.1 Application提交参数配置详解
用户应用程序可以使用bin/spark-submit脚本来启动。spark-submit脚本负责使用Spark及其依赖关系设置类路径,并可支持Spark支持的不同群集管理器和部署模式。
bin/spark-submit脚本示例如下。
1. ./bin/spark-submit \ 2. --class <main-class> \ 3. --master <master-url> \ 4. --deploy-mode <deploy-mode> \ 5. --conf <key>=<value> \ 6. ... # other options 7. <application-jar> \ 8. [application-arguments]
spark-submit脚本提交参数配置中一些常用的选项。
--class:应用程序的入口点(如org.apache.spark.examples.SparkPi)。
--master:集群的主URL(如spark://23.195.26.187:7077)。
--deploy-mode:将Driver程序部署在集群Worker节点(cluster);或作为外部客户端(client)部署在本地(默认值:client)。
--conf:任意Spark配置属性,使用key = value格式。对于包含空格的值,用引号括起来,如“key = value”。
application-jar:包含应用程序和所有依赖关系Jar包的路径。该URL必须在集群内全局可见。例如,所有节点上存在的hdfs://路径或file://路径。
application-arguments:传递给主类的main方法的参数。
6.1.2 Application提交给集群原理详解
在Spark官网部署页面(http://spark.apache.org/docs/latest/cluster-overview.html),可以看到当前集群支持以下3种集群管理器(cluster manager)。
(1)Standalone:Spark原生的简单集群管理器。使用Standalone可以很方便地搭建一个集群。
(2)Apache Mesos:一个通用的集群管理器,可以在上面运行HadoopMapReduce和一些服务型的应用。
(3)Hadoop YARN:在Hadoop 2中提供的资源管理器。
另外,Spark提供的EC2启动脚本,可以很方便地在Amazon EC2上启动一个Standalone集群。
实际上,除了上面这些通用的集群管理器外,Spark内部也提供一些方便我们测试、学习的简单集群部署模式。为了更全面地理解,我们会从Spark应用程序部署点切入,也就是从提交一个Spark应用程序开始,引出并详细解析各种部署模式。
说明:下面涉及类的描述时,如果可以通过类名唯一确定一个类,将直接给出类名,如果不能,会先给出全路径的类名,然后在不出现歧义的地方再简写为类名。
为了简化应用程序提交的复杂性,Spark提供了各种应用程序提交的统一入口,即spark-submit脚本,应用程序的提交都间接或直接地调用了该脚本。下面简单分析几个脚本,包含./bin/spark-shell、./bin/pyspark、./bin/sparkR、./bin/spark-sql、./bin/run-example、./bin/speak-submit,以及所有脚本最终都调用到的一个执行Java类的脚本./bin/spark-class。
1.脚本./bin/spark-shell
通过该脚本可以打开使用Scala语言进行开发、调试的交互式界面,脚本的代码如下所示。
1. ...... 2. function main() { 3. ...... 4. "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" 5. sttyicanon echo > /dev/null 2>&1 6. else 7. export SPARK_SUBMIT_OPTS 8. "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" 9. fi 10. } 11. ......
对应在第4行和第8行处,调用了应用程序提交脚本./bin/spark-submit。脚本./bin/spark-shell的基本用法如下所示:
1. "Usage: ./bin/spark-shell [options]"
其他脚本类似。下面分别针对各个脚本的用法(具体用法可查看脚本的帮助信息,如通过--help选项来获取)与关键执行语句等进行简单解析。了解工具(如脚本)如何使用,最根本的是先查看其帮助信息,然后在此基础上进行扩展。
2.脚本./bin/pyspark
通过该脚本可以打开使用Python语言开发、调试的交互式界面。
(1)该脚本的用法如下。
1. "Usage: ./bin/pyspark [options]"
(2)该脚本的执行语句如下。
1. exec "${SPARK_HOME}"/bin/spark-submit pyspark-shell-main --name "PySparkShell" "$@"
3.脚本./bin/sparkR
通过该脚本可以打开使用sparkR开发、调试的交互式界面。
(1)该脚本的用法如下。
1. "Usage: ./bin/sparkR [options]"
(2)该脚本的执行语句如下。
1. exec "${SPARK_HOME}"/bin/spark-submit sparkr-shell-main "$@"
4.脚本./bin/spark-sql
通过该脚本可以打开使用SparkSql开发、调试的交互式界面。
(1)该脚本的用法如下。
1. "Usage: ./bin/spark-sql [options] [cli option]"
(2)该脚本的执行语句如下。
1. exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.hive. thriftserver.SparkSQLCLIDriver "$@"
5.脚本./bin/run-example
可以通过该脚本运行Spark自带的案例代码。该脚本中会自动补全案例类的路径。
(1)该脚本的用法如下。
1. echo "Usage: ./bin/run-example <example-class> [example-args]" 1>&2 2. echo " - set MASTER=XX to use a specific master" 1>&2 3. echo " - can use abbreviated example class name relative to com.apache. spark.examples" 1>&2 4. echo " (e.g. SparkPi, mllib.LinearRegression, streaming. KinesisWordCountASL)" 1>&2
(2)该脚本的执行语句如下。
1. exec "${SPARK_HOME}"/bin/spark-submit \ 2. --master $EXAMPLE_MASTER \ 3. --class $EXAMPLE_CLASS \ 4. "$SPARK_EXAMPLES_JAR" \ 5. "$@"
6.脚本./bin/spark-submit
./bin/spark-submit是提交Spark应用程序最常用的一个脚本。从前面各个脚本的解析可以看出,各个脚本最终都调用了./bin/spark-submit脚本。
(1)该脚本的用法如下。
该脚本的用法需要从源码中获取,具体源码位置参考SparkSubmitArguments类的方法printUsageAndExit,代码如下。
1. val command = sys.env.get("_SPARK_CMD_USAGE").getOrElse( 2. """Usage: spark-submit [options] <app jar | python file> [app arguments] 3. |Usage: spark-submit --kill [submission ID] --master [spark://...] 4. |Usage: spark-submit --status [submission ID] --master [spark: //...]""".stripMargin)
(2)该脚本的执行语句如下。
1. exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy. SparkSubmit "$@"
7.脚本./bin/spark-class
该脚本是所有其他脚本最终都调用到的一个执行Java类的脚本。其中关键的执行语句如下所示。
1. CMD=() 2. while IFS= read -d '' -r ARG; do 3. CMD+=("$ARG") 4. done <<("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@") 5. exec "${CMD[@]}"
其中,负责运行的RUNNER变量设置如下。
1. # Find the java binary 2. //将RUNNER设置为Java 3. if [ -n "${JAVA_HOME}" ]; then 4. RUNNER="${JAVA_HOME}/bin/java" 5. else 6. if [ `command -v java` ]; then 7. RUNNER="java" 8. else 9. echo "JAVA_HOME is not set" >&2 10. exit 1 11. fi 12. fi
在脚本中,LAUNCH_CLASSPATH变量对应Java命令运行时所需的classpath信息。最终Java命令启动的类是org.apache.spark.launcher.Main。Main类的入口函数main,会根据输入参数构建出最终执行的命令,即这里返回的${CMD[@]}信息,然后通过exec执行。
6.1.3 Application提交给集群源码详解
本节从应用部署的角度解析相关的源码,主要包括脚本提交时对应JVM进程启动的主类org.apache.spark.launcher.Main、定义应用程序提交的行为类型的类org.apache.spark.deploy. SparkSubmitAction、应用程序封装底层集群管理器和部署模式的类org.apache.spark.deploy. SparkSubmit,以及代表一个应用程序的驱动程序的类org.apache.spark.SparkContext。
1.Main解析
从前面的脚本分析,得出最终都是通过org.apache.spark.launcher.Main类(下面简称Main类)启动应用程序的。因此,首先解析一下Main类。
在Main类的源码中,类的注释如下。
1. /** 2. * Spark 启动器的命令行接口。在Spark脚本内部使用 3. * 4. */
对应地,在Main对象的入口方法main的注释如下。
Main.java源码如下。
1. 2. /** 3. * Usage: Main [class] [class args] 4. * <p> 5. * 命令行界面工作在两种模式下: 6. * <ul> 7. * <li>"spark-submit": if <i>class</i> is "org.apache.spark.deploy. * SparkSubmit", the {@link SparkLauncher} class is used to launch a * Spark application. </li> 8. * <li>"spark-class": if another class is provided, an internal Spark * class is run.</li> 9. * </ul> 10. ....... 11. public static void main(String[] argsArray) throws Exception { 12. ......
Main类主要有两种工作模式,分别描述如下。
(1)spark-submit
启动器要启动的类为org.apache.spark.deploy.SparkSubmit时,对应为spark-submit工作模式。此时,使用SparkSubmitCommandBuilder类来构建启动命令。
(2)spark-class
启动器要启动的类是除SparkSubmit之外的其他类时,对应为spark-class工作模式。此时使用SparkClassCommandBuilder类的buildCommand方法来构建启动命令。
Main.java的源码如下。
1. public static void main(String[] argsArray) throws Exception { 2. ....... 3. String className = args.remove(0); 4. ....... 5. if (className.equals("org.apache.spark.deploy.SparkSubmit")) { 6. try { 7. builder = new SparkSubmitCommandBuilder(args); 8. ....... 9. } else { 10. builder = new SparkClassCommandBuilder(className, args); 11. } 12. ......
以spark-submit工作模式为例,对应的在构建启动命令的SparkSubmitCommandBuilder类中,上述调用的SparkClassCommandBuilder构造函数定义如下。
SparkSubmitCommandBuilder.java的源码如下。
1. SparkSubmitCommandBuilder(List<String> args) { 2. this.allowsMixedArguments = false; 3. this.sparkArgs = new ArrayList<>(); 4. boolean isExample = false; 5. List<String> submitArgs = args; 6. //根据输入的第一个参数设置,包括主资源appResource等 7. if (args.size() > 0) { 8. switch (args.get(0)) { 9. case PYSPARK_SHELL: 10. this.allowsMixedArguments = true; 11. appResource = PYSPARK_SHELL; 12. submitArgs = args.subList(1, args.size()); 13. break; 14. 15. case SPARKR_SHELL: 16. this.allowsMixedArguments = true; 17. appResource = SPARKR_SHELL; 18. submitArgs = args.subList(1, args.size()); 19. break; 20. 21. case RUN_EXAMPLE: 22. isExample = true; 23. submitArgs = args.subList(1, args.size()); 24. } 25. 26. this.isExample = isExample; 27. OptionParser parser = new OptionParser(); 28. parser.parse(submitArgs); 29. this.isAppResourceReq = parser.isAppResourceReq; 30. } else { 31. this.isExample = isExample; 32. this.isAppResourceReq = false; 33. } 34. }
从这些初步的参数解析可以看出,前面脚本中的参数与最终对应的主资源间的对应关系见表6-1。
表6-1 脚本中的参数与主资源间的对应关系
如果继续跟踪appResource赋值的源码,可以跟踪到一些特殊类的类名与最终对应的主资源间的对应关系,见表6-2。
表6-2 特殊类的类名与主资源间的对应关系
如果有兴趣,可以继续跟踪SparkClassCommandBuilder类的buildCommand方法的源码,查看构建的命令具体有哪些。
通过Main类的简单解析,可以将前面的脚本分析结果与后面即将进行分析的SparkSubmit类关联起来,以便进一步解析与应用程序提交相关的其他源码。
从前面的脚本分析可以看到,提交应用程序时,Main启动的类,也就是用户最终提交执行的类是org.apache.spark.deploy.SparkSubmit。因此,下面开始解析SparkSubmit相关的源码,包括提交行为的定义、提交时的参数解析以及最终提交运行的代码解析。
2.SparkSubmitAction解析
SparkSubmitAction定义了提交应用程序的行为类型,源码如下所示。
SparkSubmit.scala的源码如下。
1. private[deploy] object SparkSubmitAction extends Enumeration { 2. type SparkSubmitAction = Value 3. val SUBMIT, KILL, REQUEST_STATUS = Value 4. }
从源码中可以看到,分别定义了SUBMIT、KILL、REQUEST_STATUS这3种行为类型,对应提交应用、停止应用、查询应用的状态。
3.SparkSubmit解析
SparkSubmit的全路径为org.apache.spark.deploy.SparkSubmit。从SparkSubmit类的注释可以看出,SparkSubmit是启动一个Spark应用程序的主入口点,这和前面从脚本分析得到的结论一致。首先看一下SparkSubmit类的注释,如下所示。
1. /** 2. *启动一个Spark应用程序的主入口点 3. * 4. * 5. *这个程序处理与Spark依赖相关的类路径设置,提供Spark支持的在不同集群管理器的部 *署模式 6. * 7. */
SparkSubmit会帮助我们设置Spark相关依赖包的classpath,同时,为了帮助用户简化提交应用程序的复杂性,SparkSubmit提供了一个抽象层,封装了底层复杂的集群管理器与部署模式的各种差异点,即通过SparkSubmit的封装,集群管理器与部署模式对用户是透明的。
在SparkSubmit中体现透明性的集群管理器定义的源码如下所示。
SparkSubmit.scala的源码如下。
1. //集群管理器 2. //Cluster managers 3. private val YARN = 1 4. private val STANDALONE = 2 5. private val MESOS = 4 6. private val LOCAL = 8 7. private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
在SparkSubmit中体现透明性的部署模式定义的源码如下。
1. //部署模式 2. //Deploy modes 3. private val CLIENT = 1 4. private val CLUSTER = 2 5. private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
作为提交应用程序的入口点,SparkSubmit中根据具体的集群管理器进行参数转换、参数校验等操作,如对模式的检查,代码中给出了针对特定情况,不支持的集群管理器与部署模式,在这些模式下提交应用程序会直接报错退出。
SparkSubmit.scala的源码如下。
1. //不支持的集群管理器与部署模式 2. 3. (clusterManager, deployMode) match { 4. case (STANDALONE, CLUSTER) if args.isPython => 5. printErrorAndExit("Cluster deploy mode is currently not supported for python " +"applications on standalone clusters.") 6. 7. case (STANDALONE, CLUSTER) if args.isR => 8. printErrorAndExit("Cluster deploy mode is currently not supported for R " +"applications on standalone clusters.") 9. 10. case (LOCAL, CLUSTER) => 11. printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"") 12. case (_, CLUSTER) if isShell(args.primaryResource) => 13. printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.") 14. case (_, CLUSTER) if isSqlShell(args.mainClass) => 15. printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.") 16. case (_, CLUSTER) if isThriftServer(args.mainClass) => 17. printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.") 18. case _ => 19. }
首先,一个程序运行的入口点对应单例对象的main函数,因此在执行SparkSubmit时,对应的入口点是objectSparkSubmit的main函数,具体代码如下。
SparkSubmit.scala的源码如下。
1. //入口点函数main的定义 2. def main(args: Array[String]): Unit = { 3. val appArgs = new SparkSubmitArguments(args) 4. ...... 5. //根据3种行为分别进行处理 6. appArgs.action match { 7. case SparkSubmitAction.SUBMIT => submit(appArgs) 8. case SparkSubmitAction.KILL => kill(appArgs) 9. case SparkSubmitAction.REQUEST_STATUS =>requestStatus(appArgs) 10. } 11. }
其中,SparkSubmitArguments类对应用户调用提交脚本spark-submit时传入的参数信息。对应的脚本的帮助信息(./bin/spark-submit --help),也是由该类的printUsageAndExit方法提供的。
找到上面的入口点代码之后,就可以开始分析其内部的源码。对应参数信息的SparkSubmitArguments可以参考脚本的帮助信息,来查看具体参数对应的含义。参数分析后,便是对各种提交行为的具体处理。SparkSubmit支持SparkSubmitAction包含的3种行为,下面以行为SparkSubmitAction.SUBMIT为例进行分析,其他行为也可以通过各自的具体处理代码进行分析。
对应处理SparkSubmitAction.SUBMIT行为的代码入口点为submit(appArgs),进入该方法,即进入提交应用程序的处理方法的具体代码如下所示。
SparkSubmit.scala的源码如下。
1. private def submit(args: SparkSubmitArguments): Unit = { 2. //准备应用程序提交的环境,该步骤包含了内部封装的各个细节处理 3. val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) 4. 5. def doRunMain(): Unit = { 6. if (args.proxyUser != null) { 7. val proxyUser = UserGroupInformation.createProxyUser (args.proxyUser,UserGroupInformation.getCurrentUser()) 8. 9. try { 10. proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { 11. override def run(): Unit = { 12. runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) 13. } 14. }) 15. } catch { 16. case e: Exception => 17. //hadoop的AuthorizationException抑制异常堆栈跟踪,通过JVM打印输 //出的消息不是很有帮助。这里检测异常以及空栈,对其采用不同的处理 18. if (e.getStackTrace().length == 0) { 19. //scalastyle:off println 20. printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") 21. //scalastyle:on println 22. exitFn(1) 23. } else { 24. throw e 25. } 26. } 27. } else { 28. runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) 29. } 30. } 31. 32. //Standalone 集群模式下,有两种提交应用程序的方式 33. //1.传统的RPC网关方式使用o.a.s.deploy.Client进行封装 34. //2.Spark 1.3使用新REST-based 网关方式,作为Spark 1.3的默认方法,如果Master //节点不是REST服务器节点,Spark应用程序提交时会切换到传统的网关模式 35. if (args.isStandaloneCluster && args.useRest) { 36. try { 37. //scalastyle:off println 38. printStream.println("Running Spark using the REST application submission protocol.") 39. //scalastyle:on println 40. doRunMain() 41. } catch { 42. //如果失败,则使用传统的提交方式 43. case e: SubmitRestConnectionException => 44. printWarning(s"Master endpoint ${args.master} was not a REST server. " + "Falling back to legacy submission gateway instead.") 45. 46. //重新设置提交方式的控制开关 47. args.useRest = false 48. submit(args) 49. } 50. //在所有其他模式中,只要准备好主类就可以 51. } else { 52. doRunMain() 53. } 54. }
其中,最终运行所需的参数都由prepareSubmitEnvironment方法负责解析、转换,然后根据其结果执行。解析的结果包含以下4部分。
子进程运行所需的参数。
子进程运行时的classpath列表。
系统属性的映射。
子进程运行时的主类。
解析之后调用runMain方法,该方法中除了一些环境设置等操作外,最终会调用解析得到的childMainClass的main方法。下面简单分析一下prepareSubmitEnvironment方法,通过该方法来了解SparkSubmit是如何帮助底层的集群管理器和部署模式的封装的。里面涉及的各种细节比较多,这里以不同集群管理器和部署模式下最终运行的childMainClass类的解析为主线进行分析。
(1)当部署模式为CLIENT时,将childMainClass设置为传入的mainClass,对应代码如下所示。
1. //在CLIENT模式下,直接启动应用程序的主类 2. if (deployMode == CLIENT || isYarnCluster) { 3. childMainClass = args.mainClass 4. if (isUserJar(args.primaryResource)) { 5. childClasspath += args.primaryResource 6. } 7. if (args.jars != null) { childClasspath ++= args.jars.split(",") } 8. } 9. 10. if (deployMode == CLIENT) { 11. if (args.childArgs != null) { childArgs ++= args.childArgs } 12. }
(2)当集群管理器为STANDALONE、部署模式为CLUSTER时,根据提交的两种方式将childMainClass分别设置为不同的类,同时将传入的args.mainClass(提交应用程序时设置的主类)及其参数根据不同集群管理器与部署模式进行转换,并封装到新的主类所需的参数中。对应的设置见表6-3。
表6-3 STANDALONE+CLUSTER时两种不同提交方式下的childMainClass封装
其中,表述性状态传递(Representational State Transfer,REST)是Roy Fielding博士在2000年他的博士论文中提出来的一种软件架构风格。
这些设置的主类相当于封装了应用程序提交时的主类,运行后负责向Master节点申请启动提交的应用程序。
(3)当集群管理器为YARN、部署模式为CLUSTER时,childMainClass以及对应的mainClass的设置见表6-4。
表6-4 YARN+CLUSTER时childMainClass下的childMainClass封装
(4)当集群管理器为MESOS、部署模式为CLUSTER时,childMainClass以及对应的mainClass的设置见表6-5。
表6-5 MESOS+CLUSTER时childMainClass下的childMainClass封装
从上面的分析中可以看到,使用CLIENT部署模式进行提交时,由于设置的childMainClass为应用程序提交时的主类,因此是直接在提交点执行设置的主类,即mainClass,当使用CLUSTER部署模式进行提交时,则会根据具体集群管理器等信息,使用相应的封装类。这些封装类会向集群申请提交应用程序的请求,然后在由集群调度分配得到的节点上,启动所申请的应用程序。
以封装类设置为org.apache.spark.deploy.Client为例,从该类主入口main方法查看,可以看到构建了一个ClientEndpoint实例,该实例构建时,会将提交应用程序时设置的mainClass等信息封装到DriverDescription实例中,然后发送到Master,申请执行用户提交的应用程序。
对应各种集群管理器与部署模式的组合,实际代码中的处理细节非常多。这里仅给出一种源码阅读的方式,和对应的大数据处理一样,通常采用化繁为简的方式去阅读复杂的源码。例如,这里在理解整个大框架的调用过程后,以childMainClass的设置作为主线去解读源码,对应地,在扩展阅读其他源码时,也可以采用这种方式,以某种集群管理器与部署模式为主线,详细阅读相关的代码。最后,在了解各种组合的处理细节之后,通过对比、抽象等方法,对整个SparkSubmit进行归纳总结。
提交的应用程序的驱动程序(Driver Program)部分对应包含了一个SparkContext实例。因此,接下来从该实例出发,解析驱动程序在不同的集群管理器的部署细节。
4.SparkContext解析
在详细解析SparkContext实例前,首先查看一下SparkContext类的注释部分,具体如下所示。
1. /** 2. * Spark功能的主入口点。一个SparkContext代表连接到Spark集群,并可用于在集群中 * 创建RDDs、累加器和广播变量 3. * ...... 4. * @param 描述应用程序配置的配置对象。在该配置的任何设置将覆盖默认的配置以及系统属性 5. */
SparkContext类是Spark功能的主入口点。一个SparkContext实例代表了与一个Spark集群的连接,并且通过该实例,可以在集群中构建RDDs、累加器以及广播变量。SparkContext实例的构建参数config描述了应用程序的Spark配置。在该参数中指定的配置属性会覆盖默认的配置属性以及系统属性。
在SparkContext类文件中定义了一个描述集群管理器类型的单例对象SparkMasterRegex,在该对象中详细给出了当前Spark支持的各种集群管理器类型。
SparkContext.scala的源码如下。
1. /** 2. * 定义了从Master信息中抽取集群管理器类型的一个正则表达式集合 3. * 4. */ 5. private object SparkMasterRegex { 6. //对应Master格式如local[N] 和local[*]的正则表达式 7. //对应的Master格式如local[N]和local[*]的正则表达式 8. val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r 9. 10. //对应的Master格式如local[N, maxRetries]的正则表达式 11. //这种集群管理器类型用于具有任务失败尝试功能的测试 12. 13. val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r 14. 15. //一种模拟Spark集群的本地模式的正则表达式,对应的Master格式如local-cluster[N, 16. //cores, memory] 17. 18. LOCAL_CLUSTER_REGEX = """local-cluster\ [\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s* ([0-9]+)\s*]""".r 19. 20. //连接Spark部署集群的正则表达式 21. 22. val SPARK_REGEX = """spark://(.*)""".r 23. }
在SparkContext类中的主要流程可以归纳如下:
(1)createSparkEnv:创建Spark的执行环境对应的SparkEnv实例。
对应代码如下所示。
1. //Create the Spark execution environment (cache, map output tracker, //etc) 2. _env = createSparkEnv(_conf, isLocal, listenerBus) 3. SparkEnv.set(_env)
(2)createTaskScheduler:创建作业调度器实例。
对应代码如下所示。
1. //创建和启动调度器scheduler 2. val (sched, ts) = SparkContext.createTaskScheduler(this, master) 3. _schedulerBackend = sched 4. _taskScheduler = ts
其中,TaskScheduler是低层次的任务调度器,负责任务的调度。通过该接口提供可插拔的任务调度器。每个TaskScheduler负责调度一个SparkContext实例中的任务,负责调度上层DAG调度器中每个Stage提交的任务集(TaskSet),并将这些任务提交到集群中运行,在任务提交执行时,可以使用失败重试机制设置失败重试的次数。上述对应高层的DAG调度器的实例构建参见下一步。
(3)new DAGScheduler:创建高层Stage调度的DAG调度器实例。
对应代码如下。
1. _dagScheduler = new DAGScheduler(this)
DAGScheduler是高层调度模块,负责作业(Job)的Stage拆分,以及最终将Stage对应的任务集提交到低层次的任务调度器上。
下面基于这些主要流程,针对SparkMasterRegex单例对象中给出的各种集群部署模式进行解析。对应不同集群模式,这些流程中构建了包括TaskScheduler与SchedulerBackend的不同的具体子类,所构建的相关实例具体见表6-6。
表6-6 各种情况下TaskScheduler与SchedulerBackend的不同的具体子类
与TaskScheduler和SchedulerBackend不同的是,在不同集群模式中,应用程序的高层调度器DAGScheduler的实例是相同的,即对应在Spark on YARN与Mesos等集群管理器中,应用程序内部的高层Stage调度是相同的。