组合键上的两个Spark Streams [英] Combining Two Spark Streams On Key
问题描述
我有两个包含两个并行操作结果的kafka流,我需要一种组合两个流的方法,以便可以在单个spark转换中处理结果.这可能吗?(下图)
I have two kafka streams that contain results for two parallel operations, I need a way to combine both streams so I can process the results in a single spark transform. Is this possible? (illustration below)
Stream 1 {id:1,result1:True}
Stream 2 {id:1,result2:False}
JOIN(Stream 1, Stream 2, On "id") -> Output Stream {id:1,result1:True,result2:False}
当前无效的代码:
kvs1 = KafkaUtils.createStream(sparkstreamingcontext, ZOOKEEPER, NAME+"_stream", {"test_join_1": 1})
kvs2 = KafkaUtils.createStream(sparkstreamingcontext, ZOOKEEPER, NAME+"_stream", {"test_join_2": 1})
messages_RDDstream1 = kvs1.map(lambda x: x[1])
messages_RDDstream2 = kvs2.map(lambda x: x[1])
messages_RDDstream_Final = messages_RDDstream1.join(messages_RDDstream2)
当我将两个样本json传递给具有相同ID字段的每个Kafka队列时,最终的RDD流中没有返回任何内容.我想像一下我缺少将我的Kafka JSON字符串消息转换为元组的阶段吗?
When I pass two sample jsons to each Kafka queue with the same ID field, nothing is returned in my final RDD stream. I imaging I am missing the stage of converting my Kafka JSON string message into a Tuple?
我也尝试了以下方法:
kvs1.map(lambda (key, value): json.loads(value))
和
kvs1.map(lambda x: json.loads(x))
无济于事
欢呼
亚当
推荐答案
在Spark文档中进行简单的查找就可以找到答案.
A simple lookup on Spark's documentation would have given you the answer..
您可以使用 join
操作.
加入(otherStream,[numTasks]):
join(otherStream, [numTasks]) :
在两个(K,V)和(K,W)对的DStream上调用时,返回一个新的(K,(V,W))对的DStream以及每个键的所有元素对.
When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.
例如: val streamJoined = stream1.join(stream2)
这篇关于组合键上的两个Spark Streams的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!