2.3 运行Spark应用程序

运行Spark应用程序主要包括Local模式运行、Standalone模式运行、YARN模式运行、Mesos模式运行(参考官方文档)。

2.3.1 Local模式运行Spark应用程序

Local模式运行Spark应用程序是最简单的方式,以计算圆周率的程序为例,进入安装主目录,如spark-1.5.0,执行命令:

            # 提交Spark任务的入口
            ./bin/spark-submit \
            #主程序设置本地,local[*],其中*是指设置线程数
            --master local[10] \
          #选择主类名称
          --class org.apache.spark.examples.SparkPi \
          #examples Jar包
          /home/hadoop/spark-1.5.0/lib/spark-examples-1.5.0-hadoop2.3.0.jar

执行结果如下:

          *** INFO spark.SparkContext: Starting job: reduce at SparkPi.scala:35
          ……
          *** INFO spark.SparkContext: Job finished: reduce at SparkPi.scala:35, took 0.751093979 s
          Pi is roughly 3.14826 # 计算出Pi值,表明运行成功。

2.3.2 Standalone模式运行Spark应用程序

Spark独立模式下应用程序的运行以及资源调度、监控和日志等内容会在接下来做简单的介绍。

1. spark-shell运行应用程序

在Spark集群上运行应用程序,需要将Master的Spark://IP:PORT传递给SparkContext构造函数。

在集群上运行交互式的Spark命令spark-shell,该命令将会用spark-env.sh中的SPARK_MASTER_IP和SPARK_MASTER_PORT自动设置Master。

./bin/spark-shell --master spark:// IP:PORT

另外,还可以通过传递一个--cores<numCore>来控制spark-shell在集群上使用的核心的数目。

2. spark-submit启动应用程序

spark-submit脚本提供了一种向集群提交Spark应用程序的最直接的方法,对于一个独立部署模式的集群,Spark目前支持Client部署模式,即在提交应用的客户端进程中部署Driver。

如果应用程序通过spark-submit启动,那么应用程序的Jar包将会自动地分配给所有的Worker节点;对于任何其他运行应用程序时需要依赖的Jar包,可以通过-jar声明,Jar包名之间用逗号隔开。

以SparkPi为例,提交应用程序命令如下:

          ./bin/spark-submit \
              --class org.apache.spark.examples.SparkPi \
              --master spark://$YOUR_MASTER_IP:7077 \
              --executor-memory 2G \
              --total-executor-cores 2 \
          $YOUR_SPARK_HOME/lib/spark-examples-1.5.0-hadoop2.3.0.jar

其中,spark-submit为提交Spark任务的脚本,--class org.apache.spark.examples.SparkPi为Spark项目包中指定运行的类的全路径。--master spark://$YOUR_MASTER_IP:7077为主节点的路径和端口号。$YOUR_SPARK_HOME/lib/spark-examples-1.5.0-hadoop2.3.0.jar是Spark项目程序包的路径。

直接运行即可验证Spark在Standalone模式下的计算圆周率的程序。如果启动上述应用程序成功,则执行结果如下:

            *** INFO client.AppClient$ClientActor: Connecting to master spark://$YOUR_MASTER_
            IP:7077...
            *** INFO spark.SparkContext: Starting job: reduce at SparkPi.scala:35
            ……
            *** INFO spark.SparkContext: Job finished: reduce at SparkPi.scala:35, took
            15.530349566 s
            Pi is roughly 3.14058

当出现Pi is roughly 3.14058,表明计算成功。

3.资源调度

独立部署模式支持FIFO作业调度策略。不过,为了允许多并发执行,你可以控制每一个应用可获得的资源最大值。默认情况下,如果集群中一次只运行一个应用程序,它就会获得所有CPU核。你可以在SparkConf中设置spark.cores.max来配置获得最多内核的数量,示例如下所示:

            val conf = new SparkConf()
                      .setMaster(...)
                      .setAppName(...)
                      .set("spark.cores.max", "10")
            val sc = new SparkContext(conf)

除此之外,还可以通过配置集群上的spark.deploy.defaultCores来改变应用程序使用的默认值,而不必设置spark.cores.max,需要在spark-env.sh中加入以下内容:

export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"

当用户不会在共享集群上独立配置CPU核数最大值的时候,这显得特别重要。

4.监控和日志

Spark独立部署模式提供了一个基于Web的用户接口,监控集群运行状况。Master和每一个Worker都会有一个WebUI来显示集群的统计信息。默认情况下,可以通过8080端口访问Master的WebUI。

另外,每个Slave节点上作业运行的日志也会详细地记录到默认的$SPARK_HOME/work目录下每个作业会对应两个文件:stdout和stderr,包含了控制台上所有的历史输出。

如图2-2所示,本书所用数据平台是通过$YOUR_SPARK_MASTER_IP:8080访问集群统计信息情况。

图2-2 Spark集群统计信息

2.3.3 YARN模式运行Spark

Spark 0.6版本以后,加入了对在Hadoop YARN上运行的支持,并在之后发布版本中不断演进,逐渐发展成Spark应用程序运行的主要模式。

Hadoop与Spark部署完毕后,即可在YARN上运行Spark程序。其任务提交的方式与独立模式类似,只是工作原理有一些不同。

(1)在YARN上启动Spark应用

在YARN上启动Spark应用有两种模式:yarn-cluster模式和yarn-client模式。

在yarn-cluster模式下,框架会在集群中启动的Driver程序;

在yarn-client模式中,框架会在client端启动Driver程序。在YARN中,Resourcemanager的地址是从Hadoop配置中选取的。因此,master参数可以简单配置为yarn-client或yarn-cluster。

要在yarn-cluster模式下调用Spark应用,示例如下:

          ./bin/spark-submit \
          --class path.to.your.Class \
          --master yarn-cluster [options] <app jar> [app options]

以SparkPi为例,提交应用程序命令如下:

          $ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
              --master yarn-cluster \
              --num-executors 3 \
              --driver-memory 4g \
              --executor-memory 2g \
            --executor-cores 2 \
            $YOUR_SPARK_HOME/lib/spark-examples-1.5.0-hadoop2.3.0.jar \
            10

首先启动一个Application Master的客户程序,之后SparkPi将作为Application Master的一个子进程运行。

Client会周期性地检查Application Master以获得状态更新,当应用程序运行结束时,Client会立刻退出。

要以yarn-client模式启动一个Spark应用,操作相同,用“yarn-client”替换“yarn-cluster”即可。

在YARN模式下,通过spark-submit提交任务应用程序,示例如下:

            ./bin/spark-submit \
                --class org.apache.spark.examples.SparkPi \
                --master yarn-client \
                --executor-memory 2G \
                --num-executors 3 \
            $YOUR_SPARK_HOME/lib/spark-examples-1.5.0-hadoop2.3.0.jar

直接运行上述程序即可测试Spark在YARN模式下计算圆周率的程序。

(2)添加其他的Jar包

在yarn-cluster模式下,Driver和Client运行于不同的机器,SparkContext.addJar不会作用于Client本地文件。要使SparkContext.addJar可使用Client的本地文件,在启动指令中的--jars选项中加入文件。

            $ ./bin/spark-submit
            --class my.main.Class \
            --master yarn-cluster \
            --jars my-other-jar.jar,my-other-other-jar.jar
            my-main-jar.jar app_arg1 app_arg2

(3)监控和日志

程序运行完毕后,如果Hadoop集群配置完毕,那么在Hadoop集群Cluster页面上,也就是在http://$YOUR_MASTER_IP:8088/cluster/apps/FINISHED路径下可以看到执行完成的Spark程序及其输出日志,或者在http://$YOUR_MASTER_IP:8080/路径下可以看到执行的Spark任务。

如图2-3所示,本书所用数据平台是通过$YOUR_MASTER_IP:8088访问集群统计信息情况。

图2-3 Spark YARN模式任务统计信息

2.3.4 应用程序提交和参数传递

提交任务是Spark运行的起步,理解任务提交时不同参数的含义十分重要。

1.应用程序提交过程

Spark应用程序在集群上以独立进程集合的形式运行,接受用户Driver程序中main函数SparkContext对象的协调。

确切来说,当任务提交到集群上,SparkContext对象可以与多种集群管理器(Standalone部署、Mesos、YARN模式)连接,这些集群管理器负责为所有应用分配资源。

一旦连接建立,Spark可以在集群的节点上获得Executor,这些Executor进程负责执行计算和为应用程序存储数据。

接下来,Spark会将应用程序代码(传递给SparkContext的Jar或Python文件)发送给Executor。

最后,SparkContext将任务发送至Executor执行,图2-4所示为Spark任务提交流程。

图2-4 Spark任务提交流程图

执行过程中,需要注意以下情况:

1)每个应用程序都会获得自己拥有的Executor进程,这些进程的生命周期持续在整个应用运行期间,并以多线程的方式执行任务。无论是在Driver驱动程序,还是Executor执行进程,都能很好地隔离应用程序,如果没有将数据写到外部存储,该数据就不能被其他Spark应用共享。

2)Spark对集群管理器是不可知的,只要Spark能够获取Executor进程,并且这些Executor之间可以相互通信,Spark不关注集群管理器是否支持其他的业务,如YARN同时支持MapReduce程序的运行。

3)因为Driver负责与集群上的任务交互,所以Driver应该运行于距离Worker节点近的地方,最好在同一个本地局域网之中。如果想要给集群发送远程请求,那么请为Driver安装RPC协议并从附近提交,而不是在远离Worker节点的地方运行Driver。

2.配置参数传递

在SparkConf中显式配置的参数具有最高优先级,然后是传递给spark-submit的标志位,最后是默认属性配置文件中的值。

如果没有在SparkConf中显示配置,又不想在spark-submit中提交,可以通过默认属性配置文件获取值。spark-submit脚本可以从属性文件中加载Spark配置值,并将它们传递给应用程序。

默认情况下,spark-submit脚本将会从Spark目录下的conf/spark-defaults.conf中读取配置选项。例如,如果Spark的Master属性已经设置,spark-submit提交时可以省略--master标志。

如果应用程序代码依赖于其他的项目,需要将它们与应用程序一起打包,以便应用程序的代码可以分发到Spark集群之上。为此,需要创建一个包含应用程序代码和其依赖的Jar包(当创建Jar包时,需要将Spark和Hadoop作为提供依赖项;这两者不需要被打入包中,在运行时由集群管理器提供加载)。

一旦拥有Jar包,就可以在调用bin/spark-submit脚本时传递Jar包。应用程序的Jar包和所有在--jars选项中的Jar包都会被自动地传递到集群。Spark使用下面的URL支持按不同的策略散播的Jar包。

□ f ile:Driver的HTTP文件服务器提供了绝对路径和f ile:// URI,所有的Executor节点从Driver的HTTP服务器处获得文件。

□ hdfs:从HDFS的URI获取文件和Jar包。

□ local以每个Worker节点相同的local:/开头的URI文件。

注意

在Executor节点上的每个SparkContext对象的Jar包和文件会被复制到工作目录下。这可能会占用大量的空间,并且需要被及时清除。在YARN集群上,清除是自动执行的。在Standalone模式部署集群上,自动清除可以通过spark.worker.cleamup. appDataTtl属性配置。

当在yarn-cluster模式下对本地文件使用--jars选项时,该选项允许你使用SparkContext. addJar函数。若对于HDFS、HTTP、HTTPS或FTP文件,则不必使用。