如何在同一个盒子上独立运行多个kafka消费者? [英] How to run multiple kafka consumers on the same box independent of each other?

查看:384
本文介绍了如何在同一个盒子上独立运行多个kafka消费者?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个Kafka消费者 ConsumerA ConsumerB 。我想在相同的机器上运行这两个kafka消费者彼此独立。他们之间根本没有关系。

I have two Kafka consumer ConsumerA and ConsumerB. I want to run these two kafka consumers independent of each other on the same machine. There is no relation between them at all. These two kafka consumer will work on different topics on the same machine.


  • 每个消费者都应该有一个不同的Properties对象。

  • 每个消费者应该有一个不同的线程池配置,因为如果需要独立于其他消费者,它们可以以多线程方式运行(消费者组)。

以下是我的设计:

消费者类(抽象):

 public abstract class Consumer implements Runnable {
    private final Properties consumerProps;
    private final String consumerName;

    public Consumer(String consumerName, Properties consumerProps) {
        this.consumerName = consumerName;
        this.consumerProps = consumerProps;
    }

    protected abstract void shutdown();
    protected abstract void run(String consumerName, Properties consumerProps);

    @Override
    public final void run() {
        run(consumerName, consumerProps);
    }
}

ConsumerA类: / p>

ConsumerA class:

public class ConsumerA extends Consumer {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private KafkaConsumer<byte[], byte[]> consumer;

    public ConsumerA(String consumerName, Properties consumerProps) {
        super(consumerName, consumerProps);
    }

    @Override
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }

    @Override
    protected void run(String consumerName, Properties consumerProps) {
        consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(getTopicsBasisOnConsumerName());

        Map<String, Object> config = new HashMap<>();
        config.put(Config.URLS, TEST_URL);
        GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);

        try {
            while (!closed.get()) {
                ConsumerRecords<byte[], byte[]> records = consumer.poll(Long.MAX_VALUE);
                for (ConsumerRecord<byte[], byte[]> record : records) {
                    GenericRecord payload = decoder.decode(record.value());
                    // extract data from payload
                    System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
                                      record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
                consumer.commitAsync();
            }
        } catch (WakeupException ex) {
            // Ignore exception if closing
            System.out.println("error= ", ex);
            if (!closed.get()) throw e;             
        } catch (Exception ex) {
            System.out.println("error= ", ex);      
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
    }
}

strong> ConsumerA B类:

ConsumerA B class:

// similar to `ConsumerA` but with specific details of B

ConsumerHandler类:

public final class ConsumerHandler {
  private final ExecutorService executorServiceConsumer;
  private final Consumer consumer;
  private final List<Consumer> consumers = new ArrayList<>();

  public ConsumerHandler(Consumer consumer, int poolSize) {
    this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      this.consumers.add(consumer);
      executorServiceConsumer.submit(consumer);
    }
 }
  public void shutdown() {
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        for (Consumer consumer : consumers) {
          consumer.shutdown();
        }
        executorServiceConsumer.shutdown();
        try {
          executorServiceConsumer.awaitTermination(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
          Thread.currentThread().interrupt();
        }
      }
    });
  }
}

下面是我的一个项目的主要课程如果我启动我的服务器,电话会自动先来,并从这个地方开始我的所有kafka消费者,我执行我的 ConsumerA ConsumerB 。一旦关闭被调用,我通过在所有Kafka消费者上调用shutdown关闭所有资源。

Below is my main class in one of my project where if I start my server, calls will come first automatically and from this place I start my all kafka consumers where I execute my ConsumerA and ConsumerB. And as soon as shutdown is called, I release all the resources by calling shutdown on all my Kafka consumers.

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Singleton;

@Singleton
@DependencyInjectionInitializer
public class Initializer {
  private ConsumerHandler consumerHandlerA;
  private ConsumerHandler consumerHandlerB;

  @PostConstruct
  public void init() {
    consumerHandlerA = new ConsumerHandler (new ConsumerA("consumerA", getConsumerPropsA()), 3);
    consumerHandlerB = new ConsumerHandler (new ConsumerB("consumerB", getConsumerPropsB()), 3);
  }

  @PreDestroy
  public void shutdown() {
    consumerHandlerA.shutdown();
    consumerHandlerB.shutdown();
  }
}

这是这种问题的正确设计我想在同一个盒子上运行多个kafka消费者?让我知道如果有更好和更有效的方式来解决这个问题。一般来说,我将在同一个盒子上运行三到四个卡夫卡消费者,每个消费者可以根据需要拥有自己的消费群体。

Is this the right design for this kind of problem where I want to run multiple kafka consumers on the same box? Let me know if there is any better and efficient way to solve this problem. In general I will be running three or four Kafka consumers max on the same box and each consumer can have their own consumer group if needed.

这是Javadoc for KafkaConsumer 消费者。并且基于这个 article 我创建了我的消费者,只是我使用抽象类来扩展它。

Here is the Javadoc for KafkaConsumer which I am using in both my consumer. And basis on this article I have created my consumer, it is just that I have used abstract class to extend it. Search for "Putting it all Together" in that link.

在文档中,提到消费者不是线程安全的,但它看起来像我的代码对池中的每个线程重用相同的消费者实例。

In the docs it is mentioned that Consumers are not Thread-Safe but it looks like my code is reusing the same consumer instance for each thread in the pool.

public ConsumerHandler(Consumer consumer, int poolSize) {
    this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      this.consumers.add(consumer);
      executorServiceConsumer.submit(consumer);
    }
 }

解决此线程安全问题的最佳方法是什么仍然实现相同的功能?

What is the best way to solve this thread safety issue and still achieve the same features?

推荐答案

一个快速的建议,如果你已经知道了,那么道歉。类级别变量从不线程安全。如果您需要为每个线程使用不同的Properties对象,则可以在方法级别更好地声明它们,并将它们作为您需要访问Properties对象的其他方法的参数。

A quick suggestion, apologies if you know about it already. Class level variables are never thread safe. If you need to have a different Properties object for every thread, better declare them at the method level and provide them as parameter to other methods where you need to access Properties object.

这篇关于如何在同一个盒子上独立运行多个kafka消费者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆