flink:Flink Shell 抛出 NullPointerException [英] flink: Flink Shell throws NullPointerException

查看:52
本文介绍了flink:Flink Shell 抛出 NullPointerException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

  1. 我正在使用 Flink Interactive Shell 来执行 WordCount.它适用于 10MB 的文件大小.但是对于 100MB 的文件,shell 会抛出 NullPointerException:

:

java.lang.NullPointerException
    at org.apache.flink.api.common.accumulators.SerializedListAccumulator.deserializeList(SerializedListAccumulator.java:93)
    at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:549)
    at .<init>(<console>:22)

at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ILoop.scala:601)
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ILoop.scala:598)
at scala.reflect.io.Streamable$Chars$class.applyReader(Streamable.scala:104)
at scala.reflect.io.File.applyReader(File.scala:82)
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ILoop.scala:598)
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply(ILoop.scala:598)
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply(ILoop.scala:598)
at scala.tools.nsc.interpreter.ILoop.savingReplayStack(ILoop.scala:130)
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1.apply(ILoop.scala:597)
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1.apply(ILoop.scala:597)
at scala.tools.nsc.interpreter.ILoop.savingReader(ILoop.scala:135)
at scala.tools.nsc.interpreter.ILoop.interpretAllFrom(ILoop.scala:596)
at scala.tools.nsc.interpreter.ILoop$$anonfun$loadCommand$1.apply(ILoop.scala:660)
at scala.tools.nsc.interpreter.ILoop$$anonfun$loadCommand$1.apply(ILoop.scala:659)
at scala.tools.nsc.interpreter.ILoop.withFile(ILoop.scala:653)
at scala.tools.nsc.interpreter.ILoop.loadCommand(ILoop.scala:659)
at scala.tools.nsc.interpreter.ILoop$$anonfun$standardCommands$7.apply(ILoop.scala:262)
at scala.tools.nsc.interpreter.ILoop$$anonfun$standardCommands$7.apply(ILoop.scala:262)
at scala.tools.nsc.interpreter.LoopCommands$LineCmd.apply(LoopCommands.scala:81)
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:712)
at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:84)
at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:54)
at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)

我在 Linux 系统(16MB RAM)上工作.那里可能有什么问题?

I work on a linux system (16MB RAM). What could be the problem there?

我的代码(改编自 https:///ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/scala_api_quickstart.html) :

My code (adapted from https://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/scala_api_quickstart.html) :

 var filename = new String(<myFileName>)
 var text = env.readTextFile(filename)
 var counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1)   }.groupBy(0).sum(1)
 var result = counts.collect()

  1. 我还注意到,flink 只在一个内核上执行程序.使用 env.getConfig.setParallelism(4) 设置并行度并再次运行程序后,发生了另一个异常:

第 1 部分:

    org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
    at org.apache.flink.client.program.Client.run(Client.java:413)
    at org.apache.flink.client.program.Client.run(Client.java:356)
    at org.apache.flink.client.program.Client.run(Client.java:349)
    at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
    at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
    at org.apache.flink.api.java.ScalaShellRemoteEnvironment.execute(ScalaShellRemoteEnvironment.java:68)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
    at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
    at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
    at .<init>(<console>:28)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
    at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
    at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
    at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
    at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
    at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
    at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
    at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ILoop.scala:601)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ILoop.scala:598)
    at scala.reflect.io.Streamable$Chars$class.applyReader(Streamable.scala:104)
    at scala.reflect.io.File.applyReader(File.scala:82)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ILoop.scala:598)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply(ILoop.scala:598)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply(ILoop.scala:598)
    at scala.tools.nsc.interpreter.ILoop.savingReplayStack(ILoop.scala:130)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1.apply(ILoop.scala:597)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1.apply(ILoop.scala:597)
    at scala.tools.nsc.interpreter.ILoop.savingReader(ILoop.scala:135)
    at scala.tools.nsc.interpreter.ILoop.interpretAllFrom(ILoop.scala:596)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$loadCommand$1.apply(ILoop.scala:660)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$loadCommand$1.apply(ILoop.scala:659)
    at scala.tools.nsc.interpreter.ILoop.withFile(ILoop.scala:653)
    at scala.tools.nsc.interpreter.ILoop.loadCommand(ILoop.scala:659)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$standardCommands$7.apply(ILoop.scala:262)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$standardCommands$7.apply(ILoop.scala:262)
    at scala.tools.nsc.interpreter.LoopCommands$LineCmd.apply(LoopCommands.scala:81)
    at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:712)
    at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
    at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
    at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
    at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:84)
    at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:54)
    at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)

第 2 部分:

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #0 (CHAIN DataSource (at .<init>(<console>:26) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at .<init>(<console>:27)) -> Map (Map at .<init>(<console>:27)) -> Combine(SUM(1)) (2/4)) @ (unassigned) - [SCHEDULED] > with groupID < fc507fbb50fea681c726ca1d824c7577 > in sharing group < SlotSharingGroup [fc507fbb50fea681c726ca1d824c7577, fb90f780c9d5a4a9dbf983cb06bec946, 52b8abe5a21ed808f0473a599d89f046] >. Resources available to scheduler: Number of instances=1, total number of slots=1, available slots=0
    at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:250)
    at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:126)
    at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:271)
    at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:430)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:307)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:508)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:606)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
    ... 18 more

这是否意味着 taskmanager.numberOfTaskSlots?在我的 flink-conf.yaml 中,这个键设置为 4.但是我如何在 shell 中设置它?

Does that mean the taskmanager.numberOfTaskSlots? In my flink-conf.yaml is this key set to 4. But how can I set it in the shell?

推荐答案

您提出了两个问题:

  1. 为什么 print() 对大型 DataSet 不起作用?
  1. Why does print() not work for big DataSets?

当您在 DataSet 上使用 count()collect()print() 时,所有在任务管理器上分区的数据都必须通过作业管理器传输到客户端.最好只使用这些方法进行测试或实现小的DataSet.对于大数据,请使用 Apache Flink 中提供的接收器之一,例如writeAsTextFile(..).对于每个并行任务,将创建一个输出文件.

When you use count(), collect(), or print() on a DataSet, all data which has been partitioned on the task managers has to be transferred through the job manager to the client. It is best, to only use these methods for testing or to materialize small DataSets. For large data, please use one of the sinks provided in Apache Flink, e.g. writeAsTextFile(..). For each parallel task, one output file will be created then.

如果你仍然想把所有的数据传输到客户端,你可以通过增加Akka的framesize来实现.Akka 是 Fl​​ink 在底层使用的一个消息传递库.为此,请在 flink-conf.yaml 中设置 akka.framesize.默认值为 10485760 字节 (10 MB).akka.framesize: 100mb 会将其增加到 100 MB.

If you still want to transfer all data to the client, you may do so by increasing the framesize of Akka. Akka is a message-passing library that Flink uses under the hood. To do so, set akka.framesize in flink-conf.yaml. The default is 10485760 bytes (10 MB). akka.framesize: 100mb will increase it to 100 MB.

对于 Apache Flink 1.0,一些提交者已经考虑取消这个限制,并且已经有一个请求请求使用另一种传输方式来处理大型物化数据集.

For Apache Flink 1.0 some committers have considered to remove this limit and there is already a pull request to use another means of transportation for large materialized DataSets.

  1. 什么是任务槽?它们与并行性有何关系?

Flink 的默认配置为每个任务管理器启动一个任务槽.当您以本地模式启动 Scala shell 时,它只会启动一个任务管理器.任务槽的总数因此是一个.当你将并行度改为N时,你至少需要N个任务槽来并行执行这个操作.因此,要么增加 flink-conf.yaml 中任务槽的数量,要么启动其他任务管理器.如果你只是在本地运行,我会建议简单地增加任务槽的数量.有关更多信息,请参阅 http://flink.apache.org 上的 Flink 文档.

The default configuration of Flink starts one task slot per task manager. When you start the Scala shell in local mode, it only starts one task manager. The total number of task slots is thus one. When you change the parallelism to N, you need at least N task slots to execute this operation in parallel. So either you increase the number of task slots in the flink-conf.yaml or you start additional task managers. If you just run locally, I would advise to simply increase the number of task slots. For more information see the Flink documentation on http://flink.apache.org.

编辑:如果你运行 Scala-Shell,一个嵌入式 Flink 集群启动时只有一个任务管理器.您可以使用 ./bin/start-local.sh 启动本地集群,然后使用 Scala shell 的主机和端口参数(主机:localhost,端口:6123)连接到它.

edit: If you run the Scala-Shell, an embedded Flink cluster is started with only one task manager. You can start a local cluster using ./bin/start-local.sh and then connect to it using the Scala shell's host and port parameters (host: localhost, port: 6123).

这篇关于flink:Flink Shell 抛出 NullPointerException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆