如何将HDFS文件输入到R mapreduce中进行处理,并将结果存入HDFS文件 [英] How to input HDFS file into R mapreduce for processing and get the result into HDFS file

查看:254
本文介绍了如何将HDFS文件输入到R mapreduce中进行处理,并将结果存入HDFS文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个类似于下面的链接的问题stackoverflow



我正在从HDFS中的位置/somnath/logreg_data/ds1.10.csv读取文件,将其列数从10减少到5,然后写入另一个位置/ somnath /使用下面的
transfer.csvfile.hdfs.to.hdfs.reduced 函数在HDFS中执行logreg_data / reduced / ds1.10.reduced.csv。

  transfer.csvfile.hdfs.to.hdfs.reduced(hdfs://10.5.5.82:8020 / somnath / logreg_data /ds1.10.csv,hdfs://10.5.5.82:8020 / somnath / logreg_data / reduced / ds1.10.reduced.csv,5)

函数定义是

  transfer.csvfile.hdfs.to (hdfsFilePath,hdfsWritePath,reducedCols = 1){
#local.df = data.frame b $ b#to.dfs(local.df)
#r.file< - hdfs.file(hdfsFilePath,r)
transfer.reduced.map =
function(。 ,M){
label <-M [,dim(M)[2]]
reduced.predictors <-M [,1:reducedCols]
reduced.M < cbind(R educed.predictors,label)
keyval(
1,
as.numeric(reduced.M))
}
reduced.values =
values(
from.dfs(
mapreduce(
input = from.dfs(hdfsFilePath),
input.format =native,
map = function(。,M ){
label < - M [,dim(M)[2]]
print(label)
reduced.predictors< - M [,1:reducedCols]
减少。 - cbind(reduced.predictors,label)
keyval(
1,
as.numeric(reduced.M))}
)))
write.table(文件=/ root / somnath / reduced.values.csv)
w.file< - hdfs.file(hdfsWritePath,w)
hdfs.write(reduced.values ,w.file)
#to.dfs(reduced.values)
}



< (fname,paste(if(is.read)relse),但是我收到一个错误

  格式$ mode ==:
无法打开连接
调用:transfer.csvfile.hdfs.to.hdfs.reduced ... make.keyval.reader - > do.call - > <匿名> - >文件
此外:警告消息:
在文件(fname,粘贴(如果(is.read)relsew,if(format $ mode ==:
can not open文件'hdfs://10.5.5.82:8020 / somnath / logreg_data / ds1.10.csv':没有这样的文件或目录
执行暂停

OR



当我尝试使用以下命令从hdfs加载文件时,我得到了下面的错误:

 > x < -  hdfs.file(path =hdfs://10.5 .5.82:8020 / somnath / logreg_data / ds1.10.csv,mode =r)
hdfs.file错误(path =hdfs://10.5.5.82:8020 / somnath / logreg_data / ds1 .10.csv,:
尝试应用非函数

任何帮助都将高度赞赏

谢谢

解决方案

基本找到问题的解决方案我已经说过了。

  r.file<  -  hdfs.file(hdfsFilePath,r)
from .dfs(
mapreduce(
input =) as.matrix(hdfs.read.text.file(r.file)),
input.format =csv,
map = ...
))

以下是整个修改过的函数:

  transfer.csvfile.hdfs.to.hdfs.reduced = 
函数(hdfsFilePath,hdfsWritePath,reducedCols = 1){
hdfs.init()
#local.df = data.frame()
#hdfs.get(hdfsFilePath,local.df)
#to.dfs(local.df)
r.file< - hdfs.file(hdfsFilePath, r)
transfer.reduced.map =
函数(。,M){
numRows< - length(M)
M.vec.elems< -unlist lapply(M,
function(x)strsplit(x,,)))
M.matrix < - matrix(M.vec.elems,nrow = numRows,byrow = TRUE)
label < - M.matrix [,dim(M.matrix)[2]]
reduced .predictors< - M.matrix [,1:reducedCols]
reduced.M< -Cbind(reduced.predictors,label)
keyval(
1,
as。数字(reduced.M))
}
reduced.values =
值(
from.dfs(
mapreduce(
input = as.matrix( hdfs.read.text.file(r.file)),
input.fo rmat =csv,
map = function(。,M){
numRows < - length(M)
M.vec.elems< -unlist(lapply(M,
函数(x)strsplit(x,,)))
M.matrix< - 矩阵(M.vec.elems,nrow = numRows,byrow = TRUE)
label< - M.matrix [,dim(M.matrix)[2]]
reduced.predictors< - M.matrix [,1:reducedCols]
reduced.M <-cbind(reduced.predictors ,标签)
keyval(
1,
as.numeric(reduced.M))}
)))
write.table(reduced.values,file =/ root / somnath / reduced.values.csv)
w.file< - hdfs.file(hdfsWritePath,w)
hdfs.write (reduced.values,w.file)
hdfs.close(r.file)
hdfs.close(w.file)
#to.dfs(reduced.values)
}

希望这会有所帮助,如果您觉得有用,请不要忘记给予积分。提前致谢

I have a question similar to the below link in stackoverflow

R+Hadoop: How to read CSV file from HDFS and execute mapreduce?

I am tring to read a file from location "/somnath/logreg_data/ds1.10.csv" in HDFS, reduce its number of columns from 10 to 5 and then write to another location "/somnath/logreg_data/reduced/ds1.10.reduced.csv" in HDFS using the below transfer.csvfile.hdfs.to.hdfs.reduced function.

transfer.csvfile.hdfs.to.hdfs.reduced("hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv", "hdfs://10.5.5.82:8020/somnath/logreg_data/reduced/ds1.10.reduced.csv", 5)

The function definition is

transfer.csvfile.hdfs.to.hdfs.reduced =
                function(hdfsFilePath, hdfsWritePath, reducedCols=1) {
                        #local.df = data.frame()
                        #hdfs.get(hdfsFilePath, local.df)
                        #to.dfs(local.df)
                        #r.file <- hdfs.file(hdfsFilePath,"r")
                        transfer.reduced.map =
                                        function(.,M) {
                                                label <- M[,dim(M)[2]]
                                                reduced.predictors <- M[,1:reducedCols]
                                                reduced.M <- cbind(reduced.predictors, label)
                                                keyval(
                                                     1,
                                                     as.numeric(reduced.M))
                                        }
                        reduced.values =
                             values(
                                     from.dfs(
                                        mapreduce(
                                          input = from.dfs(hdfsFilePath),
                                          input.format = "native",
                                          map = function(.,M) {
                                                label <- M[,dim(M)[2]]
                                                print(label)
                                                reduced.predictors <- M[,1:reducedCols]
                                                reduced.M <- cbind(reduced.predictors, label)
                                                keyval(
                                                     1,
                                                     as.numeric(reduced.M))}
                        )))
                        write.table(reduced.values, file="/root/somnath/reduced.values.csv")
                        w.file <- hdfs.file(hdfsWritePath,"w")
                        hdfs.write(reduced.values,w.file)
                        #to.dfs(reduced.values)
                }

But I am receiving an error

Error in file(fname, paste(if (is.read) "r" else "w", if (format$mode ==  :
  cannot open the connection
Calls: transfer.csvfile.hdfs.to.hdfs.reduced ... make.keyval.reader -> do.call -> <Anonymous> -> file
In addition: Warning message:
In file(fname, paste(if (is.read) "r" else "w", if (format$mode ==  :
  cannot open file 'hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv': No such file or directory
Execution halted

OR

When I am trying to load a file from hdfs using the below commands, I am getting the below error:

> x <- hdfs.file(path="hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv",mode="r")
Error in hdfs.file(path = "hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv",  :
  attempt to apply non-function

Any help will be highly appreciated

Thanks

解决方案

Basically found a solution to the problem that I stated above.

r.file <- hdfs.file(hdfsFilePath,"r")
from.dfs(
    mapreduce(
         input = as.matrix(hdfs.read.text.file(r.file)),
         input.format = "csv",
         map = ...
))

Below is the entire modified function:

transfer.csvfile.hdfs.to.hdfs.reduced =
                function(hdfsFilePath, hdfsWritePath, reducedCols=1) {
                        hdfs.init()
                        #local.df = data.frame()
                        #hdfs.get(hdfsFilePath, local.df)
                        #to.dfs(local.df)
                        r.file <- hdfs.file(hdfsFilePath,"r")
                        transfer.reduced.map =
                                        function(.,M) {
                                                numRows <- length(M)
                                                M.vec.elems <-unlist(lapply(M,
                                                                                function(x) strsplit(x, ",")))
                                                M.matrix <- matrix(M.vec.elems, nrow=numRows, byrow=TRUE)
                                                label <- M.matrix[,dim(M.matrix)[2]]
                                                reduced.predictors <- M.matrix[,1:reducedCols]
                                                reduced.M <- cbind(reduced.predictors, label)
                                                keyval(
                                                     1,
                                                     as.numeric(reduced.M))
                                        }
                        reduced.values =
                             values(
                                     from.dfs(
                                        mapreduce(
                                          input = as.matrix(hdfs.read.text.file(r.file)),
                                          input.format = "csv",
                                          map = function(.,M) {
                                                numRows <- length(M)
                                                M.vec.elems <-unlist(lapply(M,
                                                       function(x) strsplit(x, ",")))
                                                M.matrix <- matrix(M.vec.elems, nrow=numRows, byrow=TRUE)
                                                label <- M.matrix[,dim(M.matrix)[2]]
                                                reduced.predictors <- M.matrix[,1:reducedCols]
                                                reduced.M <- cbind(reduced.predictors, label)
                                                keyval(
                                                     1,
                                                     as.numeric(reduced.M)) }
                        )))
                        write.table(reduced.values, file="/root/somnath/reduced.values.csv")
                        w.file <- hdfs.file(hdfsWritePath,"w")
                        hdfs.write(reduced.values,w.file)
                        hdfs.close(r.file)
                        hdfs.close(w.file)
                        #to.dfs(reduced.values)
                }

Hope this helps and don't forget to give points if you find it useful. Thanks ahead

这篇关于如何将HDFS文件输入到R mapreduce中进行处理,并将结果存入HDFS文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆