Sparklyr:如何计算2个Spark表之间的相关系数? [英] Sparklyr: how to calculate correlation coefficient between 2 Spark tables?

查看:354
本文介绍了Sparklyr:如何计算2个Spark表之间的相关系数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有这两个Spark表:

I have these 2 Spark tables:

simx
x0: num 1.00 2.00 3.00 ...
x1: num 2.00 3.00 4.00 ...
...
x788: num 2.00 3.00 4.00 ...

simy
y0: num 1.00 2.00 3.00 ...

在这两个表中,每列具有相同数量的值。两个表 x y 保存到句柄 simX_tbl 中,分别为 simY_tbl 。实际数据大小相当大,可能达到40GB。

In both tables, each column has the same number of values. Both table x and y are saved into handle simX_tbl and simY_tbl respectively. The actual data size is quite big and may reach 40GB.

我想计算 simx simy (让我们说像 cor(x0,y0,'pearson'))。

I want to calculate the correlation coefficient of each column in simx with simy (let's say like cor(x0, y0, 'pearson') ).

我在任何地方搜索,我认为没有任何可以使用的 cor 函数,所以我正在考虑使用相关性公式本身(就像在这里提到的)。

I searched everywhere and I don't think there's any ready-to-use cor function, so I'm thinking about using the correlation formula itself (just like mentioned in here).

根据我以前的问题

Based on a good explanation in my previous question, I think using mutate_all or mutate_each is not very efficient and gives a C stack error for a bigger data size, so I consider to use invoke instead to call functions from Spark directly.

到目前为止我管理了直到这里:

So far I managed to get until here:

exprs <- as.list(paste0("sum(", colnames(simX_tbl),")"))

corr_result <- simX_tbl%>%  
  spark_dataframe() %>% 
  invoke("selectExpr", exprs) %>% 
  invoke("toDF", as.list(colnames(simX_tbl))) %>% 
  sdf_register("corr_result")

计算 simx 中每列的总和。但是,我意识到,我还需要计算 simy 表,我不知道如何将两个表交互在一起(例如,访问 simy 同时操作 simx )。

to calculate the sum of each column in simx. But then, I realize that I also need to calculate the simy table and I don't know how to interact the two tables together (like, accessing simy while manipulating simx).

有没有办法计算相关性一个更好的方法?或者也许只是与其他Spark表进行交互。

Is there any way to calculate the correlation in a better way? Or maybe just how to interact with other Spark table.

我的Spark版本是1.6.0

My Spark version is 1.6.0

编辑:
我尝试使用结合函数从 dplyr

xy_df <- simX_tbl %>% 
  as.data.frame %>%
  combine(as.data.frame(simY_tbl)) %>%
  # convert both table to dataframe, then combine. 
  # It will become list, so need to convert to dataframe again
  as.data.frame 

xydata <- copy_to(sc, xy_df, "xydata") #copy the dataframe into Spark table

但是我不知道这是否是一个很好的解决方案,因为: p>

But I'm not sure if this is a good solution because:


  1. 需要加载到R中的数据框,我认为对于大尺寸数据来说非实用

  2. 当尝试句柄 xydata 时,列名称将成为所有值的一个值

  1. Need to load into dataframe inside of R, which I consider non-practical for big size data
  2. When trying to head the handle xydata, the column name becomes a concat of all values

xydata %>% head
Source:   query [6 x 790]
Database: spark connection master=yarn-client app=sparklyr local=FALSE

c_1_67027262134984_2_44919662134984_1_85728542134984_1_49317262134984_


1 1.670273

2 2.449197

3 1.857285

4 1.493173

5 1.576857

6 -5.672155

c_1_67027262134984_2_44919662134984_1_85728542134984_1_49317262134984_
1 1.670273
2 2.449197
3 1.857285
4 1.493173
5 1.576857
6 -5.672155


推荐答案

个人我可以通过回到输入数据集。只是为了记录输入数据已使用CSV读取器加载:

Personally I would solve it by going back to the input dataset. Just for the record the input data has been loaded using CSV reader:

df <- spark_read_csv(
  sc, path = path, name = "simData", delimiter = " ", 
  header = "false", infer_schema = "false"
) %>% rename(y = `_c0`, xs = `_c1`)

,看起来或多或少是这样的:

and looks more or less like this:

      y                                                   xs
  <chr>                                                <chr>
1 21.66     2.643227,1.2698358,2.6338573,1.8812188,3.8708665
2 35.15 3.422151,-0.59515584,2.4994135,-0.19701914,4.0771823
3 15.22  2.8302398,1.9080592,-0.68780196,3.1878228,4.6600842

现在,不要将数据分解成多个表,让我们将两部分进行处理:

Now instead of splitting data into mutlitple tables let's process both part together:

exprs <- lapply(
 0:(n - 1), 
 function(i) paste("CAST(xs[", i, "] AS double) AS x", i, sep=""))

df %>% 
  # Convert to native Spark
  spark_dataframe() %>%
  # Split and select xs, but retain y
  invoke("selectExpr", list("y", "split(xs, ',') AS  xs")) %>%
  invoke("selectExpr", c("CAST(y AS DOUBLE)", exprs)) %>%
  # Register table so we can access it from dplyr
  invoke("registerTempTable", "exploded_df")

并应用 summarize_each

tbl(sc, "exploded_df") %>% summarize_each(funs(corr(., y)), starts_with("x"))



Source:   query [1 x 5]
Database: spark connection master=local[*] app=sparklyr local=TRUE

         x0         x1        x2         x3         x4
      <dbl>      <dbl>     <dbl>      <dbl>      <dbl>
1 0.8503358 -0.9972426 0.7242708 -0.9975092 -0.5571591

快速理智检查( y x0 y x4 ):

A quick sanity check (correlation between y and x0, y and x4):

cor(c(21.66, 35.15, 15.22), c(2.643227, 3.422151, 2.8302398))



[1] 0.8503358



cor(c(21.66, 35.15, 15.22), c(3.8708665, 4.0771823, 4.6600842))



[1] -0.5571591

你可以课程中心数据:

exploded <- tbl(sc, "exploded_df")

avgs <- summarize_all(exploded, funs(mean)) %>% as.data.frame()
center_exprs <- as.list(paste(colnames(exploded ),"-", avgs))

transmute_(exploded, .dots = setNames(center_exprs, colnames(exploded))) %>% 
  summarize_each(funs(corr(., y)), starts_with("x"))

,但不会影响结果

Source:   query [1 x 5]
Database: spark connection master=local[*] app=sparklyr local=TRUE

         x0         x1        x2         x3         x4
      <dbl>      <dbl>     <dbl>      <dbl>      <dbl>
1 0.8503358 -0.9972426 0.7242708 -0.9975092 -0.5571591

如果两个转换_ summarize_each 导致一些问题,我们可以将集中和相关直接推入Spark:

If both the transmute_ and summarize_each causes some issue we can push the centering and correlation directly into Spark:

#Centering
center_exprs <- as.list(paste(colnames(exploded ),"-", avgs))

exploded %>%  
  spark_dataframe() %>% 
  invoke("selectExpr", center_exprs) %>% 
  invoke("toDF", as.list(colnames(exploded))) %>%
  invoke("registerTempTable", "centered")

centered <- tbl(sc, "centered")

#Correlation
corr_exprs <- lapply(
  0:(n - 1), 
  function(i) paste("corr(y, x", i, ") AS x", i, sep=""))

centered %>% 
  spark_dataframe() %>% 
  invoke("selectExpr", corr_exprs) %>% 
  invoke("registerTempTable", "corrs")

 tbl(sc, "corrs")



Source:   query [1 x 5]
Database: spark connection master=local[*] app=sparklyr local=TRUE

         x0         x1        x2         x3         x4
      <dbl>      <dbl>     <dbl>      <dbl>      <dbl>
1 0.8503358 -0.9972426 0.7242708 -0.9975092 -0.5571591

中间表当然不是必需的,这可以在我们从数组中提取数据的同时应用。

Intermediate table is of course not necessary and this could be applied at the same time as we extract data from arrays.

这篇关于Sparklyr:如何计算2个Spark表之间的相关系数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆