11.3 Spark与Job Server整合原理与实战

本节讲解Spark与Job Server整合原理及Spark与Job Server整合实战。

11.3.1 Spark与Job Server整合原理

Spark-jobserver提供了一个RESTful接口来提交和管理spark的jobs、jars和job contexts。Spark-jobserver项目包含了完整的Spark job server的项目,包括单元测试和项目部署脚本。

Spark-jobserver的特性如下。

 “Spark as Service”:针对job和contexts的各个方面提供了REST风格的api接口进行管理。

 支持SparkSQL、Hive、Streaming Contexts/jobs以及定制job contexts。

 通过集成Apache Shiro来支持LDAP权限验证。

 通过长期运行的job contexts支持亚秒级别低延迟的任务。

 可以通过结束context停止运行的作业(job)。

 分割jar上传步骤,以提高job的启动。

 异步和同步的job API,其中同步API对低延时作业非常有效。

 支持Standalone Spark和Mesos、yarn。

 Job和jar信息通过一个可插拔的DAO接口来持久化。

 对RDD或DataFrame对象命名并缓存,通过该名称获取RDD或DataFrame,这样可以提高对象在作业间的共享和重用。

 支持Scala 2.10版本和2.11版本。

Spark-jobserver的部署如下。

(1)复制conf/local.sh.template文件到local.sh 。备注:如果需要编译不同版本的Spark,则须修改SPARK_VERSION属性。

(2)复制config/shiro.ini.template文件到shiro.ini。备注:仅需authentication = on时执行这一步。

(3)复制config/local.conf.template到<environment>.conf。

(4)bin/server_deploy.sh <environment>,这一步将job-server以及配置文件打包,并一同推送到配置的远程服务器上。

(5)在远程服务器上部署的文件目录下通过执行server_start.sh启动服务,如需关闭服务,可执行server_stop.sh。

Spark-jobserver的各种运行方式如下。

 Docker模式:尝试使用作业服务器预先打包Spark分发的Docker容器,允许启动并部署。

 本地模式:在SBT内以本地开发模式构建并运行Job Server 。注意:这不适用于YARN,实际上仅推荐spark.master设置为local[*]。

 集群模式:将作业服务器部署到集群,有两种部署方式:

 server_deploy.sh将作业服务器部署到远程主机上的目录。

 server_package.sh将作业服务器部署到本地的目录,为Mesos或YARN部署创建.tar.gz。

 EC2部署脚本:按照EC2中的说明,使用作业服务器和示例应用程序启动Spark群集。

 EMR部署指令:按照EMR中的说明进行操作。

11.3.2 Spark与Job Server整合实战

本节根据Spark-jobserver本地模式,在SBT内以本地开发模式构建并运行Job Server。

Spark与Job Server本地模式的整合步骤如下。

1.Spark-jobserver本地模式服务的启动

(1)Linux系统中要先安装SBT。设置当前版本。

1.  export VER=`sbt version | tail -1 | cut -f2`

(2)在Linux系统中下载安装Spark-jobserver。Spark-jobserver的下载地址为https://github. com/spark-jobserver/spark-jobserver#users。

(3)进入spark-jobserver-master的安装目录,在Linux系统提示符下输入sbt,在SBT shell中键入reStart,使用默认配置文件启动Spark-jobserver服务。可选参数是替代配置文件的路径,还可以在“---”之后指定JVM参数。包括所有选项如下所示:

1.  job-server-extras/reStart /path/to/my.conf --- -Xmx8g

(4)Spark-jobserver服务启动测试验证:在浏览器中打开Url地址:http://localhost:8090,将显示Spark Job Server UI的Web页面。

2.Spark-jobserver的示例WordCountExample

(1)首先,将WordCountExample代码(WordCountExample代码功能是进行单词计数:统计输入字符串中每个单词出现的次数)打成Jar包:sbt job-server-tests/package,然后上传jar包到作业服务器。

1.  curl--data-binary@job-server-tests/target/scala-2.10/job-server-  tests-
    $VER.jar localhost:8090/jars/test

(2)上述job-server-tests-$VER.jar的jar包作为应用程序test上传服务器。接下来,开始进行单词计数作业,作业服务器将创建自己的SparkContext,并返回一个作业ID,用于后续查询。

1.   curl -d "input.string = a b c a b see" "localhost:8090/jobs?appName=
     test&classPath=spark.jobserver.WordCountExample"
2.  {
3.    "duration": "Job not done yet",
4.    "classPath": "spark.jobserver.WordCountExample",
5.    "startTime": "2016-06-19T16:27:12.196+05:30",
6.    "context": "b7ea0eb5-spark.jobserver.WordCountExample",
7.    "status": "STARTED",
8.    "jobId": "5453779a-f004-45fc-a11d-a39dae0f9bf4"
9.  }

在input.string参数中传入字符串"a b c a b see",将字符串"a b c a b see"提交给job-server服务器的test应用(即之前我们上传的WordCountExample服务),然后启动服务,应用程序的jobId号是5453779a-f004-45fc-a11d-a39dae0f9bf4。

(3)根据jobId号查询job-server服务器作业的计算结果。在curl语句中输入的jobs查询参数就是上述应用程序的jobId号,是5453779a-f004-45fc-a11d-a39dae0f9bf4。

1.    curl localhost:8090/jobs/5453779a-f004-45fc-a11d-a39dae0f9bf4
2.  {
3.    "duration": "6.341 secs",
4.    "classPath": "spark.jobserver.WordCountExample",
5.    "startTime": "2015-10-16T03:17:03.127Z",
6.    "context": "b7ea0eb5-spark.jobserver.WordCountExample",
7.    "result": {
8.      "a": 2,
9.      "b": 2,
10.     "c": 1,
11.     "see": 1
12.   },
13.   "status": "FINISHED",
14.   "jobId": "5453779a-f004-45fc-a11d-a39dae0f9bf4"
15. }

从job-server服务器返回计算结果为"a": 2次, "b": 2次,"c": 1次,"see": 1次,计算结果准确。上述是异步模式获取计算结果,须根据JobID号查询结果。如果Spark分布式计算数据量不是很大,我们也可以在curl语句中配置参数&sync=true,在POST到job-server服务请求时同步返回结果。

1.  curl -d "input.string = a b c a b see" "localhost:8090/jobs?appName=
    test&classPath=spark.jobserver.WordCountExample&sync=true"

Job Server的更多内容,读者可以登录Job Server的github网站(https://github.com/ spark-jobserver/spark-jobserver#users)进行学习。