针对每个分区的火花训练ml模型.这样,每个数据帧分区都会有一个经过训练的模型 [英] Training ml models on spark per partitions. Such that there will be a trained model per partition of dataframe

查看:53
本文介绍了针对每个分区的火花训练ml模型.这样,每个数据帧分区都会有一个经过训练的模型的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何使用Scala在Spark中对每个分区进行并行模型训练?此处给出的解决方案在Pyspark中.我正在寻找Scala中的解决方案.使用foreachPartition在Spark中为每个分区构建一个ML模型?

How to do parallel model training per partition in spark using scala? The solution given here is in Pyspark. I'm looking for solution in scala. How can you efficiently build one ML model per partition in Spark with foreachPartition?

推荐答案

  1. 使用分区col获取不同的分区
  2. 创建一个包含100个线程的线程池
  3. 为每个线程创建将来的对象并运行

示例代码可能如下-

   // Get an ExecutorService 
    val threadPoolExecutorService = getExecutionContext("name", 100)
// check https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala#L50

   val uniquePartitionValues: List[String] = ...//getDistingPartitionsUsingPartitionCol
    // Asynchronous invocation to training. The result will be collected from the futures.
    val uniquePartitionValuesFutures = uniquePartitionValues.map(partitionValue => {
      Future[Double] {
        try {
            // get dataframe where partitionCol=partitionValue
            val partitionDF = mainDF.where(s"partitionCol=$partitionValue")
          // do preprocessing and training using any algo with an input partitionDF and return accuracy
        } catch {
          ....
      }(threadPoolExecutorService)
    })

    // Wait for metrics to be calculated
    val foldMetrics = uniquePartitionValuesFutures.map(Await.result(_, Duration.Inf))
    println(s"output::${foldMetrics.mkString("  ###  ")}")

这篇关于针对每个分区的火花训练ml模型.这样,每个数据帧分区都会有一个经过训练的模型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆