SparkR foreach循环 [英] SparkR foreach loop
问题描述
在Spark的Java/Scala/Python实现中,可以简单地调用RDD
或DataFrame
类型的foreach
方法,以并行化数据集上的迭代.
In Java/Scala/Python implementations of Spark, one can simply call the foreach
method of RDD
or DataFrame
types in order to parallelize the iterations over a dataset.
在SparkR中我找不到这样的指令.在DataFrame
的行上进行迭代的正确方法是什么?
In SparkR I can't find such instruction. What would be the proper way to iterate over the rows of a DataFrame
?
我只能找到gapply
和dapply
函数,但是我不想计算新的列值,我只想通过从列表中并行获取一个元素来做某事.
I could only find the gapply
and dapply
functions, but I don't want to calculate new column values, I just want to do something by taking one element from a list, in parallel.
我以前的尝试是使用lapply
inputDF <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "")
createOrReplaceTempView(inputDF,'inputData')
distinctM <- sql('SELECT DISTINCT(ID_M) FROM inputData')
collected <- collect(distinctM)[[1]]
problemSolver <- function(idM) {
filteredDF <- filter(inputDF, inputDF$ID_M == idM)
}
spark.lapply(c(collected), problemSolver)
但我收到此错误:
Error in handleErrors(returnStatus, conn) :
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 1 times, most recent failure: Lost task 1.0 in stage 5.0 (TID 207, localhost, executor driver): org.apache.spark.SparkException: R computation failed with
Error in callJMethod(x@sdf, "col", c) :
Invalid jobj 3. If SparkR was restarted, Spark operations need to be re-executed.
Calls: compute ... filter -> $ -> $ -> getColumn -> column -> callJMethod
R为解决此类问题提供的解决方案是什么?
What would be the solution provided by R to solve such problems?
推荐答案
我也遇到了类似的问题.收集一个DataFrame会将其作为数据帧放入R中.从那里,您可以像在常规旧R中通常那样到达每一行.在我看来,这是处理数据的可怕主题,因为您失去了Spark提供的并行处理.无需收集数据然后进行过滤,而使用内置的SparkR函数select
,filter
等.如果您想进行按行运算符,则内置的SparkR函数通常会为您执行此操作,否则,我发现selectExpr
或expr
在原始Spark函数时非常有用 旨在处理单个值(例如:from_unix_timestamp)
I had a similar problem as well. Collecting a DataFrame puts it into R as a dataframe. From there, you can get at each row as you normally would in regular old R. In my opinion, this is a horrible motif for processing data as you lose the parallel processing Spark provides. Instead of collecting the data and then filtering, use the built in SparkR functions, select
, filter
,etc. If you wish to do row-wise operators, the built in SparkR functions will generally do this for you, otherwise, I have found selectExpr
or expr
to be very useful when the original Spark functions are designed to work on a single value (think: from_unix_timestamp)
所以,要获得您想要的东西,我会尝试这样的事情(我使用的是SparkR 2.0 +):
So, to get what you want, I would try something like this (I'm on SparkR 2.0+):
第一拳完成后读入数据:
Frist Read in the data as you have done:
inputDF<- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "")
然后将其作为RDD:inputSparkDF<- SparkR::createDataFrame(inputDF)
Then make this an RDD:inputSparkDF<- SparkR::createDataFrame(inputDF)
接下来,仅隔离不同/唯一的值(我正在使用magrittr进行管道传输(在SparkR中有效))
Next, isolate only the distinct/unique values (I'm using magrittr for piping (works in SparkR)):
distinctSparkDF<- SparkR::select(inputSparkDF) %>% SparkR::distinct()
在这里,您可以在仍生活在Spark的世界中的同时进行过滤:
From here, you can apply filtering while still living in Spark's world:
filteredSparkDF<- SparkR::filter(distinctSparkDF, distinctSparkDF$variable == "value")
Spark为您过滤了这些数据之后,将其作为工作流程中的 last 步骤收集到基数R中是有意义的:
After Spark has filtered that data for you, it makes sense to collect the subset into base R as the last step in the workflow:
myRegularRDataframe<- SparkR::collect(filteredSparkDF)
我希望这会有所帮助.祝你好运. --nate
I hope this helps. Best of luck. --nate
这篇关于SparkR foreach循环的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!