在 flink 应用程序中处理流运算符期间加入 ioThread 时中断/错误 [英] Interrupted while joining ioThread / Error during disposal of stream operator in flink application

查看:27
本文介绍了在 flink 应用程序中处理流运算符期间加入 ioThread 时中断/错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个基于 flink 的流应用程序,它使用 apache kafka 源和接收器.由于某些天我在开发过程中随机出现异常,我不知道它们来自哪里.

I have a flink-based streaming application which uses apache kafka sources and sinks. Since some days I am getting exceptions at random times during development, and I have no clue where they're coming from.

我使用 mainRunner 类在 IntelliJ 中运行应用程序,并且我通过 kafka 向它提供消息.有时第一条消息会触发错误,有时它会在几条消息之后发生.

I am running the app within IntelliJ using the mainRunner class, and I am feeding it messages via kafka. Sometimes the first message will trigger the errors, sometimes it happens only after a few messages.

这是它的样子:

16:31:01.935 ERROR o.a.k.c.producer.KafkaProducer      - Interrupted while joining ioThread
java.lang.InterruptedException: null
    at java.lang.Object.wait(Native Method) ~[na:1.8.0_51]
    at java.lang.Thread.join(Thread.java:1253) [na:1.8.0_51]
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031) [kafka-clients-0.11.0.2.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010) [kafka-clients-0.11.0.2.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989) [kafka-clients-0.11.0.2.jar:na]
    at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.close(FlinkKafkaProducer.java:168) [flink-connector-kafka-0.11_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.close(FlinkKafkaProducer011.java:662) [flink-connector-kafka-0.11_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) [flink-core-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) [flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477) [flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378) [flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) [flink-runtime_2.11-1.6.1.jar:1.6.1]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
16:31:01.936 ERROR o.a.f.s.runtime.tasks.StreamTask    - Error during disposal of stream operator.
org.apache.kafka.common.KafkaException: Failed to close kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1062) ~[kafka-clients-0.11.0.2.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010) ~[kafka-clients-0.11.0.2.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989) ~[kafka-clients-0.11.0.2.jar:na]
    at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.close(FlinkKafkaProducer.java:168) ~[flink-connector-kafka-0.11_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.close(FlinkKafkaProducer011.java:662) ~[flink-connector-kafka-0.11_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) ~[flink-core-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477) [flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378) [flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) [flink-runtime_2.11-1.6.1.jar:1.6.1]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
Caused by: java.lang.InterruptedException: null
    at java.lang.Object.wait(Native Method) ~[na:1.8.0_51]
    at java.lang.Thread.join(Thread.java:1253) [na:1.8.0_51]
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031) ~[kafka-clients-0.11.0.2.jar:na]
    ... 10 common frames omitted
16:31:01.938 ERROR o.a.k.c.producer.KafkaProducer      - Interrupted while joining ioThread

我得到了大约 10-20 个,然后看起来 flink 恢复了应用程序,它再次可用,我可以成功处理消息.

I get around 10-20 of those, and then it seems like flink recovers the app, and it gets usable again, and I can successfully process messages.

什么可能导致这种情况?或者我如何进一步分析以追踪这一点?

What could possibly cause this? Or how can I analyze further to track this down?

我在带有 IntelliJ beeing 版本 2018.3.2 的 Mac 上使用带有 scala 2.11 的 flink 版本 1.6.1.

I am using flink version 1.6.1 with scala 2.11 on a mac with IntelliJ beeing version 2018.3.2.

推荐答案

我能够解决它.原来我的一个流操作符(映射函数)因为一些无效的数组索引而抛出异常.

I was able to resolve it. Turned out that one of my stream operators (map-function) was throwing an exception because of some invalid array index.

在日志中无法看到这一点,只有当我逐步将应用程序分解为更小的部分时,我终于在日志中发现了这个异常,并且在修复了数组访问中的明显错误后,上面提到的异常(java.lang.InterruptedExceptionorg.apache.kafka.common.KafkaException)消失了.

It was not possible to see this in the logs, only when I step-by-step teared down the application into smaller pieces I finally got this very exception in the logs, and after fixing the obvious bug in the array access, the above mentioned exceptions (java.lang.InterruptedException and org.apache.kafka.common.KafkaException) went away.

这篇关于在 flink 应用程序中处理流运算符期间加入 ioThread 时中断/错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆