火花accumulableCollection不与mutable.Map工作 [英] Spark accumulableCollection does not work with mutable.Map

查看:1110
本文介绍了火花accumulableCollection不与mutable.Map工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用星火办雇员记录的积累和我使用的Spark的累加器。我使用的地图[EMPID,EMP作为accumulableCollection这样我可以通过自己的ID搜索的员工。我曾尝试一切,但它不工作。可以的,如果有一个与我使用accumulableCollection或不支持地图的方式有任何逻辑问题,有人点。以下是我的code

 包演示进口org.apache.spark {SparkContext,SparkConf,记录}进口org.apache.spark.SparkContext._
进口scala.collection.mutable
反对MapAccuApp扩展应用程序日志记录{
  案例类员工(ID:字符串,名称:字符串,部门:字符串)  VAL的conf =新SparkConf()。setAppName(员工)setMaster(本地[4])
  VAL SC =新SparkContext(CONF)  隐高清empMapToSet(empIdToEmp:mutable.Map [字符串,雇员]):mutable.MutableList [雇员] = {
    empIdToEmp.foldLeft(mutable.MutableList [雇员]()){(L,E)=> L + = e._2}
  }  VAL empAccu = sc.accumulableCollection [mutable.Map [字符串,雇员],雇员](mutable.Map [字符串,雇员]())  VAL员工=名单(
    员工(10001,汤姆,工程),
    员工(10002,罗杰,销售),
    员工(10003,拉斐尔,销售),
    员工(10004,国宝,销售),
    员工(10005,摩尔,销售),
    员工(10006,曙光,销售),
    员工(10007,梭哈,营销),
    员工(10008,布朗,质量保证)
  )  的System.out.println(员工数+ employees.size)
  sc.parallelize(员工).foreach(E => {
    empAccu + = E
  })  的System.out.println(empAccumulator大小+ empAccu.value.size)
}


解决方案

使用 accumulableCollection 似乎矫枉过正您的问题,如下所示:

 进口org.apache.spark {AccumulableParam,蓄积,SparkContext,SparkConf}进口scala.collection.mutable案例类员工(ID:字符串,名称:字符串,部门:字符串)VAL的conf =新SparkConf()。setAppName(员工)setMaster(本地[4])
VAL SC =新SparkContext(CONF)隐高清mapAccum =
    新AccumulableParam [mutable.Map [字符串,雇员],雇员]
{
  高清addInPlace(T1:mutable.Map [字符串,雇员],
                 T2:mutable.Map [字符串,雇员])
      :mutable.Map [字符串,雇员] = {
    T1 + T2 =
    T1
  }
  高清addAccumulator(T1:mutable.Map [字符串,雇员],E:员工)
      :mutable.Map [字符串,雇员] = {
    T1 + =(e.id - > E)
    T1
  }
  DEF零(T:mutable.Map [字符串,雇员])
      :mutable.Map [字符串,雇员] = {
    mutable.Map [字符串,雇员]()
  }
}VAL empAccu = sc.accumulable(mutable.Map [字符串,雇员]())VAL员工=名单(
  员工(10001,汤姆,工程),
  员工(10002,罗杰,销售),
  员工(10003,拉斐尔,销售),
  员工(10004,国宝,销售),
  员工(10005,摩尔,销售),
  员工(10006,曙光,销售),
  员工(10007,梭哈,营销),
  员工(10008,布朗,质量保证)
)的System.out.println(员工数+ employees.size)sc.parallelize(员工).foreach(E => {
  empAccu + = E
})的println(empAccumulator大小+ empAccu.value.size)
empAccu.value.foreach(进入= GT;
  的println(EMP ID =+ entry._1 +NAME =+ entry._2.name))

虽然这是记录不完整,现在,请查看星火codeBase的相关测试是相当启发性。

编辑:事实证明,使用 accumulableCollection 确实的具有价值:你不需要定义 AccumulableParam 及以下的作品。我要离开这两个解决方案的情况下,他们是有用的人。

 案例类员工(ID:字符串,名称:字符串,部门:字符串)VAL的conf =新SparkConf()。setAppName(员工)setMaster(本地[4])
VAL SC =新SparkContext(CONF)VAL empAccu = sc.accumulableCollection(mutable.HashMap [字符串,雇员]())VAL员工=名单(
  员工(10001,汤姆,工程),
  员工(10002,罗杰,销售),
  员工(10003,拉斐尔,销售),
  员工(10004,国宝,销售),
  员工(10005,摩尔,销售),
  员工(10006,曙光,销售),
  员工(10007,梭哈,营销),
  员工(10008,布朗,质量保证)
)的System.out.println(员工数+ employees.size)sc.parallelize(员工).foreach(E => {
  //注意这是从previous解决方案不同
  empAccu + = e.id - > Ë
})的println(empAccumulator大小+ empAccu.value.size)
empAccu.value.foreach(进入= GT;
  的println(EMP ID =+ entry._1 +NAME =+ entry._2.name))

使用火花1.0.2测试这两个解决方案。

I am using Spark to do employee record accumulation and for that I use Spark's accumulator. I am using Map[empId, emp] as accumulableCollection so that I can search employee by their ids. I have tried everything but it does not work. Can someone point if there is any logical issues with the way I am using accumulableCollection or Map is not supported. Following is my code

package demo

import org.apache.spark.{SparkContext, SparkConf, Logging}

import org.apache.spark.SparkContext._
import scala.collection.mutable


object MapAccuApp extends App with Logging {
  case class Employee(id:String, name:String, dept:String)

  val conf = new SparkConf().setAppName("Employees") setMaster ("local[4]")
  val sc = new SparkContext(conf)

  implicit def empMapToSet(empIdToEmp: mutable.Map[String, Employee]): mutable.MutableList[Employee] = {
    empIdToEmp.foldLeft(mutable.MutableList[Employee]()) { (l, e) => l += e._2}
  }

  val empAccu = sc.accumulableCollection[mutable.Map[String, Employee], Employee](mutable.Map[String,Employee]())

  val employees = List(
    Employee("10001", "Tom", "Eng"),
    Employee("10002", "Roger", "Sales"),
    Employee("10003", "Rafael", "Sales"),
    Employee("10004", "David", "Sales"),
    Employee("10005", "Moore", "Sales"),
    Employee("10006", "Dawn", "Sales"),
    Employee("10007", "Stud", "Marketing"),
    Employee("10008", "Brown", "QA")
  )

  System.out.println("employee count " + employees.size)


  sc.parallelize(employees).foreach(e => {
    empAccu += e
  })

  System.out.println("empAccumulator size " + empAccu.value.size)
}

解决方案

Using accumulableCollection seems like overkill for your problem, as the following demonstrates:

import org.apache.spark.{AccumulableParam, Accumulable, SparkContext, SparkConf}

import scala.collection.mutable

case class Employee(id:String, name:String, dept:String)

val conf = new SparkConf().setAppName("Employees") setMaster ("local[4]")
val sc = new SparkContext(conf)

implicit def mapAccum =
    new AccumulableParam[mutable.Map[String,Employee], Employee]
{
  def addInPlace(t1: mutable.Map[String,Employee],
                 t2: mutable.Map[String,Employee])
      : mutable.Map[String,Employee] = {
    t1 ++= t2
    t1
  }
  def addAccumulator(t1: mutable.Map[String,Employee], e: Employee)
      : mutable.Map[String,Employee] = {
    t1 += (e.id -> e)
    t1
  }
  def zero(t: mutable.Map[String,Employee])
      : mutable.Map[String,Employee] = {
    mutable.Map[String,Employee]()
  }
}

val empAccu = sc.accumulable(mutable.Map[String,Employee]())

val employees = List(
  Employee("10001", "Tom", "Eng"),
  Employee("10002", "Roger", "Sales"),
  Employee("10003", "Rafael", "Sales"),
  Employee("10004", "David", "Sales"),
  Employee("10005", "Moore", "Sales"),
  Employee("10006", "Dawn", "Sales"),
  Employee("10007", "Stud", "Marketing"),
  Employee("10008", "Brown", "QA")
)

System.out.println("employee count " + employees.size)

sc.parallelize(employees).foreach(e => {
  empAccu += e
})

println("empAccumulator size " + empAccu.value.size)
empAccu.value.foreach(entry =>
  println("emp id = " + entry._1 + " name = " + entry._2.name))

While this is poorly documented right now, the relevant test in the Spark codebase is quite illuminating.

Edit: It turns out that using accumulableCollection does have value: you don't need to define an AccumulableParam and the following works. I'm leaving both solutions in case they're useful to people.

case class Employee(id:String, name:String, dept:String)

val conf = new SparkConf().setAppName("Employees") setMaster ("local[4]")
val sc = new SparkContext(conf)

val empAccu = sc.accumulableCollection(mutable.HashMap[String,Employee]())

val employees = List(
  Employee("10001", "Tom", "Eng"),
  Employee("10002", "Roger", "Sales"),
  Employee("10003", "Rafael", "Sales"),
  Employee("10004", "David", "Sales"),
  Employee("10005", "Moore", "Sales"),
  Employee("10006", "Dawn", "Sales"),
  Employee("10007", "Stud", "Marketing"),
  Employee("10008", "Brown", "QA")
)

System.out.println("employee count " + employees.size)

sc.parallelize(employees).foreach(e => {
  // notice this is different from the previous solution
  empAccu += e.id -> e
})

println("empAccumulator size " + empAccu.value.size)
empAccu.value.foreach(entry =>
  println("emp id = " + entry._1 + " name = " + entry._2.name))

Both solutions tested using Spark 1.0.2.

这篇关于火花accumulableCollection不与mutable.Map工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆