来自 kafka 消费者的 InstanceAlreadyExistsException [英] InstanceAlreadyExistsException coming from kafka consumer

查看:145
本文介绍了来自 kafka 消费者的 InstanceAlreadyExistsException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在与 Kafka 合作并尝试通过遵循此文章.唯一的区别是我创建了自己的抽象类处理程序以简化设计.

I am working with Kafka and trying to setup consumer group by follwing this article. The only difference is I have created my own abstract class, handler to make design simpler.

下面是我的抽象类:

public abstract class Consumer implements Runnable {
  private final Properties consumerProps;
  private final String consumerName;

  public Consumer(String consumerName, Properties consumerProps) {
    this.consumerName = consumerName;
    this.consumerProps = consumerProps;
  }

  protected abstract void shutdown();

  protected abstract void run(String consumerName, Properties consumerProps);

  @Override
  public final void run() {
    run(consumerName, consumerProps);
  }
}

下面是我的 KafkaConsumerA,它扩展到抽象类之上:

Below is my KafkaConsumerA which extends above abstract class:

public class KafkaConsumerA extends Consumer {
  private KafkaConsumer<byte[], DataHolder> consumer;

  public KafkaConsumerA(String consumerName, Properties consumerProps) {
    super(consumerName, consumerProps);
  }

  @Override
  public void shutdown() {
    consumer.wakeup();
  }

  @Override
  protected void run(String consumerName, Properties consumerProps) {
    // exception comes from below line from two of the threads and the remaining one thread works fine.
    consumer = new KafkaConsumer<>(consumerProps);
    List<String> topics = getTopicsBasisOnConsumerName(consumerName);
    try {
      consumer.subscribe(topics);
      // Setup the schema config
      Map<String, Object> config = new HashMap<>();
      config.put("urls", "https://abc.qa.host.com");

      GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);
      while (true) {
        ConsumerRecords<byte[], DataHolder> records = consumer.poll(200);
        for (ConsumerRecord<byte[], DataHolder> record : records) {
          Map<String, Object> data = new HashMap<>();
          data.put("partition", record.partition());
          data.put("offset", record.offset());
          data.put("value", record.value());
          System.out
              .println((Thread.currentThread().getId() % 3) + 1 + ": " + decoder.decode(record.value()));
        }
      }
    } catch (WakeupException ex) {
      ex.printStackTrace();
    } catch (Exception ex) {
      ex.printStackTrace();
    } finally {
      consumer.close();
    }
  }
}

下面是我的处理程序类:

And below is my Handler class:

// looks like something is wrong in this class
public final class ConsumerHandler {
  private final ExecutorService executorServiceProcess;
  private final Consumer consumer;
  private final List<Consumer> consumers = new ArrayList<>();

  public ConsumerHandler(Consumer consumer, int poolSize) {
    this.executorServiceProcess = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      consumers.add(consumer);
      executorServiceProcess.submit(consumer);
    }
  }

  public void shutdown() {
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        for (Consumer consumer : consumers) {
          consumer.shutdown();
        }
        executorServiceProcess.shutdown();
        try {
          executorServiceProcess.awaitTermination(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
          Thread.currentThread().interrupt();
        }
      }
    });
  }
}

这里是我从主类开始消费组中的所有消费者:

And here is I am starting all my consumers in the consumer group from the main class:

  public static void main(String[] args) {
    ConsumerHandler handlerA =
        new ConsumerHandler(new KafkaConsumerA("KafkaConsumerA", getConsumerProps()), 3);
    // run KafkaConsumerB here

     handlerA.shutdown();
     // shutdown KafkaConsumerB here
  }

因此 - 我的计划是在 KafkaConsumerA 中建立一个包含三个消费者的消费者组,并且所有三个消费者都订阅了相同的主题.

So with this - my plan is to setup a consumer group with three consumers in KafkaConsumerA and all three subscribed to same topics.

错误:-

每当我运行它时,看起来消费者组中只有一个消费者有效,其他两个无效.我在控制台上看到了这两个异常:

Whenever I run this, looks like only one consumer in the consumer group works and other two doesn't work. And I see this exception on the console from those two:

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=a97716e0-0e05-4938-8fa1-6b872cf24e34
    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.7.0_79]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.7.0_79]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.7.0_79]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.7.0_79]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.7.0_79]
    at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.7.0_79]
    at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58) ~[kafka-clients-0.10.0.0-SASL.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694) [kafka-clients-0.10.0.0-SASL.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587) [kafka-clients-0.10.0.0-SASL.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569) [kafka-clients-0.10.0.0-SASL.jar:na]

我在这里做错了什么?getConsumerProps() 方法返回具有 client.idgroup.id 的属性对象,该对象对于该消费者组中的所有三个消费者具有相同的值.

What is wrong I am doing here? getConsumerProps() method return properties object which has client.id and group.id in it with same value for all three consumers in that consumer group.

以下是我的设计细节:

  • 我的 KafkaConsumerA 将在一个消费者组中有三个消费者,每个消费者都将在 topicA 上工作.
  • 我的 KafkaConsumerB(类似于 KafkaConsumerA)将在不同的消费者组中有两个消费者,每个消费者都将在 topicB 上工作.
  • My KafkaConsumerA will have three consumers in a consumer group and each consumer will work on topicA.
  • My KafkaConsumerB (similar to KafkaConsumerA) will have two consumers in a different consumer group and each of those consumer will work on topicB.

并且这两个消费者KafkaConsumerAKafkaConsumerB将运行在同一个盒子上,不同的消费者组彼此独立.

And these two consumers KafkaConsumerA and KafkaConsumerB will be running on same box with different consumer group independent of each other.

推荐答案

Kafka 正在尝试注册 MBeans 用于应用程序监控,并使用 client.id 来执行此操作.正如您所说,您在抽象类中注入了属性,并在 Aclient.id 和 group.id>.但是,您有不同的客户端,因此您应该给它们自己的 client.id,但保持相同的 group.id.这将在同一消费者组中注册不同的客户端/消费者并使它们协同工作,但不会在 MBean 注册上发生冲突.

Kafka is trying to register MBeans for application monitoring and is using the client.id to do so. As you said, you have the properties injected in your abstract class and inject for every consumer the same client.id and group.id in group A. However, you have different clients, so you should give them their own client.id, but keep the same group.id. This will register the different client/consumers in the same consumer group and make them work together, but not clash on the MBeans registration.

这篇关于来自 kafka 消费者的 InstanceAlreadyExistsException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆