如何筛选使用DSTREAM转换操作和外部RDD? [英] How to filter dstream using transform operation and external RDD?

查看:673
本文介绍了如何筛选使用DSTREAM转换操作和外部RDD?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我用变换操作的<一节所述变换方法类似的用例href=\"https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#transformations-on-dstreams\"相对=nofollow>上DStreams转换的:

spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))

我的code是如下:

My code is as follows:

sc = SparkContext("local[4]", "myapp")
ssc = StreamingContext(sc, 5)
ssc.checkpoint('hdfs://localhost:9000/user/spark/checkpoint/')
lines = ssc.socketTextStream("localhost", 9999)
counts = lines.flatMap(lambda line: line.split(" "))\
              .map(lambda word: (word, 1))\
              .reduceByKey(lambda a, b: a+b)
filter_rdd = sc.parallelize([(u'A', 1), (u'B', 1)], 2)
filtered_count = counts.transform(
    lambda rdd: rdd.join(filter_rdd).filter(lambda k, (v1, v2): v1 and not v2)
)
filtered_count.pprint()
ssc.start()
ssc.awaitTermination()

不过,我得到以下错误

But I get the following error

看来,你正试图播放的RDD或一个动作或转换引​​用一个RDD。
  RDD变换和操作只能由驱动器被调用,而不是内其他的转换;例如,rdd1.map(波长X:rdd2.values​​.count()* x)是无效的,因为不能rdd1.map转型的内部进行了改造价值观和计数动作。欲了解更多信息,请参阅SPARK-5063。

It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

我应该如何用我的外部RDD过滤掉DSTREAM的要素是什么?

How should I be using my external RDD to filter elements out of a dstream?

推荐答案

星火文档例子,你的code之间的区别是使用ssc.checkpoint的()。

The difference between the Spark doc example and your code is the use of ssc.checkpoint().

虽然你提供的特定code例子没有检查点的工作,我猜你真正需要它。但是引入外部RDD成检查点DSTREAM的范围的概念是可能无效的:从检查点恢复时,外部RDD可能已经改变

Although the specific code example you provided will work without checkpoint, I guess you actually require it. But the concept of introducing an external RDD into the scope of a checkpointed DStream is potentially invalid: when recovering from a checkpoint, the external RDD may have changed.

我试图检查站外RDD,但我没有运气,要么。

I tried to checkpoint the external RDD, but I had no luck with it either.

这篇关于如何筛选使用DSTREAM转换操作和外部RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆