为什么 df.limit 在 Pyspark 中不断变化? [英] Why does df.limit keep changing in Pyspark?

查看:22
本文介绍了为什么 df.limit 在 Pyspark 中不断变化?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用

rdd = df.limit(10000).rdd

这个操作需要相当长的时间(实际上为什么?在 10000 行之后不能捷径吗?),所以我假设我现在有一个新的 RDD.

This operation takes quite some time (why actually? can it not short-cut after 10000 rows?), so I assume I have a new RDD now.

但是,当我现在处理 rdd 时,每次访问它时都是不同的行.好像它再次重新采样.缓存 RDD 有一点帮助,但这肯定不是保存吗?

However, when I now work on rdd, it is different rows every time I access it. As if it resamples over again. Caching the RDD helps a bit, but surely that's not save?

背后的原因是什么?

更新:这里是 Spark 1.5.2 的复制

Update: Here is a reproduction on Spark 1.5.2

from operator import add
from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],100)
rdd1=rdd.toDF().limit(1000).rdd
for _ in range(3):
    print(rdd1.map(lambda row:row.i).reduce(add))

输出是

499500
19955500
49651500

我很惊讶 .rdd 没有修复数据.

I'm surprised that .rdd doesn't fix the data.

为了表明它比重​​新执行问题更棘手,这里有一个在 Spark 2.0.0.2.5.0 上产生错误结果的操作

To show that it get's more tricky than the re-execution issue, here is a single action which produces incorrect results on Spark 2.0.0.2.5.0

from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],200)
rdd1=rdd.toDF().limit(12345).rdd
rdd2=rdd1.map(lambda x:(x,x))
rdd2.join(rdd2).count()
# result is 10240 despite doing a self-join

基本上,每当您使用 limit 时,您的结果都可能是错误的.我的意思不是只是许多样本中的一个",而是真的不正确(因为在这种情况下结果应该始终是 12345).

Basically, whenever you use limit your results might be potentially wrong. I don't mean "just one of many samples", but really incorrect (since in the case the result should always be 12345).

推荐答案

因为 Spark 是分布式的,所以通常假设确定性结果是不安全的.您的示例采用 DataFrame 的第一个"10,000 行.在这里,第一"的含义是模糊的(因此是非确定性的).这将取决于 Spark 的内部结构.例如,它可能是响应驱动程序的第一个分区.该分区可能会随着网络、数据位置等而改变.

Because Spark is distributed, in general it's not safe to assume deterministic results. Your example is taking the "first" 10,000 rows of a DataFrame. Here, there's ambiguity (and hence non-determinism) in what "first" means. That will depend on the internals of Spark. For example, it could be the first partition that responds to the driver. That partition could change with networking, data locality, etc.

即使你缓存了数据,我仍然不会依赖于每次取回相同的数据,尽管我当然希望它比从磁盘读取更一致.

Even once you cache the data, I still wouldn't rely on getting the same data back every time, though I certainly would expect it to be more consistent than reading from disk.

这篇关于为什么 df.limit 在 Pyspark 中不断变化?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆