Pyspark 数据框:如何按组应用 scipy.optimize 函数 [英] Pyspark dataframe: how to apply scipy.optimize function by group

查看:32
本文介绍了Pyspark 数据框:如何按组应用 scipy.optimize 函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一段代码运行良好,但使用了 Pandas 数据框 groupby 处理.但是因为文件很大(> 7000 万组,我需要将代码转换为使用 PYSPARK 数据框.这是使用带有小示例数据的 Pandas 数据框的原始代码:

I have a piece of code which works well but uses pandas data frame groupby processing. However because the file is large ( > 70 million groups I need to convert the code to use PYSPARK data frame. Here is the original code using pandas dataframe with small example data:

import pandas as pd
import numpy as np
from scipy.optimize import minimize

df = pd.DataFrame({
'y0': np.random.randn(20),
'y1': np.random.randn(20),
'x0': np.random.randn(20), 
'x1': np.random.randn(20),
'grpVar': ['a', 'b'] * 10})

# Starting values
startVal = np.ones(2)*(1/2)

#Constraint  Sum of coefficients = 0
cons = ({'type':'eq', 'fun': lambda x: 1 - sum(x)})

# Bounds on coefficients
bnds = tuple([0,1] for x in startVal)

# Define a function to calculate sum of squared differences
def SumSqDif(a, df):
    return np.sum((df['y0'] - a[0]*df['x0'])**2 + (df['y1'] - a[1]*df['x1'])  **2)

# Define a function to call minimize function 
def RunMinimize(data, startVal, bnds, cons):
    ResultByGrp = minimize(SumSqDif, startVal, method='SLSQP',
    bounds=bnds, constraints = cons, args=(data))
return ResultByGrp.x

# Do the calculation by applyng the function by group:
# Create GroupBy object
grp_grpVar = df.groupby('grpVar')

Results = grp_grpVar.apply(RunMinimize, startVal=startVal, bnds=bnds, cons=cons))

现在我正在尝试使用 pySpark 数据框为了测试代码,我将 Pandas 数据帧转换为 pyspark 数据帧.

Now I am trying to use pySpark dataframe I convert pandas dataframe to pyspark dataframe for the purpose of testing code.

sdf = sqlContext.createDataFrame(df)
type(sdf)
#  <class 'pyspark.sql.dataframe.DataFrame'>

# Create GroupBy object
Sgrp_grpVar = sdf.groupby('grpVar')

# Redefine functions
def sSumSqDif(a, sdf):
    return np.sum((sdf['y0'] - a[0]*sdf['x0'])**2 + (sdf['y1'] - a[1]*sdf['x1'])**2)

def sRunMinimize(data=sdf, startVal=startVal, bnds=bnds, cons=cons):
    ResultByGrp = minimize(sSumSqDif, startVal, method='SLSQP',
                       bounds=bnds, constraints = cons, args=(data))
return ResultByGrp.x

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType
from pyspark.sql.types import StringType

udf = UserDefinedFunction(sRunMinimize , StringType())

Results = Sgrp_grpVar.agg(sRunMinimize()) 

但是,在我尝试定义用户定义的函数 udf 后,出现以下错误 - 见下文.非常感谢任何帮助纠正我的错误或建议替代方法的帮助.

However after I tried to define the user defined function udf I got the following errors - see below. Any help correcting my errors or suggesting an alternative approach is highly appreciated.

udf = UserDefinedFunction(sRunMinimize, StringType())回溯(最近一次调用最后一次):文件",第 1 行,在init 中的文件/usr/hdp/current/spark2-client/python/pyspark/sql/functions.py",第 1760 行self._judf = self._create_judf(name).......

udf = UserDefinedFunction(sRunMinimize , StringType()) Traceback (most recent call last): File "", line 1, in File "/usr/hdp/current/spark2-client/python/pyspark/sql/functions.py", line 1760, in init self._judf = self._create_judf(name).......

推荐答案

您正在尝试编写无法在 pyspark 中完成的用户定义的聚合函数,请参阅 https://stackoverflow.com/a/40030740.

You're trying to write a User Defined Aggregate Function which can't be done in pyspark see https://stackoverflow.com/a/40030740.

你可以编写一个UDF,将每个组内的数据收集为一个列表:

What you can write instead is a UDF on the data within each group collected as a list:

首先进行设置:

import pandas as pd 
import numpy as np 
from scipy.optimize import minimize
import pyspark.sql.functions as psf
from pyspark.sql.types import *

df = pd.DataFrame({
    'y0': np.random.randn(20),
    'y1': np.random.randn(20),
    'x0': np.random.randn(20), 
    'x1': np.random.randn(20),
    'grpVar': ['a', 'b'] * 10})
sdf = sqlContext.createDataFrame(df)

# Starting values
startVal = np.ones(2)*(1/2)
#Constraint  Sum of coefficients = 0
cons = ({'type':'eq', 'fun': lambda x: 1 - sum(x)})
# Bounds on coefficients
bnds = tuple([0,1] for x in startVal)

我们将广播这些变量,因为我们需要在聚合数据帧的每一行上调用它们,它会将值复制到每个节点,因此它们不必在驱动程序上获取它们:

We'll broadcast these variables since we need to call them on every row of the aggregated dataframe, it will copy the values to every node so they don't have to go get them on the driver:

sc.broadcast(startVal)
sc.broadcast(bnds)

让我们使用 collect_list 聚合数据,我们将更改数据的结构,因此我们只有一列(您可以将每一列收集到不同的列中,但随后您必须修改您将数据传递给函数的方式):

Let's aggregate the data using collect_list, we'll change the structure of the data around so we only have one column (you can collect each column into distinct columns but then you'd have to modify the way you pass data to the function):

Sgrp_grpVar = sdf\
    .groupby('grpVar')\
    .agg(psf.collect_list(psf.struct("y0", "y1", "x0", "x1")).alias("data"))
Sgrp_grpVar.printSchema()

    root
     |-- grpVar: string (nullable = true)
     |-- data: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- y0: double (nullable = true)
     |    |    |-- y1: double (nullable = true)
     |    |    |-- x0: double (nullable = true)
     |    |    |-- x1: double (nullable = true)

我们现在可以创建我们的 UDF,返回的数据类型对于 pyspark 来说太复杂了,pyspark 不支持 numpy 数组 所以我们需要改变它位:

We can now create our UDF, the returned data type is too complex for pyspark, numpy arrays are not supported by pyspark so we'll need to change it a bit:

def sSumSqDif(a, data):
    return np.sum(
        (data['y0'] - a[0]*data['x0'])**2 \
        + (data['y1'] - a[1]*data['x1'])**2)

def sRunMinimize(data, startVal=startVal, bnds=bnds, cons=cons):
    data = pd.DataFrame({k:v for k,v in zip(["y0", "y1", "x0", "x1"], data)})
    ResultByGrp = minimize(sSumSqDif, startVal, method='SLSQP',
                       bounds=bnds, constraints = cons, args=(data))
    return ResultByGrp.x.tolist()

sRunMinimize_udf = lambda startVal, bnds, cons: psf.udf(
    lambda data: sRunMinimize(data, startVal, bnds, cons), 
    ArrayType(DoubleType())
)

我们现在可以将此函数应用于每个组中收集的数据:

We can now apply this function to the collected data in each group:

Results = Sgrp_grpVar.select(
    "grpVar", 
    sRunMinimize_udf(startVal, bnds, cons)("data").alias("res")
)
Results.show(truncate=False)

    +------+-----------------------------------------+
    |grpVar|res                                      |
    +------+-----------------------------------------+
    |b     |[0.4073139282953772, 0.5926860717046227] |
    |a     |[0.8275186444565927, 0.17248135554340727]|
    +------+-----------------------------------------+

但我认为 pyspark 不是解决此问题的正确工具.

But I don't think pyspark is the right tool for this.

这篇关于Pyspark 数据框:如何按组应用 scipy.optimize 函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆