Sparklyr:如何根据列集中一个Spark表? [英] Sparklyr: how to center a Spark table based on column?

查看:417
本文介绍了Sparklyr:如何根据列集中一个Spark表?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Spark表:

  simx 
x0:num 1.00 2.00 3.00 ...
x1:num 2.00 3.00 4.00 ...
...
x788:num 2.00 3.00 4.00 ...

$ b在连接到这个 simx 的R环境中,
$ b

和一个名为 simX_tbl 的句柄表。



我想为这个表做一个定心,这是用列的方法减去每一列。例如,计算 x0 - mean(x0)等等。



到目前为止,我最大的努力是:

  meanX<  -  simX_tbl%>%summarise_all(funs(mean))%>%collect b 
$ b x_centered< - simX_tbl

for(i in 1:789){
colName< - paste0(x,i-1)
colName2< - lazyeval :: interp(〜a - b,a = as.name(colName),b = as.double(meanX [i]))
x_centered< - x_centered%>%mutate_ (.dots = setNames(list(colName2),colName))
}

for 循环中进行少量迭代( 1:5 x_centered% >%head 结果正确。但是当我这样做789次迭代时,当我尝试时,会出现这个错误:

 错误:C堆用量7969412太接近限制

下面是我已经尝试过的输出方法显示了C堆栈使用错误:

  x_centered%>%head #show first 6行

x_centered%>%select _(x0)#select第一列

x_centered%>%sdf_register(x_centered)#register as table

x_centered%>%spark_dataframe()%>%tbl(sc,x_centered)#also注册表

spark_write_csv(x_centered,path =hdfs / path /这里)#write as csv

稍后我需要对每列计算相关系数,但是不要以为我可以输出这个错误。



有没有办法正确/有效地做这个中心?我阅读了关于提高Cstack限制的这个问题,但是我不认为这是一个解决方案,因为数据很大,有更大的数据再次覆盖的风险。实际数据是40GB +,我目前使用的数据只是一个小样本(789列×10000行)。



Spark版本是1.6.0



编辑:使标题更清晰,添加尝试的输出方法

解决方案

mutate_each / muate_all

 $ b df<  -  data.frame(x = c(1,2,3),y = c(-4,5, 6),z = c(42,42,42))
sdf< - copy_to(sc,df,overwrite = TRUE)

  mutate_all(sdf,funs (。 -  mean(。)))

 资料来源:query [3 x 3] 
数据库:spark connection master = local [*] app = sparklyr local = TRUE

xyz
< dbl> < DBL> < DBL>
1 -1 -6.333333 0
2 0 2.666667 0
3 1 3.666667 0

,但它看起来像扩展为真正无效的(对于大型数据集)窗口函数应用程序不可接受。您可以使用更详细的解决方案:

  avgs<  -  summarize_all(sdf,funs(mean))%>% as.data.frame()

exprs< - as.list(粘贴(colnames(sdf), - ,avgs))

sdf%>%
spark_dataframe()%>%
invoke(selectExpr,exprs)%>%
invoke(toDF,as.list(colnames(sdf)))%>
invoke(registerTempTable,center)

tbl(sc,center)



 源:query [3 x 3] 
数据库:spark connection master = local [*] app = sparklyr local = TRUE

xyz
< dbl> < DBL> < DBL>
1 -1 -6.333333 0
2 0 2.666667 0
3 1 3.666667 0

它不像 dplyr 方法那么漂亮,但与前者不同,这是一个明智的事情。



如果要跳过所有调用,您可以使用 dplyr 同样的事情:

  transpute_(sdf,.dots = setNames(exprs,colnames(sdf)))



 源:query [3 x 3] 
数据库:spark connection master = local [ *] app = sparklyr local = TRUE

xyz
< dbl> < DBL> < DBL>
1 -1 -6.333333 0
2 0 2.666667 0
3 1 3.666667 0

执行计划



助手功能:

  optimizedPlan<  -  function(df){
df%>%
spark_dataframe()%>%
invoke(queryExecution)%> %
invoke(optimizedPlan)
}

dplyr 版本:

  mutate_all(sdf,funs(。 -  mean(。)))% >%optimizedPlan()



 < jobj [190]> 
class org.apache.spark.sql.catalyst.plans.logical.Project
项目[x#2877,y#2878,(z#1123 - _we0#2894)AS z#2879]
+ - 窗口[avg(z#1123)windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)AS _we0#2894]
+ - 项目[x#2877,(y#1122 - _we0#2892) #2878,z#1123]
+ - Window [avg(y#1122)windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)AS _we0#2892]
+ - Project [(x#1121 - _we0 #2890)AS x#2877,z#1123,y#1122]
+ - 窗口[avg(x#1121)windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)AS _we0#2890]
+ - 项目[y#1122,z#1123,x#1121]
+ - InMemoryRelation [x#1121,y#1122,z#1123],true,10000,StorageLevel(磁盘,内存,反序列化,1个副本),`df`
:+ - *扫描csv [x#1121,y#1122,z#1123]格式:CSV,InputPaths:file:/ tmp / RtmpiEECCe / spark_serialize_f848ebf3e065c9a204092779 c3e8f32ce6afdcb6e79bf6b9868ae9ff198a ...,PartitionFilters:[],PushedFilters:[],ReadSchema:struct< x:double,y:double,z:double>

Spark解决方案:

  tbl(sc,center)%>%optimizedPlan()



 < jobj [204]> 
class org.apache.spark.sql.catalyst.plans.logical.Project
Project [(x#1121 - 2.0)AS x#2339,(y#1122 - 2.33333333333333)AS y#2340, (z#1123 - 42.0)AS z#2341]
+ - InMemoryRelation [x#1121,y#1122,z#1123],true,10000,StorageLevel(disk,memory,deserialized,1 replicas) df`
:+ - *扫描csv [x#1121,y#1122,z#1123]格式:CSV,InputPaths:file:/ tmp / RtmpiEECCe / spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a ...,PartitionFilters:[],PushedFilters: [],ReadSchema:struct< x:double,y:double,z:double>

dplyr 优化:

  transmute_(sdf,.dots = setNames(exprs,colnames(sdf)))%>%optimizedPlan()



 < jobj [272]> 
class org.apache.spark.sql.catalyst.plans.logical.Project
Project [(x#1121 - 2.0)AS x#4792,(y#1122 - 2.33333333333333)AS y#4793, (z#1123 - 42.0)AS z#4794]
+ - InMemoryRelation [x#1121,y#1122,z#1123],true,10000,StorageLevel(disk,memory,deserialized,1 replicas) df`
:+ - *扫描csv [x#1121,y#1122,z#1123]格式:CSV,InputPaths:file:/ tmp / RtmpiEECCe / spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a ...,PartitionFilters:[],PushedFilters: [],ReadSchema:struct< x:double,y:double,z:double>

备注



Spark SQL在处理广泛的数据集方面并不是很好。使用核心Spark您通常将功能组合成单个矢量 ,Spark提供了多个可用于 Vector 数据。


I have a Spark table:

simx
x0: num 1.00 2.00 3.00 ...
x1: num 2.00 3.00 4.00 ...
...
x788: num 2.00 3.00 4.00 ...

and a handle named simX_tbl in the R environment that is connected to this simx table.

I want to do a centering for this table, which is subtracting each column with its column means. For example, calculating x0 - mean(x0), and so on.

So far my best effort is:

meanX <- simX_tbl %>% summarise_all(funs("mean")) %>% collect()

x_centered <-  simX_tbl

for(i in 1:789) {
  colName <- paste0("x", i-1)
  colName2 <- lazyeval::interp(~ a - b, a = as.name(colName), b = as.double(meanX[i]))
  x_centered <- x_centered %>% mutate_(.dots = setNames( list(colName2) , colName) )
}

This actually works when I limit the for loop for few iterations (1:5) the x_centered %>% head result is correct. But when I do this for 789 iterations, this error comes out when I try to head it:

Error: C stack usage  7969412 is too close to the limit

Below are the output methods I've already tried that shows the C stack usage error:

x_centered %>% head #show first 6 rows

x_centered %>% select_("x0") #select first column only

x_centered %>% sdf_register("x_centered") #register as table

x_centered %>% spark_dataframe() %>% tbl(sc, "x_centered") #also register as table

spark_write_csv(x_centered, path = "hdfs/path/here") #write as csv

Later on I need to do calculate correlation coefficient for each columns but I don't think I can output with this error.

Is there any way to do this centering correctly / efficiently? I read this question about raising the Cstack limit, but I don't think it's a solution because the data is quite large and there's a risk of overlimit again with bigger data. The actual data is 40GB+ and the data I'm currently using is just a small sample (789 columns x 10000 rows).

Spark version is 1.6.0

EDIT: make title clearer, add tried output methods

解决方案

You just use mutate_each / muate_all

library(dplyr)

df <- data.frame(x=c(1, 2, 3), y = c(-4, 5, 6), z = c(42, 42, 42))
sdf <- copy_to(sc, df, overwrite=TRUE)

mutate_all(sdf, funs(. - mean(.)))

Source:   query [3 x 3]
Database: spark connection master=local[*] app=sparklyr local=TRUE

      x         y     z
  <dbl>     <dbl> <dbl>
1    -1 -6.333333     0
2     0  2.666667     0
3     1  3.666667     0

but it looks like it is expanded to a really inefficient (unacceptable for large datasets) window function application. You could be better with more verbose solution:

avgs <- summarize_all(sdf, funs(mean)) %>% as.data.frame()

exprs <- as.list(paste(colnames(sdf),"-", avgs))

sdf %>%  
  spark_dataframe() %>% 
  invoke("selectExpr", exprs) %>% 
  invoke("toDF", as.list(colnames(sdf))) %>% 
  invoke("registerTempTable", "centered")

tbl(sc, "centered")

Source:   query [3 x 3]
Database: spark connection master=local[*] app=sparklyr local=TRUE

      x         y     z
  <dbl>     <dbl> <dbl>
1    -1 -6.333333     0
2     0  2.666667     0
3     1  3.666667     0

It is not as pretty as dplyr approach but unlike the former one does a sensible thing.

If you want to skip all the invokes you can use dplyr to the same thing:

transmute_(sdf, .dots = setNames(exprs, colnames(sdf)))

Source:   query [3 x 3]
Database: spark connection master=local[*] app=sparklyr local=TRUE

      x         y     z
  <dbl>     <dbl> <dbl>
1    -1 -6.333333     0
2     0  2.666667     0
3     1  3.666667     0

Execution plans:

A helper function:

optimizedPlan <- function(df) {
  df %>% 
    spark_dataframe() %>%
    invoke("queryExecution") %>%
    invoke("optimizedPlan")
}

dplyr version:

mutate_all(sdf, funs(. - mean(.))) %>% optimizedPlan()

<jobj[190]>
  class org.apache.spark.sql.catalyst.plans.logical.Project
  Project [x#2877, y#2878, (z#1123 - _we0#2894) AS z#2879]
+- Window [avg(z#1123) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#2894]
   +- Project [x#2877, (y#1122 - _we0#2892) AS y#2878, z#1123]
      +- Window [avg(y#1122) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#2892]
         +- Project [(x#1121 - _we0#2890) AS x#2877, z#1123, y#1122]
            +- Window [avg(x#1121) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#2890]
               +- Project [y#1122, z#1123, x#1121]
                  +- InMemoryRelation [x#1121, y#1122, z#1123], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
                     :  +- *Scan csv [x#1121,y#1122,z#1123] Format: CSV, InputPaths: file:/tmp/RtmpiEECCe/spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:double,y:double,z:double>

Spark solution:

tbl(sc, "centered") %>% optimizedPlan()

<jobj[204]>
  class org.apache.spark.sql.catalyst.plans.logical.Project
  Project [(x#1121 - 2.0) AS x#2339, (y#1122 - 2.33333333333333) AS y#2340, (z#1123 - 42.0) AS z#2341]
+- InMemoryRelation [x#1121, y#1122, z#1123], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
   :  +- *Scan csv [x#1121,y#1122,z#1123] Format: CSV, InputPaths: file:/tmp/RtmpiEECCe/spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:double,y:double,z:double>

dplyr optimized:

transmute_(sdf, .dots = setNames(exprs, colnames(sdf))) %>% optimizedPlan()

<jobj[272]>
  class org.apache.spark.sql.catalyst.plans.logical.Project
  Project [(x#1121 - 2.0) AS x#4792, (y#1122 - 2.33333333333333) AS y#4793, (z#1123 - 42.0) AS z#4794]
+- InMemoryRelation [x#1121, y#1122, z#1123], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
   :  +- *Scan csv [x#1121,y#1122,z#1123] Format: CSV, InputPaths: file:/tmp/RtmpiEECCe/spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:double,y:double,z:double>

Notes:

Spark SQL is not that good in handling wide datasets. With core Spark you usually combine features into a single Vector Column and Spark provides a number of transformers which can be used to operate on Vector data.

这篇关于Sparklyr:如何根据列集中一个Spark表?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆