Kafka Java Consumer 已经关闭 [英] Kafka Java Consumer already closed

查看:52
本文介绍了Kafka Java Consumer 已经关闭的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我刚刚开始使用 Kafka.我正面临着消费者的一个小问题.我用Java编写了一个消费者.

I have just started using Kafka. I am facing a small issue with the consumer. I have written a consumer in Java.

我收到此异常 - IllegalStateException 此使用者已关闭.

I get this exception - IllegalStateException This consumer has already been closed.

我在以下行中遇到异常:

I get exception on the following line :

ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);

这是在我的消费者因某些异常而崩溃后开始发生的,当我再次尝试运行它时,它给了我这个异常.

This started happening after my consumer crashed with some exception and when I tried running it again it gave me this exception.

完整代码如下:

package StreamApplicationsTest;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.*;

public class StreamAppConsumer {

public static void main(String[] args){
    int i = 0;
    //List<String> topics = new ArrayList<>();
    List<String> topics = Collections.singletonList("test_topic");
    //topics.add("test_topic");
    Properties consumerConfigurations = new Properties();
    consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG,"TestId");

    Consumer<String,String> consumer = new KafkaConsumer<>(consumerConfigurations);
    consumer.subscribe(topics);

    while(true){
        ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);
        Iterator<ConsumerRecord<String,String>> iterator = consumerRecords.iterator();
        while(iterator.hasNext()){
            i++;
            ConsumerRecord<String,String> consumerRecord = iterator.next();
            String key = consumerRecord.key();
            String value = consumerRecord.value();
            if(key=="exit" || value=="exit")
                break;
            System.out.println("Key="+key+"\tValue="+value);
        }

        System.out.println("Messages processed = "+Integer.toString(i));
        consumer.close();

    }
}
}

我只是被这个问题困住了,任何形式的帮助都会有用.

I am just stuck with this issue any sort of help will be useful.

推荐答案

发生这种情况是因为您在无限循环结束时关闭使用者,因此当它第二次轮询时,使用者已关闭.为了处理眼前的问题,我将整个 while(true) 循环包装在 try-catch 中,并在 catch 或 finally 块中处理使用者 close.

This is happening because you are closing the consumer at the end of your infinite loop so when it polls a second time the consumer has been closed. To handle the immediate problem I'd wrap the entire while(true) loop in a try-catch and handle the consumer close in the catch or finally block.

但是,如果 Kafka 消费者没有仔细处理不同的关闭信号,您将面临丢失数据的风险.我建议查看 Confluent 的优雅消费者关闭示例 此处.在你的情况下,因为你在主线程中运行,它看起来像这样......

However if different shutdown signals aren't handled carefully with a Kafka consumer you run the risk of losing data. I'd recommend looking at Confluent's example for graceful consumer shutdown here. In your case since you're running in the main thread it'd look something like this ...

public static void main(String[] args) {
    int i = 0;
    //List<String> topics = new ArrayList<>();
    List<String> topics = Collections.singletonList("test_topic");
    //topics.add("test_topic");
    Properties consumerConfigurations = new Properties();
    consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG, "TestId");

    Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigurations);
    consumer.subscribe(topics);

    Runtime.getRuntime().addShutdownHook(new Thread()
    {
      public void run() {
        consumer.wakeup();
      }
    });

    try {
      while (true) {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
        Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
        while (iterator.hasNext()) {
          i++;
          ConsumerRecord<String, String> consumerRecord = iterator.next();
          String key = consumerRecord.key();
          String value = consumerRecord.value();
          if (key == "exit" || value == "exit")
            break;
          System.out.println("Key=" + key + "\tValue=" + value);
        }
        System.out.println("Messages processed = " + Integer.toString(i));
      }
    } catch (WakeupExection e) {
      // Do Nothing
    } finally {
      consumer.close();
    }
  }
}

基本上运行 consumer.wakeup() 是消费者中唯一的线程安全方法,因此它是唯一可以在 Java 的关闭钩子内运行的方法.由于在调用唤醒时消费者没有睡着,它会触发唤醒执行,从而优雅地关闭消费者.

basically running consumer.wakeup() is the only threadsafe method in the consumer so it's the only one than can be ran inside of Java's shutdown hook. Since the consumer is not asleep when wakeup is called it trips the wakeupexection which falls through to gracefully shutting down the consumer.

这篇关于Kafka Java Consumer 已经关闭的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆