从卡夫卡的火花得到消息的主题 [英] get topic from kafka message in spark
问题描述
在我们的火花流工作,我们读卡夫卡流的消息。
In our spark-streaming job we read messages in streaming from kafka.
对于这一点,我们使用它返回 JavaPairInputDStreamfrom
。
For this, we use the KafkaUtils.createDirectStream
API which returns JavaPairInputDStreamfrom
.
该消息从卡夫卡读取(从三个主题 - 为test1,test2的,TEST3)以下列方式:
The messages are read from kafka (from three topics - test1,test2,test3) in the following way:
private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
我们要处理以不同的方式从每个主题的消息,以及为了实现这一点,我们需要知道对于每个消息的主题名称
We want to handle messages from each topic in a different way, and in order to achieve this we need to know the topic name for each message.
所以我们做到以下几点:
so we do the following:
JavaDStream<String> lines = messages.map(new SplitToLinesFunction());
这是实施 SplitToLinesFunction
:
public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
@Override
public String call(Tuple2<String, String> tuple2)
{
System.out.println(tuple2._1);
return tuple2._2();
}
}
问题是, tuple2._1
为空,我们假设 tuple2._1
将包含一些元数据,如从消息哪儿来的话题/分区的名称。
The problem is that the tuple2._1
is null and we assumed that the tuple2._1
will contain some metadata such as the name of the topic/partition from where the message came from.
然而,当我们打印 tuple2._1
,它为空。
However, when we print tuple2._1
, it's null.
我们的问题 - 有没有办法送卡夫卡主题名称,以使火花流code时, tuple2._1
将包含它(而不能为null)?
Our question - is there a way to send the topic name in kafka so that in the spark-streaming code, the tuple2._1
will contain it (and not be null)?
请注意,我们也试图从DSTREAM获得主题名称作为的火花流卡夫卡集成教程:
Note that we also tried to get the topic names from the DStream as mentioned in the spark-streaming kafka-integration tutorial:
但它返回所有从何处(即属于当前RDD)消息从到达发送到 KafkaUtils.createDirectStream
,而不是特定主题的主题。
But it returns ALL the topics that were sent to the KafkaUtils.createDirectStream
, and not the specific topic from where the messages (that belong to the current RDD) arrived from.
所以它并没有帮助我们确定该主题的名字来自于RDD消息已从发送。
So it didn't help us to identify the name of the topic from where the messages in the RDD were sent from.
修改
在回应大卫的回答 - 我尝试使用 MessageAndMetadata code>是这样的:
in response to David's answer - I tried using the MessageAndMetadata
like this:
Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);
class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
{
@Override
public String call(MessageAndMetadata<String, String> v1)
throws Exception {
// nothing is printed here
System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
return v1.topic();
}
}
JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
messages.foreachRDD(new VoidFunction() {
@Override
public void call(Object t) throws Exception {
JavaRDD<String> rdd = (JavaRDD<String>)t;
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// here all the topics kafka listens to are printed, but that doesn't help
for (OffsetRange offset : offsets) {
System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
}
}
});
问题是,什么也没有在 MessageAndMetadataFunction.call
方法打印。我应该为了得到相关话题的 MessageAndMetadataFunction.call
方法内的RDD修复?
The problem is that nothing was printed in the MessageAndMetadataFunction.call
method. what should I fix in order to get the relevant topic for that RDD inside the MessageAndMetadataFunction.call
method?
推荐答案
使用 createDirectStream
的版本之一,需要一个的MessageHandler
函数作为参数。这是我做的:
Use one of the versions of createDirectStream
that takes a messageHandler
function as a parameter. Here's what I do:
val messages = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (String, Array[Byte]](
ssc,
kafkaParams,
getPartitionsAndOffsets(topics).map(t => (t._1, t._2._1).toMap,
(msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}
)
有东西存在,这并不意味着任何东西给你 - 相关的部分是
There's stuff there that doesn't mean anything to you -- the relevant part is
(msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}
如果您不熟悉斯卡拉
,所有的功能确实是返回一个 Tuple2
包含 msg.topic
和 msg.message
。你需要的功能,以便返回这两个给你下游的使用它们。你可以只返回整个 MessageAndMetadata code>对象,而不是,它给你一些其他有趣的领域。但是,如果你只是想在
的主题
和信息,然后使用上面。
If you are not familiar with Scala
, all the function does is return a Tuple2
containing msg.topic
and msg.message
. Your function needs to return both of these in order for you to use them downstream. You could just return the entire MessageAndMetadata
object instead, which gives you a couple of other interesting fields. But if you only wanted the topic
and the message
, then use the above.
这篇关于从卡夫卡的火花得到消息的主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!