pyspark中转换的DStream在调用pprint时给出错误 [英] Transformed DStream in pyspark gives error when pprint called on it
问题描述
我正在探索通过PySpark的Spark流,并在尝试将transform
函数与take
一起使用时遇到错误.
I'm exploring Spark Streaming through PySpark, and hitting an error when I try to use the transform
function with take
.
我可以通过transform
和pprint
成功地对DStream
使用sortBy
和pprint
.
I can successfully use sortBy
against the DStream
via transform
and pprint
the result.
author_counts_sorted_dstream = author_counts_dstream.transform\
(lambda foo:foo\
.sortBy(lambda x:x[0].lower())\
.sortBy(lambda x:x[1],ascending=False))
author_counts_sorted_dstream.pprint()
但是如果我按照相同的模式使用take
并尝试pprint
:
But if I use take
following the same pattern and try to pprint
it:
top_five = author_counts_sorted_dstream.transform\
(lambda rdd:rdd.take(5))
top_five.pprint()
作业失败,并显示
Py4JJavaError: An error occurred while calling o25.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/usr/local/spark/python/pyspark/streaming/util.py", line 67, in call
return r._jrdd
AttributeError: 'list' object has no attribute '_jrdd'
您可以在此处的笔记本中看到完整的代码和输出.
You can see the full code and output in the notebook here.
我做错了什么?
推荐答案
传递给transform
的函数应从RDD
转换为RDD
.如果使用诸如take
之类的动作,则必须将结果转换回RDD
:
Function you pass to transform
should transform from RDD
to RDD
. If you use an action, like take
, you have to convert the result back to RDD
:
sc: SparkContext = ...
author_counts_sorted_dstream.transform(
lambda rdd: sc.parallelize(rdd.take(5))
)
相反,使用的RDD.sortBy
是转换(返回RDD),因此不需要进一步的并行化.
In contrast RDD.sortBy
used is a transformation (returns an RDD) so there is no need for further parallelization.
附带说明以下功能:
lambda foo: foo \
.sortBy(lambda x:x[0].lower()) \
.sortBy(lambda x:x[1], ascending=False)
没有多大意义.请记住,Spark按随机排序排序,因此不稳定.如果要按多个字段排序,则应使用组合键,例如:
doesn't make much sense. Remember that Spark sort by shuffle therefore it is not stable. If you want to sort by multiple fields you should use a composite key like:
lambda x: (x[0].lower(), -x[1])
这篇关于pyspark中转换的DStream在调用pprint时给出错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!