如何检测Spring Kafka用户何时停止从1个分区获取消息? [英] How to detect when a spring kafka consumer stops getting messages from 1 partition?

查看:95
本文介绍了如何检测Spring Kafka用户何时停止从1个分区获取消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有3个春季kafka消费者(同一组)从3个分区中获取消息.我想检测这些使用者之一何时停止从1个分区读取(其他2个使用者继续从其他2个分区读取).到目前为止,这种情况已经发生过两次,当被检测到时,很容易通过重新启动所有使用者来解决问题,从而引起重新平衡.问题是在这两种情况下都应该早点知道.所以我尝试像这样使用ListenerContainerIdleEvent-

I have have 3 spring kafka consumers (same group) getting messages from 3 partitions. I want to detect when one of these consumers stops reading from 1 partition (other 2 consumers continue reading from the other 2 partitions). This has happened twice so far and when detected, it is easy to fix by restarting all consumers which causes a re-balance. The problem is on both occasions it would have been good to know earlier. So I tried using ListenerContainerIdleEvent like so -

@EventListener
public void eventHandler(ListenerContainerIdleEvent event) {
    LOG.info("idle event fired! listnerId=" + event.getListenerId());

    Collection<org.apache.kafka.common.TopicPartition> partitions = event.getTopicPartitions();
    partitions.forEach(p ->
            LOG.info("partition: " + p.partition() + " topic:" + p.topic()));
}

这是我的测试结果-

1)1个消费者从1个分区中读取数据,此事件效果很好.

1) 1 consumer reading from 1 partition, this event works well.

2)1个消费者从3个分区中读取,此事件仅被调用什么时候所有3个分区上都没有消息.如果没有消息1个或2个分区,但第3个分区上有消息,这事件没有被调用.

2) 1 consumer reading from 3 partitions, this event gets called only when are no messages on all 3 partitions. If there are no messages on 1 or 2 partitions but there are messages on the 3rd partition, this event does not get called.

当使用者被分配到多个分区时,是否有一种方法可以从1个分区中读取消息(无论出于什么原因...消费者问题或无消息可从该分区读取)时得到通知?

Is there a way I can get notified when messages are not being read (for whatever reason...consumer issue or no messages available to be read from the partition) from 1 partition when a consumer is assigned to multiple partitions?

更新:2018年3月27日

我不确定是否应该问一个与此相关的新问题,因此请尝试首先扩展该问题.我有1个消费者在1个主题中使用3个分区.我已经设置了idleEventInterval = 30secs.每30秒,我会收到以下日志消息.

I am not sure if I should ask a new question related to this so trying to extend this question first. I have 1 consumer consuming from 1 topic with 3 partitions. I have set the idleEventInterval=30secs. Every 30 secs, I get the following log messages.

12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-30855毫秒内未收到任何消息12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-30845毫秒内未收到任何消息12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-分区:1主题:test-topic12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-分区:2主题:test-topic12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-分区:0 topic:test-topic12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-30855毫秒内未收到任何消息12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-30845毫秒内未收到任何消息12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-分区:1主题:test-topic12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-分区:2主题:test-topic12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-分区:0 topic:test-topic

12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 30855 milliseconds 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 30845 milliseconds 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 1 topic:test-topic 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 2 topic:test-topic 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 0 topic:test-topic 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 30855 milliseconds 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 30845 milliseconds 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 1 topic:test-topic 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 2 topic:test-topic 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 0 topic:test-topic

12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-60977毫秒内未收到任何消息12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-分区:1主题:test-topic12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-分区:0 topic:test-topic12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-60977毫秒内未收到任何消息12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-分区:1主题:test-topic12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-分区:0 topic:test-topic12:13:21.632 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-60975毫秒内未收到任何消息12:13:21.632 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-分区:2 topic:test-topic12:13:21.633 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-60975毫秒内未收到任何消息12:13:21.633 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler-分区:2 topic:test-topic

12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 60977 milliseconds 12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 1 topic:test-topic 12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 0 topic:test-topic 12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 60977 milliseconds 12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 1 topic:test-topic 12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 0 topic:test-topic 12:13:21.632 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 60975 milliseconds 12:13:21.632 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 2 topic:test-topic 12:13:21.633 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 60975 milliseconds 12:13:21.633 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 2 topic:test-topic

事件侦听器代码为-

  @EventListener
public void eventHandler(ListenerContainerIdleEvent event) {
    LOG.info("No messages received for " + event.getIdleTime() + " milliseconds");

Collection<org.apache.kafka.common.TopicPartition> partitions = event.getTopicPartitions();
partitions.forEach(p ->
    LOG.info("partition: " + p.partition() + " topic:" + p.topic()));

}

1)为什么此事件每30秒调用4次?

1) Why does this event get called 4 times every 30 sec?

2)为什么每组消息的分区信息不一致?有时没有分区信息,有时在同一集合中重复分区,等等.

2) Why is the partition information not consistent for every set of messages? Sometimes there is no partition information, sometimes partitions are repeated in the same set, etc.

推荐答案

...没有可用的消息可以从分区中读取

... no messages available to be read from the partition

如果并发为1并且有3个分区,则所有三个分区将由同一使用者处理.如果为消费者分配了一个以上的主题,并且在一段时间内未收到来自特定主题的消息,则该框架中没有任何事件可以发布事件.

If the concurrency is 1 and there are 3 partitions, all three partitions will be handled by the same consumer. There is nothing currently in the framework that will publish events if a consumer is assigned more that one topic and has not received a message from a particular topic for some period of time.

如果将容器并发性增加到3,则将有3个使用者-每个分区一个.如果活动空闲,该事件将由每个使用者发布.侦听器必须必须是线程安全的,因为将有3个线程(通常是并发调用).

If you increase the container concurrency to 3, you will have 3 consumers - one per partition. The event will be published by each consumer if it goes idle. The listener must be thread-safe since there will be 3 threads calling it and often concurrently.

很明显,如果您有大量分区,那么这种扩展就不会很好.

Obviously, this won't scale well if you have a large number of partitions.

出于任何原因...消费者问题...

for whatever reason...consumer issue...

如果某个主题中有消息并且该主题所分配给的使用者未接收到消息,而该使用者正在主动从其他分区接收消息,这将是很奇怪的.那将需要卡夫卡人的帮助.

It would be rather strange if there are messages in a topic and they are not received by the consumer to which that topic is assigned, yet that consumer is actively receiving messages from other partitions. That would require help from the kafka folks.

这篇关于如何检测Spring Kafka用户何时停止从1个分区获取消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆