Apache Kafka根据其值对窗口消息进行排序 [英] Apache Kafka order windowed messages based on their value

查看:438
本文介绍了Apache Kafka根据其值对窗口消息进行排序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试寻找一种方法来对主题分区中的消息进行重新排序,并将已排序的消息发送到新主题.

I'm trying to find a way to re-order messages within a topic partition and send ordered messages to a new topic.

我有Kafka发布者,它发送以下格式的String消息: {system_timestamp}-{event_name}?{parameters}

I have Kafka publisher that sends String messages of the following format: {system_timestamp}-{event_name}?{parameters}

例如:

1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3

此外,我们为每条消息添加一些消息密钥,以将它们发送到相应的分区.

Also, we add some message key for each message, to send them to the corresponding partition.

我想做的是根据消息的 {system-timestamp} 部分并在1分钟内对事件进行重新排序,这是因为我们的发布者不保证消息会以这种方式发送符合 {system-timestamp} 值.

What I want to do is reorder events based on {system-timestamp} part of the message and within a 1-minute window, cause our publishers doesn't guarantee that messages will be sent in accordance with {system-timestamp} value.

例如,我们可以首先发送具有更大 {system-timestamp} 值的消息到主题.

For example, we can deliver to the topic, a message with a bigger {system-timestamp} value first.

我研究了Kafka Stream API,并找到了一些有关消息窗口和聚合的示例:

I've investigated Kafka Stream API and found some examples regarding messages windowing and aggregation:

Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sorter");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

 KStreamBuilder builder = new KStreamBuilder();
 KStream<String, String> stream = builder.stream("events");
 KGroupedStream<String>, String> groupedStream = stream.groupByKey();//grouped events within partion.

    /* commented since I think that I don't need any aggregation, but I guess without aggregation I can't use time windowing.
KTable<Windowed<String>, String> windowedEvents = stream.groupByKey().aggregate(
                () -> "",  // initial value
                (aggKey, value, aggregate) -> aggregate + "",   // aggregating value
                TimeWindows.of(1000), // intervals in milliseconds
                Serdes.String(), // serde for aggregated value
                "test-store"
        );*/

但是接下来我应该如何处理这个分组的流?我看不到任何可用的' sort()(e1,e2)-> e1.compareTo(e2)'方法,也可以将Windows应用于诸如 aggregation()的方法em>, reduce() count(),但是我认为我不需要任何消息数据操作.

But what should I do next with this grouped stream? I don't see any 'sort() (e1,e2) -> e1.compareTo(e2)' methods available, also windows could be applied to methods like aggregation(), reduce() ,count() , but I think that I don't need any messages data manipulations.

如何在1分钟的窗口中重新排序消息并将其发送到另一个主题?

How can I re-order message in the 1-minute window and send them to another topic?

推荐答案

以下是大纲:

创建一个处理器实现,

  • 在process()方法中,用于每条消息:

  • in process() method, for each message:

  • 从消息值读取时间戳
  • 使用(时间戳,消息键)对作为键并将消息值作为值插入到KeyValueStore中.注意,这也提供重复数据删除.您需要提供一个自定义Serde来序列化密钥,以便时间戳以字节为单位排在第一位,以便按时间戳先对范围内的查询进行排序.

在punctuate()方法中:

in the punctuate() method:

  • 使用从0到时间戳的范围读取-60'000(= 1分钟)读取商店
  • 使用context.forward()依次发送获取的消息,并将其从存储中删除

此方法的问题在于,如果没有新的消息到达以提前流时间",则不会触发puncate().如果您遇到这种情况,则可以创建一个外部调度程序,将定期的滴答"消息发送到主题的每个(!)分区,您的处理器应该忽略该消息,但是如果不这样做,它们会导致标点触发的真实"消息. KIP-138将通过添加对系统时间标点的显式支持来解决此限制: https://cwiki.apache.org/confluence /display/KAFKA/KIP-138%3A + Change + punctuate + semantics

The problem with this approach is that punctuate() is not triggered if no new msgs arrive to advance the "stream time". If this is a risk in your case, you can create an external scheduler that sends periodic "tick" messages to each(!) partition of your topic, that your processor should just ignore, but they'll cause punctuate to trigger in the absence of "real" msgs. KIP-138 will address this limitation by adding explicit support for system-time punctuation: https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics

这篇关于Apache Kafka根据其值对窗口消息进行排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆