DF中的每个组的pyspark corr(超过5K列) [英] pyspark corr for each group in DF (more than 5K columns)

查看:907
本文介绍了DF中的每个组的pyspark corr(超过5K列)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个具有1亿行和5000多列的DF。我试图在colx和剩余的5000+列之间找到corr。

  aggList1 = [mean(col).alias +'_m')for col in df.columns] #exclude keys 
df21 = df.groupBy('key1','key2','key3','key4')。agg(* aggList1)
df = df.join(broadcast(df21),['key1','key2','key3','key4']))
df = df.select([func.round((func.col (colmd) - func.col(colmd +'_m')),8).alias(colmd)\
在all5Kcolumns中的colmd])


aggCols = [ corr(colx,col).alias(col)for col in colsall5K]
df2 = df.groupBy('key1','key2','key3')。agg(* aggCols)

现在它不工作,因为火花64KB codegen问题(甚至火花2.2)。所以我每循环300列,最后合并。但是在40个节点(每个10个核心,每个节点具有100GB)的群集中花了30多个小时。任何帮助调整这个?



以下已经尝试过的东西
- 将DF分区到10,000
- 每个循环中的检查点
- 缓存在每个循环中

解决方案

你可以尝试一些NumPy和RDD。首先一批进口:

 从运算符import itemgetter 
导入numpy作为np
从pyspark.statcounter导入StatCounter

我们定义一些变量:

  keys = [key1,key2,key3]#列列名称
xs = [x1,x2,x3 ]#列表列名比较
y =y#参考列的名称

和一些助手:

  def as_pair(keys,y,xs):
给定键名, y名称和xs名称
返回一个键的数组,数组数组
key = itemgetter(* keys)
value = itemgetter(y,* xs)#Python 3语法

def as_pair_(row):
return key(row),np.array(value(row))
return as_pair_

def init(x):
combineByKey的初始化函数
初始化新的StatCounter并合并第一个值
return StatC ounter()。merge(x)

定义中心(平均值):
给定一个
的平均数组字典的行值

def center_(row):
key,value = row
return key,value - 表示[key]
return center_

def prod(arr ):
return arr [0] * arr [1:]

def corr(stddev_prods):
缩放行以获得1 stddev
给定一个stddevs的字典

def corr_(row):
key,value = row
return key,value / stddev_prods [key]
return corr_

并将 DataFrame 转换为 RDD 对:

  pairs = df.rdd.map(as_pair(keys,y, xs))

接下来我们来计算每组的统计信息:

  stats =(pairs 
.combineByKey(init,StatCounter.merge,StatCounter.mergeStats)
.collectAsMap())
$ b $对于k,b表示= {k:v.mean() v in stats.items()}

注意:具有5000个功能和7000组不应该把这个结构保存在内存中。使用更大的数据集,您可能需要使用RDD和加入,但这会更慢。



将数据中心: / p>

  centered = pairs.map(center(means))

计算协方差:

 协方差=(居中
.mapValues (prod)
.combineByKey(init,StatCounter.merge,StatCounter.mergeStats)
.mapValues(StatCounter.mean))

最后相关:

  stddev_prods = {k:prod(v.stdev ))for k,v in stats.items()} 

correlations = covariance.map(corr(stddev_prods))

示例数据:

  df = sc.parallelize([
a,b,c,0.5,0.5,0.3,1.0),
(a,b,c,0.8,0.8,0.9,2.0),
(a,b,c,1.5,1.5,2.9,3.6),
(d,e,f,-3.0,4.0,5.0,10.0 ),
(d,e,f,15.0,-1.0,-5.0,10.0),
])toDF([key1,k ey2,key3,y,x1,x2,x3])

结果与 DataFrame

  df.groupBy(* keys )。(* [corr(y,x)for x in xs])。show()



  + ---- + ---- + ---- + ----------- + ----- ------------- + ------------------ + 
| key1 | key2 | key3 | corr(y,x1) | corr(y,x2)| corr(y,x3)|
+ ---- + ---- + ---- + ----------- + ------------------ + ------------------ +
| d | e | f | -1.0 | -1.0 | 1.0 |
| a | b | c | 1.0 | 0.9972300220940342 | 0.6513360726920862 |
+ ---- + ---- + ---- + ----------- + ------------------ + ------------------ +

和以上提供的方法:

  correlations.collect()



  [(('a','b','c'),array([ ,),
(('d','e','f'),数组([ - 1,-1。,1]))]

这个解决方案虽然有点涉及,但是很有弹性,可以轻松调整以处理不同的数据分发。也可以通过JIT进一步提升。


I have a DF with 100 million rows and 5000+ columns. I am trying to find the corr between colx and remaining 5000+ columns.

aggList1 =  [mean(col).alias(col + '_m') for col in df.columns]  #exclude keys
df21= df.groupBy('key1', 'key2', 'key3', 'key4').agg(*aggList1)
df = df.join(broadcast(df21),['key1', 'key2', 'key3', 'key4']))
df= df.select([func.round((func.col(colmd) - func.col(colmd + '_m')), 8).alias(colmd)\
                     for colmd in all5Kcolumns])


aggCols= [corr(colx, col).alias(col) for col in colsall5K]
df2 = df.groupBy('key1', 'key2', 'key3').agg(*aggCols)

Right now it is not working because of spark 64KB codegen issue (even spark 2.2). So i am looping for each 300 columns and merging all at the end. But it is taking more than 30 hours in a cluster with 40 nodes (10 core each and each node with 100GB). Any help to tune this?

Below things already tried - Re partition DF to 10,000 - Checkpoint in each loop - cache in each loop

解决方案

You can try with a bit of NumPy and RDDs. First a bunch of imports:

from operator import itemgetter
import numpy as np
from pyspark.statcounter import StatCounter

Let's define a few variables:

keys = ["key1", "key2", "key3"] # list of key column names
xs = ["x1", "x2", "x3"]    # list of column names to compare
y = "y"                         # name of the reference column

And some helpers:

def as_pair(keys, y, xs):
    """ Given key names, y name, and xs names
    return a tuple of key, array-of-values"""
    key = itemgetter(*keys)
    value = itemgetter(y, * xs)  # Python 3 syntax

    def as_pair_(row):
        return key(row), np.array(value(row))
    return as_pair_

def init(x):
    """ Init function for combineByKey
    Initialize new StatCounter and merge first value"""
    return StatCounter().merge(x)

def center(means):
    """Center a row value given a 
    dictionary of mean arrays
    """
    def center_(row):
        key, value = row
        return key, value - means[key]
    return center_

def prod(arr):
    return arr[0] * arr[1:]

def corr(stddev_prods):
    """Scale the row to get 1 stddev 
    given a dictionary of stddevs
    """
    def corr_(row):
        key, value = row
        return key, value / stddev_prods[key]
    return corr_

and convert DataFrame to RDD of pairs:

pairs = df.rdd.map(as_pair(keys, y, xs))

Next let's compute statistics per group:

stats = (pairs
    .combineByKey(init, StatCounter.merge, StatCounter.mergeStats)
    .collectAsMap())

means = {k: v.mean() for k, v in stats.items()}

Note: With 5000 features and 7000 group there should no issue with keeping this structure in memory. With larger datasets you may have to use RDD and join but this will be slower.

Center the data:

centered = pairs.map(center(means))

Compute covariance:

covariance = (centered
    .mapValues(prod)
    .combineByKey(init, StatCounter.merge, StatCounter.mergeStats)
    .mapValues(StatCounter.mean))

And finally correlation:

stddev_prods = {k: prod(v.stdev()) for k, v in stats.items()}

correlations = covariance.map(corr(stddev_prods))

Example data:

df = sc.parallelize([
    ("a", "b", "c", 0.5, 0.5, 0.3, 1.0),
    ("a", "b", "c", 0.8, 0.8, 0.9, -2.0), 
    ("a", "b", "c", 1.5, 1.5, 2.9, 3.6),
    ("d", "e", "f", -3.0, 4.0, 5.0, -10.0),
    ("d", "e", "f", 15.0, -1.0, -5.0, 10.0),
]).toDF(["key1", "key2", "key3", "y", "x1", "x2", "x3"])

Results with DataFrame:

df.groupBy(*keys).agg(*[corr(y, x) for x in xs]).show()

+----+----+----+-----------+------------------+------------------+
|key1|key2|key3|corr(y, x1)|       corr(y, x2)|       corr(y, x3)|
+----+----+----+-----------+------------------+------------------+
|   d|   e|   f|       -1.0|              -1.0|               1.0|
|   a|   b|   c|        1.0|0.9972300220940342|0.6513360726920862|
+----+----+----+-----------+------------------+------------------+

and the method provided above:

correlations.collect()

[(('a', 'b', 'c'), array([ 1.        ,  0.99723002,  0.65133607])),
 (('d', 'e', 'f'), array([-1., -1.,  1.]))]

This solution, while a bit involved, is quite elastic and can be easily adjusted to handle different data distributions. It should be also possible to given further boost with JIT.

这篇关于DF中的每个组的pyspark corr(超过5K列)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆