PySpark 将模型预测与未转换的数据对齐:最佳实践 [英] PySpark align model predictions with untransformed data: best practice

查看:21
本文介绍了PySpark 将模型预测与未转换的数据对齐:最佳实践的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用 PySpark 的 ML 模块,经常会出现以下步骤(在数据清理等之后):

Using PySpark's ML module, the following steps often occur (after data cleaning, etc):

  1. 执行特征和目标转换管道
  2. 创建模型
  3. 从模型生成预测
  4. 为业务用户和模型验证目的将预测和原始数据集合并在一起


采用简化的代码片段:


Taking a boiled-down snippet of code:

predictions = model.transform(test_df)

这个predictions 数据框将只有预测(以及概率,可能还有预测的转换).但它将包含原始数据集.

This predictions dataframe will only have the predictions (and the probabilities and maybe a transformation of the predictions). But it will not contain the original dataset.

我不清楚如何将原始数据集(甚至转换后的test_df)和预测结合起来;没有要加入的共享列,并且 对于大型数据集,添加索引列似乎非常棘手.

It is not obvious to me how I can combine that original dataset (or even the transformed test_df) and the predictions; there is no shared column to join on, and adding an index column seems quite tricky for large datasets.

对于大型数据集,例如我正在处理的数据集,我尝试了此处的建议:

For large datasets, like what I am working with, I have tried the suggestion here:

test_df = test_df.repartition(predictions.rdd.getNumPartitions())
joined_schema = StructType(test_df.schema.fields + predictions.schema.fields)
interim_rdd = test_df.rdd.zip(predictions.rdd).map(lambda x: x[0] + x[1])
full_data = spark.createDataFrame(interim_rdd, joined_schema)
full_data.write.parquet(my_predictions_path, mode="overwrite")


但我不喜欢这个有两个原因:


But I don't like this for 2 reasons:

  1. 我并不完全确定订单是否得到维持.该链接表明它应该是,但我不明白为什么.
  2. 它有时会崩溃,即使我如上所示强制重新分区,并出现以下错误当我尝试通过上面的最后一行写入数据时:

Caused by: org.apache.spark.SparkException: Can only zip RDDs with the same number of elements in each partition

我不想使用有时给出的 monotonically_increasing_id 建议,因为我的数据集太大而不允许这样做.

I do not want to use the monotonically_increasing_id suggestion sometimes given because my dataset is too large to allow for this.

这似乎很重要:如何在无法将预测与原始目标进行比较的情况下报告任何模型质量.其他人是怎么做到的??

It seems so fundamental: how can I report any model quality without being able to compare predictions with original targets. How do others do this??

推荐答案

当调用 model = <your ml-algorithm>.fit(df_train) 时,训练数据集可以有任意数量的附加列.只有包含特征和标签的列将用于训练模型(通常称为 featureslabel,这是可配置的),但可以存在其他列.

When calling model = <your ml-algorithm>.fit(df_train) the train dataset can have any number of additional columns. Only the column that contains the features and labels will be used for training the model (usually called features and label, that is configurable), but additional columns can be present.

当在下一步中对经过训练的模型调用 predictions = model.transform(df_test) 时,将返回一个包含 附加prediction<的数据帧/code>、probabilityrawPrediction.

When calling predictions = model.transform(df_test) on the trained model in the next step, a dataframe is returned that has the additional columns prediction, probability and rawPrediction.

特别是原始特征列和标签列仍然是数据框的一部分.此外,作为 df_test 一部分的任何列在输出中仍然可用,可用于识别行.

Especially the original feature column and the label column is still part of the dataframe. Furthermore, any column that was part of df_test is still available in the output and can be used to identify the row.

prediction = model.transform(df_test)
prediction.printSchema()

印刷品

root
 |-- feature1: double (nullable = true)
 |-- feature2: double (nullable = true)
 |-- feature3: double (nullable = true)
 |-- label: double (nullable = true)
 |-- additional_data: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

if df_test 不仅包含所需的列features,还包含其他列,包括label.例如,通过评估 labelprediction,人们现在可以创建 BinaryClassificationMetrics.

if df_test contains not only the required column features but also the other columns including label. By evaluating label and prediction one could now for example create BinaryClassificationMetrics.

调用 model.transform 在技术上是一个 Dataset.withColumn 调用.

Calling model.transform is a technically a Dataset.withColumn call.

基于 Spark 文档:Spark ML 工作流程通常从一个包含训练数据、特征和标签(=目标值)的数据帧开始.在此示例中,还有一个与 ml 过程无关的附加列.

An example based on the ML Pipeline example from the Spark docs: the Spark ML workflow usually starts with a dataframe containing the training data, features and labels (=target values). In this example, there is also an additional column present that is irrelevant for the ml process.

training_original = spark.createDataFrame([
    (0.0, 1.1, 0.1, 1.0, 'any random value that is not used to train the model'),
    (2.0, 1.0, -1.0, 0.0, 'another value'),
    (2.0, 1.3, 1.0, 0.0, 'value 3'),
    (0.0, 1.2, -0.5, 1.0, 'this value is also not used for training nor testing')],  
    ["feature1", "feature2", "feature3", "label", "additional_data"])

然后使用转换器将特征组合到单个列中.此任务最简单的转换器是 VectorAssembler

Then a transformer is used to combine the features into a single column. The easiest transformer for this task is a VectorAssembler

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=["feature1", "feature2", "feature3"],
    outputCol="features")
training_transformed = assembler.transform(training_original)
#+--------+--------+--------+-----+--------------------+--------------+          
#|feature1|feature2|feature3|label|     additional_data|      features|
#+--------+--------+--------+-----+--------------------+--------------+
#|     0.0|     1.1|     0.1|  1.0|any random value ...| [0.0,1.1,0.1]|
#| ...

现在可以使用列featureslabel 在此数据帧上训练模型.附加列存在,但将被 fit 方法忽略.

The model can now be trained on this dataframe, using the columns features and label. The additional columns are present but will be ignored by the fit method.

lr = LogisticRegression(maxIter=10, regParam=0.01)
model = lr.fit(training_transformed)

现在根据测试数据对模型进行测试.准备与训练数据相同:

Now the model is tested against the test data. The preparation is the same as for the training data:

test_df = spark.createDataFrame([
    (-1.0, 1.5, 1.3, 1.0, 'test value 1'),
    (3.0, 2.0, -0.1, 0.0, 'another test value'),
    (0.0, 2.2, -1.5, 1.0, 'this is not important')],
    ["feature1", "feature2", "feature3", "label", "additional_data"])
test_df_transformed = assembler.transform(test_df)
#+--------+--------+--------+-----+--------------------+--------------+
#|feature1|feature2|feature3|label|     additional_data|      features|
#+--------+--------+--------+-----+--------------------+--------------+
#|    -1.0|     1.5|     1.3|  1.0|        test value 1|[-1.0,1.5,1.3]|
#| ...

运行机器学习魔术会产生

Running the ML magic produces

prediction = model.transform(test_df_transformed)
#+--------+--------+--------+-----+--------------------+--------------+--------------------+--------------------+----------+
#|feature1|feature2|feature3|label|     additional_data|      features|       rawPrediction|         probability|prediction|
#+--------+--------+--------+-----+--------------------+--------------+--------------------+--------------------+----------+
#|    -1.0|     1.5|     1.3|  1.0|        test value 1|[-1.0,1.5,1.3]|[-6.5872014439355...|[0.00137599470692...|       1.0|
#| ...

此数据框现在包含原始输入数据(feature1feature3additional_data)、预期目标值(label)、转换后的特征(features)和模型预测的结果(prediction).这是所有输入值、目标值和预测在一个数据集中可用的地方.这里将是评估模型和计算模型所需指标的地方.将模型应用于新数据会得到相同的结果(但当然没有 label 列).

This dataframe now contains the original input data (feature1 to feature3 and additional_data), the expected target values (label), the transformed features (features) and the result predicted by the model (prediction). This is the place where all input values, the target values and the predictions are available in one dataset. Here would be the place to evaluate the model and calculate the desired metrics for the model. Applying the model on new data would give the same result (but without the label column of course).

这篇关于PySpark 将模型预测与未转换的数据对齐:最佳实践的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆