Kafka Java Consumer已经关闭 [英] Kafka Java Consumer already closed

查看:1020
本文介绍了Kafka Java Consumer已经关闭的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我刚刚开始使用Kafka.我面临着消费者的一个小问题.我已经用Java编写了一个使用者.

I have just started using Kafka. I am facing a small issue with the consumer. I have written a consumer in Java.

我收到此异常-IllegalStateException此使用者已经关闭.

I get this exception - IllegalStateException This consumer has already been closed.

我在以下行出现异常:

ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);

这开始发生在我的用户崩溃并出现一些异常之后,当我再次尝试运行它时,它给了我这个异常.

This started happening after my consumer crashed with some exception and when I tried running it again it gave me this exception.

这是完整的代码:

package StreamApplicationsTest;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.*;

public class StreamAppConsumer {

public static void main(String[] args){
    int i = 0;
    //List<String> topics = new ArrayList<>();
    List<String> topics = Collections.singletonList("test_topic");
    //topics.add("test_topic");
    Properties consumerConfigurations = new Properties();
    consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG,"TestId");

    Consumer<String,String> consumer = new KafkaConsumer<>(consumerConfigurations);
    consumer.subscribe(topics);

    while(true){
        ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);
        Iterator<ConsumerRecord<String,String>> iterator = consumerRecords.iterator();
        while(iterator.hasNext()){
            i++;
            ConsumerRecord<String,String> consumerRecord = iterator.next();
            String key = consumerRecord.key();
            String value = consumerRecord.value();
            if(key=="exit" || value=="exit")
                break;
            System.out.println("Key="+key+"\tValue="+value);
        }

        System.out.println("Messages processed = "+Integer.toString(i));
        consumer.close();

    }
}
}

我只是被这个问题困扰,任何帮助都是有用的.

I am just stuck with this issue any sort of help will be useful.

推荐答案

之所以发生这种情况,是因为您要在无限循环结束时关闭使用者,因此当它轮询第二次关闭使用者时.为了解决眼前的问题,我将整个while(true)循环包装在try-catch中,并在catch或finally块中处理使用者关闭.

This is happening because you are closing the consumer at the end of your infinite loop so when it polls a second time the consumer has been closed. To handle the immediate problem I'd wrap the entire while(true) loop in a try-catch and handle the consumer close in the catch or finally block.

但是,如果Kafka用户未正确处理不同的关闭信号,则可能会丢失数据.我建议您查看Confluent的示例以实现正常的消费者关机

However if different shutdown signals aren't handled carefully with a Kafka consumer you run the risk of losing data. I'd recommend looking at Confluent's example for graceful consumer shutdown here. In your case since you're running in the main thread it'd look something like this ...

public static void main(String[] args) {
    int i = 0;
    //List<String> topics = new ArrayList<>();
    List<String> topics = Collections.singletonList("test_topic");
    //topics.add("test_topic");
    Properties consumerConfigurations = new Properties();
    consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG, "TestId");

    Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigurations);
    consumer.subscribe(topics);

    Runtime.getRuntime().addShutdownHook(new Thread()
    {
      public void run() {
        consumer.wakeup();
      }
    });

    try {
      while (true) {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
        Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
        while (iterator.hasNext()) {
          i++;
          ConsumerRecord<String, String> consumerRecord = iterator.next();
          String key = consumerRecord.key();
          String value = consumerRecord.value();
          if (key == "exit" || value == "exit")
            break;
          System.out.println("Key=" + key + "\tValue=" + value);
        }
        System.out.println("Messages processed = " + Integer.toString(i));
      }
    } catch (WakeupExection e) {
      // Do Nothing
    } finally {
      consumer.close();
    }
  }
}

基本上运行consumer.wakeup()是使用者中唯一的线程安全方法,因此它是唯一可以在Java的shutdown钩子内部运行的方法.由于调用唤醒时消费者未处于睡眠状态,因此它将触发唤醒执行,该唤醒失败后将正常关闭用户.

basically running consumer.wakeup() is the only threadsafe method in the consumer so it's the only one than can be ran inside of Java's shutdown hook. Since the consumer is not asleep when wakeup is called it trips the wakeupexection which falls through to gracefully shutting down the consumer.

这篇关于Kafka Java Consumer已经关闭的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆