如何使用Pyspark合并两个Dstream(类似于普通RDD上的.zip) [英] How to Combine two Dstreams using Pyspark (similar to .zip on normal RDD)

查看:247
本文介绍了如何使用Pyspark合并两个Dstream(类似于普通RDD上的.zip)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我知道我们可以在pyspark中组合两个RDD(例如R中的cbind),如下所示:

I know that we can combine(like cbind in R) two RDDs as below in pyspark:

rdd3 = rdd1.zip(rdd2)

我想对pyspark中的两个Dstream执行相同的操作.有可能还是其他选择?

I want to perform the same for two Dstreams in pyspark. Is it possible or any alternatives?

事实上,我正在使用MLlib randomforest模型来预测使用火花流.最后,我想结合功能Dstream&一起预测Dstream以进行进一步的下游处理.

In fact, I am using a MLlib randomforest model to predict using spark streaming. In the end, I want to combine the feature Dstream & prediction Dstream together for further downstream processing.

谢谢.

-Obaid

推荐答案

最后,我在下面使用.

诀窍是使用"native python map"和"spark spreaming transform".也许不是优雅的方法,但是它是可行的:).

The trick is using "native python map" along with "spark spreaming transform". May not an elegent way, however it works :).

def predictScore(texts, modelRF):
    predictions = texts.map( lambda txt :  (txt , getFeatures(txt)) ).\
     map(lambda (txt, features) : (txt ,(features.split(','))) ).\
     map( lambda (txt, features) : (txt, ([float(i) for i in features])) ).\
     transform( lambda  rdd: sc.parallelize(\
       map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\
       )\
     )
    # in the transform operation: x=text and y=features
    # Return will be tuple of (score,'original text')
    return predictions

希望,它将帮助面临同样问题的人.如果有人有更好的主意,请在此处发布.

Hope, it will help somebody who is facing same problem. If anybody has better idea, please post it here.

-Obaid

注意:我也将问题提交到了spark用户列表上,并将答案也发布到了该列表上.

Note: I also submitted the problem on spark user list and post my answer there as well.

这篇关于如何使用Pyspark合并两个Dstream(类似于普通RDD上的.zip)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆