Pyspark:在数据框的不同组上应用kmeans [英] Pyspark: applying kmeans on different groups of a dataframe
问题描述
使用Pyspark我想将kmeans分别应用于数据框的组,而不是一次应用于整个数据框.目前,我使用for循环在每个组上进行迭代,应用kmeans并将结果附加到另一个表.但是有很多小组会很费时间.任何人都可以帮助我吗?非常感谢!
customer_list中的客户:temp_df = togroup.filter(col("customer_id")== customer)df = assembler.transform(temp_df)k = 1而(k< 5& mtrc<宽度):k + = 1kmeans = KMeans(k = k,种子= 5,maxIter = 20,initSteps = 5)型号= kmeans.fit(df)公制= 1-model.computeCost(df)/ttvar一个= model.transform(df)select(cols)allcustomers = allcustomers .union(a)
我想出了第二个解决方案,我认为它比上一个更好.想法是将 groupby()
与 collect_list()
一起使用,并编写一个以列表为输入并生成簇的udf.在另一个解决方案中,我们继续使用 df_spark
编写:
df_flat = df_spark.groupby('cat').agg(F.collect_list('val').alias('val_list'))
现在我们编写udf函数:
将numpy导入为np将pyspark.sql.functions导入为F从sklearn.cluster导入KMeans从pyspark.sql.types导入*def skmean(x):kmeans = KMeans(n_clusters = 2,random_state = 0)X = np.array(x).reshape(-1,1)kmeans.fit(X)群集= kmeans.predict(X).tolist()返回(集群)clustering_udf = F.udf(lambda arr:skmean(arr),ArrayType(IntegerType()))
然后将udf应用于展平的数据框:
df = df_flat.withColumn('clusters',clustering_udf(F.col('val')))
然后,您可以使用 F.explode()
将列表转换为列.
Using Pyspark I would like to apply kmeans separately on groups of a dataframe and not to the whole dataframe at once. For the moment I use a for loop which iterates on each group, applies kmeans and appends the result to another table. But having a lot of groups makes it time consuming. Anyone could help me please?? Thanks a lot!
for customer in customer_list:
temp_df = togroup.filter(col("customer_id")==customer)
df = assembler.transform(temp_df)
k = 1
while (k < 5 & mtrc < width):
k += 1
kmeans = KMeans(k=k,seed=5,maxIter=20,initSteps=5)
model = kmeans.fit(df)
mtric = 1 - model.computeCost(df)/ttvar
a = model.transform(df)select(cols)
allcustomers = allcustomers .union(a)
I came up with a second solution which is I think is slightly better than the last one. The idea is to use groupby()
together withcollect_list()
and write a udf that takes a list as input and generates the clusters. Continuing with df_spark
in the other solution we write:
df_flat = df_spark.groupby('cat').agg(F.collect_list('val').alias('val_list'))
Now we write the udf function:
import numpy as np
import pyspark.sql.functions as F
from sklearn.cluster import KMeans
from pyspark.sql.types import *
def skmean(x):
kmeans = KMeans(n_clusters=2, random_state=0)
X = np.array(x).reshape(-1,1)
kmeans.fit(X)
clusters = kmeans.predict(X).tolist()
return(clusters)
clustering_udf = F.udf(lambda arr : skmean(arr), ArrayType(IntegerType()))
Then apply the udf to the flattened dataframe:
df = df_flat.withColumn('clusters', clustering_udf(F.col('val')))
Then you can use F.explode()
to convert the list to a column.
这篇关于Pyspark:在数据框的不同组上应用kmeans的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!