为每个键使用来自主题的最新值 [英] Consume latest value from a topic for each keys

查看:24
本文介绍了为每个键使用来自主题的最新值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Kafka 生产者,它以高速率生成消息(消息键是让我们说用户名和值是他在游戏中的当前分数).Kafka 消费者处理消费消息的速度相对较慢.这里我的要求是显示最新的分数并避免显示陈旧的数据,权衡可能永远不会显示某些分数.

I have a Kafka producer which is producing messages at high rate (message key is let us say a username and value is his current score in a game). The Kafka consumer is relatively slow in processing the consumed messages. Here my requirement is to show most up-to-date score and avoid showing stale data, with the tradeoff that some scores may never be shown.

本质上对于每个用户名,我可能在同一个分区中有数百条消息,但我总是想阅读最新的一条.

Essentially for each of the username, I may have hundreds of messages in the same partition, but I always want to read the latest one.

一个粗略的解决方案是这样的:生产者在每条消息和实际值写入数据库时​​只发送一个键,该数据库与消费者共享.消费者从队列中读取每个键并从数据库中读取值.在这里,始终读取最新值的目标是通过生产者覆盖数据库中的值来实现的——因此实际上正在读取给定键的消费者实际上将使用最新值.但是这个解决方案由于大量的读取和更新(缓慢、竞争条件等)而有一些缺点

A crude solution which has been implemented was like this: The producer sends just a key as each message and actual value is written to a database, which is shared with the consumer. The consumer reads each key from the queue and value from the database. Here the goal to read always the latest value is achieved by producer overwriting the value in the database -- so consumer which is in fact reading a given key will actually consume the latest value. But this solution has some drawbacks due to high number of reads and updates (slow, race conditions etc.)

我正在 kafka 或 kafka 流中寻找一种更自然的方法来解决这个问题,我可以以某种方式定义从每个键的数据流中获取键的最新值.谢谢!

I am looking for a more natural way of solving this in kafka or kafka streams where I can somehow define get latest value for a key from the stream of data for each key. Thanks!

推荐答案

下面的代码有帮助

KStreamBuilder builder = new KStreamBuilder();
KTable<String, String> dataTable = builder.table("input-topic");
dataTable.toStream().foreach((key, message) -> client.post(message));
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

在实践中使这成为可能的是传入流的内存压缩(详细解释此处).我们可以使用参数 cache.max.bytes.bufferingcommit.interval.ms

What makes this possible in practice is in-memory compaction of incoming stream (details explained here). We could control the pressure using the parameters cache.max.bytes.buffering and commit.interval.ms

这篇关于为每个键使用来自主题的最新值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆