从 spark 中的 kafka 消息中获取主题 [英] get topic from kafka message in spark

查看:31
本文介绍了从 spark 中的 kafka 消息中获取主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我们的 spark-streaming 作业中,我们从 kafka 中读取流中的消息.

In our spark-streaming job we read messages in streaming from kafka.

为此,我们使用 KafkaUtils.createDirectStream API 返回 JavaPairInputDStreamfrom.

For this, we use the KafkaUtils.createDirectStream API which returns JavaPairInputDStreamfrom.

从 kafka(来自三个主题 - test1、test2、test3)中读取消息的方式如下:

The messages are read from kafka (from three topics - test1,test2,test3) in the following way:

private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));

HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);

JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
                streamingContext,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
                );

我们希望以不同的方式处理来自每个主题的消息,为了实现这一点,我们需要知道每个消息的主题名称.

We want to handle messages from each topic in a different way, and in order to achieve this we need to know the topic name for each message.

因此我们执行以下操作:

so we do the following:

JavaDStream<String> lines = messages.map(new SplitToLinesFunction());

这是SplitToLinesFunction的实现:

public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
    @Override
    public String call(Tuple2<String, String> tuple2) 
    {
        System.out.println(tuple2._1);
        return tuple2._2();
    }
}

问题是 tuple2._1 为空,我们假设 tuple2._1 将包含一些元数据,例如主题/分区的名称来自哪里消息来自.

The problem is that the tuple2._1 is null and we assumed that the tuple2._1 will contain some metadata such as the name of the topic/partition from where the message came from.

然而,当我们打印tuple2._1时,它是空的.

However, when we print tuple2._1, it's null.

我们的问题 - 有没有办法在 kafka 中发送主题名称,以便在 spark-streaming 代码中,tuple2._1 将包含它(而不是 null)?

Our question - is there a way to send the topic name in kafka so that in the spark-streaming code, the tuple2._1 will contain it (and not be null)?

请注意,我们还尝试从 DStream 中获取主题名称,如 spark-streaming kafka 集成教程:

Note that we also tried to get the topic names from the DStream as mentioned in the spark-streaming kafka-integration tutorial:

但它返回发送到 KafkaUtils.createDirectStream 的所有主题,而不是消息(属于当前 RDD)来自的特定主题.

But it returns ALL the topics that were sent to the KafkaUtils.createDirectStream, and not the specific topic from where the messages (that belong to the current RDD) arrived from.

所以它没有帮助我们识别发送 RDD 中消息的主题的名称.

So it didn't help us to identify the name of the topic from where the messages in the RDD were sent from.

编辑

响应大卫的回答 - 我尝试使用 MessageAndMetadata 像这样:

in response to David's answer - I tried using the MessageAndMetadata like this:

        Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
        topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
        topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
        topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);

        class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
        {

            @Override
            public String call(MessageAndMetadata<String, String> v1)
                    throws Exception {
                // nothing is printed here
                System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
                return v1.topic();
            }

        }

        JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
        messages.foreachRDD(new VoidFunction() {

            @Override
            public void call(Object t) throws Exception {
                JavaRDD<String> rdd = (JavaRDD<String>)t;
                OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                // here all the topics kafka listens to are printed, but that doesn't help
                for (OffsetRange offset : offsets) {
                    System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
                }
            }
        });

问题在于 MessageAndMetadataFunction.call 方法中没有打印任何内容.为了在 MessageAndMetadataFunction.call 方法中获得该 RDD 的相关主题,我应该修复什么?

The problem is that nothing was printed in the MessageAndMetadataFunction.call method. what should I fix in order to get the relevant topic for that RDD inside the MessageAndMetadataFunction.call method?

推荐答案

使用将 messageHandler 函数作为参数的 createDirectStream 版本之一.这就是我所做的:

Use one of the versions of createDirectStream that takes a messageHandler function as a parameter. Here's what I do:

val messages = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (String, Array[Byte]](
  ssc,
  kafkaParams,
  getPartitionsAndOffsets(topics).map(t => (t._1, t._2._1).toMap,
  (msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}
)

有些东西对你来说没有任何意义——相关的部分是

There's stuff there that doesn't mean anything to you -- the relevant part is

(msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}

如果您不熟悉 Scala,该函数所做的就是返回一个包含 msg.topicmsg 的 Tuple2.留言.您的函数需要返回这两个,以便您在下游使用它们.您可以只返回整个 MessageAndMetadata 对象,这会为您提供一些其他有趣的字段.但是如果你只想要 topicmessage,那么就使用上面的.

If you are not familiar with Scala, all the function does is return a Tuple2 containing msg.topic and msg.message. Your function needs to return both of these in order for you to use them downstream. You could just return the entire MessageAndMetadata object instead, which gives you a couple of other interesting fields. But if you only wanted the topic and the message, then use the above.

这篇关于从 spark 中的 kafka 消息中获取主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆