为什么df.limit在Pyspark中会不断变化? [英] Why does df.limit keep changing in Pyspark?

查看:606
本文介绍了为什么df.limit在Pyspark中会不断变化?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用

从某些数据框df创建数据样本

I'm creating a data sample from some dataframe df with

rdd = df.limit(10000).rdd

此操作需要花费一些时间(为什么呢?在10000行之后它不能捷径吗?),所以我假设我现在有一个新的RDD.

This operation takes quite some time (why actually? can it not short-cut after 10000 rows?), so I assume I have a new RDD now.

但是,当我现在使用rdd时,每次访问它时,它都是不同的行.好像它再次采样一样.缓存RDD会有所帮助,但是肯定不能保存吗?

However, when I now work on rdd, it is different rows every time I access it. As if it resamples over again. Caching the RDD helps a bit, but surely that's not save?

其背后的原因是什么?

更新:这是Spark 1.5.2的复制品

Update: Here is a reproduction on Spark 1.5.2

from operator import add
from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],100)
rdd1=rdd.toDF().limit(1000).rdd
for _ in range(3):
    print(rdd1.map(lambda row:row.i).reduce(add))

输出为

499500
19955500
49651500

.rdd不能修复数据令我感到惊讶.

I'm surprised that .rdd doesn't fix the data.

为了说明它比重新执行问题更棘手,这里有一个动作在Spark 2.0.0.2.5.0上产生了不正确的结果

To show that it get's more tricky than the re-execution issue, here is a single action which produces incorrect results on Spark 2.0.0.2.5.0

from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],200)
rdd1=rdd.toDF().limit(12345).rdd
rdd2=rdd1.map(lambda x:(x,x))
rdd2.join(rdd2).count()
# result is 10240 despite doing a self-join

基本上,每当您使用limit时,结果都可能有错误.我并不是说只是许多样本之一",而是真的不正确(因为在这种情况下结果应始终为12345).

Basically, whenever you use limit your results might be potentially wrong. I don't mean "just one of many samples", but really incorrect (since in the case the result should always be 12345).

推荐答案

由于Spark是分布式的,因此一般而言,假定确定性结果并不安全.您的示例采用的是DataFrame的第一" 10,000行.在这里,第一"的含义含糊不清(因此不确定).那将取决于Spark的内部.例如,它可能是第一个响应驱动程序的分区.该分区可能会随着网络,数据本地性等发生变化.

Because Spark is distributed, in general it's not safe to assume deterministic results. Your example is taking the "first" 10,000 rows of a DataFrame. Here, there's ambiguity (and hence non-determinism) in what "first" means. That will depend on the internals of Spark. For example, it could be the first partition that responds to the driver. That partition could change with networking, data locality, etc.

即使您缓存了数据,我仍然不会依赖于每次都返回相同的数据,尽管我当然希望它比从磁盘读取更为一致.

Even once you cache the data, I still wouldn't rely on getting the same data back every time, though I certainly would expect it to be more consistent than reading from disk.

这篇关于为什么df.limit在Pyspark中会不断变化?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆