如何在彼此独立的同一个盒子上运行多个 kafka 消费者? [英] How to run multiple kafka consumers on the same box independent of each other?

查看:32
本文介绍了如何在彼此独立的同一个盒子上运行多个 kafka 消费者?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个 Kafka 消费者 ConsumerAConsumerB.我想在同一台机器上运行这两个相互独立的 kafka 消费者.他们之间根本没有任何关系.这两个 kafka 消费者将在同一台机器上处理不同的主题.

I have two Kafka consumer ConsumerA and ConsumerB. I want to run these two kafka consumers independent of each other on the same machine. There is no relation between them at all. These two kafka consumer will work on different topics on the same machine.

  • 每个使用者都应该有一个不同的 Properties 对象.
  • 每个使用者都应该有不同的线程池配置,因为它们可以在需要时独立于其他使用者以多线程方式(使用者组)运行.

以下是我的设计:

消费者类(抽象):

 public abstract class Consumer implements Runnable {
    private final Properties consumerProps;
    private final String consumerName;

    public Consumer(String consumerName, Properties consumerProps) {
        this.consumerName = consumerName;
        this.consumerProps = consumerProps;
    }

    protected abstract void shutdown();
    protected abstract void run(String consumerName, Properties consumerProps);

    @Override
    public final void run() {
        run(consumerName, consumerProps);
    }
}

ConsumerA 类:

public class ConsumerA extends Consumer {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private KafkaConsumer<byte[], byte[]> consumer;

    public ConsumerA(String consumerName, Properties consumerProps) {
        super(consumerName, consumerProps);
    }

    @Override
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }

    @Override
    protected void run(String consumerName, Properties consumerProps) {
        consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(getTopicsBasisOnConsumerName());

        Map<String, Object> config = new HashMap<>();
        config.put(Config.URLS, TEST_URL);
        GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);

        try {
            while (!closed.get()) {
                ConsumerRecords<byte[], byte[]> records = consumer.poll(Long.MAX_VALUE);
                for (ConsumerRecord<byte[], byte[]> record : records) {
                    GenericRecord payload = decoder.decode(record.value());
                    // extract data from payload
                    System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s
",
                                      record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
                consumer.commitAsync();
            }
        } catch (WakeupException ex) {
            // Ignore exception if closing
            System.out.println("error= ", ex);
            if (!closed.get()) throw e;             
        } catch (Exception ex) {
            System.out.println("error= ", ex);      
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
    }
}

消费者 A B 类:

// similar to `ConsumerA` but with specific details of B

ConsumerHandler 类:

public final class ConsumerHandler {
  private final ExecutorService executorServiceConsumer;
  private final Consumer consumer;
  private final List<Consumer> consumers = new ArrayList<>();

  public ConsumerHandler(Consumer consumer, int poolSize) {
    this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      this.consumers.add(consumer);
      executorServiceConsumer.submit(consumer);
    }
 }
  public void shutdown() {
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        for (Consumer consumer : consumers) {
          consumer.shutdown();
        }
        executorServiceConsumer.shutdown();
        try {
          executorServiceConsumer.awaitTermination(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
          Thread.currentThread().interrupt();
        }
      }
    });
  }
}

下面是我的一个项目中的主类,如果我启动我的服务器,调用将首先自动出现,然后我从这个地方启动我的所有 kafka 消费者,在那里我执行我的 ConsumerA消费者B.一旦调用了关闭,我就会通过对所有 Kafka 消费者调用关闭来释放所有资源.

Below is my main class in one of my project where if I start my server, calls will come first automatically and from this place I start my all kafka consumers where I execute my ConsumerA and ConsumerB. And as soon as shutdown is called, I release all the resources by calling shutdown on all my Kafka consumers.

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Singleton;

@Singleton
@DependencyInjectionInitializer
public class Initializer {
  private ConsumerHandler consumerHandlerA;
  private ConsumerHandler consumerHandlerB;

  @PostConstruct
  public void init() {
    consumerHandlerA = new ConsumerHandler (new ConsumerA("consumerA", getConsumerPropsA()), 3);
    consumerHandlerB = new ConsumerHandler (new ConsumerB("consumerB", getConsumerPropsB()), 3);
  }

  @PreDestroy
  public void shutdown() {
    consumerHandlerA.shutdown();
    consumerHandlerB.shutdown();
  }
}

对于我想在同一个盒子上运行多个 kafka 消费者的这类问题,这是正确的设计吗?让我知道是否有更好更有效的方法来解决这个问题.一般来说,我会在同一个盒子上最多运行三到四个 Kafka 消费者,如果需要,每个消费者都可以有自己的消费者组.

Is this the right design for this kind of problem where I want to run multiple kafka consumers on the same box? Let me know if there is any better and efficient way to solve this problem. In general I will be running three or four Kafka consumers max on the same box and each consumer can have their own consumer group if needed.

这是KafkaConsumer<的Javadoc/a> 我在我的消费者中都使用了它.并以此为基础 文章 我已经创建了我的消费者,只是我使用了抽象类来扩展它.在该链接中搜索Putting it all Together".

Here is the Javadoc for KafkaConsumer which I am using in both my consumer. And basis on this article I have created my consumer, it is just that I have used abstract class to extend it. Search for "Putting it all Together" in that link.

在文档中提到消费者不是线程安全的,但看起来我的代码正在为池中的每个线程重用相同的消费者实例.

In the docs it is mentioned that Consumers are not Thread-Safe but it looks like my code is reusing the same consumer instance for each thread in the pool.

public ConsumerHandler(Consumer consumer, int poolSize) {
    this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      this.consumers.add(consumer);
      executorServiceConsumer.submit(consumer);
    }
 }

解决此线程安全问题并仍然实现相同功能的最佳方法是什么?

What is the best way to solve this thread safety issue and still achieve the same features?

推荐答案

最简单的解决方案解决这个线程安全问题的最佳方法是什么,并且仍然实现相同的功能?":

Easiest Solution to solve "What is the best way to solve this thread safety issue and still achieve the same features?" :

不要实现多线程(Thread API/Executor Service),而是将每个消费者作为单个消费者在其自己独立的 JVM 进程中使用和运行,因此如果您在同一台机器上需要 4 个消费者并且您不想处理多线程问题,然后让您的 kafka 消费者代码 JAR 在其自己的 4 个独立 Java 进程中运行.

这篇关于如何在彼此独立的同一个盒子上运行多个 kafka 消费者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆