分组记录后计算单词 [英] Counting words after grouping records
问题描述
注意:虽然提供的答案有效,但在较大的数据集上可能会变慢.看看这个以获得更快的解决方案.
Note: Although the provided answer is working, it can get rather slow on larger data sets. Take a look at this for a faster solution.
<小时>
我有一个由标记文档组成的数据框,例如这个:
I am having a data frame which consists of labelled document such as this one:
df_ = spark.createDataFrame([
('1', 'hello how are are you today'),
('1', 'hello how are you'),
('2', 'hello are you here'),
('2', 'how is it'),
('3', 'hello how are you'),
('3', 'hello how are you'),
('4', 'hello how is it you today')
], schema=['label', 'text'])
我想要的是按 label
对数据框进行分组,并对每组进行简单的字数统计.我的问题是我不确定如何在 PySpark 中做到这一点.在第一步中,我将拆分文本并将文档作为标记列表获取:
What I want is to group the data frame by label
and make a simple word count for each group. My problem is I'm not sure how I can do this in PySpark. In a first step I would split the text and get the document as a list of tokens:
def get_token_counts(text):
if text is None:
return list()
counter = Counter(text.lower().split())
return list(counter.items())
udf_get_token_counts = F.udf(get_token_counts)
df_.select(['label'] + [udf_get_tokens(F.col('text')).alias('text')])\
.show()
给予
+-----+--------------------+
|label| text|
+-----+--------------------+
| 1|[hello, how, are,...|
| 1|[hello, how, are,...|
| 2|[hello, are, you,...|
| 2|[hello, how, is, it]|
| 3|[hello, how, are,...|
| 3|[hello, how, are,...|
| 4|[hello, how, is, ...|
+-----+--------------------+
我知道如何对整个数据框进行字数统计,但我不知道如何继续使用 groupby()
或 reducyByKey()
.
I know how I can make a word count over the entire data frame but I don't know how I have to proceed with groupby()
or reducyByKey()
.
我正在考虑部分计算数据框中的单词:
I was thinking about partially counting the words in the data frame:
df_.select(['label'] + [udf_get_tokens(F.col('text')).alias('text')])\
.rdd.map(lambda x: (x[0], list(Counter(x[1]).items()))) \
.toDF(schema=['label', 'text'])\
.show()
给出:
+-----+--------------------+
|label| text|
+-----+--------------------+
| 1|[[are,2], [hello,...|
| 1|[[are,1], [hello,...|
| 2|[[are,1], [hello,...|
| 2|[[how,1], [it,1],...|
| 3|[[are,1], [hello,...|
| 3|[[are,1], [hello,...|
| 4|[[you,1], [today,...|
+-----+--------------------+
但我如何汇总它?
推荐答案
你应该使用 pyspark.ml.feature.Tokenizer
来分割文本而不是使用 udf
.(此外,根据您在做什么,您可能会发现 StopWordsRemover
很有用.)
You should use pyspark.ml.feature.Tokenizer
to split the text instead of using udf
. (Also, depending on what you are doing, you may find StopWordsRemover
to be useful.)
例如:
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
tokens = tokenizer.transform(df_)
tokens.show(truncate=False)
+-----+---------------------------+----------------------------------+
|label|text |tokens |
+-----+---------------------------+----------------------------------+
|1 |hello how are are you today|[hello, how, are, are, you, today]|
|1 |hello how are you |[hello, how, are, you] |
|2 |hello are you here |[hello, are, you, here] |
|2 |how is it |[how, is, it] |
|3 |hello how are you |[hello, how, are, you] |
|3 |hello how are you |[hello, how, are, you] |
|4 |hello how is it you today |[hello, how, is, it, you, today] |
+-----+---------------------------+----------------------------------+
然后你可以explode()
标记,并执行 groupBy()
以获取每个单词的计数:
Then you can explode()
the tokens, and do a groupBy()
to get the count for each word:
import pyspark.sql.functions as f
token_counts = tokens.select("label", f.explode("tokens").alias("token"))\
.groupBy("label", "token").count()\
.orderBy("label", "token")
token_counts.show(truncate=False, n=10)
+-----+-----+-----+
|label|token|count|
+-----+-----+-----+
|1 |are |3 |
|1 |hello|2 |
|1 |how |2 |
|1 |today|1 |
|1 |you |2 |
|2 |are |1 |
|2 |hello|1 |
|2 |here |1 |
|2 |how |1 |
|2 |is |1 |
+-----+-----+-----+
only showing top 10 rows
如果您想要每个标签的所有标记和计数,只需使用 pyspark.sql.functions.collect_list()
并连接token
和 count
列使用 pyspark.sql.functions.struct()
:
If you want all of the tokens and counts on one row per label, just do another groupBy()
with pyspark.sql.functions.collect_list()
and concatenate the token
and count
columns using pyspark.sql.functions.struct()
:
tokens.select("label", f.explode("tokens").alias("token"))\
.groupBy("label", "token")\
.count()\
.groupBy("label")\
.agg(f.collect_list(f.struct(f.col("token"), f.col("count"))).alias("text"))\
.orderBy("label")\
.show(truncate=False)
+-----+----------------------------------------------------------------+
|label|text |
+-----+----------------------------------------------------------------+
|1 |[[hello,2], [how,2], [are,3], [today,1], [you,2]] |
|2 |[[you,1], [hello,1], [here,1], [are,1], [it,1], [how,1], [is,1]]|
|3 |[[are,2], [you,2], [how,2], [hello,2]] |
|4 |[[today,1], [hello,1], [it,1], [you,1], [how,1], [is,1]] |
+-----+----------------------------------------------------------------+
这篇关于分组记录后计算单词的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!