使用 UncaughtExceptionHandler 重新启动或关闭流的正确方法 [英] Correct way to restart or shutdown the stream using UncaughtExceptionHandler

查看:42
本文介绍了使用 UncaughtExceptionHandler 重新启动或关闭流的正确方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个带有以下驱动程序代码的流应用程序,用于实时消息转换.

I have a stream app with below driver code for real-time message transformation.

String topicName = ...
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(topicName);

source.transform(() -> new MyTransformer()).to(...);

KafkaStreams streams = new KafkaStreams(builder, appConfig);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
    public void uncaughtException(Thread t, Throwable e) {
        logger.error("UncaughtExceptionHandler " + e.getMessage());
        System.exit(0);
    }
});


streams.cleanUp();
streams.start();

Runtime.getRuntime().addShutdownHook(new  Thread(streams::close));

执行几分钟后,应用程序会抛出以下异常,然后无法通过流进行处理.

After few minutes of execution, app throws the below exception and then not progressing through the stream.

[2017-02-22 14:24:35,139] ERROR [StreamThread-14] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group TRANSFORMATION-APP failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
org.apache.kafka.streams.errors.ProcessorStateException: task [0_11] Error while creating the state manager
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:72)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:89)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
    at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: java.io.IOException: task [0_11] Failed to lock the state directory: /tmp/kafka-streams/TRANSFORMATION-APP/0_11
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:101)
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
    ... 13 more

我尝试清除 /tmp/kafka-streams/TRANSFORMATION-APP 目录并重新启动应用程序,但再次抛出相同的异常.我注意到的一件事是应用程序运行良好,直到它转换所有积压消息但在处理一些新消息后抛出异常!

I tried to flush out the /tmp/kafka-streams/TRANSFORMATION-APP directory and restarted the app but again throws the same exception. One thing I noticed was that app works fine until it transforms all backlog messages but throws exception after processing some of the new messages!

有时它还会抛出以下未捕获的异常.

Sometimes it also throws the below uncaught exceptions.

[ERROR] 2017-02-22 12:40:54.804 [StreamThread-29] MyTransformer - UncaughtExceptionHandler task directory [/tmp/kafka-streams/TRANSFORMATION-APP/0_24] doesn't exist and couldn't be created

[ERROR] 2017-02-22 12:42:30.148 [StreamThread-179] MyTransformer - UncaughtExceptionHandler stream-thread [StreamThread-179] Failed 
to rebalance

在抛出(其中一个)这些异常后,应用程序仍在运行但没有在流中前进.

After throwing (one of) these exceptions, app is still running but not progressing through the stream.

处理这些错误的正确方法是什么?是否可以在不终止应用程序的情况下以编程方式重新启动流?此应用程序位于 monit 下.在最坏的情况下,我更愿意正确终止应用程序(不会丢失任何消息),以便 monit 可以重新启动它.

What is the correct way to handle these errors?. Is it possible to restart the stream programmatically, without killing the app? This app is under monit. At at worst case, I would prefer to terminate the app properly (without any message loss), so that monit can restart it.

输入主题有 100 个分区,我在应用程序配置中将 num.stream.threads 设置为 100.该应用程序位于 Kafka 0.10.1.1-cp1.

The input topic has 100 partitions and I have set num.stream.threads to 100 in the app configuration. The app is on Kafka 0.10.1.1-cp1.

推荐答案

Kakfa 0.10.1.x 在多线程方面存在一些错误.您可以升级到 0.10.2(AK 今天发布,CP 3.2 应该很快会跟进)或者您应用以下解决方法:

Kakfa 0.10.1.x has some bugs with regard to multi threading. You can either upgrade to 0.10.2 (AK released today, CP 3.2 should follow shortly) or you apply the following workaround:

  • 仅使用单线程执行
  • 如果您需要更多线程,请启动更多实例
  • 为每个实例配置不同的状态目录

您可能还需要在重新启动之前删除本地状态目录(仅一次),以进入整体一致的应用程序状态.

You might also need to delete your local state directory (only once) before restarting to get into a overall consistent application state.

无论如何,不​​会有数据丢失.即使在失败的情况下,Kafka Streams 也能保证至少一次处理语义.这也适用于您的本地存储——在您删除本地状态目录后,在启动时,这些状态将从底层 Kafka 更改日志主题重新创建(尽管这是一项昂贵的操作).

In any case, there will be no data loss. Kafka Streams guarantees at-least-once processing semantics even in case of failure. This applies to you local stores too -- after you delete local state dir, on startup those state will be recreated from the underlying Kafka changelog topics (it an expensive operation though).

UncaughtExceptionHandler 只为您提供了一种方法来确定线程已死.它不会(直接)帮助重新启动您的应用程序.要恢复死线程,您需要完全关闭 KafkaStreams 实例并创建/启动一个新实例.我们希望将来为此提供更好的支持.

The UncaughtExceptionHandler does only provide you a way to figure out that a thread died. It does not (directly) help to restart your application. To recover died threads, you need to close KafkaStreams instance completely and create/start a new one. We hope to add better support for this in the future.

这篇关于使用 UncaughtExceptionHandler 重新启动或关闭流的正确方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆