Kafka 消费者(0.8.2.2)可以批量读取消息吗 [英] Can a Kafka consumer(0.8.2.2) read messages in batch

查看:80
本文介绍了Kafka 消费者(0.8.2.2)可以批量读取消息吗的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据我的理解,Kafka 消费者按顺序从指定的分区读取消息......

As per my understanding Kafka consumer reads messages from an assigned partition sequentially...

我们计划有多个 Kafka 消费者(Java),它们具有相同的组我会..所以如果它从指定的分区顺序读取,那么我们如何实现高吞吐量..i.e.例如,生产者每秒发布 40 条消息......消费者进程 msg 1 每秒 ..虽然我们可以有多个消费者但不能有 40 rt???如果我错了,请纠正我...

We are planning to have multiple Kafka consumer (Java) which has same group I'd ..so if it reads sequentially from an assigned partition then how we can achieve high throughput ..i.e. For Example Producer publishes messages like 40 per sec ... Consumer process msg 1 per sec ..though we can have multiple consumers but cannot have 40 rt??? Correct me if I'm wrong...

在我们的例子中,消费者只有在成功处理消息后才必须提交偏移量..否则消息将被重新处理......有没有更好的解决方案???

And in our case consumer have to commit offset only after message is processed successfully ..else message will be reprocessed... Is there any better solution???

推荐答案

根据您的问题澄清.

一个 Kafka Consumer 可以一次读取多条消息.但是 Kafka 消费者并没有真正读取消息,更正确的说法是消费者读取一定数量的字节,然后根据单个消息的大小来确定将读取多少消息.阅读Kafka Consumer Configs,您不能指定要获取多少消息,您指定消费者可以获取的最大/最小数据大小.无论在该范围内有多少消息,您都会收到多少消息.正如您所指出的,您将始终按顺序收到消息.

A Kafka Consumer can read multiple messages at a time. But a Kafka Consumer doesn't really read messages, its more correct to say a Consumer reads a certain number of bytes and then based on the size of the individual messages, that determines how many messages will be read. Reading through the Kafka Consumer Configs, you're not allowed to specify how many messages to fetch, you specify a max/min data size that a consumer can fetch. However many messages fit inside that range is how many you will get. You will always get messages sequentially as you have pointed out.

相关消费者配置(适用于 0.9.0.0 及更高版本)

  • fetch.min.bytes
  • max.partition.fetch.bytes

更新

在评论中使用您的示例,我的理解是,如果我在配置中指定读取 10 个字节,并且如果每条消息是 2 个字节,则消费者一次读取 5 条消息."那是真实的.您的下一个陈述,这意味着这 5 条消息的偏移量在分区中是随机的",这是错误的.顺序读取并不意味着一个一个,它只是意味着它们保持有序.您可以批量处理项目并使它们保持顺序/有序.以下面的例子为例.

Using your example in the comments, "my understanding is if i specify in config to read 10 bytes and if each message is 2 bytes the consumer reads 5 messages at a time." That is true. Your next statement, "that means the offsets of these 5 messages were random with in partition" that is false. Reading sequential doesn't mean one by one, it just means that they remain ordered. You are able to batch items and have them remain sequential/ordered. Take the following examples.

在 Kafka 日志中,如果有 10 条消息(每条 2 字节)具有以下偏移量,则为 [0,1,2,3,4,5,6,7,8,9].

In a Kafka log, if there are 10 messages (each 2 bytes) with the following offsets, [0,1,2,3,4,5,6,7,8,9].

如果您读取 10 个字节,您将得到一批包含偏移量 [0,1,2,3,4] 处的消息.

If you read 10 bytes, you'll get a batch containing the messages at offsets [0,1,2,3,4].

如果您读取 6 个字节,您将得到一批包含偏移量 [0,1,2] 处的消息.

If you read 6 bytes, you'll get a batch containing the messages at offsets [0,1,2].

如果您读取 6 个字节,然后再读取 6 个字节,您将得到两批包含消息 [0,1,2] 和 [3,4,5].

If you read 6 bytes, then another 6 bytes, you'll get two batches containing the messages [0,1,2] and [3,4,5].

如果您读取 8 个字节,然后读取 4 个字节,您将得到两批包含消息 [0,1,2,3] 和 [4,5].

If you read 8 bytes, then 4 bytes, you'll get two batches containing the messages [0,1,2,3] and [4,5].

更新:澄清提交

我不是 100% 确定提交是如何工作的,我主要在 Storm 环境中使用 Kafka.提供的 KafkaSpout 自动提交 Kafka 消息.

I'm not 100% sure how committing works, I've mainly worked with Kafka from a Storm environment. The provided KafkaSpout automatically commits Kafka messages.

但是查看 0.9.0.1 Consumer APIs,我建议你这样做.似乎有三种方法与本次讨论特别相关.

But looking through the 0.9.0.1 Consumer APIs, which I would recommend you do to. There seems to be three methods in particular that are relevant to this discussion.

  • 轮询(长时间超时)
  • commitSync()
  • commitSync(java.util.Map 偏移量)

poll 方法检索消息,可能只有 1 条,也可能是 20 条,例如,假设返回了 3 条消息 [0,1,2].您现在拥有这三个消息.现在由您决定如何处理它们.你可以处理它们 0 => 1 => 2, 1 => 0 => 2, 2 => 0 => 1,这取决于.无论您如何处理它们,在处理之后您都需要提交,这会告诉 Kafka 服务器您已完成这些消息的处理.

The poll method retrieves messages, could be only 1, could be 20, for your example lets say 3 messages were returned [0,1,2]. You now have those three messages. Now it's up you to determine how to process them. You could process them 0 => 1 => 2, 1 => 0 => 2, 2 => 0 => 1, it just depends. However you process them, after processing you'll want to commit which tells the Kafka server you're done with those messages.

使用 commitSync() 提交上次轮询返回的所有内容,在这种情况下,它将提交偏移量 [0,1,2].

Using the commitSync() commits everything returned on last poll, in this case it would commit offsets [0,1,2].

另一方面,如果您选择使用 commitSync(java.util.Map offsets),您可以手动指定要提交的偏移量.如果你按顺序处理它们,你可以处理偏移量0然后提交,处理偏移量1然后提交,最后处理偏移量2并提交.

On the other hand, if you choose to use commitSync(java.util.Map offsets), you can manually specify which offsets to commit. If you're processing them in order, you can process offset 0 then commit it, process offset 1 then commit it, finally process offset 2 and commit.

总而言之,Kafka 让您可以按照自己的意愿自由地处理消息,您可以选择按顺序或完全随机地处理它们.

All in all, Kafka gives you the freedom to process messages how to desire, you can choose to process them sequentially or entirely random at your choosing.

这篇关于Kafka 消费者(0.8.2.2)可以批量读取消息吗的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆