- Hadoop构建数据仓库实践
- 王雪迎
- 2705字
- 2020-11-28 16:11:55
3.4 Hadoop生态圈的其他组件
Hadoop诞生之初只有HDFS和MapReduce两个软件组件,以后得到非常快速的发展,开发人员贡献了众多组件,以至形成了Hadoop自己的生态圈。如图3-12所示,除MapReduce外,目前已经存在Spark、Tez等分布式处理引擎。生态圈中还有一系列迁移数据、管理集群的辅助工具。
图3-12 Hadoop生态圈
这些产品貌似各不相同,但是三种共同的特征把它们紧密联系起来。首先,它们都依赖于Hadoop的基本组件——YARN、HDFS或MapReduce。其次,它们都用来处理大数据,并提供建立端到端数据流水线所需的各种功能。最后,它们对于应该如何建立分布式系统的理念是共通的。
本书后面章节会详细介绍Sqoop、Hive、Oozie、Impala、Hue等组件,并使用一个简单的实例说明如何利用这些组件实现ETL、定时自动执行工作流、数据分析、数据可视化等完整的数据仓库功能。这里介绍另外一种分布式计算框架——Spark。
Apache Spark是一个开源的集群计算框架。它最初由加州大学伯克利分校的AMP实验室开发,后来Spark的源代码捐献给了Apache软件基金会,从此成了一个活跃的Apache项目。Spark提供了一套完整的集群编程接口,内含容错和并行数据处理能力。
Spark基本的数据结构叫做弹性分布式数据集(Resilient Distributed Datasets,简称RDD)。这是一个分布于集群节点的只读数据集合,并以容错的、并行的方式进行维护。传统的MapReduce框架强制在分布式编程中使用一种特定的线性数据流处理方式。MapReduce程序从磁盘读取输入数据,把数据分解成键/值对,经过混洗、排序、归并等数据处理后产生输出,并将最终结果保存在磁盘。Map阶段和Reduce阶段的结果均要写磁盘,这大大降低了系统性能。也是由于这个原因,MapReduce大都被用于执行批处理任务。为了解决MapReduce的性能问题,Spark使用RDD作为分布式程序的工作集合,它提供一种分布式共享内存的受限形式。在分布式共享内存系统中,应用可以向全局地址空间的任意位置进行读写操作,而RDD是只读的,对其只能进行创建、转化和求值等操作。
利用RDD可以方便地实现迭代算法,简单地说就是能够在一个循环中多次访问数据集合。RDD还适合探索式的数据分析,能够对数据重复执行类似于数据库风格的查询。相对于MapReduce的实现,Spark应用的延迟可以降低几个数量级,其中最为经典的迭代算法是用于机器学习系统的培训算法,这也是开发Spark的初衷。
Spark需要一个集群管理器和一个分布式存储系统作为支撑。对于集群管理,Spark支持独立管理(原生的Spark集群),HadoopYARN和Apache Mesos。对于分布式存储,Spark可以与多种系统对接,包括HDFS、MapR文件系统、Cassandra、OpenStack Swift、Amazon S3、Kudu,或者一个用户自己实现的文件系统。Spark还支持伪分布的本地部署模式,但通常仅用于开发和测试目的。本地模式不需要分布式存储,而是用本地文件系统代替。在这种场景中,Spark运行在一个机器上,每个CPU核是一个执行器(executor)。
Spark框架含有SparkCore、SparkSQL、SparkStreaming、MLlib Machine Learning Library、GraphX等几个主要组件。
1. Spark Core
SparkCore是所有Spark相关组件的基础。它以RDD这个抽象概念为核心,通过一组应用程序接口,提供分布式任务的分发、调度和基本的I/O功能。SparkCore的编程接口支持Java、Python、Scala和R等程序语言。这组接口使用的是函数式编程模式,即一个包含对RDD进行map、filter、reduce、join等并行操作的驱动程序,向Spark传递一个函数,然后Spark调度此函数在集群上并行执行。这些基本操作把RDD作为输入并产生新的RDD。RDD自身是一个不变的数据集,对RDD的所有转换操作都是lazy模式,即Spark不会立刻计算结果,而只是简单地记住所有对数据集的转换操作。这些转换只有遇到action操作的时候才会开始真正执行,这样的设计使Spark更加高效。容错功能是通过跟踪每个RDD的“血统”(lineage,指的是产生此RDD的一系列操作)实现的。一旦RDD的数据丢失,还可以使用血统进行重建。RDD可以由任意类型的Python、Java或Scala对象构成。除了面向函数的编程风格,Spark还有两种形式的共享变量:broadcast和accumulators。broadcast变量引用的是需要在所有节点上有效的只读数据,accumulators可以简便地对各节点返回给驱动程序的值进行聚合。
一个典型的Spark函数式编程的例子是,统计文本文件中每个单词出现的次数,也就是常说的词频统计。在下面这段Scala程序代码中,每个flatMap函数以一个空格作为分隔符,将文件分解为由单词组成的列表,map函数将每个单词列表条目转化为一个以单词为键,数字1为值的RDD对,reduceByKey函数对所有的单词进行计数。每个函数调用都将一个RDD转化为一个新的RDD。对比相同功能的Java代码,Scala语言的简洁性一目了然。
// 将一个本地文本文件读取到(文件名,文件内容)的RDD对。 val data = sc.textFile("file:///home/mysql/mysql-5.6.14/README") // 以一个空格作为分隔符,将文件分解成一个由单词组成的列表。 val words = data.flatMap(_.split(" ")) // 为每个单词添加计数,并进行聚合计算 val wordFreq = words.map((_, 1)).reduceByKey(_ + _) // 取得出现次数最多的10个单词 wordFreq.sortBy(s => -s._2).map(x => (x._2, x._1)).top(10)
在spark-shell(spark-shell是spark自带的一个快速原型开发的命令行工具)里,这段代码执行结果如下。可以看到the出现的次数最多,有26次。
scala> val data = sc.textFile("file:///home/mysql/mysql-5.6.14/README") data: org.apache.spark.rdd.RDD[String] = file:///home/mysql/mysql-5.6.14/README MapPartitionsRDD[13] at textFile at <console>:27 scala> val words = data.flatMap(_.split(" ")) words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at flatMap at <console>:29 scala> val wordFreq = words.map((_, 1)).reduceByKey(_ + _) wordFreq: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[16] at reduceByKey at <console>:31 scala> wordFreq.sortBy(s => -s._2).map(x => (x._2, x._1)).top(10) res1: Array[(Int, String)] = Array((26, the), (15, ""), (14, of), (9, MySQL), (7, to), (7, is), (6, version), (6, or), (6, in), (6, a))
2. Spark SQL
SparkSQL是基于SparkCore之上的一个组件,它引入了名为DataFrames的数据抽象。DataFrames能够支持结构化、半结构化数据。SparkSQL提供了一种“领域特定语言”(Domain-Specific Language,简称DSL),用于在Scala、Java或Python中操纵DataFrames。同时SparkSQL也通过命令行接口或ODBC/JDBC提供对SQL语言的支持。我们将在12.3节详细讨论SparkSQL。下面是一段Scala里的SparkSQL代码。
val url = "jdbc:mysql://127.0.0.1/information_schema? user=root&password=xxxxxx" val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.format("jdbc").option("url", url).option("db table", " tables").load() df.printSchema() val countsByDatabase = df.groupBy(" table_SCHEMA").count().show()
这段代码用SparkSQL连接本地的MySQL数据库,屏幕打印information_schema. tables的表结构,并按table_schema字段分组,计算并显示每组的记录数。其功能基本等价于下面的MySQL语句:
use information_schema; desc tables; select table_schema, count(*) from tables group by table_schema;
执行代码前先要在spark-env.sh文件的SPARK_CLASSPATH变量中添加MySQL JDBC驱动的JAR包,例如:
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/mysql-connector-java-5.1.31-bin.jar
然后进入spark-shell执行代码,最后一条语句显示的输出如下所示:
scala> val countsByDatabase = df.groupBy(" table_SCHEMA").count().show() +------------------+-----+ | table_SCHEMA|count| +------------------+-----+ |performance_schema| 52| | hadoop| 37| |information_schema| 59| | mysql| 28| +------------------+-----+ countsByDatabase: Unit = ()
3. Spark Streaming
SparkStreaming利用SparkCore的快速调度能力执行流数据的分析。它以最小批次获取数据,并对批次上的数据执行RDD转化。这样的设计,可以让用于批处理分析的Spark应用程序代码也可以用于流数据分析,因此便于实时大数据处理架构的实现。但是这种便利性带来的问题是处理最小批次数据的延时。其他流数据处理引擎,例如Storm和Flink的streaming组件,都是以事件而不是最小批次为单位处理流数据的。SparkStreaming支持从Kafka、Flume、Twitter、ZeroMQ、Kinesis和TCP/IP sockets接收数据。
4. MLlib Machine Learning Library
Spark中还包含一个机器学习程序库,叫做MLlib。MLlib提供了很多机器学习算法,包括分类、回归、聚类、协同过滤等,还支持模型评估、数据导入等额外的功能。MLlib还提供了一些更底层的机器学习原语,如一个通用的梯度下降算法等。所有这些方法都被设计为可以在集群上轻松伸缩的架构。
5. GraphX
GraphX是Spark上的图(如社交网络的朋友关系图)处理框架。可以进行并行的图计算。与SparkStreaming和SparkSQL类似,GraphX也扩展了Spark的RDDAPI,能用来创建一个顶点和边都包含任意属性的有向图。GraphX还支持针对图的各种操作,比如进行图分割的subgraph和操作所有顶点的mapVertices,以及一些常用的图算法,如PageRank和三角计算等。由于RDD是只读的,因此GraphX不适合需要更新图的场景。