如何读取检查点的RDD [英] How to read checkpointed RDD

查看:167
本文介绍了如何读取检查点的RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这个问题不是什么新鲜事物,但是经过大量的搜索并且没有运气之后,我将问题发布在这里.

This question is not something new but after lot of googling and having no luck, i'm posting the question here.

sc.setCheckpointDir("C:\\mydrive\\Checkpoint")
val data = Seq(1,2,3,4,5,6,7,8,9)
val base = sc.parallelize(data)
base.checkpoint()
base.collect().foreach { println(_) }

我上面的代码做了检查点,但是我不确定代码是否从后续运行中读取数据.请找到日志详细信息

My above code does check-pointing but I'm not sure the code reads data from subsequent run.please find log details

17/05/17 15:37:48调试ReliableCheckpointRDD:没有分区文件

17/05/17 15:37:48 DEBUG ReliableCheckpointRDD: No partitioner file

17/05/17 15:37:48 INFO ReliableRDDCheckpointData:完成检查点 RDD 0至 文件:/C:/mydrive/Checkpoint/d10861cd-70c3-4e60-bdd3-a4753dfee1b2/rdd-0, 新父母是RDD 1

17/05/17 15:37:48 INFO ReliableRDDCheckpointData: Done checkpointing RDD 0 to file:/C:/mydrive/Checkpoint/d10861cd-70c3-4e60-bdd3-a4753dfee1b2/rdd-0, new parent is RDD 1

日志给我的印象是它不是在后续运行中读取检查点数据而是写入.如果是这种情况,如何在后续运行中读取检查点数据?我在这里想念的是什么?

The log gives me an impression that it is not reading checkpointed data on subsequent runs but writing.If this is the case, how to read check-pointed data in subsequent runs? what i'm missing here ?

推荐答案

检查点是截断RDD谱系图并将其保存到可靠的分布式(HDFS)或本地文件系统的过程. 有两种类型的检查点:

Checkpointing is a process of truncating RDD lineage graph and saving it to a reliable distributed (HDFS) or local file system. There are two types of checkpointing:

  • 可靠-在Spark(核心)中,RDD检查点将实际的中间RDD数据保存到可靠的分布式文件系统中, 例如HDFS.

  • reliable - in Spark (core), RDD checkpointing that saves the actual intermediate RDD data to a reliable distributed file system, e.g. HDFS.

本地-在Spark Streaming或GraphX中-RDD检查点会截断RDD谱系图.

local - in Spark Streaming or GraphX - RDD checkpointing that truncates RDD lineage graph.

由Spark应用程序开发人员决定何时以及如何使用RDD.checkpoint()方法进行检查点. 在使用检查点之前,Spark开发人员必须使用SparkContext.setCheckpointDir(directory: String)方法设置检查点目录.

It’s up to a Spark application developer to decide when and how to checkpoint using RDD.checkpoint() method. Before checkpointing is used, a Spark developer has to set the checkpoint directory using SparkContext.setCheckpointDir(directory: String) method.

可靠的检查点

您调用SparkContext.setCheckpointDir(directory: String)来设置检查点目录-检查点所在的目录.如果在群集上运行,该目录必须是HDFS路径.原因是驱动程序可能会尝试从其自己的本地文件系统重建检查点RDD,这是不正确的,因为检查点文件实际上位于执行程序机器上.

You call SparkContext.setCheckpointDir(directory: String) to set the checkpoint directory - the directory where RDDs are checkpointed. The directory must be a HDFS path if running on a cluster. The reason is that the driver may attempt to reconstruct the checkpointed RDD from its own local file system, which is incorrect because the checkpoint files are actually on the executor machines.

您可以通过调用RDD.checkpoint()将RDD标记为检查点. RDD将保存到检查点目录内的文件中,并且所有对其父RDD的引用都将被删除.在此RDD上执行任何作业之前,必须先调用此函数.

You mark an RDD for checkpointing by calling RDD.checkpoint(). The RDD will be saved to a file inside the checkpoint directory and all references to its parent RDDs will be removed. This function has to be called before any job has been executed on this RDD.

强烈建议在内存中保留检查点RDD,否则将其重新保存在文件中.

让我们假设您保留了名为用户"的RDD,并且要使用按名称保留的RDD调用,请参考下面的代码片段:

Let's assume you persisted RDD named "users" and to use preserved RDD calling by name please refer to code snippet below:

import com.typesafe.config.Config
import org.apache.spark.SparkContext, SparkContext._
import org.apache.spark.rdd.RDD

trait UsersSparkJob extends spark.jobserver.SparkJob with spark.jobserver.NamedRddSupport with UsersRDDBuilder {
  val rddName = "users"


  def validate(sc: SparkContext, config: Config): spark.jobserver.SparkJobValidation = spark.jobserver.SparkJobValid
}

object GetOrCreateUsers extends UsersSparkJob {

  override def runJob(sc: SparkContext, config: Config) = {
    val users: RDD[(Reputation, User)] = namedRdds.getOrElseCreate(
      rddName,
      build(sc))

    users.take(5)
  }
}

这篇关于如何读取检查点的RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆