组合键上的两个Spark Streams [英] Combining Two Spark Streams On Key

查看:73
本文介绍了组合键上的两个Spark Streams的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个包含两个并行操作结果的kafka流,我需要一种组合两个流的方法,以便可以在单个spark转换中处理结果.这可能吗?(下图)

I have two kafka streams that contain results for two parallel operations, I need a way to combine both streams so I can process the results in a single spark transform. Is this possible? (illustration below)

Stream 1 {id:1,result1:True}
Stream 2 {id:1,result2:False}
       JOIN(Stream 1, Stream 2, On "id") -> Output Stream {id:1,result1:True,result2:False}

当前无效的代码:

    kvs1 = KafkaUtils.createStream(sparkstreamingcontext, ZOOKEEPER, NAME+"_stream", {"test_join_1": 1})
    kvs2 = KafkaUtils.createStream(sparkstreamingcontext, ZOOKEEPER, NAME+"_stream", {"test_join_2": 1})

    messages_RDDstream1 = kvs1.map(lambda x: x[1])
    messages_RDDstream2 = kvs2.map(lambda x: x[1])

    messages_RDDstream_Final = messages_RDDstream1.join(messages_RDDstream2)

当我将两个样本json传递给具有相同ID字段的每个Kafka队列时,最终的RDD流中没有返回任何内容.我想像一下我缺少将我的Kafka JSON字符串消息转换为元组的阶段吗?

When I pass two sample jsons to each Kafka queue with the same ID field, nothing is returned in my final RDD stream. I imaging I am missing the stage of converting my Kafka JSON string message into a Tuple?

我也尝试了以下方法:

kvs1.map(lambda (key, value): json.loads(value))

kvs1.map(lambda x: json.loads(x))

无济于事

欢呼

亚当

推荐答案

在Spark文档中进行简单的查找就可以找到答案.

A simple lookup on Spark's documentation would have given you the answer..

您可以使用 join 操作.

加入(otherStream,[numTasks]):

join(otherStream, [numTasks]) :

在两个(K,V)和(K,W)对的DStream上调用时,返回一个新的(K,(V,W))对的DStream以及每个键的所有元素对.

When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.

例如: val streamJoined = stream1.join(stream2)

这篇关于组合键上的两个Spark Streams的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆