SparkR foreach循环 [英] SparkR foreach loop

查看:137
本文介绍了SparkR foreach循环的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在Spark的Java/Scala/Python实现中,可以简单地调用RDDDataFrame类型的foreach方法,以并行化数据集上的迭代.

In Java/Scala/Python implementations of Spark, one can simply call the foreach method of RDD or DataFrame types in order to parallelize the iterations over a dataset.

在SparkR中我找不到这样的指令.在DataFrame的行上进行迭代的正确方法是什么?

In SparkR I can't find such instruction. What would be the proper way to iterate over the rows of a DataFrame?

我只能找到gapplydapply函数,但是我不想计算新的列值,我只想通过从列表中并行获取一个元素来做某事.

I could only find the gapply and dapply functions, but I don't want to calculate new column values, I just want to do something by taking one element from a list, in parallel.

我以前的尝试是使用lapply

inputDF <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "")
createOrReplaceTempView(inputDF,'inputData')

distinctM <- sql('SELECT DISTINCT(ID_M) FROM inputData')

collected <- collect(distinctM)[[1]]

problemSolver <- function(idM) {
  filteredDF <- filter(inputDF, inputDF$ID_M == idM)
}

spark.lapply(c(collected), problemSolver)

但我收到此错误:

Error in handleErrors(returnStatus, conn) : 
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 1 times, most recent failure: Lost task 1.0 in stage 5.0 (TID 207, localhost, executor driver): org.apache.spark.SparkException: R computation failed with
 Error in callJMethod(x@sdf, "col", c) : 
  Invalid jobj 3. If SparkR was restarted, Spark operations need to be re-executed.
Calls: compute ... filter -> $ -> $ -> getColumn -> column -> callJMethod

R为解决此类问题提供的解决方案是什么?

What would be the solution provided by R to solve such problems?

推荐答案

我也遇到了类似的问题.收集一个DataFrame会将其作为数据帧放入R中.从那里,您可以像在常规旧R中通常那样到达每一行.在我看来,这是处理数据的可怕主题,因为您失去了Spark提供的并行处理.无需收集数据然后进行过滤,而使用内置的SparkR函数selectfilter等.如果您想进行按行运算符,则内置的SparkR函数通常会为您执行此操作,否则,我发现selectExprexpr在原始Spark函数时非常有用 旨在处理单个值(例如:from_unix_timestamp)

I had a similar problem as well. Collecting a DataFrame puts it into R as a dataframe. From there, you can get at each row as you normally would in regular old R. In my opinion, this is a horrible motif for processing data as you lose the parallel processing Spark provides. Instead of collecting the data and then filtering, use the built in SparkR functions, select, filter,etc. If you wish to do row-wise operators, the built in SparkR functions will generally do this for you, otherwise, I have found selectExpr or expr to be very useful when the original Spark functions are designed to work on a single value (think: from_unix_timestamp)

所以,要获得您想要的东西,我会尝试这样的事情(我使用的是SparkR 2.0 +):

So, to get what you want, I would try something like this (I'm on SparkR 2.0+):

第一拳完成后读入数据:

Frist Read in the data as you have done:

inputDF<- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "")

然后将其作为RDD:inputSparkDF<- SparkR::createDataFrame(inputDF)

Then make this an RDD:inputSparkDF<- SparkR::createDataFrame(inputDF)

接下来,仅隔离不同/唯一的值(我正在使用magrittr进行管道传输(在SparkR中有效))

Next, isolate only the distinct/unique values (I'm using magrittr for piping (works in SparkR)):

distinctSparkDF<- SparkR::select(inputSparkDF) %>% SparkR::distinct()

在这里,您可以在仍生活在Spark的世界中的同时进行过滤:

From here, you can apply filtering while still living in Spark's world:

filteredSparkDF<- SparkR::filter(distinctSparkDF, distinctSparkDF$variable == "value")

Spark为您过滤了这些数据之后,将其作为工作流程中的 last 步骤收集到基数R中是有意义的:

After Spark has filtered that data for you, it makes sense to collect the subset into base R as the last step in the workflow:

myRegularRDataframe<- SparkR::collect(filteredSparkDF)

我希望这会有所帮助.祝你好运. --nate

I hope this helps. Best of luck. --nate

这篇关于SparkR foreach循环的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆