LibRdKafka:commit_offset始终为-1001 [英] LibRdKafka: commited_offset always at -1001

查看:218
本文介绍了LibRdKafka:commit_offset始终为-1001的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

运行我的消费者组时,我总是有以下统计信息:

When I run my consumer group I have always this kind of statistics:

2016-10-15 13:56:17.925: "STATS": { "name": "debian-meox#producer-2", "type": "producer", "ts":16768436761, "time":1476532577, "replyq":0, "msg_cnt":75428, "msg_size":29314007, "msg_max":100000, "msg_size_max":4096000000, "simple_cnt":0, "brokers":{ "localhost:9092/bootstrap": { "name":"localhost:9092/bootstrap", "nodeid":-1, "state":"UP", "stateage":4989561, "outbuf_cnt":0, "outbuf_msg_cnt":0, "waitresp_cnt":0, "waitresp_msg_cnt":0, "tx":1, "txbytes":41, "txerrs":0, "txretries":0, "req_timeouts":0, "rx":1, "rxbytes":157, "rxerrs":0, "rxcorriderrs":0, "rxpartial":0, "zbuf_grow":0, "buf_grow":0, "rtt": { "min":10064, "max":10064, "avg":10064, "sum":10064, "cnt":1 }, "throttle": { "min":0, "max":0, "avg":0, "sum":0, "cnt":0 }, "toppars":{ } } , "localhost:9093/bootstrap": { "name":"localhost:9093/bootstrap", "nodeid":-1, "state":"UP", "stateage":4989603, "outbuf_cnt":0, "outbuf_msg_cnt":0, "waitresp_cnt":0, "waitresp_msg_cnt":0, "tx":1, "txbytes":41, "txerrs":0, "txretries":0, "req_timeouts":0, "rx":1, "rxbytes":157, "rxerrs":0, "rxcorriderrs":0, "rxpartial":0, "zbuf_grow":0, "buf_grow":0, "rtt": { "min":10078, "max":10078, "avg":10078, "sum":10078, "cnt":1 }, "throttle": { "min":0, "max":0, "avg":0, "sum":0, "cnt":0 }, "toppars":{ } } , "localhost:9094/bootstrap": { "name":"localhost:9094/bootstrap", "nodeid":-1, "state":"UP", "stateage":4989343, "outbuf_cnt":0, "outbuf_msg_cnt":0, "waitresp_cnt":0, "waitresp_msg_cnt":0, "tx":1, "txbytes":41, "txerrs":0, "txretries":0, "req_timeouts":0, "rx":1, "rxbytes":157, "rxerrs":0, "rxcorriderrs":0, "rxpartial":0, "zbuf_grow":0, "buf_grow":0, "rtt": { "min":10075, "max":10075, "avg":10075, "sum":10075, "cnt":1 }, "throttle": { "min":0, "max":0, "avg":0, "sum":0, "cnt":0 }, "toppars":{ } } , "debian-meox:9094/2": { "name":"debian-meox:9094/2", "nodeid":2, "state":"UP", "stateage":4959041, "outbuf_cnt":0, "outbuf_msg_cnt":0, "waitresp_cnt":0, "waitresp_msg_cnt":0, "tx":1, "txbytes":41, "txerrs":0, "txretries":0, "req_timeouts":0, "rx":1, "rxbytes":157, "rxerrs":0, "rxcorriderrs":0, "rxpartial":0, "zbuf_grow":0, "buf_grow":0, "rtt": { "min":472, "max":472, "avg":472, "sum":472, "cnt":1 }, "throttle": { "min":0, "max":0, "avg":0, "sum":0, "cnt":0 }, "toppars":{ } } , "debian-meox:9093/1": { "name":"debian-meox:9093/1", "nodeid":1, "state":"UP", "stateage":4958754, "outbuf_cnt":28, "outbuf_msg_cnt":63922, "waitresp_cnt":3, "waitresp_msg_cnt":7488, "tx":1121, "txbytes":587723174, "txerrs":0, "txretries":0, "req_timeouts":0, "rx":633, "rxbytes":24173, "rxerrs":0, "rxcorriderrs":0, "rxpartial":0, "zbuf_grow":0, "buf_grow":0, "rtt": { "min":1248, "max":205146, "avg":22323, "sum":14130815, "cnt":633 }, "throttle": { "min":0, "max":0, "avg":0, "sum":0, "cnt":632 }, "toppars":{ "test_topic": { "topic":"test_topic", "partition":1} } } , "debian-meox:9092/0": { "name":"debian-meox:9092/0", "nodeid":0, "state":"UP", "stateage":4958760, "outbuf_cnt":0, "outbuf_msg_cnt":0, "waitresp_cnt":0, "waitresp_msg_cnt":0, "tx":1, "txbytes":41, "txerrs":0, "txretries":0, "req_timeouts":0, "rx":1, "rxbytes":157, "rxerrs":0, "rxcorriderrs":0, "rxpartial":0, "zbuf_grow":0, "buf_grow":0, "rtt": { "min":328, "max":328, "avg":328, "sum":328, "cnt":1 }, "throttle": { "min":0, "max":0, "avg":0, "sum":0, "cnt":0 }, "toppars":{ "test_topic": { "topic":"test_topic", "partition":0} } } }, "topics":{ "test_topic": { "topic":"test_topic", "metadata_age":4933, "partitions":{ "0": { "partition":0, "leader":0, "desired":false, "unknown":false, "msgq_cnt":0, "msgq_bytes":0, "xmit_msgq_cnt":0, "xmit_msgq_bytes":0, "fetchq_cnt":0, "fetchq_size":0, "fetch_state":"none", "query_offset":0, "next_offset":0, "app_offset":-1001, "stored_offset":-1001, "commited_offset":-1001, "committed_offset":-1001, "eof_offset":-1001, "lo_offset":-1001, "hi_offset":-1001, "consumer_lag":-1, "txmsgs":0, "txbytes":0, "msgs": 0, "rx_ver_drops": 0 } , "1": { "partition":1, "leader":1, "desired":false, "unknown":false, "msgq_cnt":1572, "msgq_bytes":840177, "xmit_msgq_cnt":2474, "xmit_msgq_bytes":1322717, "fetchq_cnt":0, "fetchq_size":0, "fetch_state":"none", "query_offset":0, "next_offset":0, "app_offset":-1001, "stored_offset":-1001, "commited_offset":-1001, "committed_offset":-1001, "eof_offset":-1001, "lo_offset":-1001, "hi_offset":-1001, "consumer_lag":-1, "txmsgs":1357083, "txbytes":613073020, "msgs": 1361129, "rx_ver_drops": 0 } , "-1": { "partition":-1, "leader":-1, "desired":false, "unknown":false, "msgq_cnt":0, "msgq_bytes":0, "xmit_msgq_cnt":0, "xmit_msgq_bytes":0, "fetchq_cnt":0, "fetchq_size":0, "fetch_state":"none", "query_offset":0, "next_offset":0, "app_offset":-1001, "stored_offset":-1001, "commited_offset":-1001, "committed_offset":-1001, "eof_offset":-1001, "lo_offset":-1001, "hi_offset":-1001, "consumer_lag":-1, "txmsgs":0, "txbytes":0, "msgs": 17604, "rx_ver_drops": 0 } } } } }

如您所见,所有偏移量都设置为-1001:

As you can see all offset are set to -1001:

"committed_offset":-1001,
"eof_offset":-1001,
"lo_offset":-1001,
"hi_offset":-1001,

这是我的设置(也用于default_topic):

This is my setup (also for default_topic):

"auto.commit.enable", "true"
"offset.store.method", "broker"

有时,即使主题中有很多消息,使用者组也无法获取它们. 有什么主意吗?

And somentimes, even if the there are a lot of messages in the topic, the consumer group is not able to fetch them. Any idea?

推荐答案

所有分区都位于"fetch_state": "none"中,这通常意味着它们不是assign():ed,因此不符合使用条件(也可能是存在没有领导者,但这里不是这种情况:"leader": 1).当从代理中获取消息并由您的应用程序使用消息时,仅更新并提交偏移量(如果enable.auto.commit为true(默认值)).

All the partitions are in "fetch_state": "none" which typically means they are not assign():ed and thus not eligible for consumption (it could also be that there is no leader for them, but that's not the case here: "leader": 1). The offsets are only updated, and committed (if enable.auto.commit is true (default)), when messages are fetched from the broker and consumed by your application.

您是否注册了RebalanceCb?如果是这样,您必须调用ERR__ASSIGN_PARTITIONS上的assign(partitions...)ERR__REVOKE_PARTITIONS上的unassign().

Did you register a RebalanceCb? If so you must call assign(partitions...) on ERR__ASSIGN_PARTITIONS and unassign() on ERR__REVOKE_PARTITIONS.

示例:

class ExampleRebalanceCb : public RdKafka::RebalanceCb {
public:
  void rebalance_cb (RdKafka::KafkaConsumer *consumer,
             RdKafka::ErrorCode err,
                     std::vector<RdKafka::TopicPartition*> &partitions) {
    if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
      consumer->assign(partitions);
    else
      consumer->unassign();
  }
};

完整示例: https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_consumer_example.cpp

这篇关于LibRdKafka:commit_offset始终为-1001的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆