使用pyspark将功能应用于groupBy数据 [英] Apply a function to groupBy data with pyspark

查看:1216
本文介绍了使用pyspark将功能应用于groupBy数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我在另一列上分组时,我正在尝试从csv中获取字数统计.我的csv有三列:id,message和user_id.我阅读了此内容,然后拆分了消息并存储了字母组合的列表:

I'm trying to get word counts from a csv when grouping on another column. My csv has three columns: id, message and user_id. I read this in and then split the message and store a list of unigrams:

+-----------------+--------------------+--------------------+
|               id|             message|             user_id|
+-----------------+--------------------+--------------------+
|10100720363468236|[i'm, sad, to, mi...|dceafb541a1b8e894...|
|10100718944611636|[what, does, the,...|dceafb541a1b8e894...|
|10100718890699676|[at, the, oecd, w...|dceafb541a1b8e894...|
+-----------------+--------------------+--------------------+

接下来,给定我的数据框df,我想按user_id分组,然后获取每个字母组合的计数.作为一个简单的第一步,我尝试按user_id分组,并获取分组消息字段的长度:

Next, given my dataframe df, I want to group by user_id and then get counts for each of the unigrams. As a simple first pass I tried grouping by user_id and get the length of the grouped message field:

from collections import Counter
from pyspark.sql.types import ArrayType, StringType, IntegerType
from pyspark.sql.functions import udf

df = self.session.read.csv(self.corptable, header=True,
        mode="DROPMALFORMED",)

# split my messages ....
# message is now ArrayType(StringType())

grouped = df.groupBy(df["user_id"])
counter = udf(lambda l: len(l), ArrayType(StringType()))
grouped.agg(counter(df["message"]))
print(grouped.collect())

我收到以下错误:

pyspark.sql.utils.AnalysisException: "expression '`message`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;"

不确定如何解决此错误.通常,在对另一列进行分组时,如何将功能应用于一个列?我是否总是必须创建一个用户定义函数? Spark的新手.

Not sure how to get around this error. In general how does one apply a function to one column when grouping another? Do I always have to create a User Defined Function? Very new to Spark.

编辑:在单独的Python文件中使用令牌生成器的情况下,这是我解决的方法:

Edit: Here is how I solved this, given a tokenizer in a separate Python file:

group_field = "user_id"
message_field = "message"

context = SparkContext()
session = SparkSession\
        .builder\
        .appName("dlastk")\
        .getOrCreate()

# add tokenizer
context.addPyFile(tokenizer_path)
from tokenizer import Tokenizer
tokenizer = Tokenizer()
spark_tokenizer = udf(tokenizer.tokenize, ArrayType(StringType()))

df = session.read.csv("myFile.csv", header=True,)
df = df[group_field, message_field]

# tokenize the message field
df = df.withColumn(message_field, spark_tokenizer(df[message_field]))

# create ngrams from tokenized messages
n = 1
grouped = df.rdd.map(lambda row: (row[0], Counter([" ".join(x) for x in zip(*[row[1][i:] for i in range(n)])]))).reduceByKey(add)

# flatten the rdd so that each row contains (group_id, ngram, count, relative frequency
flat = grouped.flatMap(lambda row: [[row[0], x,y, y/sum(row[1].values())] for x,y in row[1].items()])

# rdd -> DF
flat = flat.toDF()
flat.write.csv("myNewCSV.csv")

数据如下:

# after read
+--------------------+--------------------+
|             user_id|             message|
+--------------------+--------------------+
|00035fb0dcfbeaa8b...|To the douchebag ...|
|00035fb0dcfbeaa8b...|   T minus 1 week...|
|00035fb0dcfbeaa8b...|Last full day of ...|
+--------------------+--------------------+

# after tokenize
+--------------------+--------------------+
|             user_id|             message|
+--------------------+--------------------+
|00035fb0dcfbeaa8b...|[to, the, doucheb...|
|00035fb0dcfbeaa8b...|[t, minus, 1, wee...|
|00035fb0dcfbeaa8b...|[last, full, day,...|
+--------------------+--------------------+

# grouped: after 1grams extracted and Counters added
[('00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', Counter({'!': 545, '.': 373, 'the': 306, '"': 225, ...

# flat: after calculating sum and relative frequency for each 1gram
[['00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', 'face', 3, 0.000320547066994337], ['00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', 'was', 26, 0.002778074580617587] ....

# after flat RDD to DF
+--------------------+---------+---+--------------------+
|                  _1|       _2| _3|                  _4|
+--------------------+---------+---+--------------------+
|00035fb0dcfbeaa8b...|     face|  3| 3.20547066994337E-4|
|00035fb0dcfbeaa8b...|      was| 26|0.002778074580617587|
|00035fb0dcfbeaa8b...|      how| 22|0.002350678491291...|
+--------------------+---------+---+--------------------+

推荐答案

一种自然的方法是将单词分组为一个列表,然后使用python函数Counter()生成单词计数.对于这两个步骤,我们将使用udf.首先,它将使由多个数组的collect_list()生成的嵌套列表变平:

A natural approach could be to group the words into one list, and then use the python function Counter() to generate word counts. For both steps we'll use udf's. First, the one that will flatten the nested list resulting from collect_list() of multiple arrays:

unpack_udf = udf(
    lambda l: [item for sublist in l for item in sublist]
)

第二,生成单词计数元组的单词,在我们的例子中为struct:

Second, one that generates the word count tuples, or in our case struct's:

from pyspark.sql.types import *
from collections import Counter

# We need to specify the schema of the return object
schema_count = ArrayType(StructType([
    StructField("word", StringType(), False),
    StructField("count", IntegerType(), False)
]))

count_udf = udf(
    lambda s: Counter(s).most_common(), 
    schema_count
)

将它们放在一起:

from pyspark.sql.functions import collect_list

(df.groupBy("id")
 .agg(collect_list("message").alias("message"))
 .withColumn("message", unpack_udf("message"))
 .withColumn("message", count_udf("message"))).show(truncate = False)
+-----------------+------------------------------------------------------+
|id               |message                                               |
+-----------------+------------------------------------------------------+
|10100718890699676|[[oecd,1], [the,1], [with,1], [at,1]]                 |
|10100720363468236|[[what,3], [me,1], [sad,1], [to,1], [does,1], [the,1]]|
+-----------------+------------------------------------------------------+

数据:

df = sc.parallelize([(10100720363468236,["what", "sad", "to", "me"]),
                     (10100720363468236,["what", "what", "does", "the"]),
                     (10100718890699676,["at", "the", "oecd", "with"])]).toDF(["id", "message"])

这篇关于使用pyspark将功能应用于groupBy数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆