Spark MLlib:为每个数据组构建分类器 [英] Spark MLlib: building classifiers for each data group

查看:70
本文介绍了Spark MLlib:为每个数据组构建分类器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经用某些组号标记了带有标记的向量(LabeledPoint-s).对于每个组,我需要创建一个单独的 Logistic回归分类器:

I have labeled vectors (LabeledPoint-s) taged by some group number. For every group I need to create a separate Logistic Regression classifier:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}

object Scratch {

  val train = Seq(
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))),
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))),
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))),
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))),
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))),
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0)))))
  )

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    // set up environment
    val conf = new SparkConf()
      .setMaster("local[5]")
      .setAppName("Scratch")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)

    val trainRDD = sc.parallelize(train)
    val modelByGroup = trainRDD.groupByKey().map({case (group, iter) => 
                           (group, new LogisticRegressionWithLBFGS().run(iter))})
  }

}

LogisticRegressionWithLBFGS().run(iter)无法编译,因为runRDD一起使用,而不与groupBy返回的迭代器一起使用. 请建议如何构建与输入数据中的组(标签)一样多的分类器.

LogisticRegressionWithLBFGS().run(iter) does not compile because run works with RDD and not with iterator that groupBy returns. Please advise how to build as many classifiers as there are groups (tags) in the input data.

更新-演示嵌套的RDD迭代不起作用:

Update - demonstrates that nested RDD iteration does not work:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}

object Scratch {

  val train = Seq(
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))),
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))),
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))),
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))),
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))),
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0)))))
  )

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    // set up environment
    val conf = new SparkConf()
      .setMaster("local[5]")
      .setAppName("Scratch")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)

    val trainRDD = sc.parallelize(train)
    val keys : RDD[Int] = trainRDD.map({case (key,_) => key}).distinct
    for (key <- keys) {
    // key is Int here!
      // Get train data for the current group (key):
      val groupTrain = trainRDD.filter({case (x, _) => x == key }).cache()

      /**
       * Which results in org.apache.spark.SparkException:
       * RDD transformations and actions can only be invoked by the driver,
       * not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid
       * because the values transformation and count action cannot be performed inside of the rdd1.map transformation.
       * For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.sc(RDD.scala:87)
       */
    }
  }
}

看起来没有办法在其他转换中使用转换,对吗?

Looks like there is no way to use transformations inside other transformations, correct?

推荐答案

如果在每个组上使用分类器,则不需要mllib. Mllib设计为与分布式集一起使用(您的集不是每个工作人员上都有大量本地集).您可以在地图功能的每个组上使用一些本地机器学习库,例如 weka .

If your using classifier on each group you don't need mllib. Mllib is designed to use with distributed sets (your sets are not you have butch of local sets on each worker). You can just use some local machine learning library like weka on each group in map function.

val keys = wholeRDD.map(_._1).distinct.collect

var models = List()
for (key <- keys) {
  val valuesForKey = wholeRDD.filter(_._1 == key)
  // train model
  ...
  models = model::models
}

这篇关于Spark MLlib:为每个数据组构建分类器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆