如何使用Spark SQL在group by之后添加稀疏向量? [英] How to add sparse vectors after group by, using Spark SQL?

查看:89
本文介绍了如何使用Spark SQL在group by之后添加稀疏向量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在做一个新闻推荐系统,我需要为用户和他们阅读的新闻建立一个表格.我的原始数据就是这样:

001436800277225 ["9161492","9161787","9378531"]
009092130698762 ["9394697"]
010003000431538 ["9394697","9426473","9428530"]
010156461231357 ["9350394","9414181"]
010216216021063 ["9173862","9247870"]
010720006581483 ["9018786"]
011199797794333 ["9017977","9091134","9142852","9325464","9331913"]
011337201765123 ["9161294","9198693"]
011414545455156 ["9168185","9178348","9182782","9359776"]
011425002581540 ["9083446","9161294","9309432"]

并且我使用spark-SQL进行爆炸并进行一种热编码,

df = getdf()
df1 = df.select('uuid',explode('news').alias('news'))
stringIndexer = StringIndexer(inputCol="news", outputCol="newsIndex")
model = stringIndexer.fit(df1)
indexed = model.transform(df1)
encoder = OneHotEncoder(inputCol="newsIndex", outputCol="newsVec")
encoded = encoder.transform(indexed)
encoded.show(20,False)

此后,我的数据变为:

+---------------+-------+---------+----------------------+
|uuid           |news   |newsIndex|newsVec               |
+---------------+-------+---------+----------------------+
|014324000386050|9398253|10415.0  |(105721,[10415],[1.0])|
|014324000386050|9428530|70.0     |(105721,[70],[1.0])   |
|014324000631752|654112 |1717.0   |(105721,[1717],[1.0]) |
|014324000674240|730531 |2282.0   |(105721,[2282],[1.0]) |
|014324000674240|694306 |1268.0   |(105721,[1268],[1.0]) |
|014324000674240|712016 |4766.0   |(105721,[4766],[1.0]) |
|014324000674240|672307 |7318.0   |(105721,[7318],[1.0]) |
|014324000674240|698073 |1241.0   |(105721,[1241],[1.0]) |
|014324000674240|728044 |5302.0   |(105721,[5302],[1.0]) |
|014324000674240|672256 |1619.0   |(105721,[1619],[1.0]) |
|014324000674240|730236 |2376.0   |(105721,[2376],[1.0]) |
|014324000674240|730235 |14274.0  |(105721,[14274],[1.0])|
|014324000674240|728509 |1743.0   |(105721,[1743],[1.0]) |
|014324000674240|704528 |10310.0  |(105721,[10310],[1.0])|
|014324000715399|774134 |8876.0   |(105721,[8876],[1.0]) |
|014324000725836|9357431|3479.0   |(105721,[3479],[1.0]) |
|014324000725836|9358028|15621.0  |(105721,[15621],[1.0])|
|014324000730349|812106 |4599.0   |(105721,[4599],[1.0]) |
|014324000730349|699237 |754.0    |(105721,[754],[1.0])  |
|014324000730349|748109 |4854.0   |(105721,[4854],[1.0]) |
+---------------+-------+---------+----------------------+

但是一个id有多行,所以我想先groupBy('uuid')然后再add这些向量.但是只使用groupBy然后添加将有错误.我该怎么办?

解决方案

indexed开始,我们可以将列newsIndex作为列表收集,并使用udf将其转换为SparseVector. >

要声明稀疏向量,我们需要特征数量和包含位置和值的元组列表.因为我们正在处理分类变量,所以对于值,我们将使用1.0.索引将是列newsIndex:

from pyspark.sql.functions import collect_list, max, lit
from pyspark.ml.linalg import Vectors, VectorUDT

def encode(arr, length):

  vec_args =  length, [(x,1.0) for x in arr]

  return Vectors.sparse(*vec_args)   

encode_udf = udf(encode, VectorUDT())

功能部件的数量为max(newsIndex) + 1(因为StrinIndexer开始于0.0):

feats = indexed.agg(max(indexed["newsIndex"])).take(1)[0][0] + 1

将它们组合在一起:

indexed.groupBy("uuid") \
       .agg(collect_list("newsIndex")
       .alias("newsArr")) \
       .select("uuid", 
               encode_udf("newsArr", lit(feats))
               .alias("OHE")) \
       .show(truncate = False)
+---------------+-----------------------------------------+
|uuid           |OHE                                      |
+---------------+-----------------------------------------+
|009092130698762|(24,[0],[1.0])                           |
|010003000431538|(24,[0,3,15],[1.0,1.0,1.0])              |
|010720006581483|(24,[11],[1.0])                          |
|010216216021063|(24,[10,22],[1.0,1.0])                   |
|001436800277225|(24,[2,12,23],[1.0,1.0,1.0])             |
|011425002581540|(24,[1,5,9],[1.0,1.0,1.0])               |
|010156461231357|(24,[13,18],[1.0,1.0])                   |
|011199797794333|(24,[7,8,17,19,20],[1.0,1.0,1.0,1.0,1.0])|
|011414545455156|(24,[4,6,14,21],[1.0,1.0,1.0,1.0])       |
|011337201765123|(24,[1,16],[1.0,1.0])                    |
+---------------+-----------------------------------------+

I am doing a News recommendation system and I need to build a table for users and news they read. my raw data just like this :

001436800277225 ["9161492","9161787","9378531"]
009092130698762 ["9394697"]
010003000431538 ["9394697","9426473","9428530"]
010156461231357 ["9350394","9414181"]
010216216021063 ["9173862","9247870"]
010720006581483 ["9018786"]
011199797794333 ["9017977","9091134","9142852","9325464","9331913"]
011337201765123 ["9161294","9198693"]
011414545455156 ["9168185","9178348","9182782","9359776"]
011425002581540 ["9083446","9161294","9309432"]

and I use spark-SQL do explode and one hot encoding,

df = getdf()
df1 = df.select('uuid',explode('news').alias('news'))
stringIndexer = StringIndexer(inputCol="news", outputCol="newsIndex")
model = stringIndexer.fit(df1)
indexed = model.transform(df1)
encoder = OneHotEncoder(inputCol="newsIndex", outputCol="newsVec")
encoded = encoder.transform(indexed)
encoded.show(20,False)

After that, my data become:

+---------------+-------+---------+----------------------+
|uuid           |news   |newsIndex|newsVec               |
+---------------+-------+---------+----------------------+
|014324000386050|9398253|10415.0  |(105721,[10415],[1.0])|
|014324000386050|9428530|70.0     |(105721,[70],[1.0])   |
|014324000631752|654112 |1717.0   |(105721,[1717],[1.0]) |
|014324000674240|730531 |2282.0   |(105721,[2282],[1.0]) |
|014324000674240|694306 |1268.0   |(105721,[1268],[1.0]) |
|014324000674240|712016 |4766.0   |(105721,[4766],[1.0]) |
|014324000674240|672307 |7318.0   |(105721,[7318],[1.0]) |
|014324000674240|698073 |1241.0   |(105721,[1241],[1.0]) |
|014324000674240|728044 |5302.0   |(105721,[5302],[1.0]) |
|014324000674240|672256 |1619.0   |(105721,[1619],[1.0]) |
|014324000674240|730236 |2376.0   |(105721,[2376],[1.0]) |
|014324000674240|730235 |14274.0  |(105721,[14274],[1.0])|
|014324000674240|728509 |1743.0   |(105721,[1743],[1.0]) |
|014324000674240|704528 |10310.0  |(105721,[10310],[1.0])|
|014324000715399|774134 |8876.0   |(105721,[8876],[1.0]) |
|014324000725836|9357431|3479.0   |(105721,[3479],[1.0]) |
|014324000725836|9358028|15621.0  |(105721,[15621],[1.0])|
|014324000730349|812106 |4599.0   |(105721,[4599],[1.0]) |
|014324000730349|699237 |754.0    |(105721,[754],[1.0])  |
|014324000730349|748109 |4854.0   |(105721,[4854],[1.0]) |
+---------------+-------+---------+----------------------+

But one id have multiple rows, so I want to groupBy('uuid') and then add these vectors. But just use groupBy and then add will have error. How could I do that?

解决方案

Starting from indexed, we can collect the column newsIndex as a list and transform it into a SparseVector using an udf.

To declare a sparse vector, we need the number of features and a list of tuples containing the position and the value. Because we are dealing with a categorical variable, for value we will use is 1.0. And the index will be the column newsIndex:

from pyspark.sql.functions import collect_list, max, lit
from pyspark.ml.linalg import Vectors, VectorUDT

def encode(arr, length):

  vec_args =  length, [(x,1.0) for x in arr]

  return Vectors.sparse(*vec_args)   

encode_udf = udf(encode, VectorUDT())

The number of features is max(newsIndex) + 1 (since StrinIndexer begins at 0.0):

feats = indexed.agg(max(indexed["newsIndex"])).take(1)[0][0] + 1

Bringing it all together:

indexed.groupBy("uuid") \
       .agg(collect_list("newsIndex")
       .alias("newsArr")) \
       .select("uuid", 
               encode_udf("newsArr", lit(feats))
               .alias("OHE")) \
       .show(truncate = False)
+---------------+-----------------------------------------+
|uuid           |OHE                                      |
+---------------+-----------------------------------------+
|009092130698762|(24,[0],[1.0])                           |
|010003000431538|(24,[0,3,15],[1.0,1.0,1.0])              |
|010720006581483|(24,[11],[1.0])                          |
|010216216021063|(24,[10,22],[1.0,1.0])                   |
|001436800277225|(24,[2,12,23],[1.0,1.0,1.0])             |
|011425002581540|(24,[1,5,9],[1.0,1.0,1.0])               |
|010156461231357|(24,[13,18],[1.0,1.0])                   |
|011199797794333|(24,[7,8,17,19,20],[1.0,1.0,1.0,1.0,1.0])|
|011414545455156|(24,[4,6,14,21],[1.0,1.0,1.0,1.0])       |
|011337201765123|(24,[1,16],[1.0,1.0])                    |
+---------------+-----------------------------------------+

这篇关于如何使用Spark SQL在group by之后添加稀疏向量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆