PySpark将模型预测与未转换的数据对齐:最佳实践 [英] PySpark align model predictions with untransformed data: best practice

查看:61
本文介绍了PySpark将模型预测与未转换的数据对齐:最佳实践的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用PySpark的ML模块,通常会发生以下步骤(在数据清除等之后):

  1. 执行功能和目标转换管道
  2. 创建模型
  3. 根据模型生成预测
  4. 将预测和原始数据集合并在一起,以供业务用户和模型验证之用


获取一段简短的代码段:

  predictions = model.transform(test_df) 

predictions 数据帧将仅包含预测(以及预测的概率和可能的变换).但是它将 包含原始数据集.

如何将预测与原始PySpark DataFrame相结合?

对我来说,我不知道如何将原始数据集(甚至是转换后的 test_df )和预测结合起来;没有要加入的共享列,并且此处的建议:

  test_df = test_df.repartition(predictions.rdd.getNumPartitions())join_schema = StructType(test_df.schema.fields + projections.schema.fields)interim_rdd = test_df.rdd.zip(predictions.rdd).map(lambda x:x [0] + x [1])full_data = spark.createDataFrame(interim_rdd,joind_schema)full_data.write.parquet(my_predictions_path,mode ="overwrite") 


但是我不喜欢这样做有两个原因:

  1. 我不能完全确定是否维持秩序.该链接表明应该如此,但我不明白为什么.
  2. 即使我如上所述强制进行重新分区,有时也会崩溃,并出现以下错误,当我尝试通过上方的最后一行写入数据时:

原因:org.apache.spark.SparkException:每个分区中只能压缩具有相同数量元素的RDD


我不想使用有时给出的 monotonically_increasing_id 建议,因为我的数据集太大而无法满足要求.


似乎很基本:我如何报告任何模型质量而又无法将预测与原始目标进行比较.别人怎么做??

解决方案

调用 model =< your ml-algorithm> .fit(df_train)时,训练数据集可以具有任意数量的附加列.仅包含要素和标签的列将用于训练模型(通常称为 features label ,它们是可配置的),但是可以存在其他列./p>

下一步,当在经过训练的模型上调用 predictions = model.transform(df_test)时,将返回具有 additional prediction 概率 rawPrediction .

尤其是原始要素列和标签列仍是数据框的一部分.此外,作为 df_test 一部分的 any 列仍在输出中可用,并可用于标识该行.

  prediction = model.transform(df_test)projection.printSchema() 

打印

  root|-feature1:双精度(nullable = true)|-feature2:双精度(nullable = true)|-feature3:双精度(nullable = true)|-标签:双精度(nullable = true)|-Additional_data:字符串(可为空= true)|-功能:向量(nullable = true)|-rawPrediction:向量(nullable = true)|-概率:向量(nullable = true)|-预测:双精度(nullable = false) 

如果 df_test 不仅包含必需的列功能,还包含其他列,包括 label .通过评估 label prediction ,现在可以例如创建 training_original = spark.createDataFrame([(0.0、1.1、0.1、1.0,任何不用于训练模型的随机值"),(2.0、1.0,-1.0、0.0,另一个值"),(2.0、1.3、1.0、0.0,值3"),(0.0、1.2,-0.5、1.0,此值也不用于培训或测试")],["feature1","feature2","feature3","label","additional_data"))

然后使用变压器将特征组合到单个列中.此任务最简单的转换器是 导入VectorAssembler汇编程序= VectorAssembler(inputCols = ["feature1","feature2","feature3"],outputCol =功能")training_transformed = assembler.transform(training_original)#+ -------- + -------- + -------- + ----- + -------------------- + -------------- +#|功能1 |功能2 |功能3 |标签|Additional_data |功能|#+ -------- + -------- + -------- + ----- + -------------------- + -------------- +#|0.0 |1.1 |0.1 |1.0 |任何随机值... |[0.0,1.1,0.1] |#|...

现在可以使用列 features label 的列在此数据帧上训练模型.存在其他列,但 fit 方法将忽略它们.

  lr = LogisticRegression(maxIter = 10,regParam = 0.01)模型= lr.fit(training_transformed) 

现在,将根据测试数据对模型进行测试.准备与训练数据相同:

  test_df = spark.createDataFrame([(-1.0、1.5、1.3、1.0,测试值1"),(3.0、2.0,-0.1、0.0,另一个测试值"),(0.0、2.2,-1.5、1.0,这并不重要")]],["feature1","feature2","feature3","label","additional_data"))test_df_transformed = assembler.transform(test_df)#+ -------- + -------- + -------- + ----- + -------------------- + -------------- +#|功能1 |功能2 |功能3 |标签|Additional_data |功能|#+ -------- + -------- + -------- + ----- + -------------------- + -------------- +#|-1.0 |1.5 |1.3 |1.0 |测试值1 | [-1.0,1.5,1.3] |#|... 

运行ML魔法会产生

  prediction = model.transform(test_df_transformed)#+ -------- + -------- + -------- + ----- + -------------------- + -------------- + -------------------- + -------------------- + ---------- +#|功能1 |功能2 |功能3 |标签|Additional_data |功能|rawPrediction |概率|预测|#+ -------- + -------- + -------- + ----- + -------------------- + -------------- + -------------------- + -------------------- + ---------- +#|-1.0 |1.5 |1.3 |1.0 |测试值1 | [-1.0,1.5,1.3] | [-6.5872014439355 ... | [0.00137599470692 ... |1.0 |#|... 

此数据框现在包含原始输入数据( feature1 feature3 additional_data ),预期目标值( label ),转换后的特征( features )和模型预测的结果( prediction ).这是一个数据集中所有输入值,目标值和预测都可用的地方.这里是评估模型并计算模型所需度量的地方.将模型应用于新数据将得到相同的结果(但当然没有 label 列).

Using PySpark's ML module, the following steps often occur (after data cleaning, etc):

  1. Perform feature and target transform pipeline
  2. Create model
  3. Generate predictions from the model
  4. Merge predictions and original dataset together for business users and for model validation purposes


Taking a boiled-down snippet of code:

predictions = model.transform(test_df)

This predictions dataframe will only have the predictions (and the probabilities and maybe a transformation of the predictions). But it will not contain the original dataset.

How Can I Combine Predictions with Original PySpark DataFrame?

It is not obvious to me how I can combine that original dataset (or even the transformed test_df) and the predictions; there is no shared column to join on, and adding an index column seems quite tricky for large datasets.

Current Solution:

For large datasets, like what I am working with, I have tried the suggestion here:

test_df = test_df.repartition(predictions.rdd.getNumPartitions())
joined_schema = StructType(test_df.schema.fields + predictions.schema.fields)
interim_rdd = test_df.rdd.zip(predictions.rdd).map(lambda x: x[0] + x[1])
full_data = spark.createDataFrame(interim_rdd, joined_schema)
full_data.write.parquet(my_predictions_path, mode="overwrite")


But I don't like this for 2 reasons:

  1. I am not completely certain that order is maintained. The link suggests that it should be, but I do not understand why.
  2. It sometimes crashes, even though I am forcing a repartitioning as show above, with the following error when I try to write the data via that last line above:

Caused by: org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition


I do not want to use the monotonically_increasing_id suggestion sometimes given because my dataset is too large to allow for this.


It seems so fundamental: how can I report any model quality without being able to compare predictions with original targets. How do others do this??

解决方案

When calling model = <your ml-algorithm>.fit(df_train) the train dataset can have any number of additional columns. Only the column that contains the features and labels will be used for training the model (usually called features and label, that is configurable), but additional columns can be present.

When calling predictions = model.transform(df_test) on the trained model in the next step, a dataframe is returned that has the additional columns prediction, probability and rawPrediction.

Especially the original feature column and the label column is still part of the dataframe. Furthermore, any column that was part of df_test is still available in the output and can be used to identify the row.

prediction = model.transform(df_test)
prediction.printSchema()

prints

root
 |-- feature1: double (nullable = true)
 |-- feature2: double (nullable = true)
 |-- feature3: double (nullable = true)
 |-- label: double (nullable = true)
 |-- additional_data: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

if df_test contains not only the required column features but also the other columns including label. By evaluating label and prediction one could now for example create BinaryClassificationMetrics.

Calling model.transform is a technically a Dataset.withColumn call.


An example based on the ML Pipeline example from the Spark docs: the Spark ML workflow usually starts with a dataframe containing the training data, features and labels (=target values). In this example, there is also an additional column present that is irrelevant for the ml process.

training_original = spark.createDataFrame([
    (0.0, 1.1, 0.1, 1.0, 'any random value that is not used to train the model'),
    (2.0, 1.0, -1.0, 0.0, 'another value'),
    (2.0, 1.3, 1.0, 0.0, 'value 3'),
    (0.0, 1.2, -0.5, 1.0, 'this value is also not used for training nor testing')],  
    ["feature1", "feature2", "feature3", "label", "additional_data"])

Then a transformer is used to combine the features into a single column. The easiest transformer for this task is a VectorAssembler

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=["feature1", "feature2", "feature3"],
    outputCol="features")
training_transformed = assembler.transform(training_original)
#+--------+--------+--------+-----+--------------------+--------------+          
#|feature1|feature2|feature3|label|     additional_data|      features|
#+--------+--------+--------+-----+--------------------+--------------+
#|     0.0|     1.1|     0.1|  1.0|any random value ...| [0.0,1.1,0.1]|
#| ...

The model can now be trained on this dataframe, using the columns features and label. The additional columns are present but will be ignored by the fit method.

lr = LogisticRegression(maxIter=10, regParam=0.01)
model = lr.fit(training_transformed)

Now the model is tested against the test data. The preparation is the same as for the training data:

test_df = spark.createDataFrame([
    (-1.0, 1.5, 1.3, 1.0, 'test value 1'),
    (3.0, 2.0, -0.1, 0.0, 'another test value'),
    (0.0, 2.2, -1.5, 1.0, 'this is not important')],
    ["feature1", "feature2", "feature3", "label", "additional_data"])
test_df_transformed = assembler.transform(test_df)
#+--------+--------+--------+-----+--------------------+--------------+
#|feature1|feature2|feature3|label|     additional_data|      features|
#+--------+--------+--------+-----+--------------------+--------------+
#|    -1.0|     1.5|     1.3|  1.0|        test value 1|[-1.0,1.5,1.3]|
#| ...

Running the ML magic produces

prediction = model.transform(test_df_transformed)
#+--------+--------+--------+-----+--------------------+--------------+--------------------+--------------------+----------+
#|feature1|feature2|feature3|label|     additional_data|      features|       rawPrediction|         probability|prediction|
#+--------+--------+--------+-----+--------------------+--------------+--------------------+--------------------+----------+
#|    -1.0|     1.5|     1.3|  1.0|        test value 1|[-1.0,1.5,1.3]|[-6.5872014439355...|[0.00137599470692...|       1.0|
#| ...

This dataframe now contains the original input data (feature1 to feature3 and additional_data), the expected target values (label), the transformed features (features) and the result predicted by the model (prediction). This is the place where all input values, the target values and the predictions are available in one dataset. Here would be the place to evaluate the model and calculate the desired metrics for the model. Applying the model on new data would give the same result (but without the label column of course).

这篇关于PySpark将模型预测与未转换的数据对齐:最佳实践的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆