对记录分组后的单词计数(第2部分) [英] Counting words after grouping records (Part 2)
问题描述
尽管我对于我想要实现的目标有一个答案,但问题在于它的速度很慢.数据集不是很大.它总共为50GB,但是受影响的部分可能只是5到10GB的数据.但是,以下是我所需要的,但是它是减慢速度的方法.减慢速度是指它运行了一个小时并且没有终止.
Although I am having an answer for what I want to achieve, the problem is that it's way to slow. The data set is not very large. It's 50GB in total but the affected part is probably just between 5 to 10GB of data. However, the following is what I require, but it's way to slow And by slow I mean it was running for an hour and it didn't terminate.
df_ = spark.createDataFrame([
('1', 'hello how are are you today'),
('1', 'hello how are you'),
('2', 'hello are you here'),
('2', 'how is it'),
('3', 'hello how are you'),
('3', 'hello how are you'),
('4', 'hello how is it you today')
], schema=['label', 'text'])
tokenizer = Tokenizer(inputCol='text', outputCol='tokens')
tokens = tokenizer.transform(df_)
token_counts.groupby('label')\
.agg(F.collect_list(F.struct(F.col('token'), F.col('count'))).alias('text'))\
.show(truncate=False)
哪个给我每个标签的令牌计数:
Which gives me the token count for each label:
+-----+----------------------------------------------------------------+
|label|text |
+-----+----------------------------------------------------------------+
|3 |[[are,2], [how,2], [hello,2], [you,2]] |
|1 |[[today,1], [how,2], [are,3], [you,2], [hello,2]] |
|4 |[[hello,1], [how,1], [is,1], [today,1], [you,1], [it,1]] |
|2 |[[hello,1], [are,1], [you,1], [here,1], [is,1], [how,1], [it,1]]|
+-----+----------------------------------------------------------------+
但是,我认为对 explode()
的调用对此太昂贵了.
However, I think the call to explode()
is way too expensive for this.
我不知道,但是在每个"dokument"中计算令牌并随后将其合并到 groupBy()
中可能更快:
I don't know but it might be faster to count the tokens in each "dokument" and later merge it in a groupBy()
:
df_.select(['label'] + [udf_get_tokens(F.col('text')).alias('text')])\
.rdd.map(lambda x: (x[0], list(Counter(x[1]).items()))) \
.toDF(schema=['label', 'text'])\
.show()
给出计数:
+-----+--------------------+
|label| text|
+-----+--------------------+
| 1|[[are,2], [hello,...|
| 1|[[are,1], [hello,...|
| 2|[[are,1], [hello,...|
| 2|[[how,1], [it,1],...|
| 3|[[are,1], [hello,...|
| 3|[[are,1], [hello,...|
| 4|[[you,1], [today,...|
+-----+--------------------+
是否有一种更有效的方式合并这些令牌计数的方法?
Is there a way to merge those token counts in a more efficient way?
推荐答案
如果由 id
定义的组较大,则显而易见的改进目标是改组大小.改组标签,而不是改组文本.首先矢量化输入
If groups defined by id
are largish the obvious target for improvement is shuffle size. Instead of shuffling text, shuffle labels. First vectorize input
from pyspark.ml.feature import CountVectorizer
from pyspark.ml import Pipeline
pipeline_model = Pipeline(stages=[
Tokenizer(inputCol='text', outputCol='tokens'),
CountVectorizer(inputCol='tokens', outputCol='vectors')
]).fit(df_)
df_vec = pipeline_model.transform(df_).select("label", "vectors")
然后合计:
from pyspark.ml.linalg import SparseVector, DenseVector
from collections import defaultdict
def seq_func(acc, v):
if isinstance(v, SparseVector):
for i in v.indices:
acc[int(i)] += v[int(i)]
if isinstance(v, DenseVector):
for i in len(v):
acc[int(i)] += v[int(i)]
return acc
def comb_func(acc1, acc2):
for k, v in acc2.items():
acc1[k] += v
return acc1
aggregated = rdd.aggregateByKey(defaultdict(int), seq_func, comb_func)
并映射回所需的输出:
vocabulary = pipeline_model.stages[-1].vocabulary
def f(x, vocabulary=vocabulary):
# For list of tuples use [(vocabulary[i], float(v)) for i, v in x.items()]
return {vocabulary[i]: float(v) for i, v in x.items()}
aggregated.mapValues(f).toDF(["id", "text"]).show(truncate=False)
# +---+-------------------------------------------------------------------------------------+
# |id |text |
# +---+-------------------------------------------------------------------------------------+
# |4 |[how -> 1.0, today -> 1.0, is -> 1.0, it -> 1.0, hello -> 1.0, you -> 1.0] |
# |3 |[how -> 2.0, hello -> 2.0, are -> 2.0, you -> 2.0] |
# |1 |[how -> 2.0, hello -> 2.0, are -> 3.0, you -> 2.0, today -> 1.0] |
# |2 |[here -> 1.0, how -> 1.0, are -> 1.0, is -> 1.0, it -> 1.0, hello -> 1.0, you -> 1.0]|
# +---+-------------------------------------------------------------------------------------+
仅当文本部分很大时才值得尝试-否则,在 DataFrame
和Python对象之间进行的所有必需转换可能比 collecting_list
昂贵.
This worth trying only if text part is considerably large - otherwise all required transformations between DataFrame
and Python objects might be more expensive than collecting_list
.
这篇关于对记录分组后的单词计数(第2部分)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!