PySpark 中的 KMeans 聚类 [英] KMeans clustering in PySpark

查看:61
本文介绍了PySpark 中的 KMeans 聚类的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含许多列的 spark 数据框mydataframe".我试图仅在两列上运行 kmeans:纬度和经度(纬度和经度),将它们用作简单值).我想仅基于那 2 列提取 7 个集群,然后我想将集群分配附加到我的原始数据帧.我试过了:

I have a spark dataframe 'mydataframe' with many columns. I am trying to run kmeans on only two columns: lat and long (latitude & longitude) using them as simple values). I want to extract 7 clusters based on just those 2 columns and then I want to attach the cluster asignment to my original dataframe. I've tried:

from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel

# Prepare a data frame with just 2 columns:
data = mydataframe.select('lat', 'long')
data_rdd = data.rdd  # needs to be an RDD
data_rdd.cache()

# Build the model (cluster the data)
clusters = KMeans.train(data_rdd, 7, maxIterations=15, initializationMode="random")

但我在一段时间后收到一个错误:

But I am getting an error after a while:

org.apache.spark.SparkException:作业因阶段失败而中止:阶段 5191.0 中的任务 1 失败 4 次,最近失败:阶段 5191.0 中丢失任务 1.3(TID 260738、10.19.211.69、执行程序 1):org.apache.spark.api.python.PythonException:回溯(最近一次调用)

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5191.0 failed 4 times, most recent failure: Lost task 1.3 in stage 5191.0 (TID 260738, 10.19.211.69, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last)

我尝试分离并重新连接集群.结果一样.我做错了什么?

I've tried to detach and re-attach the cluster. Same result. What am I doing wrong?

推荐答案

由于,基于 你最近的另一个问题,我猜你是使用 Spark 集群的第一步(你甚至导入了 sqrt & array,但从未使用过他们,可能是因为它就像在 docs 示例中那样,让我提供了更一般的建议,而不是您在这里提出的具体问题(希望也可以避免您随后再打开 3-4 个问题,试图将您的集群分配重新放入您的数据框中)...

Since, based on another recent question of yours, I guess you are in your very first steps with Spark clustering (you are even importing sqrt & array, without ever using them, probably because it is like that in the docs example), let me offer advice in a more general level rather than in the specific question you are asking here (hopefully also saving you from subsequently opening 3-4 more questions, trying to get your cluster assignments back into your dataframe)...

自从

  1. 您的数据已经存在于数据框中

  1. you have your data already in a dataframe

您想将集群成员资格附加回您的初始数据框

you want to attach the cluster membership back into your initial dataframe

您没有理由恢复到 RDD 并使用 (即将成为不推荐使用) MLlib 包;使用(现在推荐的)ML 包,您可以更轻松、优雅和高效地完成工作,该包直接与数据帧配合使用.

you have no reason to revert to an RDD and use the (soon to be deprecated) MLlib package; you will do your job much more easily, elegantly, and efficiently using the (now recommended) ML package, which works directly with dataframes.

第 0 步 - 制作一些类似于您的玩具数据:

Step 0 - make some toy data resembling yours:

spark.version
# u'2.2.0'

df = spark.createDataFrame([[0, 33.3, -17.5],
                              [1, 40.4, -20.5],
                              [2, 28., -23.9],
                              [3, 29.5, -19.0],
                              [4, 32.8, -18.84]
                             ],
                              ["other","lat", "long"])

df.show()
# +-----+----+------+
# |other| lat|  long|
# +-----+----+------+
# |    0|33.3| -17.5|
# |    1|40.4| -20.5| 
# |    2|28.0| -23.9|
# |    3|29.5| -19.0|
# |    4|32.8|-18.84|
# +-----+----+------+

第 1 步 - 组合您的功能

与大多数 ML 包相比,Spark ML 要求将您的输入特征收集在数据帧的单列中,通常命名为 features;它提供了一种特定的方法来执行此操作,VectorAssembler:

In contrast to most ML packages out there, Spark ML requires your input features to be gathered in a single column of your dataframe, usually named features; and it provides a specific method for doing this, VectorAssembler:

from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=["lat", "long"], outputCol="features")
new_df = vecAssembler.transform(df)
new_df.show()
# +-----+----+------+-------------+ 
# |other| lat|  long|     features|
# +-----+----+------+-------------+
# |    0|33.3| -17.5| [33.3,-17.5]|
# |    1|40.4| -20.5| [40.4,-20.5]|
# |    2|28.0| -23.9| [28.0,-23.9]| 
# |    3|29.5| -19.0| [29.5,-19.0]|
# |    4|32.8|-18.84|[32.8,-18.84]|
# +-----+----+------+-------------+ 

也许已经猜到了,参数 inputCols 用于告诉 VectoeAssembler 我们数据框中的哪些特定列将用作特征.

As perhaps already guessed, the argument inputCols serves to tell VectoeAssembler which particular columns in our dataframe are to be used as features.

第 2 步 - 拟合您的 KMeans 模型

Step 2 - fit your KMeans model

from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=2, seed=1)  # 2 clusters here
model = kmeans.fit(new_df.select('features'))

select('features') 这里用来告诉算法数据帧的哪一列用于聚类 - 请记住,在上面的第 1 步之后,您的原始 lat&long 特性不再直接使用.

select('features') here serves to tell the algorithm which column of the dataframe to use for clustering - remember that, after Step 1 above, your original lat & long features are no more directly used.

第 3 步 - 转换您的初始数据框以包含集群分配

Step 3 - transform your initial dataframe to include cluster assignments

transformed = model.transform(new_df)
transformed.show()    
# +-----+----+------+-------------+----------+ 
# |other| lat|  long|     features|prediction|
# +-----+----+------+-------------+----------+
# |    0|33.3| -17.5| [33.3,-17.5]|         0| 
# |    1|40.4| -20.5| [40.4,-20.5]|         1|
# |    2|28.0| -23.9| [28.0,-23.9]|         0|
# |    3|29.5| -19.0| [29.5,-19.0]|         0|
# |    4|32.8|-18.84|[32.8,-18.84]|         0|
# +-----+----+------+-------------+----------+

transformed 数据帧的最后一列,prediction,显示了集群分配 - 在我的玩具案例中,我在集群 #0 和集群 1 中得到了 4 条记录记录在集群 #1 中.

The last column of the transformed dataframe, prediction, shows the cluster assignment - in my toy case, I have ended up with 4 records in cluster #0 and 1 record in cluster #1.

您可以使用 select 语句进一步操作 transformed 数据框,或者甚至 drop features 列(其中现在已经完成了它的功能,可能不再需要)...

You can further manipulate the transformed dataframe with select statements, or even drop the features column (which has now fulfilled its function and may be no longer necessary)...

希望您现在更接近您最初真正想要实现的目标.对于提取集群统计信息等,我最近的另一个答案可能会有所帮助...

Hopefully you are much closer now to what you actually wanted to achieve in the first place. For extracting cluster statistics etc., another recent answer of mine might be helpful...

这篇关于PySpark 中的 KMeans 聚类的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆