使用 Kafka Streams DSL 的两个 Kafka 主题的事件时间合并 [英] Event-Time merge of two Kafka topics using Kafka Streams DSL

查看:48
本文介绍了使用 Kafka Streams DSL 的两个 Kafka 主题的事件时间合并的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找一种根据事件时间合并两个 Kafka 主题的方法.

I am looking for a way to merge two Kafka topics based on the event time.

例如,我有两个主题具有以下架构 {event-key}:: {event-time-as-value}

for example, I have two topics with the following schema {event-key}:: {event-time-as-value}

topic I -  { {1 :: 12:00pm} {2 :: 12:10pm} {3 :: 14:50pm} {4 :: 15:00pm} }
topic II - { {1 :: 13:00pm} {2 :: 13:10pm} {3 :: 15:50pm} {4 :: 16:00pm} }

预期的输出应该是这样的:

The expected output should look like this:

{ {1 :: 12:00pm} {2 :: 12:10pm} {1 :: 13:00pm} {2 :: 13:10pm} {3 :: 14:50pm} {4 :: 15:00pm} {3 :: 15:50pm} {4 :: 16:00pm} }

有没有办法使用 Kafka Streams DSL 来做到这一点?

Is there a way to do it using Kafka Streams DSL?

注意:原始主题很有可能不是按事件时间排序的,这没关系.我希望算法始终选择当前位于每个主题开头的两个事件中最早的一个(与 合并两个排序数组算法有效)

A Note: There is a good chance that the original topics are not ordered by event-time, and it's ok. I would like the algorithm to always pick the earliest of the two events that are currently at the head of each topic (same as the way the merge two sorted arrays algorithm works)

推荐答案

Kafka Streams(从 2.1.0 版开始)实现了您描述的确切算法.因此,一个简单的:

Kafka Streams (as of version 2.1.0) implements the exact algorithm you describe. Hence, a simple:

StreamsBuilder builder = new StreamsBuilder();
builder
    .stream(Arrays.asList("firstInputTopic", "secondInputTopic"))
    .to("outputTopidName");

应该做你想做的.请注意,该程序将基于每个分区合并数据.

should do what you want. Note that the program will merge data on a per-partition bases.

还要考虑配置max.task.idle.ms.

更多细节阅读对应的KIP:https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization

For more details read the corresponding KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization

此外,您需要实现和配置一个自定义的TimestampExtractor,用于从值中获取时间戳.

Additionally, you need to implement and configure a custom TimestampExtractor that gets the timestamp from the value.

这篇关于使用 Kafka Streams DSL 的两个 Kafka 主题的事件时间合并的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆