pyspark-按元素聚合(和)向量 [英] pyspark - aggregate (sum) vector element-wise

查看:190
本文介绍了pyspark-按元素聚合(和)向量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个看似简单的问题,但我一直都没有成功地把头撞在墙上.我实质上是在尝试做与所有行进行汇总.

I have what seems like a simple problem but I keep banging my head against the wall with no success. I am essentially trying to do the same thing as this post except that I don't care about the "group by" aspect of that post, I just want to sum over all rows.

为了解释链接的帖子,DataFrame看起来像:

To paraphrase the linked post, the DataFrame looks like:

ID,Vec
1,[0,0,5]
2,[3,3,4]
3,[0,8,1]
....

我想对向量进行逐元素求和.

I would like to element-wise sum the vectors.

以上示例的期望输出将是一行:

The desired output of the above example would be a single row:

SumOfVectors
[3,11,10]

另一个很大的不同是,我正在使用pyspark ,而不是Scala.我尝试使rdd.fold()正常工作,但要么无法正常工作,要么无法弄清pyspark中的语法.

The other big difference is that I'm using pyspark, not Scala. I tried getting rdd.fold() to work, but either it doesn't work the same or I can't figure out the syntax in pyspark.

最后一个警告是,我要在大约1MM行的数据帧和大约10k长度的向量上执行此操作,因此必须相当有效.

One final caveat is that I'm doing this on a dataframe of ~1MM rows and a vector of length ~10k so this has to be fairly efficient.

感谢您的帮助!根据评论,可重现的玩具数据框如下.

Thanks for any help! A reproducible toy dataframe is below, per comments.

import numpy as np
from pyspark.ml.linalg import Vectors

n_rows = 100

pdf = np.concatenate([np.array(range(n_rows)), np.random.randn(n_rows), 3*np.random.randn(n_rows)+2, 6*np.random.randn(n_rows)-2]).reshape(n_rows,-1)
dff = map(lambda x: (int(x[0]), Vectors.dense(x[1:])), pdf)

df = spark.createDataFrame(dff,schema=["ID", "Vec"])

df.schema应该看起来像StructType(List(StructField(ID,LongType,true),StructField(Vec,VectorUDT,true)))

仅打印df会给我DataFrame[ID: bigint, Vec: vector]

可能也很重要,我正在使用Spark 2.4

$ spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_191
Branch HEAD
Compiled by user ec2-user on 2018-12-07T19:51:27Z
Revision bab859f34a291cb7b3f4e724b59e1b48af69016b
Url git@aws157git.com:/pkg/Aws157BigTop
Type --help for more information.

推荐答案

我最终想到了这一点(我在撒谎,我的一位同事为我解决了这个问题),所以我将在此处发布答案,以防万一有人同样的问题.

I eventually figured this out (I'm lying, one of my coworkers figured it out for me) so I'll post the answer here in case anyone has the same issue.

您可以使用fold,类似于在原始问题中链接的scala示例中的用法. pyspark中的语法如下:

You can use fold similar to how it's done in the scala example linked in the original question. Syntax in pyspark is like so:

# find out how many Xs we're iterating over to establish the range below
vec_df = df.select('Vec')
num_cols = len(vec_df.first().Vec)

# iterate over vector to sum each "column"    
vec_sums = vec_df.rdd.fold([0]*num_cols, lambda a,b: [x + y for x, y in zip(a, b)])

简要说明:rdd.fold()带有两个参数.第一个是初始化数组,在本例中为[0]*num_cols,它只是一个0的数组.第二个函数是应用于数组并用于遍历数据帧的每一行的函数.因此,对于每一行,它都会执行lambda a,b: [x + y for x, y in zip(a, b)],这只会将该行逐个元素地添加到到目前为止已计算的内容中.

Brief explanation: rdd.fold() takes two arguments. The first is an initialization array, in this case [0]*num_cols which is just an array of 0's. The second is a function to apply to the array and to use for iterating over each row of the dataframe. So for each row it does lambda a,b: [x + y for x, y in zip(a, b)] which just adds this row element-wise to what it has computed so far.

您可以在原始问题中使用我的代码来生成玩具数据框以对其进行测试.希望对某人有帮助.

You can use my code in the original question to generate a toy dataframe to test this on. Hope that's helpful to someone.

这篇关于pyspark-按元素聚合(和)向量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆