如何将函数应用于SparkR中的每一行? [英] How to apply a function to each row in SparkR?

查看:81
本文介绍了如何将函数应用于SparkR中的每一行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个CSV格式的文件,其中包含一个带有"id","timestamp","action","value"和"location"列的表. 我想对表的每一行应用一个函数,并且已经在R中编写了如下代码:

I have a file in CSV format which contains a table with column "id", "timestamp", "action", "value" and "location". I want to apply a function to each row of the table and I've already written the code in R as follows:

user <- read.csv(file_path,sep = ";")
num <- nrow(user)
curLocation <- "1"
for(i in 1:num) {
    row <- user[i,]
    if(user$action != "power")
        curLocation <- row$value
    user[i,"location"] <- curLocation
}

R脚本运行良好,现在我想将其应用SparkR.但是,我无法直接在SparkR中访问第ith行,也找不到任何函数来操作

The R script works fine and now I want to apply it SparkR. However, I couldn't access the ith row directly in SparkR and I couldn't find any function to manipulate every row in SparkR documentation.

我应该使用哪种方法来达到与R脚本相同的效果?

Which method should I use in order to achieve the same effect as in the R script?

此外,按照@chateaur的建议,我尝试使用dapply函数进行编码,如下所示:

In addition, as advised by @chateaur, I tried to code using dapply function as follows:

curLocation <- "1"
schema <- structType(structField("Sequence","integer"), structField("ID","integer"), structField("Timestamp","timestamp"), structField("Action","string"), structField("Value","string"), structField("Location","string"))
setLocation <- function(row, curLoc) {
    if(row$Action != "power|battery|level"){
        curLoc <- row$Value
    }
    row$Location <- curLoc
}
bw <- dapply(user, function(row) { setLocation(row, curLocation)}, schema)
head(bw)

然后我得到一个错误:

Then I got an error:

我查看了警告消息条件的长度> 1,并且仅将使用第一个元素,然后发现了https://stackoverflow.com/a/29969702/4942713 .这使我想知道 dapply函数中的 row 参数是否表示数据帧的整个分区,而不是单行?也许dapply函数不是理想的解决方案?

I looked up the warning message the condition has length > 1 and only the first element will be used and I found something https://stackoverflow.com/a/29969702/4942713. It made me wonder whether the row parameter in the dapply function represent an entire partition of my data frame instead of one single row? Maybe dapply function is not a desirable solution?

后来,我尝试按照@chateaur的建议修改功能.我没有使用 dapply ,而是使用了 dapplyCollect ,这省去了指定架构的工作.可行!

Later, I tried to modify the function as advised by @chateaur. Instead of using dapply, I used dapplyCollect which saves me the effort of specifying the schema. It works!

changeLocation <- function(partitionnedDf) {
    nrows <- nrow(partitionnedDf)
    curLocation <- "1"
    for(i in 1:nrows){
        row <- partitionnedDf[i,]
        if(row$action != "power") {
            curLocation <- row$value
        }
    partitionnedDf[i,"location"] <- curLocation
    }
    partitionnedDf
}

bw <- dapplyCollect(user, changeLocation)

推荐答案

Scorpion775,

Scorpion775,

您应该共享您的sparkR代码.不要忘记,R和sparkR中的数据操作方式不同.

You should share your sparkR code. Don't forget that data isn't manipulated the same way in R and sparkR.

来自: http://spark.apache.org/docs/latest/sparkr .html

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")

然后,您可以在此处查看dapply函数: https://spark.apache.org/docs/2.1.0/api/R/dapply.html

Then you can look at dapply function here : https://spark.apache.org/docs/2.1.0/api/R/dapply.html

这是一个可行的示例:

changeLocation <- function(partitionnedDf) {
    nrows <- nrow(partitionnedDf)
    curLocation <- as.integer(1)

    # Loop over each row of the partitionned data frame
    for(i in 1:nrows){
        row <- partitionnedDf[i,]

        if(row[1] != "power") {
            curLocation <- row[2]
        }
        partitionnedDf[i,3] <- curLocation
    }

    # Return modified data frame
    partitionnedDf
}

# Load data
df <- read.df("data.csv", "csv", header="false", inferSchema = "true")

head(collect(df))

# Define schema of dataframe
schema <- structType(structField("action", "string"), structField("value", "integer"),
                     structField("location", "integer"))

# Change location of each row                    
df2 <- dapply(df, changeLocation, schema)

head(df2)

这篇关于如何将函数应用于SparkR中的每一行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆