- Spark大数据商业实战三部曲:内核解密|商业案例|性能调优
- 王家林
- 3564字
- 2021-03-30 21:55:48
2.4 Spark 2.2 Streaming
Spark 2.0为我们带来了一个新的流处理框架Structured Streaming,这是一个基于Spark SQL和Catalyst优化器构建的高级流API。它允许用户使用与操作静态数据的DataFrame/Dataset API对流数据进行编程,利用Catalyst优化器自动地增量化查询计划,并且它不但支持流数据的不断写入,还支持其他的静态数据的插入。
Apache Spark 2.2.0版本中Structured Streaming的更新:
1.整体可用性
SPARK-20844:结构化流式API现在已经是可行的,不再被标注为实验性。
2.Kafka提升
SPARK-19719:支持从Apache Kafka流式传输或批量读取和写入数据。
SPARK-19968:低延迟kafka到kafka流的缓存生产者。
3.API更新
SPARK-19067:支持复杂的状态处理和[flat]MapGroupsWithState的超时处理。
SPARK-19876:支持一次触发。
4.其他变化
SPARK-20979:用于测试和基准测试的速率源。
2.4.1 Structured Streaming
Structured Streaming会通过Checkpoint(检查点)机制和预写日志的方式来确保对流数据的语义一致性exactly-once处理,让整个处理过程更加可靠。
在Spark 2.2版本中增加了对kafka 0.10的支持,为记录添加了可见的基于事件时间水印的延迟,并且现在Structured Streaming已经支持对所有格式的文件操作。
到Spark 2.1版本为止,Structured Streaming还处于试验阶段,还需几个版本的迭代,才能稳定到可以在生产环境下使用的程度。
下面结合官方文档了解一下Structured Streaming。
首先写一个程序来监听网络端口发来的内容,然后进行WordCount。
第一步:创建程序入口SparkSession,并引入spark.implicits来允许Scalaobject隐式转换为DataFrame。
1. val spark = SparkSession.builder 2. .appName("StructuredNetworkCount").getOrCreate() 3. import spark.implicits._
第二步:创建流。配置从socket读取流数据,地址和端口为localhost:9999。
1. val lines = spark.readStream.format("socket") 2. .option("host","localhost").option("port","9999") 3. .load()
第三步:进行单词统计。这里lines是DataFrame,使用as[String]给它定义类型转换为DataSet。之后在DataSet里进行单词统计。
1. val words = lines.as[String].flatMap(_.split(" ")) 2. val wordcount = words.groupBy("values").count()
第四步:创建查询句柄,定义打印结果方式并启动程序。这里使用writeStream方法,输出模式为全部输出到控制台。
1. val query = wordcount.writeStream 2. .outputMode("complete").format("console").start() 3. //调用awaitTermination方法来防止程序在处理数据时停止 4. query.awaitTermination()
接下来运行该程序,在Linux命令窗口运行nc–lk 9999开启9999端口。然后在Spark的Home目录下提交程序,传入IP和端口号。
1. .bin/run-example 2. org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount 3. localhost 9999
程序启动之后,在前面的命令窗口中输入单词apache spark,程序会运行并打印出结果,如图2-1所示。
最后再输入hello spark,程序会再次运行并打印出如图2-2所示的结果。
图2-1 Spark Streaming运行示意图1
图2-2 Spark Streaming运行示意图2
可见,融合了DataSetAPI的流处理框架的程序代码十分简捷,并且执行效率也会比原来的SparkStreaming更高。
Structured Streaming的关键思想如图2-3所示:把数据流视作一张数据不断增加的表,这样用户就可以基于这张表进行数据处理,就好像使用批处理来处理静态数据一样,但实际上Spark底层是把新数据不断地增量添加到这张无界的表的下一行中。
图2-3 Structured Streaming的关键思想
Structured Streaming共有3种输出模式,这3种模式都只适用于某些类型的查询:
(1)CompleteMode:完整模式。整个更新的结果表将被写入外部存储器。由存储连接器决定如何处理整张表的写入。聚合操作以及聚合之后的排序操作支持这种模式。
(2)AppendMode:附加模式。只有自上次触发执行后在结果表中附加的新行会被写入外部存储器。这仅适用于结果表中的现有行不会更改的查询,如select、where、map、flatMap、filter、join等操作支持这种模式。
(3)UpdateMode:更新模式(这个模式将在以后的版本中实现)。只有自上次触发执行后在结果表中更新的行将被写入外部存储器(不输出未更改的行)。
2.4.2 增量输出模式
上面例子中使用的是CompleteMode,程序中接收数据的输入表是lines,它是一个DataFrame,新来的数据会被添加进去。之后的wordCounts是结果表。当程序启动时,Spark会不断检测是否有新数据加入到lines中,如果有新数据,则运行一个增量的查询,与上一次查询的结果合并,并且更新结果表,如图2-4所示。
图2-4 Spark Streaming示意图
这个看似非常简单的设计,背后的实现逻辑却并不简单,因为其他的流处理框架用户是需要自己推理怎么解决容错和数据一致性的问题,如经典的at-most-once、at-least-once、exactly-once问题,而在上面的CompleteMode下,Spark因为只在有新数据进来的时候才会更新结果,所以帮用户解决了这些问题。我们可以通过StructuredStreaming在基于event-time的操作和延迟数据的处理这两个问题上的解决方式来简单地了解背后的实现机制。
event-time是嵌入事件本身的时间,记录了事件发生的时间。很多时候我们需要用这个时间来实现业务逻辑,例如,我们要获取IOT设备每分钟产生的事件数量,则可能需要使用生成数据的时间,而不是Spark接收它们的时间。在这个模式下,event-time作为每行数据中的一列,可以用于基于时间窗口的聚合,也正因如此,基于event-time的窗口函数可以同样被定义在静态数据集上(如日志文件等)。
而关于容错方面,提供端到端的exactly-once语义是Structured Streaming的主要设计目标之一,要实现exactly-once,就要考虑数据源(sources)、执行引擎(execution)和存储(sinks)3个方面。Structured Streaming是这样实现的:假定每个数据源都有偏移量(与kafka的offsets类似)用来追溯数据在数据流中的位置;在执行引擎中会通过checkpoint(检查点)和WAL (writeaheadlogs预写日志)记录包括被处理的数据的偏移量范围在内的程序运行进度信息;在存储层设计成多次处理结果幂等,即处理多次结果相同。这样确保了Structured Streaming端到端exactly-once的语义一致性。
上面提到的event-time列带来的另一个好处是,可以很自然地处理“迟到”的数据(如没有按照event-time的时间顺序被Spark接收),因为更新旧的结果表时,可以完全控制更新和清除旧的聚合结果来限制处于中间状态的数据(窗口函数中,这些数据可以是有状态的)的大小。Spark 2.1引入的watermarking允许用户指定延迟数据的阈值,也允许引擎清除掉旧的状态。
先来看一下基础的窗口函数在Structured Streaming中的应用。例如,要对10min的单词进行计数,每5min统计一次,我们要把滑动窗口的大小设置为10min,每5min滑动一次。具体代码如下。
1. //假如输入的数据words格式是timestamp:Timestamp,word:String 2. val windowedCounts = words 3. .groupBy( 4. //设置窗口按照timestamp列为参照时间,10min为窗口大小,5min滑动一次,并且按照word //进行分组计数 5. window($"timestamp","10 minutes","5 minutes"),word 6. ).count
如图2-5所示,从12:00开始,12:05启动第一次查询,数据是12:00-12:10时间段的,当然,此时只有前5min的数据被统计进来。在12:10分时统计12:05-12:15时间段的数据(此时也是只有前5min的数据),并把上一个窗口的后5min的数据(即12:05-12:10时间段的数据)的统计结果合并到上一个窗口的结果中去,之后每次启动查询,都会把上一个窗口的查询结果补全,并把本窗口的前5min数据的统计结果记下来。
但是,我们可能会遇到这种情况:如图2-6所示,12:04产生的数据,一直到12:10之后才被程序接收到,此时数据依然会被正确地合并到对应的窗口中去,但是这样会导致查询结果长时间处于中间状态,而如果要长时间运行程序,就必须限制累积到内存中的中间状态结果的大小,这就要求系统知道什么时候清理这些中间状态的数据。
图2-5 Spark流处理示意图
图2-6 流处理Watermark数据示意图
在Spark 2.1中引入水印(watermarking),使得系统可以自动跟踪目前的event-time,并按照用户指定的时间清理旧的状态。使用方法如下。
1. val windowedCounts = words 2. .withWatermark("timestamp", "10 minutes") 3. .groupBy(window($"timestamp", "10 minutes", "5 minutes"),$"word") 4. .count()
这里设置时间为10min,在Append模式下,具体执行逻辑如图2-7所示。
图2-7中点线表示目前收到的数据的最大event-time,实线是水印(计算方法是运算截止到触发点时收到的数据最大的event-time减去latethreshold,也就是减去10),当水印时间小于窗口的结束时间时,计算的数据都被保留为中间数据,当水印时间大于窗口结束时间时,就把这个窗口的运算结果加入到结果表中去,之后即使再收到属于这个窗口的数据,也不再进行计算,而直接忽略掉。图中在12:15时刻最大的event-time为12:14,所以水印时间为12:04,小于第一个窗口的12:10,所以此时计算的数据处于中间状态,存放在内存中,在下一个触发点也就是12:20时,最大event-time为12:21,水印时间是12:11,大于12:10,此时认为12:00-12:10窗口的数据已经完全到达,把中间结果中属于这个窗口的数据写入到结果表中,并且之后不再对这个结果进行更新。
图2-7 Append逻辑示意图
需要注意的是,在Append模式下,系统在输出某个窗口的运行结果之前一定会根据设置等待延迟数据,这就意味着如果用户把延迟阈值设置为1天,那么在这一天内就无法看到这个窗口的(中间)结果。不过,在以后的版本中,Spark会加入Update模式来解决这一问题。
Structured Streaming目前有4种输出数据的处理方式:
(1)writeStream.format("parquet").start(),输出到文件中,Append模式支持这种方式。这种方式自带容错机制。
(2)writeStream.format("console").start(),输出到控制台,用于调试。Append模式和Complete模式支持这种方式。
(3)writeStream.format("memory").queryName("table").start(),以table的形式输出到内存,可以在之后的程序中使用spark.sql(select * from table).show来对结果进行处理。这种方式同样用于调试。Append模式和Complete模式支持这种方式。
(4)writeStream.foreach(…).start(),对输出结果进行进一步处理。要使用它,用户必须传入一个实现ForeachWriter接口的writer类,这个类必须是可序列化的,因为稍后会把结果发送到不同的executor进行分布式计算。这个接口有3个方法要实现,即open、process、close。
open方法:open方法有两个入参:version、partitionId,其中partitionId是分区ID,version是重复数据删除的唯一ID标识。open方法用于处理Executor分区的初始化(如打开连接启动事物等)。version用于数据失败时重复数据的删除,当从失败中恢复时,一些数据可能生成多次,但它们具有相同的版本。如果此方法发现使用的partitionId和version这个分区已经处理后,可以返回false,以跳过进一步的数据处理,但close仍然将被要求清理资源。
process方法:调用此方法处理Executor中的数据。此方法只在open时调用,返回true。
close方法:停止执行Executor一个分区中的新数据时调用,保证被调用open时返回true,或者返回false。然而,在下列情况下,close不被调用:JVM崩溃,没有抛出异常Throwable;open方法抛出异常Throwable。
最后要说明的是,为了程序在重启之后可以接着上次的执行结果继续执行,需要设置检查点,方式如下:
1. aggDF.writeStream.outputMode("complete") 2. .option("checkpointLocation", "path/to/HDFS/dir") 3. .format("memory").start()