9.2 Spark中checkpoint原理和源码详解

本节对Spark中checkpoint原理及Spark中checkpoint源码进行详解。

9.2.1 Spark中checkpoint原理详解

checkpoint到底是什么?

(1)Spark在生产环境下经常会面临Tranformations的RDD非常多(例如,一个Job中包含10 000个RDD)或者具体Tranformation产生的RDD本身计算特别复杂和耗时(例如,计算时常超过1h),此时我们必须考虑对计算结果数据的持久化。

(2)Spark擅长多步骤迭代,同时擅长基于Job的复用,这时如果能够对曾经计算的过程产生的数据进行复用,就可以极大地提升效率。

(3)如果采用persist把数据放在内存中,虽然是最快速的,但是也是最不可靠的。如果放在磁盘上,也不是完全可靠的。例如,磁盘会损坏,管理员可能清空磁盘等。

(4)checkpoint的产生就是为了相对更加可靠地持久化数据,checkpoint可以指定把数据放在本地并且是多副本的方式,但是在正常的生产情况下是放在HDFS,这就自然地借助HDFS高容错、高可靠的特征完成了最大化的、可靠的持久化数据的方式。

(5)为确保RDD复用计算的可靠性,checkpoint把数据持久化到HDFS中,保证数据最大程度的安全性。

(6)checkpoint就是针对整个RDD计算链条中特别需要数据持久化的环节(后面会反复使用当前环节的RDD)开始基于HDFS等的数据持久化复用策略,通过对RDD启动checkpoint机制来实现容错和高可用。

9.2.2 Spark中checkpoint源码详解

1.checkpoint的运行原理和源码实现彻底详解

RDD进行计算前须先看一下是否有checkpoint,如果有checkpoint,就不需要再进行计算了。

RDD.scala的iterator方法的源码如下。

1.      final def iterator(split: Partition, context: TaskContext):
        Iterator[T] = {
2.    if (storageLevel != StorageLevel.NONE) {
3.      getOrCompute(split, context)
4.    } else {
5.      computeOrReadCheckpoint(split, context)
6.    }
7.  }

进入RDD.scala的getOrCompute方法,源码如下。

1.  private[spark] def getOrCompute(partition: Partition, context: TaskContext):
    Iterator[T] = {
2.      val blockId = RDDBlockId(id, partition.index)
3.      var readCachedBlock = true
4.      //这种方法被Executors调用,所以我们需要调用SparkEnv.get代替sc.env
5.      SparkEnv.get.blockManager.getOrElseUpdate(blockId,               storageLevel,
        elementClassTag, () => {
6.        readCachedBlock = false
7.        computeOrReadCheckpoint(partition, context)
8.      }) match {

getOrCompute方法的getOrElseUpdate方法传入的第四个参数是匿名函数,调用computeOrReadCheckpoint(partition, context)检查checkpoint中是否有数据。

RDD.scala的computeOrReadCheckpoint的源码如下。

1.        private[spark]      def   computeOrReadCheckpoint(split:  Partition,
          context: TaskContext): Iterator[T] =
2.  {
3.    if (isCheckpointedAndMaterialized) {
4.      firstParent[T].iterator(split, context)
5.    } else {
6.      compute(split, context)
7.    }
8.  }

computeOrReadCheckpoint方法中的isCheckpointedAndMaterialized是一个布尔值,判断这个RDD是否checkpointed和被物化,Spark 2.0 checkpoint中有两种方式:reliably或者locally。computeOrReadCheckpoint作为isCheckpointed语义的别名返回值。

isCheckpointedAndMaterialized方法的源码如下。

1.   private[spark] def isCheckpointedAndMaterialized: Boolean =
2.  checkpointData.exists(_.isCheckpointed)

回到RDD.scala的computeOrReadCheckpoint,如果已经持久化及物化isCheckpointed-AndMaterialized,就调用firstParent[T]的iterator。如果没有持久化,则进行compute。

2.checkpoint原理机制

(1)通过调用SparkContext.setCheckpointDir方法指定进行checkpoint操作的RDD把数据放在哪里,在生产集群中是放在HDFS上的,同时为了提高效率,在进行checkpoint的使用时,可以指定很多目录。

SparkContext为即将计算的RDD设置checkpoint保存的目录。如果在集群中运行,必须是HDFS的目录路径。

SparkContext.scala的setCheckpointDir的源码如下。

1.     def setCheckpointDir(directory: String) {
2.
3.  /**
      *如果在集群上运行,如目录是本地的,则记录一个警告。否则,driver可能会试图从它自己
      *的本地文件系统重建RDD的checkpoint检测点,因为checkpoint检查点文件不正确。
      *实际上是在Executor机器上
     */
4.     if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
5.       logWarning("Spark is not running in local mode, therefore the
         checkpoint directory " +
6.         s"must not be on the local filesystem. Directory '$directory' " +
7.         "appears to be on the local filesystem.")
8.     }
9.
10.    checkpointDir = Option(directory).map { dir =>
11.      val path = new Path(dir, UUID.randomUUID().toString)
12.      val fs = path.getFileSystem(hadoopConfiguration)
13.      fs.mkdirs(path)
14.      fs.getFileStatus(path).getPath.toString
15.    }
16.  }

RDD.scala的checkpoint方法标记RDD的检查点checkpoint。它将保存到SparkContext# setCheckpointDir的目录检查点内的文件中,所有引用它的父RDDs将被移除。须在任何作业之前调用此函数。建议RDD在内存中缓存,否则保存在文件中时需要重新计算。

RDD.scala的checkpoint的源码如下。

1.  def checkpoint(): Unit = RDDCheckpointData.synchronized {
2.      //注意:我们在这里使用全局锁,原因是下游的复杂性:子RDD分区指向正确的父分区。未
        //来我们应该重新考虑这个问题
3.      if (context.checkpointDir.isEmpty) {
4.        throw new SparkException("Checkpoint directory has not been set in
          the SparkContext")
5.      } else if (checkpointData.isEmpty) {
6.        checkpointData = Some(new ReliableRDDCheckpointData(this))
7.      }
8.    }

其中的checkpointData是RDDCheckpointData。

1.  private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None

RDDCheckpointData标识某个RDD要进行checkpoint。如果某个RDD要进行checkpoint,那在Spark框架内部就会生成RDDCheckpointData。

1.   private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient
     private val rdd: RDD[T])
2.    extends Serializable {
3.
4.    import CheckpointState._
5.
6.    //相关的RDD检查状态
7.    protected var cpState = Initialized
8.
9.    //RDD包含检查点数据
10.   private var cpRDD: Option[CheckpointRDD[T]] = None
11.
12.   //待办事宜:确定需要在下面的方法中使用全局锁吗
13.
14.   /**
15.     *返回RDD的checkpoint数据是否已经持久化
16.     */
17.   def isCheckpointed: Boolean = RDDCheckpointData.synchronized {
18.     cpState == Checkpointed
19.   }
20.
21.   /**
22.     *物化RDD和持久化其内容
23.     *RDD的第一个行动完成以后立即触发调用
24.     */
25.   final def checkpoint(): Unit = {
26.    //防止多个线程同时对相同RDDCheckpointing,这RDDCheckpointData状态自动翻转
27.     RDDCheckpointData.synchronized {
28.       if (cpState == Initialized) {
29.         cpState = CheckpointingInProgress
30.       } else {
31.         return
32.       }
33.     }
34.
35.     val newRDD = doCheckpoint()
36.
37.     //更新我们的状态和截断RDD的血统
38.     RDDCheckpointData.synchronized {
39.       cpRDD = Some(newRDD)
40.       cpState = Checkpointed
41.       rdd.markCheckpointed()
42.     }
43.   }
44.
45.   /**
46.     *物化RDD和持久化其内容
47.     *
48.     *子类应重写此方法,以定义自定义检查点行为
49.     * @return the Checkpoint RDD 在进程中创建
50.     */
51.   protected def doCheckpoint(): CheckpointRDD[T]
52.   /**
        *返回包含我们的检查点数据。如果checkpoint的状态是Checkpointed,才定义
        */
53.
54.   def checkpointRDD: Option[CheckpointRDD[T]] = RDDCheckpointData.
      synchronized { cpRDD }
55.   /**
        *返回checkpoint RDD的分区,仅用于测试
        */
56.
57.   def getPartitions: Array[Partition] = RDDCheckpointData.synchronized {
58.     cpRDD.map(_.partitions).getOrElse { Array.empty }
59.   }
60.
61. }
62. /**
      *同步检查点操作的全局锁
      */
63.
64. private[spark] object RDDCheckpointData

(2)在进行RDD的checkpoint的时候,其所依赖的所有的RDD都会从计算链条中清空掉。

(3)作为最佳实践,一般在进行checkpoint方法调用前都要进行persist把当前RDD的数据持久化到内存或者磁盘上,这是因为checkpoint是Lazy级别,必须有Job的执行,且在Job执行完成后,才会从后往前回溯哪个RDD进行了checkpoint标记,然后对标记过的RDD新启动一个Job执行具体的checkpoint过程。

(4)checkpoint改变了RDD的Lineage。

(5)当调用checkpoint方法要对RDD进行checkpoint操作,此时框架会自动生成RDDCheckpointData,当RDD上运行过一个Job后,就会立即触发RDDCheckpointData中的checkpoint方法,在其内部会调用doCheckpoint,实际上在生产时会调用ReliableRDDCheckpointData的doCheckpoint,在生产过程中会导致ReliableCheckpointRDD的writeRDDToCheckpointDirectory的调用,而在writeRDDToCheckpointDirectory方法内部,会触发runJob来执行把当前的RDD中的数据写到checkpoint的目录中,同时会产生ReliableCheckpointRDD实例。

RDDCheckpointData.scala的checkpoint方法进行真正的checkpoint:在RDDCheckpointData. synchronized同步块中先判断cpState的状态,然后调用doCheckpoint()。

RDDCheckpointData.scala的checkpoint方法的源码如下。

1.     final def checkpoint(): Unit = {
2.    //防止多个线程同时对相同RDDcheckpointing,这RDDCheckpointData状态自动翻转
3.  RDDCheckpointData.synchronized {
4.        if (cpState == Initialized) {
5.          cpState = CheckpointingInProgress
6.        } else {
7.          return
8.        }
9.      }
10.
11.     val newRDD = doCheckpoint()
12.
13.     //更新我们的状态和截断RDD的血统
14.     RDDCheckpointData.synchronized {
15.       cpRDD = Some(newRDD)
16.       cpState = Checkpointed
17.       rdd.markCheckpointed()
18.     }
19.   }

其中的doCheckpoint方法是RDDCheckpointData.scala中的方法,这里没有具体的实现。

1.  protected def doCheckpoint(): CheckpointRDD[T]

RDDCheckpointData的子类包括LocalRDDCheckpointData、ReliableRDDCheckpointData。ReliableRDDCheckpointData子类中doCheckpoint方法具体的实现,在方法中进行writeRDDToCheckpointDirectory的调用。

ReliableRDDCheckpointData.scala的doCheckpoint的源码如下。

1.  protected override def doCheckpoint(): CheckpointRDD[T] = {
2.  val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd,
    cpDir)
3.
4.      //如果引用超出范围,则可选地清理检查点文件
5.      if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.
        cleanCheckpoints", false)) {
6.        rdd.context.cleaner.foreach { cleaner =>
7.          cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
8.        }
9.      }
10.
11.     logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is
        RDD ${newRDD.id}")
12.     newRDD
13.   }
14.
15. }

writeRDDToCheckpointDirectory将RDD的数据写入到checkpoint的文件中,返回一个ReliableCheckpointRDD。

 首先找到sparkContext,赋值给sc变量。

 基于checkpointDir创建checkpointDirPath。

 fs获取文件系统的内容。

 然后是广播sc.broadcast,将路径信息广播给所有的Executor。

 接下来是sc.runJob,触发runJob执行,把当前的RDD中的数据写到checkpoint的目录中。

 最后返回ReliableCheckpointRDD。无论是对哪个RDD进行checkpoint,最终都会产生ReliableCheckpointRDD,以checkpointDirPath.toString中的数据为数据来源;以originalRDD.partitioner的分区器partitioner作为partitioner;这里的originalRDD就是要进行checkpoint的RDD。

writeRDDToCheckpointDirectory的源码如下。

1.   def writeRDDToCheckpointDirectory[T: ClassTag](
2.       originalRDD: RDD[T],
3.       checkpointDir: String,
4.       blockSize: Int = -1): ReliableCheckpointRDD[T] = {
5.
6.     val sc = originalRDD.sparkContext
7.
8.     //为检查点创建输出路径
9.     val checkpointDirPath = new Path(checkpointDir)
10.    val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
11.    if (!fs.mkdirs(checkpointDirPath)) {
12.      throw new SparkException(s"Failed to create checkpoint path
         $checkpointDirPath")
13.    }
14.
15.    //保存文件,并重新加载它作为一个RDD
16.    val broadcastedConf = sc.broadcast(
17.      new SerializableConfiguration(sc.hadoopConfiguration))
18.    //待办事项:这是代价昂贵的,因为它又一次计算RDD是不必要的(SPARK-8582)
       sc.runJob(originalRDD,
19.      writePartitionToCheckpointFile[T](checkpointDirPath.toString,
         broadcastedConf) _)
20.
21.    if (originalRDD.partitioner.nonEmpty) {
22.      writePartitionerToCheckpointDir(sc,          originalRDD.partitioner.get,
         checkpointDirPath)
23.    }
24.
25.    val newRDD = new ReliableCheckpointRDD[T](
26.      sc, checkpointDirPath.toString, originalRDD.partitioner)
27.    if (newRDD.partitions.length != originalRDD.partitions.length) {
28.      throw new SparkException(
29.        s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has
           different " +
30.          s"number of partitions from original RDD $originalRDD
             (${originalRDD.partitions.length})")
31.    }
32.    newRDD
33.  }

ReliableCheckpointRDD是读取以前写入可靠存储系统检查点文件数据的RDD。其中的partitioner是构建ReliableCheckpointRDD的时候传进来的。其中的getPartitions是构建一个一个的分片。其中,getPreferredLocations获取数据本地性,fs.getFileBlockLocations获取文件的位置信息。compute方法通过ReliableCheckpointRDD.readCheckpointFile读取数据。

ReliableCheckpointRDD.scala的源码如下。

1.    private[spark] class ReliableCheckpointRDD[T: ClassTag](
2.      sc: SparkContext,
3.      val checkpointPath: String,
4.      _partitioner: Option[Partitioner] = None
5.    ) extends CheckpointRDD[T](sc) {
6.
7.    @transient private val hadoopConf = sc.hadoopConfiguration
8.    @transient private val cpath = new Path(checkpointPath)
9.    @transient private val fs = cpath.getFileSystem(hadoopConf)
10.   private val broadcastedConf = sc.broadcast(new SerializableConfiguration
      (hadoopConf))
11.   //如果检查点目录不存在,则快速失败
12.   require(fs.exists(cpath),        s"Checkpoint     directory    does   not   exist:
      $checkpointPath")
13.   /**
        *返回checkpoint的路径,RDD从中读取数据
        */
14.
15.   override val getCheckpointFile: Option[String] = Some(checkpointPath)
16.   override val partitioner: Option[Partitioner] = {
17.     _partitioner.orElse {
18.       ReliableCheckpointRDD.readCheckpointedPartitionerFile(context,
          checkpointPath)
19.     }
20.   }
21.  /**
       *返回检查点目录中的文件所描述的分区
       *由于原来的RDD可能属于一个之前的应用,没办法知道之前的分区数。此方法假定在应用
       *生命周期,原始集检查点文件完全保存在可靠的存储里面
       */
22.
23.   protected override def getPartitions: Array[Partition] = {
24.     //如果路径不存在,listStatus就抛出异常
25.     val inputFiles = fs.listStatus(cpath)
26.       .map(_.getPath)
27.       .filter(_.getName.startsWith("part-"))
28.       .sortBy(_.getName.stripPrefix("part-").toInt)
29.     //如果输入文件无效,则快速失败
30.     inputFiles.zipWithIndex.foreach { case (path, i) =>
31.       if (path.getName != ReliableCheckpointRDD.checkpointFileName(i)) {
32.         throw new SparkException(s"Invalid checkpoint file: $path")
33.       }
34.     }
35.     Array.tabulate(inputFiles.length)(i => new CheckpointRDDPartition(i))
36.   }
37.  /**
       *返回与给定分区关联的检查点文件的位置
38.    */
39.   protected override def getPreferredLocations(split: Partition):
      Seq[String] = {
40.     val status = fs.getFileStatus(
41.       new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName
         (split.index)))
42.     val locations = fs.getFileBlockLocations(status, 0, status.getLen)
43.     locations.headOption.toList.flatMap(_.getHosts).filter(_ != "localhost")
44.   }
45.
46.   /**
       *读取与给定分区关联的检查点文件的内容
47.    */
48.   override def compute(split: Partition, context: TaskContext): I
      terator[T] = {
49.     val file = new Path(checkpointPath, ReliableCheckpointRDD.
        checkpointFileName(split.index))
50.     ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
51.   }
52.
53. }
54. .......

下面看一下ReliableCheckpointRDD.scala中compute方法中的ReliableCheckpointRDD. readCheckpointFile。readCheckpointFile读取指定检查点文件checkpoint的内容。readCheckpointFile方法通过deserializeStream反序列化fileInputStream文件输入流,然后将deserializeStream变成一个Iterator。

Spark 2.1.1版本的ReliableCheckpointRDD.scala的readCheckpointFile的源码如下。

1.   def readCheckpointFile[T](
2.        path: Path,
3.        broadcastedConf: Broadcast[SerializableConfiguration],
4.        context: TaskContext): Iterator[T] = {
5.      val env = SparkEnv.get
6.      val fs = path.getFileSystem(broadcastedConf.value.value)
7.      val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
8.      val fileInputStream = fs.open(path, bufferSize)
9.      val serializer = env.serializer.newInstance()
10.     val deserializeStream = serializer.deserializeStream(fileInputStream)
11.
12.     //注册一个任务完成回调以,关闭输入流
13.     context.addTaskCompletionListener(context => deserializeStream.close())
14.
15.     deserializeStream.asIterator.asInstanceOf[Iterator[T]]
16.   }
17.
18. }

Spark 2.2.0版本的ReliableCheckpointRDD.scala的readCheckpointFile的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第8行整体替换,新增fileInputStream变量中对CHECKPOINT_COMPRESS压缩配置的判断。如果CHECKPOINT压缩配置为true,则对fileStream文件流进行压缩。

1.   ......
2.     val fileInputStream = {
3.        val fileStream = fs.open(path, bufferSize)
4.        if (env.conf.get(CHECKPOINT_COMPRESS)) {
5.          CompressionCodec.createCodec(env.conf).compressedInputStream
           (fileStream)
6.        } else {
7.          fileStream
8.        }
9.      }
10. ......

ReliableRDDCheckpointData.scala的cleanCheckpoint方法,清理RDD数据相关的checkpoint文件。

1.   def cleanCheckpoint(sc: SparkContext, rddId: Int): Unit = {
2.    checkpointPath(sc, rddId).foreach { path =>
3.      path.getFileSystem(sc.hadoopConfiguration).delete(path, true)
4.    }
5.  }

在生产环境中不使用LocalCheckpointRDD。LocalCheckpointRDD的getPartitions直接从toArray级别中调用new()函数创建CheckpointRDDPartition。LocalCheckpointRDD的compute方法直接报异常。

LocalCheckpointRDD的源码如下。

1.    private[spark] class LocalCheckpointRDD[T: ClassTag](
2.      sc: SparkContext,
3.      rddId: Int,
4.      numPartitions: Int)
5.    extends CheckpointRDD[T](sc) {
6.  ......
7.  protected override def getPartitions: Array[Partition] = {
8.      (0 until numPartitions).toArray.map { i => new CheckpointRDDPartition(i) }
9.    }
10. .......
11.   override def compute(partition: Partition, context: TaskContext):
      Iterator[T] = {
12.     throw new SparkException(
13.       s"Checkpoint block ${RDDBlockId(rddId, partition.index)} not found!
          Either the executor " +
14.       s"that originally checkpointed this partition is no longer alive,
          or the original RDD is " +
15.       s"unpersisted. If this problem persists, you may consider using
          'rdd.checkpoint()' " +
16.       s"instead, which is slower than local checkpointing but more fault-
         tolerant.")
17.   }
18.
19. }

checkpoint运行流程图如图9-2所示。

图9-2 Checkpoint运行流程图

通过SparkContext设置Checkpoint数据保存的目录,RDD调用checkpoint方法,生产RDDCheckpointData,当RDD上运行一个Job后,就会立即触发RDDCheckpointData中的checkpoint方法,在其内部会调用doCheckpoint;然后调用ReliableRDDCheckpointData的doCheckpoint;ReliableCheckpointRDD的writeRDDToCheckpointDirectory的调用;在writeRDDToCheckpointDirectory方法内部会触发runJob,来执行把当前的RDD中的数据写到Checkpoint的目录中,同时会产生ReliableCheckpointRDD实例。

checkpoint保存在HDFS中,具有多个副本;persist保存在内存中或者磁盘中。在Job作业调度的时候,checkpoint沿着finalRDD的“血统”关系lineage从后往前回溯向上查找,查找哪些RDD曾标记为要进行checkpoint,标记为checkpointInProgress;一旦进行checkpoint,RDD所有父RDD就被清空。