Spark Streaming-DStream没有distinct() [英] Spark Streaming - DStream does not have distinct()

查看:82
本文介绍了Spark Streaming-DStream没有distinct()的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想计算表示为RDD的某些类型ID的不同值.

I want to count the distinct value of some type of IDs represented as an RDD.

在非流媒体情况下,这非常简单.说 IDs 是从平面文件读取的ID的RDD.

In the non-streaming case, it's fairly straightforward. Say IDs is an RDD of IDs read from a flat file.

    print ("number of unique IDs %d" %  (IDs.distinct().count()))

但是在流媒体情况下,我似乎做不到同样的事情.假设我们的 streamIDs 是从网络读取的ID的 DStream .

But I can't seem to do the same thing in the streaming case. Say we have streamIDs be a DStream of IDs read from the network.

    print ("number of unique IDs from stream %d" %  (streamIDs.distinct().count()))

给我这个错误

AttributeError: 'TransformedDStream' object has no attribute 'distinct'

我做错了什么?如何打印此批次中显示的不同ID的数量?

What am I doing wrong? How do I printout the number of distinct IDs that showed up during this batch?

推荐答案

使用RDD可以得到一个结果,但是使用DStreams可以得到一系列的结果,每个微批次都有一个结果.因此,您不能一次打印唯一ID的数量,而是必须注册一个操作来为每个微型批次打印唯一ID,这是一个RDD,可以在其上使用不同的:

With RDDs you have a single result, but with DStreams you have a series of results with a result per micro batch. So you cannot print the number of unique ids once, but instead you have to register an action to print the unique ids for each micro batch, which is a RDD on which you can use distinct:

streamIDs.foreachRDD(rdd => println(rdd.distinct().count()))

请记住,您可以使用 window 创建具有更大批处理的转换后的dstream:

Remember you can use window to create a transformed dstream with bigger batches:

streamIDs.window(Duration(1000)).foreachRDD(rdd => println(rdd.distinct().count()))

这篇关于Spark Streaming-DStream没有distinct()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆