大量列的性能下降.皮斯帕克 [英] Performance decrease for huge amount of columns. Pyspark

查看:46
本文介绍了大量列的性能下降.皮斯帕克的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在处理Spark宽数据框(约9000列,有时更多)时遇到了问题.
任务:

I met problem with processing of spark wide dataframe (about 9000 columns and sometimes more).
Task:

  1. 通过groupBy并枢轴创建宽DF.
  2. 将列转换为向量,并从pyspark.ml处理为KMeans.

因此,我做了广泛的构架,并尝试使用VectorAssembler创建矢量,对其进行缓存并对其进行了KMeans训练.
在我的PC上,在独立模式下(约500x9000),组装时花了11分钟,而KMeans花了2分钟,完成了7个不同数量的群集.另一方面,在pandas中进行此处理(数据透视df,并迭代7个簇)只需不到一分钟的时间.
很明显我了解独立模式和缓存等方面的开销和性能下降,但这确实让我感到沮丧.
有人可以解释一下如何避免这种开销吗?
人们如何使用宽DF而不是使用vectorassembler并导致性能下降?
更正式的问题(用于sof规则)听起来像-我如何加快此代码的速度?

So I made extensive frame and try to create vector with VectorAssembler, cached it and trained on it KMeans.
It took about 11 minutes for assembling and 2 minutes for KMeans for 7 different count of clusters on my pc in standalone mode for frame ~500x9000. Another side this processing in pandas (pivot df, and iterate 7 clusters) takes less one minute.
Obviously I understand overhead and performance decreasing for standalone mode and caching and so on but it's really discourages me.
Could somebody explain how I can avoid this overhead?
How peoples work with wide DF instead of using vectorassembler and getting performance decreasing?
More formal question (for sof rules) sound like - How can I speed up this code?

%%time
tmp = (df_states.select('ObjectPath', 'User', 'PropertyFlagValue')
       .groupBy('User')
       .pivot('ObjectPath')
       .agg({'PropertyFlagValue':'max'})
       .fillna(0))
ignore = ['User']
assembler = VectorAssembler(
    inputCols=[x for x in tmp.columns if x not in ignore],
    outputCol='features')
Wall time: 36.7 s

print(tmp.count(), len(tmp.columns))
552, 9378

%%time
transformed = assembler.transform(tmp).select('User', 'features').cache()
Wall time: 10min 45s

%%time
lst_levels = []
for num in range(3, 14):
    kmeans = KMeans(k=num, maxIter=50)
    model = kmeans.fit(transformed)
    lst_levels.append(model.computeCost(transformed))
rs = [i-j for i,j in list(zip(lst_levels, lst_levels[1:]))]
for i, j in zip(rs, rs[1:]):
    if i - j < j:
        print(rs.index(i))
        kmeans = KMeans(k=rs.index(i) + 3, maxIter=50)
        model = kmeans.fit(transformed)
        break
 Wall time: 1min 32s

配置:

.config("spark.sql.pivotMaxValues", "100000") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") \
.config("spark.sql.shuffle.partitions", "4") \
.config("spark.sql.inMemoryColumnarStorage.batchSize", "1000") \

推荐答案

实际上在

  • 首先,我们将创建值映射.
  • 还提取所有不同的名称.
  • 倒数第二步,我们以名称和返回值的形式搜索行映射的每个值;如果未找到,则返回0.
  • 结果的向量汇编器.
  • 优势:

    1. 您不必创建具有很多列数的宽数据框,因此可以避免开销.(速度从11分钟提高到1分钟.)
    2. 您仍然在集群上工作,并在spark模式下执行代码.

    代码示例: scala实现.

    这篇关于大量列的性能下降.皮斯帕克的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆