如何将HDFS文件输入到R mapreduce中进行处理,并将结果存入HDFS文件 [英] How to input HDFS file into R mapreduce for processing and get the result into HDFS file
问题描述
我有一个类似于下面的链接的问题stackoverflow
我正在从HDFS中的位置/somnath/logreg_data/ds1.10.csv读取文件,将其列数从10减少到5,然后写入另一个位置/ somnath /使用下面的
transfer.csvfile.hdfs.to.hdfs.reduced
函数在HDFS中执行logreg_data / reduced / ds1.10.reduced.csv。
transfer.csvfile.hdfs.to.hdfs.reduced(hdfs://10.5.5.82:8020 / somnath / logreg_data /ds1.10.csv,hdfs://10.5.5.82:8020 / somnath / logreg_data / reduced / ds1.10.reduced.csv,5)
函数定义是
transfer.csvfile.hdfs.to (hdfsFilePath,hdfsWritePath,reducedCols = 1){
#local.df = data.frame b $ b#to.dfs(local.df)
#r.file< - hdfs.file(hdfsFilePath,r)
transfer.reduced.map =
function(。 ,M){
label <-M [,dim(M)[2]]
reduced.predictors <-M [,1:reducedCols]
reduced.M < cbind(R educed.predictors,label)
keyval(
1,
as.numeric(reduced.M))
}
reduced.values =
values(
from.dfs(
mapreduce(
input = from.dfs(hdfsFilePath),
input.format =native,
map = function(。,M ){
label < - M [,dim(M)[2]]
print(label)
reduced.predictors< - M [,1:reducedCols]
减少。 - cbind(reduced.predictors,label)
keyval(
1,
as.numeric(reduced.M))}
)))
write.table(文件=/ root / somnath / reduced.values.csv)
w.file< - hdfs.file(hdfsWritePath,w)
hdfs.write(reduced.values ,w.file)
#to.dfs(reduced.values)
}
< (fname,paste(if(is.read)relse),但是我收到一个错误
格式$ mode ==:
无法打开连接
调用:transfer.csvfile.hdfs.to.hdfs.reduced ... make.keyval.reader - > do.call - > <匿名> - >文件
此外:警告消息:
在文件(fname,粘贴(如果(is.read)relsew,if(format $ mode ==:
can not open文件'hdfs://10.5.5.82:8020 / somnath / logreg_data / ds1.10.csv':没有这样的文件或目录
执行暂停
OR
当我尝试使用以下命令从hdfs加载文件时,我得到了下面的错误:
> x < - hdfs.file(path =hdfs://10.5 .5.82:8020 / somnath / logreg_data / ds1.10.csv,mode =r)
hdfs.file错误(path =hdfs://10.5.5.82:8020 / somnath / logreg_data / ds1 .10.csv,:
尝试应用非函数
任何帮助都将高度赞赏
谢谢
基本找到问题的解决方案我已经说过了。
r.file< - hdfs.file(hdfsFilePath,r)
from .dfs(
mapreduce(
input =) as.matrix(hdfs.read.text.file(r.file)),
input.format =csv,
map = ...
))
以下是整个修改过的函数:
transfer.csvfile.hdfs.to.hdfs.reduced =
函数(hdfsFilePath,hdfsWritePath,reducedCols = 1){
hdfs.init()
#local.df = data.frame()
#hdfs.get(hdfsFilePath,local.df)
#to.dfs(local.df)
r.file< - hdfs.file(hdfsFilePath, r)
transfer.reduced.map =
函数(。,M){
numRows< - length(M)
M.vec.elems< -unlist lapply(M,
function(x)strsplit(x,,)))
M.matrix < - matrix(M.vec.elems,nrow = numRows,byrow = TRUE)
label < - M.matrix [,dim(M.matrix)[2]]
reduced .predictors< - M.matrix [,1:reducedCols]
reduced.M< -Cbind(reduced.predictors,label)
keyval(
1,
as。数字(reduced.M))
}
reduced.values =
值(
from.dfs(
mapreduce(
input = as.matrix( hdfs.read.text.file(r.file)),
input.fo rmat =csv,
map = function(。,M){
numRows < - length(M)
M.vec.elems< -unlist(lapply(M,
函数(x)strsplit(x,,)))
M.matrix< - 矩阵(M.vec.elems,nrow = numRows,byrow = TRUE)
label< - M.matrix [,dim(M.matrix)[2]]
reduced.predictors< - M.matrix [,1:reducedCols]
reduced.M <-cbind(reduced.predictors ,标签)
keyval(
1,
as.numeric(reduced.M))}
)))
write.table(reduced.values,file =/ root / somnath / reduced.values.csv)
w.file< - hdfs.file(hdfsWritePath,w)
hdfs.write (reduced.values,w.file)
hdfs.close(r.file)
hdfs.close(w.file)
#to.dfs(reduced.values)
}
希望这会有所帮助,如果您觉得有用,请不要忘记给予积分。提前致谢
I have a question similar to the below link in stackoverflow
R+Hadoop: How to read CSV file from HDFS and execute mapreduce?
I am tring to read a file from location "/somnath/logreg_data/ds1.10.csv" in HDFS, reduce its number of columns from 10 to 5 and then write to another location "/somnath/logreg_data/reduced/ds1.10.reduced.csv" in HDFS using the below
transfer.csvfile.hdfs.to.hdfs.reduced
function.
transfer.csvfile.hdfs.to.hdfs.reduced("hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv", "hdfs://10.5.5.82:8020/somnath/logreg_data/reduced/ds1.10.reduced.csv", 5)
The function definition is
transfer.csvfile.hdfs.to.hdfs.reduced =
function(hdfsFilePath, hdfsWritePath, reducedCols=1) {
#local.df = data.frame()
#hdfs.get(hdfsFilePath, local.df)
#to.dfs(local.df)
#r.file <- hdfs.file(hdfsFilePath,"r")
transfer.reduced.map =
function(.,M) {
label <- M[,dim(M)[2]]
reduced.predictors <- M[,1:reducedCols]
reduced.M <- cbind(reduced.predictors, label)
keyval(
1,
as.numeric(reduced.M))
}
reduced.values =
values(
from.dfs(
mapreduce(
input = from.dfs(hdfsFilePath),
input.format = "native",
map = function(.,M) {
label <- M[,dim(M)[2]]
print(label)
reduced.predictors <- M[,1:reducedCols]
reduced.M <- cbind(reduced.predictors, label)
keyval(
1,
as.numeric(reduced.M))}
)))
write.table(reduced.values, file="/root/somnath/reduced.values.csv")
w.file <- hdfs.file(hdfsWritePath,"w")
hdfs.write(reduced.values,w.file)
#to.dfs(reduced.values)
}
But I am receiving an error
Error in file(fname, paste(if (is.read) "r" else "w", if (format$mode == :
cannot open the connection
Calls: transfer.csvfile.hdfs.to.hdfs.reduced ... make.keyval.reader -> do.call -> <Anonymous> -> file
In addition: Warning message:
In file(fname, paste(if (is.read) "r" else "w", if (format$mode == :
cannot open file 'hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv': No such file or directory
Execution halted
OR
When I am trying to load a file from hdfs using the below commands, I am getting the below error:
> x <- hdfs.file(path="hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv",mode="r")
Error in hdfs.file(path = "hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv", :
attempt to apply non-function
Any help will be highly appreciated
Thanks
Basically found a solution to the problem that I stated above.
r.file <- hdfs.file(hdfsFilePath,"r")
from.dfs(
mapreduce(
input = as.matrix(hdfs.read.text.file(r.file)),
input.format = "csv",
map = ...
))
Below is the entire modified function:
transfer.csvfile.hdfs.to.hdfs.reduced =
function(hdfsFilePath, hdfsWritePath, reducedCols=1) {
hdfs.init()
#local.df = data.frame()
#hdfs.get(hdfsFilePath, local.df)
#to.dfs(local.df)
r.file <- hdfs.file(hdfsFilePath,"r")
transfer.reduced.map =
function(.,M) {
numRows <- length(M)
M.vec.elems <-unlist(lapply(M,
function(x) strsplit(x, ",")))
M.matrix <- matrix(M.vec.elems, nrow=numRows, byrow=TRUE)
label <- M.matrix[,dim(M.matrix)[2]]
reduced.predictors <- M.matrix[,1:reducedCols]
reduced.M <- cbind(reduced.predictors, label)
keyval(
1,
as.numeric(reduced.M))
}
reduced.values =
values(
from.dfs(
mapreduce(
input = as.matrix(hdfs.read.text.file(r.file)),
input.format = "csv",
map = function(.,M) {
numRows <- length(M)
M.vec.elems <-unlist(lapply(M,
function(x) strsplit(x, ",")))
M.matrix <- matrix(M.vec.elems, nrow=numRows, byrow=TRUE)
label <- M.matrix[,dim(M.matrix)[2]]
reduced.predictors <- M.matrix[,1:reducedCols]
reduced.M <- cbind(reduced.predictors, label)
keyval(
1,
as.numeric(reduced.M)) }
)))
write.table(reduced.values, file="/root/somnath/reduced.values.csv")
w.file <- hdfs.file(hdfsWritePath,"w")
hdfs.write(reduced.values,w.file)
hdfs.close(r.file)
hdfs.close(w.file)
#to.dfs(reduced.values)
}
Hope this helps and don't forget to give points if you find it useful. Thanks ahead
这篇关于如何将HDFS文件输入到R mapreduce中进行处理,并将结果存入HDFS文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!