在Scala/Spark的HDFS上将文件从一个文件夹移动到另一个文件夹 [英] Move file from one folder to another on HDFS in Scala / Spark

查看:865
本文介绍了在Scala/Spark的HDFS上将文件从一个文件夹移动到另一个文件夹的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两条路径,一条路径用于文件,一条路径用于文件夹.我想将文件移到HDFS上的该文件夹中.我该如何在Scala中做到这一点?我也正在使用Spark

I have two paths, one for a file and one for a folder. I would like to move the file into that folder on HDFS. How can I do that in Scala? I'm using Spark, too

如果相同的代码也适用于Windows路径,则奖励,就像在HDFS上读取/写入文件一样,但不是必需的.

Bonus if the same code will work for Windows paths too, just like reading/writing files on HDFS, but not required.

我尝试了以下操作:

val fs = FileSystem.get(sc.hadoopConfiguration)
fs.moveFromLocalFile(something, something2)

然后出现以下错误:

线程主"中的异常java.lang.IllegalArgumentException:错误 FS:hdfs:/user/o/datasets/data.txt,预期:file:///

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs:/user/o/datasets/data.txt, expected: file:///

moveToLocalFile()也是如此,因为它们是用于在文件系统之间而不是在文件系统内传输文件.我也尝试过fs.rename(),但这根本没有做任何事情(没有错误,也没有任何东西).

Same goes for moveToLocalFile() because they are meant to transfer files between filesystems, not within a filesystem. I have also tried fs.rename() but that did not do anything at all (no error or anything either).

我基本上在一个目录中创建文件(通过流写入文件),一旦完成,就需要将它们移动到另一个目录中.这个不同的目录受Spark流媒体监视,当Spark流媒体试图处理未完成的文件时,我遇到了一些问题

I basically create files in one directory (writing to them with a stream) and once they are done they need to moved into a different directory. This different directory is monitored by Spark streaming and I have had some issues when Spark streaming tries to work with not finished files

推荐答案

尝试以下Scala代码.

Try the following Scala code.

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

val hadoopConf = new Configuration()
val hdfs = FileSystem.get(hadoopConf)

val srcPath = new Path(srcFilePath)
val destPath = new Path(destFilePath)

hdfs.copyFromLocalFile(srcPath, destPath)

您还应该检查Spark是否在conf/spark-env.sh文件中设置了HADOOP_CONF_DIR变量.这样可以确保Spark可以找到Hadoop配置设置.

You should also check if Spark has the HADOOP_CONF_DIR variable set in the conf/spark-env.sh file. This will make sure that Spark is going to find the Hadoop configuration settings.

build.sbt文件的依赖项:

The dependencies for the build.sbt file:

libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.6.0"
libraryDependencies += "org.apache.commons" % "commons-io" % "1.3.2"
libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.6.0"

OR

您可以使用来自Apache Commons的IOUtils将数据从InputStream复制到OutputStream

you can used IOUtils from apache commons to copy data from InputStream to OutputStream

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.commons.io.IOUtils;



val hadoopconf = new Configuration();
val fs = FileSystem.get(hadoopconf);

//Create output stream to HDFS file
val outFileStream = fs.create(new Path("hdfs://<namenode>:<port>/output_path"))

//Create input stream from local file
val inStream = fs.open(new Path("hdfs://<namenode>:<port>/input_path"))

IOUtils.copy(inStream, outFileStream)

//Close both files
inStream.close()
outFileStream.close()

这篇关于在Scala/Spark的HDFS上将文件从一个文件夹移动到另一个文件夹的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆