加入ioThread时中断/在flink应用程序中处理流运算符时出错 [英] Interrupted while joining ioThread / Error during disposal of stream operator in flink application

查看:913
本文介绍了加入ioThread时中断/在flink应用程序中处理流运算符时出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个基于flink的流应用程序,它使用apache kafka源和接收器。有时候,在开发过程中,我有时会出现异常情况,因此我不知道它们的来源。

I have a flink-based streaming application which uses apache kafka sources and sinks. Since some days I am getting exceptions at random times during development, and I have no clue where they're coming from.

我在<$ c $内运行应用程序c> IntelliJ 使用 mainRunner 类,我通过kafka馈送消息。有时第一条消息将触发错误,有时仅在几条消息之后才发生。

I am running the app within IntelliJ using the mainRunner class, and I am feeding it messages via kafka. Sometimes the first message will trigger the errors, sometimes it happens only after a few messages.

外观如下:

16:31:01.935 ERROR o.a.k.c.producer.KafkaProducer      - Interrupted while joining ioThread
java.lang.InterruptedException: null
    at java.lang.Object.wait(Native Method) ~[na:1.8.0_51]
    at java.lang.Thread.join(Thread.java:1253) [na:1.8.0_51]
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031) [kafka-clients-0.11.0.2.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010) [kafka-clients-0.11.0.2.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989) [kafka-clients-0.11.0.2.jar:na]
    at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.close(FlinkKafkaProducer.java:168) [flink-connector-kafka-0.11_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.close(FlinkKafkaProducer011.java:662) [flink-connector-kafka-0.11_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) [flink-core-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) [flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477) [flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378) [flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) [flink-runtime_2.11-1.6.1.jar:1.6.1]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
16:31:01.936 ERROR o.a.f.s.runtime.tasks.StreamTask    - Error during disposal of stream operator.
org.apache.kafka.common.KafkaException: Failed to close kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1062) ~[kafka-clients-0.11.0.2.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010) ~[kafka-clients-0.11.0.2.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989) ~[kafka-clients-0.11.0.2.jar:na]
    at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.close(FlinkKafkaProducer.java:168) ~[flink-connector-kafka-0.11_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.close(FlinkKafkaProducer011.java:662) ~[flink-connector-kafka-0.11_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) ~[flink-core-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477) [flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378) [flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) [flink-runtime_2.11-1.6.1.jar:1.6.1]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
Caused by: java.lang.InterruptedException: null
    at java.lang.Object.wait(Native Method) ~[na:1.8.0_51]
    at java.lang.Thread.join(Thread.java:1253) [na:1.8.0_51]
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031) ~[kafka-clients-0.11.0.2.jar:na]
    ... 10 common frames omitted
16:31:01.938 ERROR o.a.k.c.producer.KafkaProducer      - Interrupted while joining ioThread

我得到了大约10-20个,然后flink似乎恢复了该应用程序,并且再次可用,并且我可以成功处理消息。

I get around 10-20 of those, and then it seems like flink recovers the app, and it gets usable again, and I can successfully process messages.

可能是什么原因造成的?或者我该如何进一步分析以找出原因?

What could possibly cause this? Or how can I analyze further to track this down?

我在具有IntelliJ beeing版本2018.3.2的Mac上使用带有Scala 2.11的flink版本1.6.1。

I am using flink version 1.6.1 with scala 2.11 on a mac with IntelliJ beeing version 2018.3.2.

推荐答案

我能够解决它。原来我的一个流运算符(映射函数)由于数组索引无效而引发异常。

I was able to resolve it. Turned out that one of my stream operators (map-function) was throwing an exception because of some invalid array index.

在日志中看不到这个,只有当我逐步将应用程序分解成较小的块时,我才在日志中最终得到这个异常,并且在修复了数组访问中的明显错误之后,上述异常( java.lang .interruptedException org.apache.kafka.common.KafkaException )消失了。

It was not possible to see this in the logs, only when I step-by-step teared down the application into smaller pieces I finally got this very exception in the logs, and after fixing the obvious bug in the array access, the above mentioned exceptions (java.lang.InterruptedException and org.apache.kafka.common.KafkaException) went away.

这篇关于加入ioThread时中断/在flink应用程序中处理流运算符时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆