Zookeeper 如何从 __consumer_offsets 主题中检索消费者偏移量? [英] How does Zookeeper retrive the consumer offsets from __consumer_offsets topic?

查看:60
本文介绍了Zookeeper 如何从 __consumer_offsets 主题中检索消费者偏移量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是Zookeeper 在哪里做的后续问题存储Kafka集群和相关信息?"基于 Armando Ballaci 提供的答案.

This is a followup question to "Where do zookeeper store Kafka cluster and related information?" based on the answer provided by Armando Ballaci.

现在很明显,消费者偏移量存储在 Kafka 集群中一个名为 __consumer_offsets 的特殊主题中.没关系,我只是想知道这些偏移量的检索是如何工作的.

Now it's clear that consumer offsets are stored in the Kafka cluster in a special topic called __consumer_offsets. That's fine, I am just wondering how does the retrieval of these offsets work.

Topic 不像 RDBS,我们可以基于某个谓词查询任意数据.例如 - 如果数据存储在 RDBMS 中,则可能像下面这样的查询将获取某个消费者组的特定消费者的主题特定分区的消费者偏移量.

Topics are not like RDBS over which we can query for arbitrary data based on a certain predicate. Ex - if the data is stored in an RDBMS, probably a query like below will get the consumer offset for a particular partition of a topic for a particular consumer of some consumer group.

select consumer_offset__read, consumer_offset__commited from consumer_offset_table where consumer-grp-id="x";和 partitionid="y"

但显然这种检索在 Kafka Topics 中是不可能的.那么topic的检索机制是如何工作的呢?有人能详细说明一下吗?

But clearly this kind of retrieval is not possible o.n Kafka Topics. So how does the retrieval mechanism from topic work? Could someone elaborate?

(来自 Kafka 分区的数据在 FIFO 中读取,如果遵循 Kafka 消费者模型来检索特定偏移量,则必须处理大量额外数据,而且速度会很慢.所以我想知道它是否在某些其他方式...)

(Data from Kafka partitions is read in FIFO, and if Kafka consumer model is followed to retrieve a particular offset, a lot of additional data has to be processed and it's going to be slow. So am wondering if it's done in some other way...)

推荐答案

当我在日常工作中偶然发现此问题时,我可以在网上找到的一些描述如下:

Some description I could find on web regarding the same when I stumbled upon this for my day job is as follows:

在 0.8.1.1 之前的 Kafka 版本中,消费者将他们的偏移量提交给 ZooKeeper.当有大量偏移量(即消费者计数 * 分区计数)时,ZooKeeper 不能很好地扩展(特别是对于写入).幸运的是,Kafka 现在提供了一种存储消费者偏移量的理想机制.消费者可以通过将它们写入持久(复制)和高度可用的主题来在 Kafka 中提交它们的偏移量.消费者可以通过读取该主题来获取偏移量(尽管我们提供了内存中偏移量缓存以加快访问速度).即,偏移量提交是常规的生产者请求(成本低廉),而偏移量获取是快速内存查找.

In Kafka releases through 0.8.1.1, consumers commit their offsets to ZooKeeper. ZooKeeper does not scale extremely well (especially for writes) when there are a large number of offsets (i.e., consumer-count * partition-count). Fortunately, Kafka now provides an ideal mechanism for storing consumer offsets. Consumers can commit their offsets in Kafka by writing them to a durable (replicated) and highly available topic. Consumers can fetch offsets by reading from this topic (although we provide an in-memory offsets cache for faster access). i.e., offset commits are regular producer requests (which are inexpensive) and offset fetches are fast memory look ups.

Kafka 官方文档描述了该功能的工作原理以及如何将偏移量从 ZooKeeper 迁移到 Kafka.此 wiki 提供了示例代码,展示了如何使用新的基于 Kafka 的偏移量存储机制.

The official Kafka documentation describes how the feature works and how to migrate offsets from ZooKeeper to Kafka. This wiki provides sample code that shows how to use the new Kafka-based offset storage mechanism.

try {
        BlockingChannel channel = new BlockingChannel("localhost", 9092,
                BlockingChannel.UseDefaultBufferSize(),
                BlockingChannel.UseDefaultBufferSize(),
                5000 /* read timeout in millis */);
        channel.connect();
        final String MY_GROUP = "demoGroup";
        final String MY_CLIENTID = "demoClientId";
        int correlationId = 0;
        final TopicAndPartition testPartition0 = new TopicAndPartition("demoTopic", 0);
        final TopicAndPartition testPartition1 = new TopicAndPartition("demoTopic", 1);
        channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
        ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
 
        if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
            Broker offsetManager = metadataResponse.coordinator();
            // if the coordinator is different, from the above channel's host then reconnect
            channel.disconnect();
            channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
                                          BlockingChannel.UseDefaultBufferSize(),
                                          BlockingChannel.UseDefaultBufferSize(),
                                          5000 /* read timeout in millis */);
            channel.connect();
        } else {
            // retry (after backoff)
        }
    }
    catch (IOException e) {
        // retry the query (after backoff)
    }

这篇关于Zookeeper 如何从 __consumer_offsets 主题中检索消费者偏移量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆