flink:Flink Shell抛出NullPointerException [英] flink: Flink Shell throws NullPointerException

查看:461
本文介绍了flink:Flink Shell抛出NullPointerException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

  1. 我正在使用Flink Interactive Shell执行WordCount.文件大小为10MB.但是,如果文件大小为100MB,则外壳程序会引发NullPointerException:

:

java.lang.NullPointerException
    at org.apache.flink.api.common.accumulators.SerializedListAccumulator.deserializeList(SerializedListAccumulator.java:93)
    at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:549)
    at .<init>(<console>:22)

at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ILoop.scala:601)
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ILoop.scala:598)
at scala.reflect.io.Streamable$Chars$class.applyReader(Streamable.scala:104)
at scala.reflect.io.File.applyReader(File.scala:82)
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ILoop.scala:598)
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply(ILoop.scala:598)
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply(ILoop.scala:598)
at scala.tools.nsc.interpreter.ILoop.savingReplayStack(ILoop.scala:130)
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1.apply(ILoop.scala:597)
at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1.apply(ILoop.scala:597)
at scala.tools.nsc.interpreter.ILoop.savingReader(ILoop.scala:135)
at scala.tools.nsc.interpreter.ILoop.interpretAllFrom(ILoop.scala:596)
at scala.tools.nsc.interpreter.ILoop$$anonfun$loadCommand$1.apply(ILoop.scala:660)
at scala.tools.nsc.interpreter.ILoop$$anonfun$loadCommand$1.apply(ILoop.scala:659)
at scala.tools.nsc.interpreter.ILoop.withFile(ILoop.scala:653)
at scala.tools.nsc.interpreter.ILoop.loadCommand(ILoop.scala:659)
at scala.tools.nsc.interpreter.ILoop$$anonfun$standardCommands$7.apply(ILoop.scala:262)
at scala.tools.nsc.interpreter.ILoop$$anonfun$standardCommands$7.apply(ILoop.scala:262)
at scala.tools.nsc.interpreter.LoopCommands$LineCmd.apply(LoopCommands.scala:81)
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:712)
at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:84)
at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:54)
at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)

我在Linux系统(16MB RAM)上工作.那里可能是什么问题?

I work on a linux system (16MB RAM). What could be the problem there?

我的代码(改编自 https://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/scala_api_quickstart.html ):

 var filename = new String(<myFileName>)
 var text = env.readTextFile(filename)
 var counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1)   }.groupBy(0).sum(1)
 var result = counts.collect()

  1. 我还注意到,flink仅在一个内核上执行程序.在使用env.getConfig.setParallelism(4)设置并行性并再次运行该程序后,发生了另一个异常:

第1部分:

    org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
    at org.apache.flink.client.program.Client.run(Client.java:413)
    at org.apache.flink.client.program.Client.run(Client.java:356)
    at org.apache.flink.client.program.Client.run(Client.java:349)
    at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
    at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
    at org.apache.flink.api.java.ScalaShellRemoteEnvironment.execute(ScalaShellRemoteEnvironment.java:68)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
    at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
    at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
    at .<init>(<console>:28)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
    at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
    at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
    at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
    at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
    at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
    at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
    at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ILoop.scala:601)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ILoop.scala:598)
    at scala.reflect.io.Streamable$Chars$class.applyReader(Streamable.scala:104)
    at scala.reflect.io.File.applyReader(File.scala:82)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ILoop.scala:598)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply(ILoop.scala:598)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply(ILoop.scala:598)
    at scala.tools.nsc.interpreter.ILoop.savingReplayStack(ILoop.scala:130)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1.apply(ILoop.scala:597)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$interpretAllFrom$1.apply(ILoop.scala:597)
    at scala.tools.nsc.interpreter.ILoop.savingReader(ILoop.scala:135)
    at scala.tools.nsc.interpreter.ILoop.interpretAllFrom(ILoop.scala:596)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$loadCommand$1.apply(ILoop.scala:660)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$loadCommand$1.apply(ILoop.scala:659)
    at scala.tools.nsc.interpreter.ILoop.withFile(ILoop.scala:653)
    at scala.tools.nsc.interpreter.ILoop.loadCommand(ILoop.scala:659)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$standardCommands$7.apply(ILoop.scala:262)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$standardCommands$7.apply(ILoop.scala:262)
    at scala.tools.nsc.interpreter.LoopCommands$LineCmd.apply(LoopCommands.scala:81)
    at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:712)
    at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
    at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
    at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
    at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:84)
    at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:54)
    at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)

第2部分:

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #0 (CHAIN DataSource (at .<init>(<console>:26) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at .<init>(<console>:27)) -> Map (Map at .<init>(<console>:27)) -> Combine(SUM(1)) (2/4)) @ (unassigned) - [SCHEDULED] > with groupID < fc507fbb50fea681c726ca1d824c7577 > in sharing group < SlotSharingGroup [fc507fbb50fea681c726ca1d824c7577, fb90f780c9d5a4a9dbf983cb06bec946, 52b8abe5a21ed808f0473a599d89f046] >. Resources available to scheduler: Number of instances=1, total number of slots=1, available slots=0
    at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:250)
    at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:126)
    at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:271)
    at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:430)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:307)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:508)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:606)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
    ... 18 more

这是否意味着taskmanager.numberOfTaskSlots?在我的flink-conf.yaml中,此密钥设置为4.但是如何在外壳中设置它?

Does that mean the taskmanager.numberOfTaskSlots? In my flink-conf.yaml is this key set to 4. But how can I set it in the shell?

推荐答案

您问了两个问题:

  1. 为什么print()对于大型DataSet不起作用?
  1. Why does print() not work for big DataSets?

DataSet上使用count()collect()print()时,所有在任务管理器上分区的数据都必须通过作业管理器传输到客户端.最好仅使用这些方法进行测试或实现小的DataSet.对于大数据,请使用Apache Flink中提供的接收器之一,例如writeAsTextFile(..).对于每个并行任务,然后将创建一个输出文件.

When you use count(), collect(), or print() on a DataSet, all data which has been partitioned on the task managers has to be transferred through the job manager to the client. It is best, to only use these methods for testing or to materialize small DataSets. For large data, please use one of the sinks provided in Apache Flink, e.g. writeAsTextFile(..). For each parallel task, one output file will be created then.

如果您仍想将所有数据传输到客户端,则可以通过增加Akka的帧大小来实现. Akka是Flink在后台使用的消息传递库.为此,请在flink-conf.yaml中设置akka.framesize.默认值为10485760字节(10 MB). akka.framesize: 100mb会将其增加到100 MB.

If you still want to transfer all data to the client, you may do so by increasing the framesize of Akka. Akka is a message-passing library that Flink uses under the hood. To do so, set akka.framesize in flink-conf.yaml. The default is 10485760 bytes (10 MB). akka.framesize: 100mb will increase it to 100 MB.

对于Apache Flink 1.0,一些提交者已考虑删除此限制,并且已经存在对大型物化数据集使用另一种传输方式的拉取请求.

For Apache Flink 1.0 some committers have considered to remove this limit and there is already a pull request to use another means of transportation for large materialized DataSets.

  1. 什么是任务槽?它们与并行性有什么关系?

Flink的默认配置为每个任务管理器启动一个任务槽.在本地模式下启动Scala shell时,它仅启动一个任务管理器.因此,任务插槽的总数为1.将并行性更改为N时,至少需要N个任务插槽才能并行执行此操作.因此,要么增加flink-conf.yaml中的任务插槽数量,要么启动其他任务管理器.如果您只是在本地运行,我建议您简单地增加任务插槽的数量.有关更多信息,请参见 http://flink.apache.org 上的Flink文档.

The default configuration of Flink starts one task slot per task manager. When you start the Scala shell in local mode, it only starts one task manager. The total number of task slots is thus one. When you change the parallelism to N, you need at least N task slots to execute this operation in parallel. So either you increase the number of task slots in the flink-conf.yaml or you start additional task managers. If you just run locally, I would advise to simply increase the number of task slots. For more information see the Flink documentation on http://flink.apache.org.

编辑:如果运行Scala-Shell,则仅使用一个任务管理器启动嵌入式Flink集群.您可以使用./bin/start-local.sh启动本地集群,然后使用Scala Shell的host和port参数(host:localhost,端口:6123)连接到本地集群.

edit: If you run the Scala-Shell, an embedded Flink cluster is started with only one task manager. You can start a local cluster using ./bin/start-local.sh and then connect to it using the Scala shell's host and port parameters (host: localhost, port: 6123).

这篇关于flink:Flink Shell抛出NullPointerException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆