How to insert an HDFS file into R mapreduce for processing and get the result in an HDFS file

I have a question similar to the link below in stackoverflow

R + Hadoop: how to read CSV file from HDFS and execute mapreduce?

I am trying to read a file from the location "/somnath/logreg_data/ds1.10.csv" in HDFS, reduce its number of columns from 10 to 5, and then write to another location "/somnath/logreg_data/reduced/ds1.10.reduced. csv "in HDFS using the following transfer.csvfile.hdfs.to.hdfs.reduced .

 transfer.csvfile.hdfs.to.hdfs.reduced("hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv", "hdfs://10.5.5.82:8020/somnath/logreg_data/reduced/ds1.10.reduced.csv", 5) 

Function Definition

 transfer.csvfile.hdfs.to.hdfs.reduced = function(hdfsFilePath, hdfsWritePath, reducedCols=1) { #local.df = data.frame() #hdfs.get(hdfsFilePath, local.df) #to.dfs(local.df) #r.file <- hdfs.file(hdfsFilePath,"r") transfer.reduced.map = function(.,M) { label <- M[,dim(M)[2]] reduced.predictors <- M[,1:reducedCols] reduced.M <- cbind(reduced.predictors, label) keyval( 1, as.numeric(reduced.M)) } reduced.values = values( from.dfs( mapreduce( input = from.dfs(hdfsFilePath), input.format = "native", map = function(.,M) { label <- M[,dim(M)[2]] print(label) reduced.predictors <- M[,1:reducedCols] reduced.M <- cbind(reduced.predictors, label) keyval( 1, as.numeric(reduced.M))} ))) write.table(reduced.values, file="/root/somnath/reduced.values.csv") w.file <- hdfs.file(hdfsWritePath,"w") hdfs.write(reduced.values,w.file) #to.dfs(reduced.values) } 

But I get an error

 Error in file(fname, paste(if (is.read) "r" else "w", if (format$mode == : cannot open the connection Calls: transfer.csvfile.hdfs.to.hdfs.reduced ... make.keyval.reader -> do.call -> <Anonymous> -> file In addition: Warning message: In file(fname, paste(if (is.read) "r" else "w", if (format$mode == : cannot open file 'hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv': No such file or directory Execution halted 

OR

When I try to download a file from hdfs using the commands below, I get the following error:

 > x <- hdfs.file(path="hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv",mode="r") Error in hdfs.file(path = "hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv", : attempt to apply non-function 

Any help would be greatly appreciated

thanks

0
source share
1 answer

Basically, a solution was found to the problem that I mentioned above.

 r.file <- hdfs.file(hdfsFilePath,"r") from.dfs( mapreduce( input = as.matrix(hdfs.read.text.file(r.file)), input.format = "csv", map = ... )) 

Below is the whole modified function:

 transfer.csvfile.hdfs.to.hdfs.reduced = function(hdfsFilePath, hdfsWritePath, reducedCols=1) { hdfs.init() #local.df = data.frame() #hdfs.get(hdfsFilePath, local.df) #to.dfs(local.df) r.file <- hdfs.file(hdfsFilePath,"r") transfer.reduced.map = function(.,M) { numRows <- length(M) M.vec.elems <-unlist(lapply(M, function(x) strsplit(x, ","))) M.matrix <- matrix(M.vec.elems, nrow=numRows, byrow=TRUE) label <- M.matrix[,dim(M.matrix)[2]] reduced.predictors <- M.matrix[,1:reducedCols] reduced.M <- cbind(reduced.predictors, label) keyval( 1, as.numeric(reduced.M)) } reduced.values = values( from.dfs( mapreduce( input = as.matrix(hdfs.read.text.file(r.file)), input.format = "csv", map = function(.,M) { numRows <- length(M) M.vec.elems <-unlist(lapply(M, function(x) strsplit(x, ","))) M.matrix <- matrix(M.vec.elems, nrow=numRows, byrow=TRUE) label <- M.matrix[,dim(M.matrix)[2]] reduced.predictors <- M.matrix[,1:reducedCols] reduced.M <- cbind(reduced.predictors, label) keyval( 1, as.numeric(reduced.M)) } ))) write.table(reduced.values, file="/root/somnath/reduced.values.csv") w.file <- hdfs.file(hdfsWritePath,"w") hdfs.write(reduced.values,w.file) hdfs.close(r.file) hdfs.close(w.file) #to.dfs(reduced.values) } 

Hope this helps and don’t forget to specify points if you find it useful. thanks in advance

+1
source

Source: https://habr.com/ru/post/951180/


All Articles