如何完成kafka消费者安全?(在shutdownHook内部调用thread#join是否有意义?) [英] How to finish kafka consumer safety?(Is there meaning to call thread#join inside shutdownHook ? )

查看:94
本文介绍了如何完成kafka消费者安全?(在shutdownHook内部调用thread#join是否有意义?)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在阅读

这里是完成消费者线程的代码:

And here code to finish the consumer thread:

Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                System.out.println("Starting exit...");
                consumer.wakeup(); 1
                try {
                    mainThread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

据我所知,ShutdownHook 在所有非守护进程线程完成但进程被操作系统销毁之前调用.
1. 在我看来 mainThread.join() 是没有用的.主线程总是会在 ShutdownHook 执行的那一刻结束.这是正确的还是我误解了什么?
2. 其实我不明白为什么我们需要等待主线程?我们需要等待关闭方法的执行吗?

As I understand ShutdownHook invokes when all non-daemon threads have finished but before the process was destroyed by OS.
1. From my point of view mainThread.join() is useless. Main thread always will finished to the moment of ShutdownHook execution. Is it correct or I misunderstood something?
2. Actually I don't understand why do we need to await main thred? we need to await close method execution?

本书提供了以下主要方法代码:

Book provides following main method code:

try {
            // looping until ctrl-c, the shutdown hook will cleanup on exit
            while (true) {
                ConsumerRecords<String, String> records =
                  movingAvg.consumer.poll(1000);
                System.out.println(System.currentTimeMillis() + "--  waiting for data...");
                for (ConsumerRecord<String, String> record :
                  records) {
                    System.out.printf("offset = %d, key = %s,
                      value = %s\n",
                      record.offset(), record.key(),
                      record.value());
                }
                for (TopicPartition tp: consumer.assignment())
                    System.out.println("Committing offset at position:" + consumer.position(tp));
                movingAvg.consumer.commitSync();
            }
        } catch (WakeupException e) {
            // ignore for shutdown 2
        } finally {
            consumer.close(); 3
            System.out.println("Closed consumer and we are done");
        }
    }

推荐答案

您执行 consumer.wakeup() 来中断当前消费者的操作(可能是长时间运行的(例如轮询)甚至阻塞(在 beginningOffsets(...) 的情况下会发生什么.

You do consumer.wakeup() to interrupt current consumer's operation (that might be long-running (e.g. a poll) or even blocked (what could happen in case of beginningOffsets(...).

mainThread.join() 放在里面是为了确保主线程真正完成,并且在唤醒后的处理过程中不会被关闭.请记住,shutdownHook 也负责处理中断,而不仅仅是普通的程序关闭.

The mainThread.join() is put in there to ensure that main thread actually finishes and is not shut down in middle of processing after wakeup. Please remember that shutdownHook is responsible for handling interrupts as well, not only ordinary program shutdown.

所以如果你打断例如ctrl-C:

So if you interrupt with e.g. ctrl-C:

1. shutdown hook gets called
2. main thread is still running, most often waiting for data in `poll`
3. shutdown hook `wakeup`-s the main thread
4. main thread enters the exception handler, breaks the loop
5. main thread closes the consumer with `.close()`
6. shutdown hook waits for 5. and finishes

如果没有等待,您可能没有执行步骤 4 中的消费者关闭步骤 &5.

Without waiting you might have not performed the consumer shutdown steps in steps 4 & 5.

这篇关于如何完成kafka消费者安全?(在shutdownHook内部调用thread#join是否有意义?)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆