为每个键消耗主题的最新值 [英] Consume latest value from a topic for each keys

查看:36
本文介绍了为每个键消耗主题的最新值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Kafka制作人,该制作人以高速率制作消息(消息键是一个用户名,值是他在游戏中的当前得分). Kafka使用者在处理消耗的消息方面相对较慢.在这里,我的要求是显示最新分数,并避免显示陈旧数据,但要权衡某些分数可能永远不会显示.

I have a Kafka producer which is producing messages at high rate (message key is let us say a username and value is his current score in a game). The Kafka consumer is relatively slow in processing the consumed messages. Here my requirement is to show most up-to-date score and avoid showing stale data, with the tradeoff that some scores may never be shown.

从本质上讲,对于每个用户名,我可能在同一分区中有数百条消息,但是我一直想阅读最新的消息.

Essentially for each of the username, I may have hundreds of messages in the same partition, but I always want to read the latest one.

已实施的粗略解决方案如下:当每个消息和实际值写入数据库时​​,生产者仅发送一个密钥,并与消费者共享.使用者从队列中读取每个密钥,并从数据库中读取值.在这里,始终读取最新值的目标是通过生产者覆盖数据库中的值来实现的,因此实际上正在读取给定密钥的消费者实际上将消耗最新值.但是由于大量读取和更新(速度慢,竞争条件等),该解决方案存在一些缺陷

A crude solution which has been implemented was like this: The producer sends just a key as each message and actual value is written to a database, which is shared with the consumer. The consumer reads each key from the queue and value from the database. Here the goal to read always the latest value is achieved by producer overwriting the value in the database -- so consumer which is in fact reading a given key will actually consume the latest value. But this solution has some drawbacks due to high number of reads and updates (slow, race conditions etc.)

我正在寻找一种更自然的方法来解决kafka或kafka流中的问题,我可以以某种方式定义从每个密钥的数据流中获取密钥的最新值.谢谢!

I am looking for a more natural way of solving this in kafka or kafka streams where I can somehow define get latest value for a key from the stream of data for each key. Thanks!

推荐答案

下面的代码有所帮助

KStreamBuilder builder = new KStreamBuilder();
KTable<String, String> dataTable = builder.table("input-topic");
dataTable.toStream().foreach((key, message) -> client.post(message));
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

在实践中使之成为可能的是传入流的内存压缩(详细说明此处).我们可以使用参数cache.max.bytes.bufferingcommit.interval.ms

What makes this possible in practice is in-memory compaction of incoming stream (details explained here). We could control the pressure using the parameters cache.max.bytes.buffering and commit.interval.ms

这篇关于为每个键消耗主题的最新值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆