从Scala脚本中执行hdfs命令 [英] Executing hdfs commands from inside scala script

查看:356
本文介绍了从Scala脚本中执行hdfs命令的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图从Spark在集群模式下执行的scala脚本内部执行特定于HDFS的命令.在命令下方:

I'm trying to execute a HDFS specific command from inside the scala script being executed by Spark in cluster mode. Below the command:

val cmd = Seq("hdfs","dfs","-copyToLocal","/tmp/file.dat","/path/to/local")
val result = cmd.!!

该作业在此阶段失败,并显示以下错误:

The job fails at this stage with the error as below:

java.io.FileNotFoundException: /var/run/cloudera-scm-agent/process/2087791-yarn-NODEMANAGER/log4j.properties (Permission denied)
        at java.io.FileInputStream.open0(Native Method)
        at java.io.FileInputStream.open(FileInputStream.java:195)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at java.io.FileInputStream.<init>(FileInputStream.java:93)
        at sun.net.www.protocol.file.FileURLConnection.connect(FileURLConnection.java:90)
        at sun.net.www.protocol.file.FileURLConnection.getInputStream(FileURLConnection.java:188)
        at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:557)
        at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
        at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
        at org.apache.log4j.Logger.getLogger(Logger.java:104)
        at org.apache.commons.logging.impl.Log4JLogger.getLogger(Log4JLogger.java:262)
        at org.apache.commons.logging.impl.Log4JLogger.<init>(Log4JLogger.java:108)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

但是,当我在Spark shell中单独运行同一命令时,它执行得很好,并且文件也被复制了.

However, when I run the same command separately in Spark shell, it executes just fine and the file is copied as well.

scala> val cmd = Seq("hdfs","dfs","-copyToLocal","/tmp/file_landing_area/file.dat","/tmp/local_file_area")
cmd: Seq[String] = List(hdfs, dfs, -copyToLocal, /tmp/file_landing_area/file.dat, /tmp/local_file_area)

scala> val result = cmd.!!
result: String = ""

我不明白权限被拒绝的错误.尽管显示为FileNotFoundException.完全令人困惑.

I don't understand the permission denied error. Although it displays as a FileNotFoundException. Totally confusing.

有什么想法吗?

推荐答案

根据错误,它正在将hdfs数据检查到var文件夹中,我怀疑它存在配置问题,或者没有指向正确的文件夹. 使用seq并执行HDFS命令不是一个好习惯.仅对火花壳有用.不建议在代码中使用相同的方法.而不是尝试使用下面的Scala文件系统API将数据从或移到HDFS.请检查下面的示例代码,以获取可能对您有帮助的参考.

As per error, it is checking hdfs data into var folder which I suspect configuration issue or it is not pointing to correct one. Using seq and executing HDFS command is not good practise. It is useful only for spark shell. Using same approach in code not advisable. Instead of this try to use below Scala File system API to move data From or To HDFS. Please check below sample code just for reference that might help you.

import org.apache.hadoop.fs
import org.apache.hadoop.fs._
val conf = new Configuration()

val fs = path.getFileSystem(conf)

val hdfspath = new Path("hdfs:///user/nikhil/test.csv")
val localpath = new Path("file:///home/cloudera/test/")

fs.copyToLocalFile(hdfspath,localpath)

请使用下面的链接获取有关Scala文件系统API的更多参考.

Please use below link for more reference regarding Scala File system API.

查看全文

登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆