在 Apache Spark 中为具有大量列的数据集创建 ml 管道的最佳方法 [英] Optimal way to create a ml pipeline in Apache Spark for dataset with high number of columns

查看:27
本文介绍了在 Apache Spark 中为具有大量列的数据集创建 ml 管道的最佳方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Spark 2.1.1 处理具有约 2000 个特征的数据集,并尝试创建一个基本的 ML 管道,其中包含一些 Transformer 和一个分类器.

I am working with Spark 2.1.1 on a dataset with ~2000 features and trying to create a basic ML Pipeline, consisting of some Transformers and a Classifier.

为了简单起见,让我们假设我正在使用的流水线由一个 VectorAssembler、StringIndexer 和一个分类器组成,这将是一个相当常见的用例.

Let's assume for the sake of simplicity that the Pipeline I am working with consists of a VectorAssembler, StringIndexer and a Classifier, which would be a fairly common usecase.

// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
  .setInputCols(featureColumns)
  .setOutputCol("featuresRaw")

val labelIndexer: StringIndexer = new StringIndexer()
  .setInputCol("TARGET")
  .setOutputCol("indexedLabel")

// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("featuresRaw")
  .setMaxBins(30)

// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
  .addGrid(rf.numTrees, Array(5))
  .addGrid(rf.maxDepth, Array(5))
  .build()

// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setLabelCol("indexedLabel")

如果管道步骤被分成一个转换器管道(VectorAssembler + StringIndexer)和第二个分类器管道,并且如果不需要的列被丢弃在两个管道之间,则训练成功.这意味着要重用模型,必须在训练后保存两个 PipelineModel,并且必须引入中间预处理步骤.

If the pipeline steps are separated into a transformer pipeline (VectorAssembler + StringIndexer) and a second classifier pipeline, and if the unnecessary columns are dropped in between both pipelines, training succeeds. This means for reusing the models, two PipelineModels have to be saved after training and an intermediary preprocessing step has to be introduced.

// Split indexers and forest in two Pipelines.
val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain)
// Transform data and drop all columns, except those needed for training 
val dfTrainT = prePipeline.transform(dfTrain)
val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col))
val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)

val mainPipeline = new Pipeline().setStages(Array(rf))

val cv = new CrossValidator()
  .setEstimator(mainPipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]

(恕我直言)更简洁的解决方案是将所有管道阶段合并为一个管道.

The (imho) much cleaner solution would be to merge all pipeline stages into one pipeline.

val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, assmbleFeatures, rf))

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

// This will fail! 
val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]

但是,将所有 PipelineStages 放入一个 Pipeline 会导致以下异常,这可能是由于 this PR 最终会解决:

However, putting all PipelineStages into one Pipeline leads to the following exception, probably due to the issue this PR will eventually solve:

错误代码生成器:无法编译:org.codehaus.janino.JaninoRuntimeException:类 org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection 的常量池已超过 JVM 限制 0xFFFF

ERROR CodeGenerator: failed to compile: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection has grown past JVM limit of 0xFFFF

这样做的原因是 VectorAssembler 有效地将 DataFrame 中的数据量加倍(在本例中),因为没有可以删除不必要列的转换器.(参见 spark 管道矢量汇编器删除其他列)

The reason for this is that the VectorAssembler effectively doubles (in this example) the amount of data in the DataFrame, as there is no transformer that could drop the unnecessary columns. (See spark pipeline vector assembler drop other columns)

示例适用于 golub 数据集 和以下预处理步骤是必要的:

To the example works on the golub dataset and the following preprocessing steps are necessary:

import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)

// Those steps are necessary, otherwise training would fail either way
val colsToDrop = df.columns.take(5000)
val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)

// Split df in train and test sets
val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))

// Feature columns are columns except "TARGET"
val featureColumns = dfTrain.columns.filter(col => col != "TARGET")

由于我是 Spark 的新手,我不确定解决此问题的最佳方法是什么.你会建议...

As I am new to Spark, I am not sure what would be the best way to solve this issue. Would you suggest...

  1. 创建一个新的转换器,它删除列并且可以合并到管道中?
  2. 拆分两个管道并引入中间步骤
  3. 还有什么吗?:)

或者我是否遗漏了可以解决此问题的任何重要内容(管道步骤、PR 等)?

Or am I missing anything important (pipeline steps, PR, etc.) that would solve this issue?

我实现了一个新的 Transformer DroppingVectorAssembler,它删除了不必要的列,但是,抛出了相同的异常.

I implemented a new Transformer DroppingVectorAssembler, which drops unnecessary columns, however, the same exception is thrown.

除此之外,将 spark.sql.codegen.wholeStage 设置为 false 并不能解决问题.

Besides that, setting spark.sql.codegen.wholeStage to false does not solve the issue.

推荐答案

janino 错误是由于优化程序过程中创建的常量变量的数量造成的.JVM 中允许的常量变量的最大限制为 ((2^16) -1).如果超过此限制,则您将获得 Constant pool for class ... 已超过 JVM 的 0xFFFF 限制

The janino error is due the number of constant variables created during the optimizer process. The maximum limit of constant variables allowed in the JVM is ((2^16) -1). If this limit is exceeded, then you get the Constant pool for class ... has grown past JVM limit of 0xFFFF

将解决此问题的 JIRA 是 SPARK-18016,但是目前仍在进行中.

The JIRA that will fix this issue is SPARK-18016, but it's still in progress at this time.

您的代码很可能在 VectorAssembler 阶段失败,因为它必须在单个优化任务中对数千列执行.

Your code is most likely failing during the VectorAssembler stage, when it has to perform against thousands of columns during a single optimization task.

我针对这个问题开发的解决方法是通过对列的子集进行处理来创建向量向量",然后在最后将结果组合在一起以创建单一特征向量.这可以防止任何单个优化任务超出 JVM 常量限制.它并不优雅,但我已经将它用于达到 10k 列范围的数据集.

The workaround that I developed for this problem is to create a "vector of vectors" by working against subsets of the columns and then bringing the results together at the end to create a singular feature vector. This prevents any single optimization task from exceeding the JVM constant limit. It's not elegant, but I've used it on datasets reaching into the 10k columns range.

此方法还允许您仍然保留单个管道,尽管它需要一些额外的步骤才能使其工作(创建子向量).从子向量创建特征向量后,您可以根据需要删除原始源列.

This method also allows you to still keep a single pipeline, though it requires some additional steps to make it work (creating the sub-vectors). After you've created the feature vector from the sub-vectors, you can drop the original source columns if desired.

示例代码:

// IMPORT DEPENDENCIES
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SQLContext, Row, DataFrame, Column}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}

// Create first example dataframe
val exampleDF = spark.createDataFrame(Seq(
  (1, 1, 2, 3, 8, 4, 5, 1, 3, 2, 0, 4, 2, 8, 1, 1, 2, 3, 8, 4, 5),
  (2, 4, 3, 8, 7, 9, 8, 2, 3, 3, 2, 6, 5, 4, 2, 4, 3, 8, 7, 9, 8),
  (3, 6, 1, 9, 2, 3, 6, 3, 8, 5, 1, 2, 3, 5, 3, 6, 1, 9, 2, 3, 6),
  (4, 7, 8, 6, 9, 4, 5, 4, 9, 8, 2, 4, 9, 2, 4, 7, 8, 6, 9, 4, 5),
  (5, 9, 2, 7, 8, 7, 3, 5, 3, 4, 8, 0, 6, 2, 5, 9, 2, 7, 8, 7, 3),
  (6, 1, 1, 4, 2, 8, 4, 6, 3, 9, 8, 8, 9, 3, 6, 1, 1, 4, 2, 8, 4)
)).toDF("uid", "col1", "col2", "col3", "col4", "col5", 
        "col6", "col7", "col8", "col9", "colA", "colB", 
        "colC", "colD", "colE", "colF", "colG", "colH", 
        "colI", "colJ", "colK")

// Create multiple column lists using the sliding method
val Array(colList1, colList2, colList3, colList4) = exampleDF.columns.filter(_ != "uid").sliding(5,5).toArray

// Create a vector assembler for each column list
val colList1_assembler = new VectorAssembler().setInputCols(colList1).setOutputCol("colList1_vec")
val colList2_assembler = new VectorAssembler().setInputCols(colList2).setOutputCol("colList2_vec")
val colList3_assembler = new VectorAssembler().setInputCols(colList3).setOutputCol("colList3_vec")
val colList4_assembler = new VectorAssembler().setInputCols(colList4).setOutputCol("colList4_vec")

// Create a vector assembler using column list vectors as input
val features_assembler = new VectorAssembler().setInputCols(Array("colList1_vec","colList2_vec","colList3_vec","colList4_vec")).setOutputCol("features")

// Create the pipeline with column list vector assemblers first, then the final vector of vectors assembler last
val pipeline = new Pipeline().setStages(Array(colList1_assembler,colList2_assembler,colList3_assembler,colList4_assembler,features_assembler))

// Fit and transform the data
val featuresDF = pipeline.fit(exampleDF).transform(exampleDF)

// Get the number of features in "features" vector
val featureLength = (featuresDF.schema(featuresDF.schema.fieldIndex("features")).metadata.getMetadata("ml_attr").getLong("num_attrs"))

// Print number of features in "features vector"
print(featureLength)

(注意:创建列列表的方法确实应该以编程方式完成,但为了理解这个概念,我让这个例子保持简单.)

(Note: The method of creating the column lists should really be done programatically, but I've kept this example simple for the sake of understanding the concept.)

这篇关于在 Apache Spark 中为具有大量列的数据集创建 ml 管道的最佳方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆