使用UncaughtExceptionHandler重新启动或关闭流的正确方法 [英] Correct way to restart or shutdown the stream using UncaughtExceptionHandler

查看:125
本文介绍了使用UncaughtExceptionHandler重新启动或关闭流的正确方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个具有以下驱动程序代码的流应用程序,用于实时消息转换.

I have a stream app with below driver code for real-time message transformation.

String topicName = ...
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(topicName);

source.transform(() -> new MyTransformer()).to(...);

KafkaStreams streams = new KafkaStreams(builder, appConfig);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
    public void uncaughtException(Thread t, Throwable e) {
        logger.error("UncaughtExceptionHandler " + e.getMessage());
        System.exit(0);
    }
});


streams.cleanUp();
streams.start();

Runtime.getRuntime().addShutdownHook(new  Thread(streams::close));

执行几分钟后,应用会引发以下异常,然后不继续处理流.

After few minutes of execution, app throws the below exception and then not progressing through the stream.

[2017-02-22 14:24:35,139] ERROR [StreamThread-14] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group TRANSFORMATION-APP failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
org.apache.kafka.streams.errors.ProcessorStateException: task [0_11] Error while creating the state manager
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:72)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:89)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
    at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: java.io.IOException: task [0_11] Failed to lock the state directory: /tmp/kafka-streams/TRANSFORMATION-APP/0_11
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:101)
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
    ... 13 more

我试图刷新/tmp/kafka-streams/TRANSFORMATION-APP目录并重新启动应用程序,但再次引发相同的异常.我注意到的一件事是,该应用程序可以正常运行,直到它可以转换所有积压的消息,但是在处理了一些新消息后会引发异常!

I tried to flush out the /tmp/kafka-streams/TRANSFORMATION-APP directory and restarted the app but again throws the same exception. One thing I noticed was that app works fine until it transforms all backlog messages but throws exception after processing some of the new messages!

有时它还会引发以下未捕获的异常.

Sometimes it also throws the below uncaught exceptions.

[ERROR] 2017-02-22 12:40:54.804 [StreamThread-29] MyTransformer - UncaughtExceptionHandler task directory [/tmp/kafka-streams/TRANSFORMATION-APP/0_24] doesn't exist and couldn't be created

[ERROR] 2017-02-22 12:42:30.148 [StreamThread-179] MyTransformer - UncaughtExceptionHandler stream-thread [StreamThread-179] Failed 
to rebalance

引发这些异常(其中之一)后,应用仍在运行,但未在流中进行.

After throwing (one of) these exceptions, app is still running but not progressing through the stream.

处理这些错误的正确方法是什么?是否可以通过编程方式重新启动流而不杀死应用程序?此应用程序位于监控下.在最坏的情况下,我宁愿正确终止该应用程序(不丢失任何消息),以便监控程序可以重新启动它.

What is the correct way to handle these errors?. Is it possible to restart the stream programmatically, without killing the app? This app is under monit. At at worst case, I would prefer to terminate the app properly (without any message loss), so that monit can restart it.

输入主题有100个分区,我在应用程序配置中将num.stream.threads设置为100.该应用程序位于Kafka 0.10.1.1-cp1.

The input topic has 100 partitions and I have set num.stream.threads to 100 in the app configuration. The app is on Kafka 0.10.1.1-cp1.

推荐答案

Kakfa 0.10.1.x在多线程方面存在一些错误.您可以升级到0.10.2(今天发布了AK,不久之后将发布CP 3.2),也可以应用以下变通办法:

Kakfa 0.10.1.x has some bugs with regard to multi threading. You can either upgrade to 0.10.2 (AK released today, CP 3.2 should follow shortly) or you apply the following workaround:

  • 仅使用单线程执行
  • 如果您需要更多线程,请启动更多实例
  • 为每个实例配置一个不同的状态目录

在重新进入整体一致的应用程序状态之前,您可能还需要删除本地状态目录(仅一次).

You might also need to delete your local state directory (only once) before restarting to get into a overall consistent application state.

在任何情况下,都不会丢失数据.即使发生故障,Kafka Streams也保证至少一次处理语义.这也适用于您的本地存储-删除本地状态dir后,启动时将从基础的Kafka changelog主题重新创建这些状态(不过这是一项昂贵的操作).

In any case, there will be no data loss. Kafka Streams guarantees at-least-once processing semantics even in case of failure. This applies to you local stores too -- after you delete local state dir, on startup those state will be recreated from the underlying Kafka changelog topics (it an expensive operation though).

UncaughtExceptionHandler仅提供一种确定线程死亡的方法.它(直接)无助于重新启动您的应用程序.要恢复死线程,您需要完全关闭KafkaStreams实例并创建/启动一个新实例.我们希望将来能对此提供更好的支持.

The UncaughtExceptionHandler does only provide you a way to figure out that a thread died. It does not (directly) help to restart your application. To recover died threads, you need to close KafkaStreams instance completely and create/start a new one. We hope to add better support for this in the future.

这篇关于使用UncaughtExceptionHandler重新启动或关闭流的正确方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆