Kafka Streams API:KStream 到 KTable [英] Kafka Streams API: KStream to KTable

查看:22
本文介绍了Kafka Streams API:KStream 到 KTable的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Kafka 主题,用于发送位置事件(key=user_id,value=user_location).我能够将其作为 KStream 读取和处理:

KStreamBuilder builder = new KStreamBuilder();KStream<字符串,位置>位置 = 建造者.stream("location_topic").map((k, v) -> {//这里有一些处理,省略形式清晰位置 location = new Location(lat, lon);返回新的 KeyValue(k,位置);});

这很好用,但我想要一个 KTable 与每个用户的最后一个已知位置.我怎么办?

我能够读写中间主题:

//写入中间主题location.to(Serdes.String(), new LocationSerde(), "location_topic_aux");//从中间主题构建 KTableKTable<字符串,位置>table = builder.table("location_topic_aux", "store");

是否有一种简单的方法可以从 KStream 获取 KTable?这是我第一个使用 Kafka Streams 的应用程序,所以我可能遗漏了一些明显的东西.

解决方案

更新:

在 Kafka 2.5 中,将添加一个新方法 KStream#toTable(),它将提供一种方便的方法将 KStream 转换为 KTable.详情见:https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL

原答案:

目前没有直接的方法来做到这一点.如 Confluent 常见问题解答中所述,您的方法绝对有效:http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

<块引用>

这是关于代码的最简单的方法.但是,它的缺点是 (a) 您需要管理一个额外的主题,并且 (b) 它会导致额外的网络流量,因为数据是从 Kafka 写入和重新读取的.

还有一种选择,使用dummy-reduce":

KStreamBuilder builder = new KStreamBuilder();KStream流 = ...;//创建派生 KStream 的一些计算KTabletable = stream.groupByKey().reduce(new Reducer() {@覆盖公共长申请(Long aggValue,Long newValue){返回新值;}},虚拟聚合存储");

<块引用>

与选项 1 相比,这种方法在代码方面稍微复杂一些,但具有以下优点:(a) 不需要手动主题管理,(b) 不需要从 Kafka 重新读取数据.

总的来说,你需要自己决定,你更喜欢哪种方法:

<块引用>

在选项 2 中,Kafka Streams 将创建一个内部更改日志主题来备份 KTable 以实现容错.因此,这两种方法都需要在 Kafka 中进行一些额外的存储,并导致额外的网络流量.总的来说,这是选项 2 中稍微复杂的代码与选项 1 中的手动主题管理之间的权衡.

I have a Kafka topic where I send location events (key=user_id, value=user_location). I am able to read and process it as a KStream:

KStreamBuilder builder = new KStreamBuilder();

KStream<String, Location> locations = builder
        .stream("location_topic")
        .map((k, v) -> {
            // some processing here, omitted form clarity
            Location location = new Location(lat, lon);
            return new KeyValue<>(k, location);
        });

That works well, but I'd like to have a KTable with the last known position of each user. How could I do it?

I am able to do it writing to and reading from an intermediate topic:

// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");

// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");

Is there a simple way to obtain a KTable from a KStream? This is my first app using Kafka Streams, so I'm probably missing something obvious.

解决方案

Update:

In Kafka 2.5, a new method KStream#toTable() will be added, that will provide a convenient way to transform a KStream into a KTable. For details see: https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL

Original Answer:

There is not straight forward way at the moment to do this. Your approach is absolutely valid as discussed in Confluent FAQs: http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

This is the simplest approach with regard to the code. However, it has the disadvantages that (a) you need to manage an additional topic and that (b) it results in additional network traffic because data is written to and re-read from Kafka.

There is one alternative, using a "dummy-reduce":

KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> stream = ...; // some computation that creates the derived KStream

KTable<String, Long> table = stream.groupByKey().reduce(
    new Reducer<Long>() {
        @Override
        public Long apply(Long aggValue, Long newValue) {
            return newValue;
        }
    },
    "dummy-aggregation-store");

This approach is somewhat more complex with regard to the code compared to option 1 but has the advantage that (a) no manual topic management is required and (b) re-reading the data from Kafka is not necessary.

Overall, you need to decide by yourself, which approach you like better:

In option 2, Kafka Streams will create an internal changelog topic to back up the KTable for fault tolerance. Thus, both approaches require some additional storage in Kafka and result in additional network traffic. Overall, it’s a trade-off between slightly more complex code in option 2 versus manual topic management in option 1.

这篇关于Kafka Streams API:KStream 到 KTable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆