Zookeeper如何从__consumer_offsets主题中检索消费者偏移量? [英] How does Zookeeper retrive the consumer offsets from __consumer_offsets topic?

查看:133
本文介绍了Zookeeper如何从__consumer_offsets主题中检索消费者偏移量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是对 zookeeper在哪里执行的后续问题存储Kafka群集和相关信息?根据Armando Ballaci提供的答案.

This is a followup question to "Where do zookeeper store Kafka cluster and related information?" based on the answer provided by Armando Ballaci.

现在,很明显,消费者偏移量存储在Kafka集群中的一个名为__consumer_offsets的特殊主题中.很好,我只是想知道如何获取这些偏移量.

Now it's clear that consumer offsets are stored in the Kafka cluster in a special topic called __consumer_offsets. That's fine, I am just wondering how does the retrieval of these offsets work.

主题与RDBS不同,RDBS可以根据特定谓词查询任意数据.例如-如果数据存储在RDBMS中,则类似如下的查询将为某个消费者组的特定消费者获取主题的特定分区的消费者偏移量.

Topics are not like RDBS over which we can query for arbitrary data based on a certain predicate. Ex - if the data is stored in an RDBMS, probably a query like below will get the consumer offset for a particular partition of a topic for a particular consumer of some consumer group.

select consumer_offset__read, consumer_offset__commited from consumer_offset_table where consumer-grp-id="x" and partitionid="y"

但是很明显,这种检索是不可能的.n.Kafka Topics.那么主题检索机制如何工作?有人可以详细说明吗?

But clearly this kind of retrieval is not possible o.n Kafka Topics. So how does the retrieval mechanism from topic work? Could someone elaborate?

(来自Kafka分区的数据是在FIFO中读取的,如果遵循Kafka消费者模型来检索特定的偏移量,则必须处理许多其他数据,而且速度会很慢.所以我想知道是否在某些情况下完成了数据其他方式...)

(Data from Kafka partitions is read in FIFO, and if Kafka consumer model is followed to retrieve a particular offset, a lot of additional data has to be processed and it's going to be slow. So am wondering if it's done in some other way...)

推荐答案

当我在日常工作中偶然发现此问题时,我可以在网上找到一些有关此问题的描述,如下所示:

Some description I could find on web regarding the same when I stumbled upon this for my day job is as follows:

在Kafka到0.8.1.1的发行版中,消费者将其偏移量提交给ZooKeeper.当存在大量偏移量(即消费者计数*分区计数)时,ZooKeeper的伸缩性不是很好(特别是写操作).幸运的是,Kafka现在提供了一种用于存储消费者偏移量的理想机制.消费者可以通过在Kafka中将其写入持久(复制)和高度可用的主题来实现其抵消.消费者可以通过阅读本主题来获取偏移量(尽管我们提供了内存中的偏移量缓存以加快访问速度).也就是说,偏移量提交是常规的生产者请求(价格便宜),而偏移量提取是快速的内存查找.

In Kafka releases through 0.8.1.1, consumers commit their offsets to ZooKeeper. ZooKeeper does not scale extremely well (especially for writes) when there are a large number of offsets (i.e., consumer-count * partition-count). Fortunately, Kafka now provides an ideal mechanism for storing consumer offsets. Consumers can commit their offsets in Kafka by writing them to a durable (replicated) and highly available topic. Consumers can fetch offsets by reading from this topic (although we provide an in-memory offsets cache for faster access). i.e., offset commits are regular producer requests (which are inexpensive) and offset fetches are fast memory look ups.

Kafka官方文档描述了该功能的工作原理以及如何将偏移量从ZooKeeper迁移到Kafka.该Wiki提供了示例代码,展示了如何使用新的基于Kafka的偏移量存储机制.

The official Kafka documentation describes how the feature works and how to migrate offsets from ZooKeeper to Kafka. This wiki provides sample code that shows how to use the new Kafka-based offset storage mechanism.

try {
        BlockingChannel channel = new BlockingChannel("localhost", 9092,
                BlockingChannel.UseDefaultBufferSize(),
                BlockingChannel.UseDefaultBufferSize(),
                5000 /* read timeout in millis */);
        channel.connect();
        final String MY_GROUP = "demoGroup";
        final String MY_CLIENTID = "demoClientId";
        int correlationId = 0;
        final TopicAndPartition testPartition0 = new TopicAndPartition("demoTopic", 0);
        final TopicAndPartition testPartition1 = new TopicAndPartition("demoTopic", 1);
        channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
        ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
 
        if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
            Broker offsetManager = metadataResponse.coordinator();
            // if the coordinator is different, from the above channel's host then reconnect
            channel.disconnect();
            channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
                                          BlockingChannel.UseDefaultBufferSize(),
                                          BlockingChannel.UseDefaultBufferSize(),
                                          5000 /* read timeout in millis */);
            channel.connect();
        } else {
            // retry (after backoff)
        }
    }
    catch (IOException e) {
        // retry the query (after backoff)
    }

这篇关于Zookeeper如何从__consumer_offsets主题中检索消费者偏移量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆