用于更改源的 Spark Dataframe.cache() 行为 [英] Spark Dataframe.cache() behavior for changing source

查看:45
本文介绍了用于更改源的 Spark Dataframe.cache() 行为的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的用例:

  1. 从 cassandra 表创建数据框.
  2. 通过过滤列并修改该列的值来创建输出数据框.
  3. 使用 TTL 集将输出数据帧写入 cassandra,以便在短时间内(2 秒)后删除所有修改过的记录
  4. 将输出数据帧返回给调用者,调用者在一段时间后将其写入文件系统.我只能向调用者返回一个数据帧,我没有进一步的控制权.此外,我无法增加 TTL.

到时候,第 4 步执行完毕,输出的数据帧为空.这是因为,spark 会重新评估动作上的数据帧,并且由于沿袭,cassandra 查询再次执行,现在不会产生任何记录.
为了避免这种情况,我在第 2 步之后添加了一个步骤:

By the time, step 4 is executed, the output dataframe is empty. This is because, spark re-evaluates the dataframe on the action, and due to lineage the cassandra query is done again, which now yields no records.
To avoid this, I added a step after step 2:

2a) outputDataframe.cache()

这可确保在第 5 步中不会查询 cassandra,并且我的文件中也能获得所需的输出记录.我对这种方法有以下疑问:

This ensures that during step 5, cassandra is not queried, and I get desired output records in my file as well. I have below queries on this approach:

  1. 在 spark 找不到缓存数据(缓存查找失败)的情况下,是否有可能沿沿袭并运行 cassandra 查询?如果是,在所有情况下如何避免这种情况?
  2. 我见过另一种缓存方式:df.rdd.cache().这与在数据帧上调用 cache() 有什么不同吗?
  1. Is it possible that, in cases where spark doesn't find the cached data (cache lookup fails), it will go up the lineage and run the cassandra query? If yes, what is the way to avoid that in all cases?
  2. I have seen another way of doing the caching: df.rdd.cache(). Is this any different than calling cache() on the dataframe?

作为参考,我当前的代码如下:

For reference, my current code looks as follows:

//1
val dfOrig = spark
      .read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("keyspace" -> "myks", "table" -> "mytable", "pushdown" -> "true"))
      .load()
//2
val df = dfOrig.filter("del_flag = 'N'").withColumn("del_flag", lit("Y"))
//3
df.write.format("org.apache.spark.sql.cassandra")
      .options(Map("keyspace" -> "myks", "table" -> "mytable", "spark.cassandra.output.ttl" -> "120"))
      .mode("append")
      .save()
//4
// <After quite some processing, mostly after the TTL, and in the calling code>
df.write.format("csv").save("some.csv") 

推荐答案

在 Spark 找不到缓存数据(缓存查找失败)的情况下,它是否有可能沿着沿袭运行 Cassandra 查询?

Is it possible that, in cases where Spark doesn't find the cached data (cache lookup fails), it will go up the lineage and run the Cassandra query?

是的,这是可能的.缓存数据可以被缓存清理器移除(主要在 MEMORY_ONLY 模式下),当相应节点退役(崩溃、抢占、动态分配释放)时可能会丢失.此外,其他选项(如推测执行)可能会影响缓存行为.

Yes it is possible. Cached data can be removed by the cache cleaner (primarily in MEMORY_ONLY mode), can be lost when the corresponding node is decommissioned (crashed, preempted, released by dynamic allocation). Additionally other options, like speculative execution, can affect cache behavior.

最后,数据可能没有完全缓存.

Finally data might not be fully cached in first place.

如果是,在所有情况下如何避免这种情况?

If yes, what is the way to avoid that in all cases?

如果您需要强一致性保证,请不要使用 cache/persist - 它的设计并没有考虑到这样的用例.而是将数据导出到持久、可靠的存储(如 HDFS)并从那里读取.

Don't use cache / persist if you require strong consistency guarantees - it wasn't designed with use cases like this one in mind. Instead export data to a persistent, reliable storage (like HDFS) and read it from there.

您也可以将 checkpoint 与 HDFS checkpointDir 一起使用.

You could also use checkpoint with HDFS checkpointDir.

您可能会倾向于使用更可靠的缓存模式,例如 MEMORY_AND_DISK_2 - 这可能会降低重新计算数据的可能性,但代价是

You might be tempted to use more reliable caching mode like MEMORY_AND_DISK_2 - this might reduce the probability of recomputing the data, at the cost of

df.rdd.cache().这与在数据帧上调用 cache() 有什么不同吗?

df.rdd.cache(). Is this any different than calling cache() on the dataframe?

它是不同的(主要区别在于序列化策略),但在涉及此问题范围内感兴趣的属性时则不同.

It is different (the primary difference is the serialization strategy), but not when it comes to the properties which are of interest in the scope of this question.

重要:

请注意,缓存行为可能不是您代码中的最大问题.在复杂的管道中读取和附加到单个表可能会导致各种不受欢迎或未定义的行为,除非采取额外的步骤来确保读取器不会选择新写入的记录.

Please note that caching behavior might not be the biggest issue in your code. Reading from and appending to a single table can result in all kinds of undesired or undefined behaviors in complex pipelines, unless additional steps are taken to ensure that reader doesn't pick newly written records.

这篇关于用于更改源的 Spark Dataframe.cache() 行为的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆