KafkaUtils010 SparkStreaming 中的 MessageHandler [英] MessageHandler in KafkaUtils010 SparkStreaming

查看:28
本文介绍了KafkaUtils010 SparkStreaming 中的 MessageHandler的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想按主题分组或在申请时知道消息来自哪个主题:

I wanted to group per topic or know from which topic a message comes when applying:

val stream = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent, 
    Subscribe[String, String](
      Array(topicConfig.srcTopic),
      kafkaParameters(BOOTSTRAP_SERVERS,"kafka_test_group_id))
    )
  )

然而,在最新的 API 中,kafka010 似乎不像以前的版本那样支持消息处理程序.关于如何获得主题的任何想法?

However in the latest API kafka010 does not seem to support a message handler as in previous versions. Any idea on how to get the topic?

我的目标是从 N 个主题中消费处理它们(根据主题以不同的方式),然后以主题的 1:1 映射将其推回到另一个 N 个主题:

My goal is to consume from N topics process them (in different ways depending on the topic) and then push it back to another N topics in a 1:1 mapping of the topics:

SrcTopicA--> Process --> DstTopicA
SrcTopicB--> Process --> DstTopicB
SrcTopicC--> Process --> DstTopicC

但是有一些属性需要共享(变化很大,所以不可能使用广播变量).所以所有的topic都需要在同一个spark job中消费.

But there are some attributes that need to be shared (that change a lot so there is no possibility of using a broadcast variable). So all the topics need to be consumed in the same spark job.

推荐答案

当你在 0.10 中使用 createDirectStream 时,你会得到一个 ConsumerRecord.该记录具有 topic 值.您可以创建主题和值的元组:

When you use createDirectStream in 0.10 you get back a ConsumerRecord. This record has a topic value. You can create a tuple of the topic and value:

val stream: InputDStream[ConsumerRecord[String, String]] = 
  KafkaUtils.createDirectStream[String, String](
    streamingContext,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
  )

val res: DStream[(String, String)] = stream.map(record => (record.topic(), record.value()))

这篇关于KafkaUtils010 SparkStreaming 中的 MessageHandler的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆