Sparklyr:如何根据列将 Spark 表居中? [英] Sparklyr: how to center a Spark table based on column?
问题描述
我有一个 Spark 表:
I have a Spark table:
simx
x0: num 1.00 2.00 3.00 ...
x1: num 2.00 3.00 4.00 ...
...
x788: num 2.00 3.00 4.00 ...
和连接到这个 simx
表的 R 环境中名为 simX_tbl
的句柄.
and a handle named simX_tbl
in the R environment that is connected to this simx
table.
我想对这个表进行居中,即用其列均值减去每一列.例如,计算x0 - mean(x0)
,等等.
I want to do a centering for this table, which is subtracting each column with its column means. For example, calculating x0 - mean(x0)
, and so on.
到目前为止,我最大的努力是:
So far my best effort is:
meanX <- simX_tbl %>% summarise_all(funs("mean")) %>% collect()
x_centered <- simX_tbl
for(i in 1:789) {
colName <- paste0("x", i-1)
colName2 <- lazyeval::interp(~ a - b, a = as.name(colName), b = as.double(meanX[i]))
x_centered <- x_centered %>% mutate_(.dots = setNames( list(colName2) , colName) )
}
当我将 for
循环限制为几次迭代 (1:5
) 时,这实际上有效,x_center %>% head
结果是正确的.但是当我对 789 次迭代执行此操作时,当我尝试 head
时会出现此错误:
This actually works when I limit the for
loop for few iterations (1:5
) the x_centered %>% head
result is correct. But when I do this for 789 iterations, this error comes out when I try to head
it:
Error: C stack usage 7969412 is too close to the limit
以下是我已经尝试过的显示 C 堆栈使用错误的输出方法:
Below are the output methods I've already tried that shows the C stack usage error:
x_centered %>% head #show first 6 rows
x_centered %>% select_("x0") #select first column only
x_centered %>% sdf_register("x_centered") #register as table
x_centered %>% spark_dataframe() %>% tbl(sc, "x_centered") #also register as table
spark_write_csv(x_centered, path = "hdfs/path/here") #write as csv
稍后我需要计算每列的相关系数,但我认为我无法输出此错误.
Later on I need to do calculate correlation coefficient for each columns but I don't think I can output with this error.
有什么方法可以正确/有效地进行居中?我阅读了关于提高 Cstack 限制的这个问题,但我认为这不是解决方案,因为数据非常大并且更大的数据有再次超限的风险.实际数据为 40GB+,我目前使用的数据只是一个小样本(789 列 x 10000 行).
Is there any way to do this centering correctly / efficiently? I read this question about raising the Cstack limit, but I don't think it's a solution because the data is quite large and there's a risk of overlimit again with bigger data. The actual data is 40GB+ and the data I'm currently using is just a small sample (789 columns x 10000 rows).
Spark 版本为 1.6.0
Spark version is 1.6.0
使标题更清晰,添加尝试过的输出方法
make title clearer, add tried output methods
推荐答案
你只需使用 mutate_each
/muate_all
library(dplyr)
df <- data.frame(x=c(1, 2, 3), y = c(-4, 5, 6), z = c(42, 42, 42))
sdf <- copy_to(sc, df, overwrite=TRUE)
mutate_all(sdf, funs(. - mean(.)))
Source: query [3 x 3]
Database: spark connection master=local[*] app=sparklyr local=TRUE
x y z
<dbl> <dbl> <dbl>
1 -1 -6.333333 0
2 0 2.666667 0
3 1 3.666667 0
但看起来它被扩展为非常低效(对于大型数据集是不可接受的)窗口函数应用程序.使用更详细的解决方案可能会更好:
but it looks like it is expanded to a really inefficient (unacceptable for large datasets) window function application. You could be better with more verbose solution:
avgs <- summarize_all(sdf, funs(mean)) %>% as.data.frame()
exprs <- as.list(paste(colnames(sdf),"-", avgs))
sdf %>%
spark_dataframe() %>%
invoke("selectExpr", exprs) %>%
invoke("toDF", as.list(colnames(sdf))) %>%
invoke("registerTempTable", "centered")
tbl(sc, "centered")
Source: query [3 x 3]
Database: spark connection master=local[*] app=sparklyr local=TRUE
x y z
<dbl> <dbl> <dbl>
1 -1 -6.333333 0
2 0 2.666667 0
3 1 3.666667 0
它不像 dplyr
方法那么漂亮,但不像前者那样做一件明智的事情.
It is not as pretty as dplyr
approach but unlike the former one does a sensible thing.
如果你想跳过所有的 invokes
你可以使用 dplyr
来做同样的事情:
If you want to skip all the invokes
you can use dplyr
to the same thing:
transmute_(sdf, .dots = setNames(exprs, colnames(sdf)))
Source: query [3 x 3]
Database: spark connection master=local[*] app=sparklyr local=TRUE
x y z
<dbl> <dbl> <dbl>
1 -1 -6.333333 0
2 0 2.666667 0
3 1 3.666667 0
执行计划:
辅助函数(另请参阅dbplyr::remote_query
了解物理计划):
A helper function (see also dbplyr::remote_query
for physical plan):
optimizedPlan <- function(df) {
df %>%
spark_dataframe() %>%
invoke("queryExecution") %>%
invoke("optimizedPlan")
}
dplyr
版本:
mutate_all(sdf, funs(. - mean(.))) %>% optimizedPlan()
<jobj[190]>
class org.apache.spark.sql.catalyst.plans.logical.Project
Project [x#2877, y#2878, (z#1123 - _we0#2894) AS z#2879]
+- Window [avg(z#1123) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#2894]
+- Project [x#2877, (y#1122 - _we0#2892) AS y#2878, z#1123]
+- Window [avg(y#1122) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#2892]
+- Project [(x#1121 - _we0#2890) AS x#2877, z#1123, y#1122]
+- Window [avg(x#1121) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#2890]
+- Project [y#1122, z#1123, x#1121]
+- InMemoryRelation [x#1121, y#1122, z#1123], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
: +- *Scan csv [x#1121,y#1122,z#1123] Format: CSV, InputPaths: file:/tmp/RtmpiEECCe/spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:double,y:double,z:double>
Spark 解决方案:
Spark solution:
tbl(sc, "centered") %>% optimizedPlan()
<jobj[204]>
class org.apache.spark.sql.catalyst.plans.logical.Project
Project [(x#1121 - 2.0) AS x#2339, (y#1122 - 2.33333333333333) AS y#2340, (z#1123 - 42.0) AS z#2341]
+- InMemoryRelation [x#1121, y#1122, z#1123], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
: +- *Scan csv [x#1121,y#1122,z#1123] Format: CSV, InputPaths: file:/tmp/RtmpiEECCe/spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:double,y:double,z:double>
dplyr
优化:
transmute_(sdf, .dots = setNames(exprs, colnames(sdf))) %>% optimizedPlan()
<jobj[272]>
class org.apache.spark.sql.catalyst.plans.logical.Project
Project [(x#1121 - 2.0) AS x#4792, (y#1122 - 2.33333333333333) AS y#4793, (z#1123 - 42.0) AS z#4794]
+- InMemoryRelation [x#1121, y#1122, z#1123], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
: +- *Scan csv [x#1121,y#1122,z#1123] Format: CSV, InputPaths: file:/tmp/RtmpiEECCe/spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:double,y:double,z:double>
注意事项:
Spark SQL 在处理宽数据集方面不是很好.使用核心 Spark,您通常将功能组合到单个 Vector
Column
中,Spark 提供了许多可用于操作 Vector
数据的转换器.
Spark SQL is not that good in handling wide datasets. With core Spark you usually combine features into a single Vector
Column
and Spark provides a number of transformers which can be used to operate on Vector
data.
这篇关于Sparklyr:如何根据列将 Spark 表居中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!