Pyspark:在数据框的不同组上应用kmeans [英] Pyspark: applying kmeans on different groups of a dataframe

查看:163
本文介绍了Pyspark:在数据框的不同组上应用kmeans的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用Pyspark我想将kmeans分别应用于数据框的组,而不是一次应用于整个数据框.目前,我使用for循环在每个组上进行迭代,应用kmeans并将结果附加到另一个表.但是有很多小组会很费时间.任何人都可以帮助我吗?非常感谢!

customer_list中的客户:temp_df = togroup.filter(col("customer_id")== customer)df = assembler.transform(temp_df)k = 1而(k< 5& mtrc<宽度):k + = 1kmeans = KMeans(k = k,种子= 5,maxIter = 20,initSteps = 5)型号= kmeans.fit(df)公制= 1-model.computeCost(df)/ttvar一个= model.transform(df)select(cols)allcustomers = allcustomers .union(a)

解决方案

我想出了第二个解决方案,我认为它比上一个更好.想法是将 groupby() collect_list()一起使用,并编写一个以列表为输入并生成簇的udf.在另一个解决方案中,我们继续使用 df_spark 编写:

  df_flat = df_spark.groupby('cat').agg(F.collect_list('val').alias('val_list')) 

现在我们编写udf函数:

 将numpy导入为np将pyspark.sql.functions导入为F从sklearn.cluster导入KMeans从pyspark.sql.types导入*def skmean(x):kmeans = KMeans(n_clusters = 2,random_state = 0)X = np.array(x).reshape(-1,1)kmeans.fit(X)群集= kmeans.predict(X).tolist()返回(集群)clustering_udf = F.udf(lambda arr:skmean(arr),ArrayType(IntegerType())) 

然后将udf应用于展平的数据框:

  df = df_flat.withColumn('clusters',clustering_udf(F.col('val'))) 

然后,您可以使用 F.explode()将列表转换为列.

Using Pyspark I would like to apply kmeans separately on groups of a dataframe and not to the whole dataframe at once. For the moment I use a for loop which iterates on each group, applies kmeans and appends the result to another table. But having a lot of groups makes it time consuming. Anyone could help me please?? Thanks a lot!

for customer in customer_list:
    temp_df = togroup.filter(col("customer_id")==customer)
    df = assembler.transform(temp_df)
    k = 1
    while (k < 5 & mtrc < width):
        k += 1
        kmeans = KMeans(k=k,seed=5,maxIter=20,initSteps=5)
        model = kmeans.fit(df)
        mtric = 1 - model.computeCost(df)/ttvar
        a = model.transform(df)select(cols)
        allcustomers = allcustomers .union(a)

解决方案

I came up with a second solution which is I think is slightly better than the last one. The idea is to use groupby() together withcollect_list() and write a udf that takes a list as input and generates the clusters. Continuing with df_spark in the other solution we write:

df_flat = df_spark.groupby('cat').agg(F.collect_list('val').alias('val_list'))

Now we write the udf function:

import numpy as np
import pyspark.sql.functions as F
from sklearn.cluster import KMeans
from pyspark.sql.types import *
def skmean(x):
    kmeans = KMeans(n_clusters=2, random_state=0)
    X = np.array(x).reshape(-1,1)  
    kmeans.fit(X)
    clusters = kmeans.predict(X).tolist()
    return(clusters)
clustering_udf = F.udf(lambda arr : skmean(arr), ArrayType(IntegerType()))

Then apply the udf to the flattened dataframe:

df = df_flat.withColumn('clusters', clustering_udf(F.col('val')))

Then you can use F.explode() to convert the list to a column.

这篇关于Pyspark:在数据框的不同组上应用kmeans的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆