pyspark中转换的DStream在调用pprint时给出错误 [英] Transformed DStream in pyspark gives error when pprint called on it

查看:470
本文介绍了pyspark中转换的DStream在调用pprint时给出错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在探索通过PySpark的Spark流,并在尝试将transform函数与take一起使用时遇到错误.

I'm exploring Spark Streaming through PySpark, and hitting an error when I try to use the transform function with take.

我可以通过transformpprint成功地对DStream使用sortBypprint.

I can successfully use sortBy against the DStream via transform and pprint the result.

author_counts_sorted_dstream = author_counts_dstream.transform\
  (lambda foo:foo\
   .sortBy(lambda x:x[0].lower())\
   .sortBy(lambda x:x[1],ascending=False))
author_counts_sorted_dstream.pprint()

但是如果我按照相同的模式使用take并尝试pprint:

But if I use take following the same pattern and try to pprint it:

top_five = author_counts_sorted_dstream.transform\
  (lambda rdd:rdd.take(5))
top_five.pprint()

作业失败,并显示

Py4JJavaError: An error occurred while calling o25.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/streaming/util.py", line 67, in call
    return r._jrdd
AttributeError: 'list' object has no attribute '_jrdd'

您可以在此处的笔记本中看到完整的代码和输出.

You can see the full code and output in the notebook here.

我做错了什么?

推荐答案

传递给transform的函数应从RDD转换为RDD.如果使用诸如take之类的动作,则必须将结果转换回RDD:

Function you pass to transform should transform from RDD to RDD. If you use an action, like take, you have to convert the result back to RDD:

sc: SparkContext = ...

author_counts_sorted_dstream.transform(
  lambda rdd: sc.parallelize(rdd.take(5))
)

相反,使用的RDD.sortBy是转换(返回RDD),因此不需要进一步的并行化.

In contrast RDD.sortBy used is a transformation (returns an RDD) so there is no need for further parallelization.

附带说明以下功能:

lambda foo: foo \
    .sortBy(lambda x:x[0].lower()) \
    .sortBy(lambda x:x[1], ascending=False)

没有多大意义.请记住,Spark按随机排序排序,因此不稳定.如果要按多个字段排序,则应使用组合键,例如:

doesn't make much sense. Remember that Spark sort by shuffle therefore it is not stable. If you want to sort by multiple fields you should use a composite key like:

lambda x: (x[0].lower(), -x[1])

这篇关于pyspark中转换的DStream在调用pprint时给出错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆