PySpark中的KMeans聚类 [英] KMeans clustering in PySpark
问题描述
我有一个包含许多列的spark数据框"mydataframe".我试图仅在两列上运行kmeans:纬度和经度(纬度和经度),将它们用作简单值).我想仅基于这两列提取7个聚类,然后将聚类分配附加到我的原始数据框中.我尝试过:
I have a spark dataframe 'mydataframe' with many columns. I am trying to run kmeans on only two columns: lat and long (latitude & longitude) using them as simple values). I want to extract 7 clusters based on just those 2 columns and then I want to attach the cluster asignment to my original dataframe. I've tried:
from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel
# Prepare a data frame with just 2 columns:
data = mydataframe.select('lat', 'long')
data_rdd = data.rdd # needs to be an RDD
data_rdd.cache()
# Build the model (cluster the data)
clusters = KMeans.train(data_rdd, 7, maxIterations=15, initializationMode="random")
但过一会儿我出现了一个错误:
But I am getting an error after a while:
org.apache.spark.SparkException:由于阶段失败而导致作业中止:阶段5191.0中的任务1失败4次,最近一次失败:阶段5191.0中的任务1.3丢失(TID 260738、10.19.211.69,执行者1):组织.apache.spark.api.python.PythonException:追溯(最近一次通话最后一次)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5191.0 failed 4 times, most recent failure: Lost task 1.3 in stage 5191.0 (TID 260738, 10.19.211.69, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last)
我尝试分离并重新连接群集.结果相同.我在做什么错了?
I've tried to detach and re-attach the cluster. Same result. What am I doing wrong?
非常感谢您!
推荐答案
此后,基于您最近遇到的另一个问题,我想您正处于Spark集群的第一步(您甚至不使用它们就导入sqrt
& array
,可能是因为文档示例),让我在更一般的层面上提供建议,而不是在这里您要问的特定问题中(希望这也可以避免您以后再提出3-4个问题,尝试将群集分配返回到数据框中)...
Since, based on another recent question of yours, I guess you are in your very first steps with Spark clustering (you are even importing sqrt
& array
, without ever using them, probably because it is like that in the docs example), let me offer advice in a more general level rather than in the specific question you are asking here (hopefully also saving you from subsequently opening 3-4 more questions, trying to get your cluster assignments back into your dataframe)...
自
-
您的数据已经在数据框中
you have your data already in a dataframe
您想要将集群成员身份附加到初始成员中 数据框
you want to attach the cluster membership back into your initial dataframe
您没有理由还原为RDD并使用(很快)MLlib软件包;您可以使用(现在推荐)直接与数据帧配合使用的ML包,更加轻松,优雅,高效地完成工作.
you have no reason to revert to an RDD and use the (soon to be deprecated) MLlib package; you will do your job much more easily, elegantly, and efficiently using the (now recommended) ML package, which works directly with dataframes.
第0步-制作一些类似于您的玩具数据:
Step 0 - make some toy data resembling yours:
spark.version
# u'2.2.0'
df = spark.createDataFrame([[0, 33.3, -17.5],
[1, 40.4, -20.5],
[2, 28., -23.9],
[3, 29.5, -19.0],
[4, 32.8, -18.84]
],
["other","lat", "long"])
df.show()
# +-----+----+------+
# |other| lat| long|
# +-----+----+------+
# | 0|33.3| -17.5|
# | 1|40.4| -20.5|
# | 2|28.0| -23.9|
# | 3|29.5| -19.0|
# | 4|32.8|-18.84|
# +-----+----+------+
第1步-组装功能
与现有的大多数ML包相比,Spark ML要求您将输入功能收集在数据框的单列中,通常命名为features
;并且提供了执行此操作的特定方法, VectorAssembler
:
In contrast to most ML packages out there, Spark ML requires your input features to be gathered in a single column of your dataframe, usually named features
; and it provides a specific method for doing this, VectorAssembler
:
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["lat", "long"], outputCol="features")
new_df = vecAssembler.transform(df)
new_df.show()
# +-----+----+------+-------------+
# |other| lat| long| features|
# +-----+----+------+-------------+
# | 0|33.3| -17.5| [33.3,-17.5]|
# | 1|40.4| -20.5| [40.4,-20.5]|
# | 2|28.0| -23.9| [28.0,-23.9]|
# | 3|29.5| -19.0| [29.5,-19.0]|
# | 4|32.8|-18.84|[32.8,-18.84]|
# +-----+----+------+-------------+
也许已经猜到了,参数inputCols
用来告诉VectoeAssembler
我们数据框中的哪些特定列将用作特征.
As perhaps already guessed, the argument inputCols
serves to tell VectoeAssembler
which particular columns in our dataframe are to be used as features.
第2步-适合您的KMeans模型
Step 2 - fit your KMeans model
from pyspark.ml.clustering import KMeans
kmeans = KMeans(k=2, seed=1) # 2 clusters here
model = kmeans.fit(new_df.select('features'))
select('features')
在这里用来告诉算法要用于群集的数据帧的哪一列-请记住,在上述步骤1之后,原始的lat
& long
功能不再直接使用.
select('features')
here serves to tell the algorithm which column of the dataframe to use for clustering - remember that, after Step 1 above, your original lat
& long
features are no more directly used.
第3步-转换初始数据框以包括群集分配
Step 3 - transform your initial dataframe to include cluster assignments
transformed = model.transform(new_df)
transformed.show()
# +-----+----+------+-------------+----------+
# |other| lat| long| features|prediction|
# +-----+----+------+-------------+----------+
# | 0|33.3| -17.5| [33.3,-17.5]| 0|
# | 1|40.4| -20.5| [40.4,-20.5]| 1|
# | 2|28.0| -23.9| [28.0,-23.9]| 0|
# | 3|29.5| -19.0| [29.5,-19.0]| 0|
# | 4|32.8|-18.84|[32.8,-18.84]| 0|
# +-----+----+------+-------------+----------+
transformed
数据帧的最后一列prediction
显示了群集分配-在我的玩具案例中,我最终在群集#0中有4条记录,在群集#1中有1条记录.
The last column of the transformed
dataframe, prediction
, shows the cluster assignment - in my toy case, I have ended up with 4 records in cluster #0 and 1 record in cluster #1.
您可以使用select
语句甚至drop
features
列(现在已完成其功能,可能不再需要)来进一步操作transformed
数据框...
You can further manipulate the transformed
dataframe with select
statements, or even drop
the features
column (which has now fulfilled its function and may be no longer necessary)...
希望您现在与您最初想要实现的目标更加接近.要提取集群统计信息等,请我的另一个近期答案可能会有所帮助...
Hopefully you are much closer now to what you actually wanted to achieve in the first place. For extracting cluster statistics etc., another recent answer of mine might be helpful...
这篇关于PySpark中的KMeans聚类的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!