数据流上的Spark流抛出FileNotFoundException [英] Spark streaming on dataproc throws FileNotFoundException

查看:111
本文介绍了数据流上的Spark流抛出FileNotFoundException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我尝试向Google Dataproc集群提交Spark Streaming作业时,出现此异常:

16/12/13 00:44:20 ERROR org.apache.spark.SparkContext: Error initializing SparkContext.
java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist
        at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
...
16/12/13 00:44:20 INFO org.spark_project.jetty.server.ServerConnector: Stopped ServerConnector@d7bffbc{HTTP/1.1}{0.0.0.0:4040}
16/12/13 00:44:20 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
16/12/13 00:44:20 ERROR org.apache.spark.util.Utils: Uncaught exception in thread main
java.lang.NullPointerException
        at org.apache.spark.network.shuffle.ExternalShuffleClient.close(ExternalShuffleClient.java:152)
        at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1360)
...
Exception in thread "main" java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist
        at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)

完整输出此处.

当在spark-env.sh中未正确定义hadoop配置时,似乎会发生此错误- link2

它可以在某处配置吗?关于如何解决它的任何指示?

在本地模式下运行相同的代码可以正常工作:

sparkConf.setMaster("local[4]")

对于其他上下文:作业是这样调用的:

gcloud dataproc jobs submit spark \
--cluster my-test-cluster \
--class com.company.skyfall.Skyfall \
--jars gs://my-bucket/resources/skyfall-assembly-0.0.1.jar \
--properties spark.ui.showConsoleProgress=false

这是样板设置代码:

  lazy val conf = {
    val c = new SparkConf().setAppName(this.getClass.getName)
    c.set("spark.ui.port", (4040 + scala.util.Random.nextInt(1000)).toString)

    if (isLocal) c.setMaster("local[4]")
    c.set("spark.streaming.receiver.writeAheadLog.enable", "true")
    c.set("spark.streaming.blockInterval", "1s")
  }

  lazy val ssc = if (checkPointingEnabled) {
    StreamingContext.getOrCreate(getCheckPointDirectory, createStreamingContext)
  } else {
    createStreamingContext()
  }

  private def getCheckPointDirectory: String = {
    if (isLocal) localCheckPointPath else checkPointPath
  }

  private def createStreamingContext(): StreamingContext = {
    val s = new StreamingContext(conf, Seconds(batchDurationSeconds))
    s.checkpoint(getCheckPointDirectory)
    s
  }

预先感谢

解决方案

这是否不是您第一次使用给定的检查点目录运行作业,因为在检查点目录中已经包含检查点了?

之所以发生这种情况,是因为检查点对用于提交YARN应用程序的确切jarfile参数进行了硬编码,并且在带有--jars标志指向GCS的Dataproc上运行时,这实际上是Dataproc的语法糖,它会自动从GCS暂存您的jarfile进入本地文件路径/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar,该路径仅在单个作业运行期间临时使用,因为Spark无法在不进行本地分段的情况下直接从GCS调用jarfile.

但是,在随后的作业中,先前的tmp jarfile已经被删除,但是新作业尝试引用硬编码到检查点数据中的那个旧位置.

还存在由检查点数据中的硬编码引起的其他问题;例如,Dataproc也使用YARN标签"来跟踪作业,如果旧的Dataproc作业的标签"在新的YARN应用程序中重复使用,它将与YARN冲突.要运行流应用程序,您需要先清除检查点目录(如果可能的话,从干净的表盘开始),然后:

  1. 开始作业之前,必须将作业jarfile放置在主节点上的某个位置,然后您的"--jar"标志必须指定"file:///path/on/master/node/to/jarfile.jar"

当您指定"file:///"路径时,dataproc知道它已经在主节点上,因此它不会重新登台到/tmp目录中,因此在这种情况下,检查点可以安全地指向某个目录在主机上固定本地目录.

您可以通过init操作执行此操作,也可以提交快速的pig作业(或仅通过SSH进入主文件并下载该jarfile):

# Use a quick pig job to download the jarfile to a local directory (for example /usr/lib/spark in this case)
gcloud dataproc jobs submit pig --cluster my-test-cluster \
    --execute "fs -cp gs://my-bucket/resources/skyfall-assembly-0.0.1.jar file:///usr/lib/spark/skyfall-assembly-0.0.1.jar"

# Submit the first attempt of the job
gcloud dataproc jobs submit spark --cluster my-test-cluster \
    --class com.company.skyfall.Skyfall \
    --jars file:///usr/lib/spark/skyfall-assembly-0.0.1.jar \
    --properties spark.ui.showConsoleProgress=false

  1. Dataproc依靠内部的spark.yarn.tags来跟踪与作业关联的YARN应用程序.但是,检查点拥有一个过时的spark.yarn.tags,这会使Dataproc与似乎与旧作业相关联的新应用程序混淆.

目前,只要最近杀死的Jobid保留在内存中,它就仅清除"可疑的YARN应用程序,因此重新启动dataproc代理将解决此问题.

# Kill the job through the UI or something before the next step.
# Now use "pig sh" to restart the dataproc agent
gcloud dataproc jobs submit pig --cluster my-test-cluster \
    --execute "sh systemctl restart google-dataproc-agent.service"

# Re-run your job without needing to change anything else,
# it'll be fine now if you ever need to resubmit it and it
# needs to recover from the checkpoint again.

请记住,尽管根据检查点的性质,这意味着您将无法更改在后续运行中传递的参数,因为检查点恢复用于破坏命令行设置.

When I try to submit a spark streaming job to google dataproc cluster, I get this exception:

16/12/13 00:44:20 ERROR org.apache.spark.SparkContext: Error initializing SparkContext.
java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist
        at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
...
16/12/13 00:44:20 INFO org.spark_project.jetty.server.ServerConnector: Stopped ServerConnector@d7bffbc{HTTP/1.1}{0.0.0.0:4040}
16/12/13 00:44:20 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
16/12/13 00:44:20 ERROR org.apache.spark.util.Utils: Uncaught exception in thread main
java.lang.NullPointerException
        at org.apache.spark.network.shuffle.ExternalShuffleClient.close(ExternalShuffleClient.java:152)
        at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1360)
...
Exception in thread "main" java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist
        at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)

Full output here.

It seems this error happens when hadoop configuration is not correctly defined in spark-env.sh - link1, link2

Is it configurable somewhere? Any pointers on how to resolve it?

Running the same code in local mode works fine:

sparkConf.setMaster("local[4]")

For additional context: the job was invoked like this:

gcloud dataproc jobs submit spark \
--cluster my-test-cluster \
--class com.company.skyfall.Skyfall \
--jars gs://my-bucket/resources/skyfall-assembly-0.0.1.jar \
--properties spark.ui.showConsoleProgress=false

This is the boilerplate setup code:

  lazy val conf = {
    val c = new SparkConf().setAppName(this.getClass.getName)
    c.set("spark.ui.port", (4040 + scala.util.Random.nextInt(1000)).toString)

    if (isLocal) c.setMaster("local[4]")
    c.set("spark.streaming.receiver.writeAheadLog.enable", "true")
    c.set("spark.streaming.blockInterval", "1s")
  }

  lazy val ssc = if (checkPointingEnabled) {
    StreamingContext.getOrCreate(getCheckPointDirectory, createStreamingContext)
  } else {
    createStreamingContext()
  }

  private def getCheckPointDirectory: String = {
    if (isLocal) localCheckPointPath else checkPointPath
  }

  private def createStreamingContext(): StreamingContext = {
    val s = new StreamingContext(conf, Seconds(batchDurationSeconds))
    s.checkpoint(getCheckPointDirectory)
    s
  }

Thanks in advance

解决方案

Is it possible that this wasn't the first time you ran the job with the given checkpoint directory, as in the checkpoint directory already contains a checkpoint?

This happens because the checkpoint hard-codes the exact jarfile arguments used to submit the YARN application, and when running on Dataproc with a --jars flag pointing to GCS, this is actually syntactic sugar for Dataproc automatically staging your jarfile from GCS into a local file path /tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar that's only used temporarily for the duration of a single job-run, since Spark isn't able to invoke the jarfile directly out of GCS without staging it locally.

However, in a subsequent job, the previous tmp jarfile will already be deleted, but the new job tries to refer to that old location hard-coded into the checkpoint data.

There are also additional issues caused by hard-coding in the checkpoint data; for example, Dataproc also uses YARN "tags" to track jobs, and will conflict with YARN if an old Dataproc job's "tag" is reused in a new YARN application. To run your streaming application, you'll need to first clear out your checkpoint directory if possible to start from a clean slate, and then:

  1. You must place the job jarfile somewhere on the master node before starting the job, and then your "--jar" flag must specify "file:///path/on/master/node/to/jarfile.jar".

When you specify a "file:///" path dataproc knows its already on the master node so it doesn't re-stage into a /tmp directory, so in that case it's safe for the checkpoint to point to some fixed local directory on the master.

You can do this either with an init action or you can submit a quick pig job (or just ssh into the master and download that jarfile):

# Use a quick pig job to download the jarfile to a local directory (for example /usr/lib/spark in this case)
gcloud dataproc jobs submit pig --cluster my-test-cluster \
    --execute "fs -cp gs://my-bucket/resources/skyfall-assembly-0.0.1.jar file:///usr/lib/spark/skyfall-assembly-0.0.1.jar"

# Submit the first attempt of the job
gcloud dataproc jobs submit spark --cluster my-test-cluster \
    --class com.company.skyfall.Skyfall \
    --jars file:///usr/lib/spark/skyfall-assembly-0.0.1.jar \
    --properties spark.ui.showConsoleProgress=false

  1. Dataproc relies on spark.yarn.tags under the hood to track YARN applications associated with jobs. However, the checkpoint holds a stale spark.yarn.tags which causes Dataproc to get confused with new applications that seem to be associated with old jobs.

For now, it only "cleans up" suspicious YARN applications as long as the recent killed jobid is held in memory, so rebooting the dataproc agent will fix this.

# Kill the job through the UI or something before the next step.
# Now use "pig sh" to restart the dataproc agent
gcloud dataproc jobs submit pig --cluster my-test-cluster \
    --execute "sh systemctl restart google-dataproc-agent.service"

# Re-run your job without needing to change anything else,
# it'll be fine now if you ever need to resubmit it and it
# needs to recover from the checkpoint again.

Keep in mind though that by nature of checkpoints this means you won't be able to change the arguments you pass on subsequent runs, because the checkpoint recovery is used to clobber your command-line settings.

这篇关于数据流上的Spark流抛出FileNotFoundException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆