如何在 kafka 0.9.0 中使用多线程消费者? [英] How to use multi-thread consumer in kafka 0.9.0?

查看:17
本文介绍了如何在 kafka 0.9.0 中使用多线程消费者?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

kafka 的文档给出了一个方法,如下描述:

The doc of kafka give an approach about with following describes:

每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者 > 实例.

One Consumer Per Thread:A simple option is to give each thread its own consumer > instance.

我的代码:

public class KafkaConsumerRunner implements Runnable {

    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final CloudKafkaConsumer consumer;
    private final String topicName;

    public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) {
        this.consumer = consumer;
        this.topicName = topicName;
    }

    @Override
    public void run() {
        try {
            this.consumer.subscribe(topicName);
            ConsumerRecords<String, String> records;
            while (!closed.get()) {
                synchronized (consumer) {
                    records = consumer.poll(100);
                }
                for (ConsumerRecord<String, String> tmp : records) {
                    System.out.println(tmp.value());
                }
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            System.out.println(e);
            //if (!closed.get()) throw e;
        }
    }

    // Shutdown hook which can be called from a separate thread
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }

    public static void main(String[] args) {
        CloudKafkaConsumer kafkaConsumer = KafkaConsumerBuilder.builder()
                .withBootstrapServers("172.31.1.159:9092")
                .withGroupId("test")
                .build();
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log"));
        executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log.info"));
        executorService.shutdown();
    }
}

但它不起作用并抛出异常:

but it doesn't work and throws an exception:

java.util.ConcurrentModificationException:KafkaConsumer 对于多线程访问不安全

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

此外,我阅读了Flink(一个分布式流和批量数据处理的开源平台)的源码.使用多线程消费者的 Flink 和我的类似.

Furthermore, I read the source of Flink (an open source platform for distributed stream and batch data processing). Flink using multi-thread consumer is similar to mine.

long pollTimeout = Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, Long.toString(DEFAULT_POLL_TIMEOUT)));
pollLoop: while (running) {
    ConsumerRecords<byte[], byte[]> records;
    //noinspection SynchronizeOnNonFinalField
    synchronized (flinkKafkaConsumer.consumer) {
        try {
            records = flinkKafkaConsumer.consumer.poll(pollTimeout);
        } catch (WakeupException we) {
            if (running) {
                throw we;
            }
            // leave loop
            continue;
        }
    }

多线程flink代码

怎么了?

推荐答案

Kafka 消费者不是线程安全的.正如您在问题中指出的那样,该文件指出

Kafka consumer is not thread safe. As you pointed out in your question, the document stated that

一个简单的选择是给每个线程一个自己的消费者实例

A simple option is to give each thread its own consumer instance

但是在您的代码中,您拥有由不同的 KafkaConsumerRunner 实例包装的相同消费者实例.因此,多个线程正在访问同一个消费者实例.kafka 文档有明确的说明

But in your code, you have the same consumer instance wrapped by different KafkaConsumerRunner instances. Thus multiple threads are accessing the same consumer instance. The kafka documentation clearly stated

Kafka 消费者不是线程安全的.所有网络 I/O 都发生在进行调用的应用程序的线程.它的责任是用户确保多线程访问正确同步.非同步访问将导致并发修改异常.

The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException.

这正是您收到的例外情况.

That's exactly the exception you received.

这篇关于如何在 kafka 0.9.0 中使用多线程消费者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆