Spark Dataframe.cache()行为,用于更改源 [英] Spark Dataframe.cache() behavior for changing source

查看:1104
本文介绍了Spark Dataframe.cache()行为,用于更改源的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的用例:

  1. 从cassandra表创建数据框.
  2. 通过对列进行过滤来创建输出数据框,然后修改该列的值.
  3. 将输出数据帧写入带有TTL设置的cassandra,以便在短时间(2s)后删除所有修改的记录
  4. 将输出数据帧返回给调用方,一段时间后将其写入文件系统.我只能将数据帧返回给调用者,而我没有进一步的控制权.另外,我无法增加TTL.

到那时,执行步骤4,输出数据帧为空.这是因为spark会重新评估操作上的数据框,并且由于沿袭而再次进行了cassandra查询,该查询现在不产生任何记录.
为避免这种情况,我在步骤2之后添加了一个步骤:

By the time, step 4 is executed, the output dataframe is empty. This is because, spark re-evaluates the dataframe on the action, and due to lineage the cassandra query is done again, which now yields no records.
To avoid this, I added a step after step 2:

2a)outputDataframe.cache()

这可确保在步骤5中不查询cassandra,并且在文件中也可以获得所需的输出记录.我对这种方法有以下疑问:

This ensures that during step 5, cassandra is not queried, and I get desired output records in my file as well. I have below queries on this approach:

  1. 是否有可能在spark找不到缓存的数据(缓存查找失败)的情况下,它将沿袭并运行cassandra查询?如果是,在所有情况下如何避免这种情况?
  2. 我已经看到了另一种进行缓存的方法:df.rdd.cache().这和在数据帧上调用cache()有什么区别吗?
  1. Is it possible that, in cases where spark doesn't find the cached data (cache lookup fails), it will go up the lineage and run the cassandra query? If yes, what is the way to avoid that in all cases?
  2. I have seen another way of doing the caching: df.rdd.cache(). Is this any different than calling cache() on the dataframe?

作为参考,我当前的代码如下:

For reference, my current code looks as follows:

//1
val dfOrig = spark
      .read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("keyspace" -> "myks", "table" -> "mytable", "pushdown" -> "true"))
      .load()
//2
val df = dfOrig.filter("del_flag = 'N'").withColumn("del_flag", lit("Y"))
//3
df.write.format("org.apache.spark.sql.cassandra")
      .options(Map("keyspace" -> "myks", "table" -> "mytable", "spark.cassandra.output.ttl" -> "120"))
      .mode("append")
      .save()
//4
// <After quite some processing, mostly after the TTL, and in the calling code>
df.write.format("csv").save("some.csv") 

推荐答案

在Spark找不到缓存的数据(缓存查找失败)的情况下,它是否有可能沿袭并运行Cassandra查询?

Is it possible that, in cases where Spark doesn't find the cached data (cache lookup fails), it will go up the lineage and run the Cassandra query?

是的,有可能.缓存的数据可以由缓存清理器删​​除(主要在MEMORY_ONLY模式下),当相应的节点退役(崩溃,抢占,通过动态分配释放)时可能会丢失.另外,其他选项(例如推测执行)可能会影响缓存行为.

Yes it is possible. Cached data can be removed by the cache cleaner (primarily in MEMORY_ONLY mode), can be lost when the corresponding node is decommissioned (crashed, preempted, released by dynamic allocation). Additionally other options, like speculative execution, can affect cache behavior.

最后,数据可能不会一开始就被完全缓存.

Finally data might not be fully cached in first place.

如果是,在所有情况下如何避免这种情况?

If yes, what is the way to avoid that in all cases?

如果您需要强大的一致性保证,请不要使用cache/persist-它在设计时就没有考虑到这种用例.而是将数据导出到持久,可靠的存储(如HDFS)并从那里读取数据.

Don't use cache / persist if you require strong consistency guarantees - it wasn't designed with use cases like this one in mind. Instead export data to a persistent, reliable storage (like HDFS) and read it from there.

您也可以将checkpoint与HDFS checkpointDir一起使用.

You could also use checkpoint with HDFS checkpointDir.

您可能会想使用更可靠的缓存模式,例如MEMORY_AND_DISK_2-这可能会降低重新计算数据的可能性,但代价是

You might be tempted to use more reliable caching mode like MEMORY_AND_DISK_2 - this might reduce the probability of recomputing the data, at the cost of

df.rdd.cache().这与在数据帧上调用cache()有什么不同吗?

df.rdd.cache(). Is this any different than calling cache() on the dataframe?

这是不同的(主要区别是序列化策略),但是涉及到此问题范围内感兴趣的属性时,则没有差别.

It is different (the primary difference is the serialization strategy), but not when it comes to the properties which are of interest in the scope of this question.

重要:

请注意,缓存行为可能不是代码中的最大问题.从单个表读取或追加到单个表可能会导致复杂管道中出现各种不良行为或未定义行为,除非采取其他措施以确保读者不会选择新写的记录.

Please note that caching behavior might not be the biggest issue in your code. Reading from and appending to a single table can result in all kinds of undesired or undefined behaviors in complex pipelines, unless additional steps are taken to ensure that reader doesn't pick newly written records.

这篇关于Spark Dataframe.cache()行为,用于更改源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆