KafkaUtils010 SparkStreaming中的MessageHandler [英] MessageHandler in KafkaUtils010 SparkStreaming
问题描述
我想按主题分组或在申请时知道消息来自哪个主题:
I wanted to group per topic or know from which topic a message comes when applying:
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](
Array(topicConfig.srcTopic),
kafkaParameters(BOOTSTRAP_SERVERS,"kafka_test_group_id))
)
)
但是,在最新的API中,kafka010似乎不像以前的版本那样支持消息处理程序.关于如何获得话题的任何想法吗?
However in the latest API kafka010 does not seem to support a message handler as in previous versions. Any idea on how to get the topic?
我的目标是从N个主题中消费(以不同的方式,取决于主题)对其进行处理,然后以1:1主题映射将其推回到另一个N个主题:
My goal is to consume from N topics process them (in different ways depending on the topic) and then push it back to another N topics in a 1:1 mapping of the topics:
SrcTopicA--> Process --> DstTopicA
SrcTopicB--> Process --> DstTopicB
SrcTopicC--> Process --> DstTopicC
但是有些属性需要共享(变化很大,因此不可能使用广播变量).因此,所有主题都需要在同一火花作业中使用.
But there are some attributes that need to be shared (that change a lot so there is no possibility of using a broadcast variable). So all the topics need to be consumed in the same spark job.
推荐答案
在0.10中使用 createDirectStream
时,您将返回 ConsumerRecord
.该记录具有 topic
值.您可以创建主题和值的元组:
When you use createDirectStream
in 0.10 you get back a ConsumerRecord
. This record has a topic
value. You can create a tuple of the topic and value:
val stream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val res: DStream[(String, String)] = stream.map(record => (record.topic(), record.value()))
这篇关于KafkaUtils010 SparkStreaming中的MessageHandler的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!