在Spark ML管道中缓存中间结果 [英] Caching intermediate results in Spark ML pipeline

查看:204
本文介绍了在Spark ML管道中缓存中间结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

最近,我计划将我的独立python ML代码迁移到spark.事实证明,spark.ml中的ML管道非常方便,具有简化的API,用于链接算法阶段和超参数网格搜索.

Lately I'm planning to migrate my standalone python ML code to spark. The ML pipeline in spark.ml turns out quite handy, with streamlined API for chaining up algorithm stages and hyper-parameter grid search.

尽管如此,我发现它支持现有文档中一个不起眼的重要功能:缓存中间结果.当流水线涉及计算密集型阶段时,此功能的重要性就会提高.

Still, I found its support for one important feature obscure in existing documents: caching of intermediate results. The importance of this feature arise when the pipeline involves computation intensive stages.

例如,在我的情况下,我使用一个巨大的稀疏矩阵对时间序列数据执行多个移动平均,以形成输入特征.矩阵的结构由某些超参数确定.事实证明,此步骤是整个管道的瓶颈,因为我必须在运行时构造矩阵.

For example, in my case I use a huge sparse matrix to perform multiple moving averages on time series data in order to form input features. The structure of the matrix is determined by some hyper-parameter. This step turns out to be a bottleneck for the entire pipeline because I have to construct the matrix in runtime.

在参数搜索过程中,我通常要检查除此结构参数"以外的其他参数.因此,如果在结构参数"不变的情况下可以重用巨大的矩阵,则可以节省大量时间.因此,我有意形成了我的代码以缓存和重用这些中间结果.

During parameter search, I usually have other parameters to examine other than this "structure parameter". So if I can reuse the huge matrix when the "structure parameter" is unchanged, I can save tons of time. For this reason, I intentionally formed my code to cache and reuse these intermediate results.

所以我的问题是: Spark的ML管道能否自动处理中间缓存?还是我必须手动编写代码才能这样做?如果是这样,有什么最佳实践可以学习吗?

So my question is: can Spark's ML pipeline handle intermediate caching automatically? Or do I have to manually form code to do so? If so, is there any best practice to learn from?

P.S.我已经研究了正式文件和其他材料,但似乎没有人讨论这个话题.

P.S. I have looked into the official document and some other material, but none of them seems to discuss this topic.

推荐答案

所以我遇到了同样的问题,解决的方法是我实现了自己的PipelineStage,它缓存输入的DataSet并按原样返回.

So I ran into the same problem and the way I solved is that I have implemented my own PipelineStage, that caches the input DataSet and returns it as it is.

import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.types.StructType

class Cacher(val uid: String) extends Transformer with DefaultParamsWritable {
  override def transform(dataset: Dataset[_]): DataFrame = dataset.toDF.cache()

  override def copy(extra: ParamMap): Transformer = defaultCopy(extra)

  override def transformSchema(schema: StructType): StructType = schema

  def this() = this(Identifiable.randomUID("CacherTransformer"))
}

要使用它,则需要执行以下操作:

To use it then you would do something like this:

new Pipeline().setStages(Array(stage1, new Cacher(), stage2))

这篇关于在Spark ML管道中缓存中间结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆