如何处理一批最大大小的KStream或回退到时间窗口? [英] How to process a KStream in a batch of max size or fallback to a time window?

查看:105
本文介绍了如何处理一批最大大小的KStream或回退到时间窗口?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想创建一个基于Kafka流的应用程序,该应用程序处理一个主题并以X大小(即50)的批处理来接收消息,但是如果流的流量很低,请给我Y秒钟之内的任何流(即5).

I would like to create a Kafka stream-based application that processes a topic and takes messages in batches of size X (i.e. 50) but if the stream has low flow, to give me whatever the stream has within Y seconds (i.e. 5).

因此,我不处理一个消息,而是处理一个List[Record],其中列表的大小为50(或更小).

So, instead of processing messages one by one, I process a List[Record] where the size of the list is 50 (or maybe less).

这是为了使某些I/O绑定处理更加有效.

This is to make some I/O bound processing more efficient.

我知道可以使用经典的Kafka API来实现,但是我一直在寻找一种基于流的实现,该实现还可以在考虑到错误/失败的情况下本地处理偏移量提交. 我找不到任何与他的文档相关的信息,也没有找到相关的信息,并且想知道是否有人可以解决此问题.

I know that this can be implemented with the classic Kafka API but was looking for a stream-based implementation that can also handle offset committing natively, taking errors/failures into account. I couldn't find anything related int he docs or by searching around and was wondering if anyone has a solution to this problem.

推荐答案

最简单的方法可能是使用有状态的transform()操作.每次收到记录时,都会将其放入存储中.收到50条记录后,就可以进行处理,发出输出并从商店中删除记录.

The simplest way might be, to use a stateful transform() operation. Each time you receive a record, you put it into the store. When you have received 50 records, you do your processing, emit output, and delete the records from the store.

要在一定时间内未读取限制的情况下强制执行处理,可以注册挂钟标点符号.

To enforce processing if you don't read the limit in a certain amount of time, you can register a wall-clock punctuation.

这篇关于如何处理一批最大大小的KStream或回退到时间窗口?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆