如何运行多个k意味着在pyspark中集群并使用groupBy [英] How to run multiple k means clustering and use groupBy in pyspark

查看:77
本文介绍了如何运行多个k意味着在pyspark中集群并使用groupBy的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个像这样的数据集:

I have a dataset like so:

|Seq_key|   |Class_id|  |value|
Seq_key 1   Class_id 1  value 1
Seq_key 1   Class_id 2  value 2
Seq_key 1   Class_id 3  value 3
Seq_key 1   Class_id 4  value 4
Seq_key 1   Class_id 5  value 5
Seq_key 1   Class_id 6  value 6
Seq_key 2   Class_id 1  value 1
Seq_key 2   Class_id 2  value 2
Seq_key 2   Class_id 3  value 3
Seq_key 2   Class_id 4  value 4
Seq_key 2   Class_id 5  value 5
Seq_key 2   Class_id 6  value 6
Seq_key 2   Class_id 7  value 7
Seq_key 3   Class_id 1  value 1
Seq_key 3   Class_id 2  value 2
Seq_key 3   Class_id 3  value 3
Seq_key 3   Class_id 4  value 4
Seq_key 3   Class_id 5  value 5
Seq_key 3   Class_id 6  value 6
Seq_key 3   Class_id 7  value 7
Seq_key 3   Class_id 8  value 8

每个 Seq_key Class_ids values 是互斥的.我对每个 Seq_key 应用k-means聚类,并找到最佳数目的聚类,质心等,以便每个 Seq_key 的输出如下:

The Class_ids and values for each Seq_key are mutually exclusive. I apply k-means clustering for each Seq_key and find the optimal number of clusters, centroids etc such that the outputs for each Seq_key are like so:

|Seq_key|   |Class id|  |Cluster|  |Centroid|
Seq_key 1   Class_id 1     1         128
Seq_key 1   Class_id 2     2         56
Seq_key 1   Class_id 3     3         100
Seq_key 1   Class_id 4     1         128
Seq_key 1   Class_id 5     1         128
Seq_key 1   Class_id 6     4         72
Seq_key 2   Class_id 1     1         5.5
Seq_key 2   Class_id 2     1         5.5
Seq_key 2   Class_id 3     2         3.4
Seq_key 2   Class_id 4     3         1.7
Seq_key 2   Class_id 5     1         5.5
Seq_key 2   Class_id 6     2         3.4
Seq_key 2   Class_id 7     2         3.4
Seq_key 3   Class_id 1     4         500
Seq_key 3   Class_id 2     1         700
Seq_key 3   Class_id 3     3         274
Seq_key 3   Class_id 4     2         189
Seq_key 3   Class_id 5     2         189
Seq_key 3   Class_id 6     4         500
Seq_key 3   Class_id 7     1         700
Seq_key 3   Class_id 8     3         274

当前,我正在手动遍历每个 Seq_key ,并应用 pyspark.ml.clustering 库中的k-means算法.但这显然是无效的,因为 seq_keys 的数量增加到数以万计.另外,我没有正确利用Spark的分布式计算.

Currently, I am looping through each Seq_key manually and applying the k-means algorithm from the pyspark.ml.clustering library. But this is clearly inefficient as the number of seq_keys increases to tens of thousands. Also, I am not properly exploiting the distributed computing of spark.

Seq_key 是互斥的,因此不能与其他 Seq_keys 聚类有没有办法通过 ml 库中的 groupBy 之类的方法来实现我的输出?即使只计算由 Seq_key 分组的质心也足够了.这可能吗?

The Seq_key are mutually exclusive, so they cannot be clustered with other Seq_keys Is there a way to achieve my output via a groupBy like method in the ml libraries? Even just calculating the centroids grouped by the Seq_key would suffice. Is this possible?

推荐答案

您也许可以通过水平并行性(即并行运行多个Spark作业)来改善运行时,就像这样:

You might be able to improve runtime by horizontal parallelism i.e. running multiple Spark jobs in parallel, like so:

from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count

def run_kmeans(seqid, data=sens):

    df_tmp=data.filter(col('SEQ_ID')==seqid)\
        .select('SEQ_KEY','CLASS_ID','value')
    for c in df_tmp.columns:
        if c in FEATURE_COLS:
            df_tmp=df_tmp.withColumn(c, df_tmp[c].cast("float"))
    df_tmp=df_tmp.na.drop()
    vecAssembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
    vector_df = vecAssembler.transform(df_tmp)
    bkm = BisectingKMeans().setK(4).setSeed(1).setFeaturesCol("features")
    model = bkm.fit(vector_df)
    cluster=model.transform(vector_df).drop('features')

    return cluster

pool = ThreadPool(cpu_count())
fleet_clusters = pool.map(run_kmeans, fleets)

这篇关于如何运行多个k意味着在pyspark中集群并使用groupBy的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆