如何在 Spark ML 中为分类创建正确的数据框 [英] How to create correct data frame for classification in Spark ML

查看:25
本文介绍了如何在 Spark ML 中为分类创建正确的数据框的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Spark ML api 但我在创建正确的数据框输入到管道时遇到问题.

I am trying to run random forest classification by using Spark ML api but I am having issues with creating right data frame input into pipeline.

这是示例数据:

age,hours_per_week,education,sex,salaryRange
38,40,"hs-grad","male","A"
28,40,"bachelors","female","A"
52,45,"hs-grad","male","B"
31,50,"masters","female","B"
42,40,"bachelors","male","B"

agehours_per_week 是整数,而包括标签 salaryRange 在内的其他特征是分类的(字符串)

age and hours_per_week are integers while other features including label salaryRange are categorical (String)

加载这个 csv 文件(我们称之为 sample.csv)可以通过 Spark csv 库 像这样:

Loading this csv file (lets call it sample.csv) can be done by Spark csv library like this:

val data = sqlContext.csvFile("/home/dusan/sample.csv")

默认情况下,所有列都以字符串形式导入,因此我们需要将age"和hours_per_week"更改为 Int:

By default all columns are imported as string so we need to change "age" and "hours_per_week" to Int:

val toInt    = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))

只是为了检查架构现在的样子:

Just to check how schema looks now:

scala> dataFixed.printSchema
root
 |-- age: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- salaryRange: string (nullable = true)

然后让我们设置交叉验证器和管道:

Then lets set the cross validator and pipeline:

val rf = new RandomForestClassifier()
val pipeline = new Pipeline().setStages(Array(rf)) 
val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)

运行此行时出现错误:

val cmModel = cv.fit(dataFixed)

java.lang.IllegalArgumentException:字段功能"不存在.

可以在 RandomForestClassifier 中设置标签列和特征列,但是我有 4 列作为预测变量(特征)而不仅仅是一个.

It is possible to set label column and feature column in RandomForestClassifier ,however I have 4 columns as predictors (features) not only one.

我应该如何组织我的数据框,以便正确组织标签和特征列?

为了您的方便,这里是完整的代码:

For your convenience here is full code :

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.DataFrame

import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.{Vector, Vectors}


object SampleClassification {

  def main(args: Array[String]): Unit = {

    //set spark context
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local");
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._
    import com.databricks.spark.csv._

    //load data by using databricks "Spark CSV Library" 
    val data = sqlContext.csvFile("/home/dusan/sample.csv")

    //by default all columns are imported as string so we need to change "age" and  "hours_per_week" to Int
    val toInt    = udf[Int, String]( _.toInt)
    val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))


    val rf = new RandomForestClassifier()

    val pipeline = new Pipeline().setStages(Array(rf))

    val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)

    // this fails with error
    //java.lang.IllegalArgumentException: Field "features" does not exist.
    val cmModel = cv.fit(dataFixed) 
  }

}

感谢您的帮助!

推荐答案

您只需要确保您的数据框中有一个 "features" 列,该列属于 VectorUDF 如下图所示:

You simply need to make sure that you have a "features" column in your dataframe that is of type VectorUDF as show below:

scala> val df2 = dataFixed.withColumnRenamed("age", "features")
df2: org.apache.spark.sql.DataFrame = [features: int, hours_per_week: int, education: string, sex: string, salaryRange: string]

scala> val cmModel = cv.fit(df2) 
java.lang.IllegalArgumentException: requirement failed: Column features must be of type org.apache.spark.mllib.linalg.VectorUDT@1eef but was actually IntegerType.
    at scala.Predef$.require(Predef.scala:233)
    at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
    at org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:50)
    at org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
    at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:118)
    at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:164)
    at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:164)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
    at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:164)
    at org.apache.spark.ml.tuning.CrossValidator.transformSchema(CrossValidator.scala:142)
    at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
    at org.apache.spark.ml.tuning.CrossValidator.fit(CrossValidator.scala:107)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76)

EDIT1

本质上,您的数据框中需要有两个字段用于特征向量的特征"和用于实例标签的标签".实例必须是 Double 类型.

Essentially there need to be two fields in your data frame "features" for feature vector and "label" for instance labels. Instance must be of type Double.

要使用 Vector 类型创建功能"字段,首先创建一个 udf 如下所示:

To create a "features" fields with Vector type first create a udf as show below:

val toVec4    = udf[Vector, Int, Int, String, String] { (a,b,c,d) => 
  val e3 = c match {
    case "hs-grad" => 0
    case "bachelors" => 1
    case "masters" => 2
  }
  val e4 = d match {case "male" => 0 case "female" => 1}
  Vectors.dense(a, b, e3, e4) 
}

现在还要对label"字段进行编码,创建另一个udf,如下所示:

Now to also encode the "label" field, create another udf as shown below:

val encodeLabel    = udf[Double, String]( _ match { case "A" => 0.0 case "B" => 1.0} )

现在我们使用这两个udf来转换原始数据帧:

Now we transform original dataframe using these two udf:

val df = dataFixed.withColumn(
  "features",
  toVec4(
    dataFixed("age"),
    dataFixed("hours_per_week"),
    dataFixed("education"),
    dataFixed("sex")
  )
).withColumn("label", encodeLabel(dataFixed("salaryRange"))).select("features", "label")

请注意,数据框中可能存在额外的列/字段,但在这种情况下,我仅选择了 featureslabel:

Note that there can be extra columns / fields present in the dataframe, but in this case I have selected only features and label:

scala> df.show()
+-------------------+-----+
|           features|label|
+-------------------+-----+
|[38.0,40.0,0.0,0.0]|  0.0|
|[28.0,40.0,1.0,1.0]|  0.0|
|[52.0,45.0,0.0,0.0]|  1.0|
|[31.0,50.0,2.0,1.0]|  1.0|
|[42.0,40.0,1.0,0.0]|  1.0|
+-------------------+-----+

现在由您来为您的学习算法设置正确的参数以使其工作.

Now its upto you to set correct parameters for your learning algorithm to make it work.

这篇关于如何在 Spark ML 中为分类创建正确的数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆