无法从 kafka 主题轮询/获取所有记录 [英] Not able to poll / fetch all records from kafka topic

查看:37
本文介绍了无法从 kafka 主题轮询/获取所有记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从特定主题中轮询数据,例如 kafka 正在接收 100 条记录/秒但大多数时候它不会获取所有记录.我使用超时为 5000 毫秒,我每隔 100ms 调用此方法注意:我也订阅了特定主题

I am trying to poll data from a specific topic like kafka is receiving 100 records/s but most of the time it does not fetch all records. I am using timeout as 5000ms and I am calling this method every 100ms Note : I am subscribing to the specific topic too

@Scheduled(fixedDelayString = "100")

    public void pollRecords() {
        ConsumerRecords<String, String> records = 
        leadConsumer.poll("5000");

如何从 kafka 中获取所有数据?

How can I fetch all the data from kafka ?

推荐答案

从 poll() 返回的最大记录数由 max.poll.records 消费者配置参数指定.(默认为 500)此外,还有另一个消费者配置参数限制从服务器返回的最大数据量.(fetch.max.bytesmax.partition.fetch.bytes)

Maximum number of records returned from poll() is specified with max.poll.records consumer config parameter. (default is 500) Also, there are another consumer config parameters which limits the maximum amount of data returned from server. (fetch.max.bytes and max.partition.fetch.bytes)

另一方面,在代理端还有另一个大小限制,称为message.max.bytes.

On the other hand, on broker side there is another size limit which is called message.max.bytes.

因此您应该正确设置这些参数以获取更多消息.

So you should set these parameters properly to get more messages.

来自 Kafka 文档(link):

From Kafka docs (link):

ma​​x.poll.records: 单次返回的最大记录数调用 poll().(默认:500)

max.poll.records: The maximum number of records returned in a single call to poll(). (default: 500)

fetch.max.bytes:服务器应为获取请求返回的最大数据量.记录由分批获取消费者,如果第一个记录批次中的第一个非空fetch 的分区大于此值,记录批处理仍将被退回,以确保消费者能够取得进展.因此,这不是绝对最大值.最大记录批量大小经纪人接受是通过 message.max.bytes 定义的(经纪人config) 或 max.message.bytes (主题配置).请注意,消费者并行执行多个提取.(默认:52428800)

fetch.max.bytes: The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel. (default:52428800)

message.max.bytes: Kafka 允许的最大记录批量大小.如果增加这个值并且有大于 0.10.2 的消费者,则消费者的提取大小也必须增加,以便他们可以获取这么大的记录批次.在最新的消息格式版本中,为提高效率,记录总是分批进行.在之前消息格式版本,未压缩的记录不分组批次,此限制仅适用于该记录中的单个记录case.This 可以通过主题级别 max.message.bytes 为每个主题设置配置.(默认:1000012)

message.max.bytes: The largest record batch size allowed by Kafka. If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that the they can fetch record batches this large. In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case.This can be set per topic with the topic level max.message.bytes config. (default: 1000012)

ma​​x.partition.fetch.bytes:服务器将返回的每个分区的最大数据量.批量获取记录由消费者.如果第一个记录批次中的第一个非空fetch 的分区大于此限制,批处理仍将被退回,以确保消费者能够取得进步.最大值代理接受的记录批量大小是通过定义的message.max.bytes(代理配置)或 max.message.bytes(主题配置).请参阅 fetch.max.bytes 以限制使用者请求大小.(默认:1048576)

max.partition.fetch.bytes: The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). See fetch.max.bytes for limiting the consumer request size. (default: 1048576)

这篇关于无法从 kafka 主题轮询/获取所有记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆