在Apache Spark中为具有大量列的数据集创建ml管道的最佳方法 [英] Optimal way to create a ml pipeline in Apache Spark for dataset with high number of columns

查看:73
本文介绍了在Apache Spark中为具有大量列的数据集创建ml管道的最佳方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spark 2.1.1处理具有约2000个功能的数据集,并尝试创建一个基本的ML管道,该管道由一些变压器和一个分类器组成.

I am working with Spark 2.1.1 on a dataset with ~2000 features and trying to create a basic ML Pipeline, consisting of some Transformers and a Classifier.

为简单起见,我们假设正在使用的管道由VectorAssembler,StringIndexer和Classifier组成,这是一个相当常见的用例.

Let's assume for the sake of simplicity that the Pipeline I am working with consists of a VectorAssembler, StringIndexer and a Classifier, which would be a fairly common usecase.

// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
  .setInputCols(featureColumns)
  .setOutputCol("featuresRaw")

val labelIndexer: StringIndexer = new StringIndexer()
  .setInputCol("TARGET")
  .setOutputCol("indexedLabel")

// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("featuresRaw")
  .setMaxBins(30)

// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
  .addGrid(rf.numTrees, Array(5))
  .addGrid(rf.maxDepth, Array(5))
  .build()

// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setLabelCol("indexedLabel")

如果将流水线步骤分为转换器流水线(VectorAssembler + StringIndexer)和第二个分类器流水线,并且如果在两个流水线之间都删除了不必要的列,则训练成功. 这意味着要重用这些模型,必须在训练后保存两个PipelineModels,并且必须引入一个中间预处理步骤.

If the pipeline steps are separated into a transformer pipeline (VectorAssembler + StringIndexer) and a second classifier pipeline, and if the unnecessary columns are dropped in between both pipelines, training succeeds. This means for reusing the models, two PipelineModels have to be saved after training and an intermediary preprocessing step has to be introduced.

// Split indexers and forest in two Pipelines.
val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain)
// Transform data and drop all columns, except those needed for training 
val dfTrainT = prePipeline.transform(dfTrain)
val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col))
val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)

val mainPipeline = new Pipeline().setStages(Array(rf))

val cv = new CrossValidator()
  .setEstimator(mainPipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]

(恕我直言)更清洁的解决方案是将所有管道阶段合并为一个管道.

The (imho) much cleaner solution would be to merge all pipeline stages into one pipeline.

val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, assmbleFeatures, rf))

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

// This will fail! 
val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]

但是,将所有PipelineStages放入一个Pipeline会导致以下异常,这可能是由于问题 PR最终将解决:

However, putting all PipelineStages into one Pipeline leads to the following exception, probably due to the issue this PR will eventually solve:

错误CodeGenerator:编译失败:org.codehaus.janino.JaninoRuntimeException:类org.apache.spark.sql.catalyst.expressions.GeneratedClass $ SpecificUnsafeProjection的常量池已超过JVM限制0xFFFF

ERROR CodeGenerator: failed to compile: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection has grown past JVM limit of 0xFFFF

这样做的原因是VectorAssembler实际上使DataFrame中的数据量加倍(在此示例中),因为没有转换器可以丢弃不必要的列. (请参见 spark管道矢量汇编器会删除其他列)

The reason for this is that the VectorAssembler effectively doubles (in this example) the amount of data in the DataFrame, as there is no transformer that could drop the unnecessary columns. (See spark pipeline vector assembler drop other columns)

该示例适用于 golub数据集和以下预处理步骤是必需的:

To the example works on the golub dataset and the following preprocessing steps are necessary:

import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)

// Those steps are necessary, otherwise training would fail either way
val colsToDrop = df.columns.take(5000)
val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)

// Split df in train and test sets
val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))

// Feature columns are columns except "TARGET"
val featureColumns = dfTrain.columns.filter(col => col != "TARGET")

由于我是Spark的新手,所以我不确定什么是解决此问题的最佳方法.你会建议...

As I am new to Spark, I am not sure what would be the best way to solve this issue. Would you suggest...

  1. 来创建一个新的转换器,该转换器可以删除列并且可以将其合并到管道中?
  2. 拆分两条管道并介绍中介步骤
  3. 还有什么? :)

还是我错过了可以解决此问题的任何重要内容(管道步骤,PR等)?

Or am I missing anything important (pipeline steps, PR, etc.) that would solve this issue?

我实现了一个新的Transformer DroppingVectorAssembler,它删除了不必要的列,但是,抛出了相同的异常.

I implemented a new Transformer DroppingVectorAssembler, which drops unnecessary columns, however, the same exception is thrown.

此外,将spark.sql.codegen.wholeStage设置为false不能解决问题.

Besides that, setting spark.sql.codegen.wholeStage to false does not solve the issue.

推荐答案

janino错误是由于在优化程序过程中创建的常量变量的数量所致. JVM中允许的常量变量的最大限制为((2 ^ 16)-1).如果超过此限制,则您将获得Constant pool for class ... has grown past JVM limit of 0xFFFF

The janino error is due the number of constant variables created during the optimizer process. The maximum limit of constant variables allowed in the JVM is ((2^16) -1). If this limit is exceeded, then you get the Constant pool for class ... has grown past JVM limit of 0xFFFF

将解决此问题的JIRA是 SPARK-18016 ,但是目前仍在进行中.

The JIRA that will fix this issue is SPARK-18016, but it's still in progress at this time.

您的代码很可能在VectorAssembler阶段失败,这是因为它必须在单个优化任务中针对成千上万的列执行操作.

Your code is most likely failing during the VectorAssembler stage, when it has to perform against thousands of columns during a single optimization task.

我针对此问题开发的解决方法是通过对列的子集进行处理来创建向量向量",然后在最后将结果放在一起以创建奇异特征向量.这样可以防止任何单个优化任务超过JVM常量限制.它不是很优雅,但是我已经在达到10k列范围的数据集上使用了它.

The workaround that I developed for this problem is to create a "vector of vectors" by working against subsets of the columns and then bringing the results together at the end to create a singular feature vector. This prevents any single optimization task from exceeding the JVM constant limit. It's not elegant, but I've used it on datasets reaching into the 10k columns range.

此方法还允许您仍然保留单个管道,尽管它需要一些额外的步骤才能使其正常工作(创建子向量).从子向量创建特征向量后,可以根据需要删除原始源列.

This method also allows you to still keep a single pipeline, though it requires some additional steps to make it work (creating the sub-vectors). After you've created the feature vector from the sub-vectors, you can drop the original source columns if desired.

示例代码:

// IMPORT DEPENDENCIES
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SQLContext, Row, DataFrame, Column}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}

// Create first example dataframe
val exampleDF = spark.createDataFrame(Seq(
  (1, 1, 2, 3, 8, 4, 5, 1, 3, 2, 0, 4, 2, 8, 1, 1, 2, 3, 8, 4, 5),
  (2, 4, 3, 8, 7, 9, 8, 2, 3, 3, 2, 6, 5, 4, 2, 4, 3, 8, 7, 9, 8),
  (3, 6, 1, 9, 2, 3, 6, 3, 8, 5, 1, 2, 3, 5, 3, 6, 1, 9, 2, 3, 6),
  (4, 7, 8, 6, 9, 4, 5, 4, 9, 8, 2, 4, 9, 2, 4, 7, 8, 6, 9, 4, 5),
  (5, 9, 2, 7, 8, 7, 3, 5, 3, 4, 8, 0, 6, 2, 5, 9, 2, 7, 8, 7, 3),
  (6, 1, 1, 4, 2, 8, 4, 6, 3, 9, 8, 8, 9, 3, 6, 1, 1, 4, 2, 8, 4)
)).toDF("uid", "col1", "col2", "col3", "col4", "col5", 
        "col6", "col7", "col8", "col9", "colA", "colB", 
        "colC", "colD", "colE", "colF", "colG", "colH", 
        "colI", "colJ", "colK")

// Create multiple column lists using the sliding method
val Array(colList1, colList2, colList3, colList4) = exampleDF.columns.filter(_ != "uid").sliding(5,5).toArray

// Create a vector assembler for each column list
val colList1_assembler = new VectorAssembler().setInputCols(colList1).setOutputCol("colList1_vec")
val colList2_assembler = new VectorAssembler().setInputCols(colList2).setOutputCol("colList2_vec")
val colList3_assembler = new VectorAssembler().setInputCols(colList3).setOutputCol("colList3_vec")
val colList4_assembler = new VectorAssembler().setInputCols(colList4).setOutputCol("colList4_vec")

// Create a vector assembler using column list vectors as input
val features_assembler = new VectorAssembler().setInputCols(Array("colList1_vec","colList2_vec","colList3_vec","colList4_vec")).setOutputCol("features")

// Create the pipeline with column list vector assemblers first, then the final vector of vectors assembler last
val pipeline = new Pipeline().setStages(Array(colList1_assembler,colList2_assembler,colList3_assembler,colList4_assembler,features_assembler))

// Fit and transform the data
val featuresDF = pipeline.fit(exampleDF).transform(exampleDF)

// Get the number of features in "features" vector
val featureLength = (featuresDF.schema(featuresDF.schema.fieldIndex("features")).metadata.getMetadata("ml_attr").getLong("num_attrs"))

// Print number of features in "features vector"
print(featureLength)

(注意:创建列列表的方法实际上应该以编程方式完成,但是为了理解这一概念,我将这个示例简化了.)

(Note: The method of creating the column lists should really be done programatically, but I've kept this example simple for the sake of understanding the concept.)

这篇关于在Apache Spark中为具有大量列的数据集创建ml管道的最佳方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆