如何改造斯卡拉嵌套地图操作斯卡拉星火操作? [英] How to transform Scala nested map operation to Scala Spark operation?

查看:154
本文介绍了如何改造斯卡拉嵌套地图操作斯卡拉星火操作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面code数据集中计算两个表之间的距离eucleudian:

  VAL用户1 =列表(一,1,3,2,6,9)//>用户1:列表[字符串] =列表(一,1,3,2,6,9)
  VAL用户2 =列表(B,1,2,2,5,9)//>用户2:列表[字符串] =名单(B,1,2,2,5,9)  VAL所有=列出(用户1,用户2)//>所有:列表[列表[字符串] =名单(名单(一,1,3,2,6,9),表(B,1,2,2,5,
                                                  // | 9))  高清euclDistance(用户A:列表[字符串],用户B:列表[字符串])= {
    的println(比较+用户A(0)+和+用户B(0))
    VAL压缩= userA.zip(用户B)
    VAL lastElements =压缩匹配{
      案例(H :: T)=> ŧ
    }
    VAL subElements属性= lastElements.map(M =&GT((m._1.toDouble - m._2.toDouble)*(m._1.toDouble - m._2.toDouble)))
    VAL总结= subElements.sum
    VAL sqRoot =的Math.sqrt(总结)    sqRoot
  } //> euclDistance:(用户A:列表[字符串],用户B:列表[字符串])双人间  all.map(M =>(all.map(平方米= GT; euclDistance(M,2))))
                                                  //>进行比较的和一
                                                  // |比较a和b
                                                  // |比较b和一个
                                                  // |比较B和B
                                                  // | RES0:列表[列表[双] =名单(名单(0.0,1.4142135623730951),列表(1.414213
                                                  // | 5623730951,0.0))

不过,这可怎么翻译成平行星火斯卡拉操作?

当我打印distAll的内容:

 斯卡拉> distAll.foreach(P => p.foreach(的println))
14/10/24 23点09分42秒INFO SparkContext:开始的工作:在的foreach LT&;&控制台GT;:21
14/10/24 23点09分42秒INFO DAGScheduler:得到作业2(的foreach AT<&控制台GT;:21)4
输出分区(allowLocal = FALSE)
14/10/24 23点09分42秒INFO DAGScheduler:最终阶段:第2阶段在&LT(的foreach;控制台>:2
1)
14/10/24 23点09分42秒INFO DAGScheduler:最后阶段的家长:列表()
14/10/24 23点09分42秒INFO DAGScheduler:缺少父母:列表()
14/10/24 23点09分42秒INFO DAGScheduler:提交阶段2(ParallelCollectionRDD [1
]在并行化AT<&控制台GT;:18),它没有父母失踪
14/10/24 23点09分42秒INFO MemoryStore的:ensureFreeSpace(1152)调用curMem = 115
2,MAXMEM = 278019440
14/10/24 23点09分42秒INFO MemoryStore的:阻止broadcast_2存储在​​内存中的值
 (估计规模1152.0 B,免费265.1 MB)
14/10/24 23点09分42秒INFO DAGScheduler:从第2阶段提交4人失踪任务(霸
rallelCollectionRDD [1]在并行化AT<&控制台GT;:18)
14/10/24 23点09分42秒INFO TaskSchedulerImpl:添加任务设置2.0 4个任务
14/10/24 23点09分42秒INFO TaskSetManager:在第一阶段2.0(TID 8,启动任务0.0卤味
calhost,PROCESS_LOCAL,1169字节)
14/10/24 23点09分42秒INFO TaskSetManager:在第一阶段2.0(TID 9,启动任务1.0卤味
calhost,PROCESS_LOCAL,1419字节)
14/10/24 23点09分42秒INFO TaskSetManager:在第一阶段2.0(TID 10,L开始任务2.0
ocalhost,PROCESS_LOCAL,1169字节)
14/10/24 23点09分42秒INFO TaskSetManager:在第一阶段2.0(TID 11,L在开始任务3.0
ocalhost,PROCESS_LOCAL,1420字节)
14/10/24 23点09分42秒INFO执行人:在第一阶段2.0上运行任务0.0(TID 8)
14/10/24 23点09分42秒INFO执行人:在第一阶段2.0上运行任务1.0(TID 9)
14/10/24 23点09分42秒INFO执行人:在第一阶段2.0上运行任务3.0(TID 11)
A14 / 10月24日23时09分42秒INFO执行人:在第一阶段2.0上运行任务2.0(TID 10)14/10/24 23点09分42秒INFO执行人:在第一阶段2.0成品任务2.0(TID 10)。 585通过
TES结果发送到驱动器
114/10/24 23时09分42秒INFO TaskSetManager:在第一阶段2.0(TID 10)成品任务2.0
在本地主机上的16毫秒(1/4)314/10/24 23时09分42秒INFO执行人:在第一阶段2.0(TID 8)完成任务0.0。 585通过
TES结果发送到驱动器214/10/24 23时09分42秒INFO TaskSetManager:完成任务0.0舞台2.0(TID 8)我
N于本地主机16毫秒(2/4)6
9
14/10/24 23点09分42秒INFO执行人:在第一阶段2.0(TID 9)成品任务1.0。 585 BYT
ES结果发送到驱动器
B14 / 10月24日23时09分42秒INFO TaskSetManager:在第一阶段2.0(TID 9)成品任务1.0我
N于本地主机16毫秒(3/4)1
2
2

9
14/10/24 23点09分42秒INFO执行人:在第一阶段2.0成品任务3.0(TID 11)。 585通过
TES结果发送到驱动器
14/10/24 23点09分42秒INFO TaskSetManager:在第一阶段2.0(TID 11)我完成了任务3.0
N于本地主机31毫秒(4/4)
14/10/24 23点09分42秒INFO DAGScheduler:第2阶段(的foreach AT<&控制台GT; 21)完成
在0.031小号
14/10/24 23点09分42秒INFO TaskSchedulerImpl:删除taskset的2.0,其任务有
全部建成后,从池
14/10/24 23点09分42秒INFO SparkContext:工作完成:在的foreach LT&;&控制台GT;:21,花了
 0.037641021小号

的距离不填充?

更新:

要获得尤金Zhulenev回答以下为我工作,我需要作出以下更改:

用了java.io.Serializable扩展UserObject

也重新命名为用户UserObject。

下面更新code:

  VAL用户1 =列表(一,1,3,2,6,9)//>用户1:列表[字符串] =列表(一,1,3,2,6,9)
  VAL用户2 =列表(B,1,2,2,5,9)//>用户2:列表[字符串] =名单(B,1,2,2,5,9)  案例类用户(名称:字符串,功能:矢量[双])反对UserObject扩展了java.io.Serializable {
    高清fromlist里(名单:名单[字符串]):用户名单= {匹配
      方案H ::尾=>用户(H,tail.map(_。toDouble).toVector)
    }
  } VAL所有=名单(UserObject.fromList(用户1),UserObject.fromList(用户2))
    VAL用户= sc.parallelize(all.combinations(2).toSeq.map {
    案例升:: - [R ::无=> (L,R)
  })   高清euclDistance(用户A:用户,用户B:用户)= {
    的println(S比较$ {userA.name}和$ {userB.name})
    VAL subElements属性=(userA.features压缩userB.features){图
      M => (m._1 - m._2)*(m._1 - m._2)
    }
    VAL总结= subElements.sum
    VAL sqRoot =的Math.sqrt(总结)的println(值+ sqRoot)
    sqRoot
  }  users.foreach(T => euclDistance(t._1,t._2))

更新2:

我试过code在maasg答案,但收到错误:

 斯卡拉> VAL userDistanceRdd = {usersRdd.map的情况下(用户1,用户2)=> {
     | VAL数据= sc.broadcast.value
     | VAL距离= euclidDistance(数据(用户1),数据(用户2))
     | ((用户1,用户2),距离)
     | }
     | }
<&控制台GT;:27:错误:类SparkContex的方法广播缺少参数
吨;
按照这个方法,'_'如果你希望把它当作一个部分应用FUNCT
离子
               VAL数据= sc.broadcast.value

下面是整个code。与我的修正:

 键入用户Id =字符串
userdata类型=数组[双]VAL用户:列表[用户ID] =列表(A,B)
VAL数据:地图[用户ID的UserData] =地图((A,阵列(3.0,4.0)),
(b的,阵列(3.0,4.0)))高清组合[T](l:列出[T]):列表[(T,T)= L比赛{
    案件无=>零
    方案H ::无=>零
    方案H :: T => t.map(X =>(H,X))+ +组合(t)的
}
VAL broadcastData = sc.broadcast(数据)VAL usersRdd = sc.parallelize(组合(用户))VAL euclidDistance:(的UserData,的UserData)=>双=(X,Y)=>
    的Math.sqrt((X拉链y)的.MAP {壳体(A,B)=> math.pow(A-B,2)}。总和)
VAL userDistanceRdd = {usersRdd.map的情况下(用户1,用户2)=> {
        VAL数据= sc.broadcast.value
        VAL距离= euclidDistance(数据(用户1),数据(用户2))
        ((用户1,用户2),距离)
    }
    }

有关maasg code工作,我需要添加} userDistanceRdd 功能。

code:

 键入用户Id =字符串
userdata类型=数组[双]VAL用户:列表[用户ID] =列表(A,B)VAL数据:地图[用户ID的UserData] =地图((A,阵列(3.0,4.0)),
(b的,阵列(3.0,3.0)))高清组合[T](l:列出[T]):列表[(T,T)= L比赛{
    案件无=>零
    方案H ::无=>零
    方案H :: T => t.map(X =>(H,X))+ +组合(t)的
}VAL broadcastData = sc.broadcast(数据)
VAL usersRdd = sc.parallelize(组合(用户))
VAL euclidDistance:(的UserData,的UserData)=>双=(X,Y)=>
    的Math.sqrt((X拉链y)的.MAP {壳体(A,B)=> math.pow(A-B,2)}。总和)
VAL userDistanceRdd = {usersRdd.map的情况下(用户1,用户2)=> {
        VAL数据= broadcastData.value
        VAL距离= euclidDistance(数据(用户1),数据(用户2))
        ((用户1,用户2),距离)
    }
    }userDistanceRdd.foreach(的println)


解决方案

所有的,我建议你从你存储的用户模型列表中,很好地类型类移动第一。然后,我不认为你需要计算相同的用户喜欢(A-A)和(B-B),并且没有理由来计算两倍的距离之间的距离(A-B)(B-A)。

  VAL用户1 =列表(一,1,3,2,6,9)
  VAL用户2 =列表(B,1,2,2,5,9)  案例类用户(名称:字符串,功能:矢量[双])  对象的用户{
    高清fromlist里(名单:名单[字符串]):用户名单= {匹配
      方案H ::尾=>用户(H,tail.map(_。toDouble).toVector)
    }
  }  高清euclDistance(用户A:用户,用户B:用户)= {
    的println(S比较$ {userA.name}和$ {userB.name})
    VAL subElements属性=(userA.features压缩userB.features){图
      M => (m._1 - m._2)*(m._1 - m._2)
    }
    VAL总结= subElements.sum
    VAL sqRoot =的Math.sqrt(总结)    sqRoot
  }  VAL所有=名单(User.fromList(用户1),User.fromList(用户2))
  VAL用户:RDD [(用户,用户)] = sc.parallelize(all.combinations(2).toSeq.map {
    案例升:: - [R ::无=> (L,R)
  })  users.foreach(T => euclDistance(t._1,t._2))

Below code calculates eucleudian distance between two List in a dataset :

 val user1 = List("a", "1", "3", "2", "6", "9")  //> user1  : List[String] = List(a, 1, 3, 2, 6, 9)
  val user2 = List("b", "1", "2", "2", "5", "9")  //> user2  : List[String] = List(b, 1, 2, 2, 5, 9)

  val all = List(user1, user2)                    //> all  : List[List[String]] = List(List(a, 1, 3, 2, 6, 9), List(b, 1, 2, 2, 5,
                                                  //|  9))



  def euclDistance(userA: List[String], userB: List[String]) = {
    println("comparing "+userA(0) +" and "+userB(0))
    val zipped = userA.zip(userB)
    val lastElements = zipped match {
      case (h :: t) => t
    }
    val subElements = lastElements.map(m => ((m._1.toDouble - m._2.toDouble) * (m._1.toDouble - m._2.toDouble)))
    val summed = subElements.sum
    val sqRoot = Math.sqrt(summed)

    sqRoot
  }                                               //> euclDistance: (userA: List[String], userB: List[String])Double

  all.map(m => (all.map(m2 => euclDistance(m,m2))))
                                                  //> comparing a and a
                                                  //| comparing a and b
                                                  //| comparing b and a
                                                  //| comparing b and b
                                                  //| res0: List[List[Double]] = List(List(0.0, 1.4142135623730951), List(1.414213
                                                  //| 5623730951, 0.0))

But how can this be translated into parallel Spark Scala operation ?

When I print the contents of distAll :

scala> distAll.foreach(p => p.foreach(println))
14/10/24 23:09:42 INFO SparkContext: Starting job: foreach at <console>:21
14/10/24 23:09:42 INFO DAGScheduler: Got job 2 (foreach at <console>:21) with 4
output partitions (allowLocal=false)
14/10/24 23:09:42 INFO DAGScheduler: Final stage: Stage 2(foreach at <console>:2
1)
14/10/24 23:09:42 INFO DAGScheduler: Parents of final stage: List()
14/10/24 23:09:42 INFO DAGScheduler: Missing parents: List()
14/10/24 23:09:42 INFO DAGScheduler: Submitting Stage 2 (ParallelCollectionRDD[1
] at parallelize at <console>:18), which has no missing parents
14/10/24 23:09:42 INFO MemoryStore: ensureFreeSpace(1152) called with curMem=115
2, maxMem=278019440
14/10/24 23:09:42 INFO MemoryStore: Block broadcast_2 stored as values in memory
 (estimated size 1152.0 B, free 265.1 MB)
14/10/24 23:09:42 INFO DAGScheduler: Submitting 4 missing tasks from Stage 2 (Pa
rallelCollectionRDD[1] at parallelize at <console>:18)
14/10/24 23:09:42 INFO TaskSchedulerImpl: Adding task set 2.0 with 4 tasks
14/10/24 23:09:42 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 8, lo
calhost, PROCESS_LOCAL, 1169 bytes)
14/10/24 23:09:42 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 9, lo
calhost, PROCESS_LOCAL, 1419 bytes)
14/10/24 23:09:42 INFO TaskSetManager: Starting task 2.0 in stage 2.0 (TID 10, l
ocalhost, PROCESS_LOCAL, 1169 bytes)
14/10/24 23:09:42 INFO TaskSetManager: Starting task 3.0 in stage 2.0 (TID 11, l
ocalhost, PROCESS_LOCAL, 1420 bytes)
14/10/24 23:09:42 INFO Executor: Running task 0.0 in stage 2.0 (TID 8)
14/10/24 23:09:42 INFO Executor: Running task 1.0 in stage 2.0 (TID 9)
14/10/24 23:09:42 INFO Executor: Running task 3.0 in stage 2.0 (TID 11)
a14/10/24 23:09:42 INFO Executor: Running task 2.0 in stage 2.0 (TID 10)

14/10/24 23:09:42 INFO Executor: Finished task 2.0 in stage 2.0 (TID 10). 585 by
tes result sent to driver
114/10/24 23:09:42 INFO TaskSetManager: Finished task 2.0 in stage 2.0 (TID 10)
in 16 ms on localhost (1/4)

314/10/24 23:09:42 INFO Executor: Finished task 0.0 in stage 2.0 (TID 8). 585 by
tes result sent to driver

214/10/24 23:09:42 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 8) i
n 16 ms on localhost (2/4)

6
9
14/10/24 23:09:42 INFO Executor: Finished task 1.0 in stage 2.0 (TID 9). 585 byt
es result sent to driver
b14/10/24 23:09:42 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 9) i
n 16 ms on localhost (3/4)

1
2
2
5
9
14/10/24 23:09:42 INFO Executor: Finished task 3.0 in stage 2.0 (TID 11). 585 by
tes result sent to driver
14/10/24 23:09:42 INFO TaskSetManager: Finished task 3.0 in stage 2.0 (TID 11) i
n 31 ms on localhost (4/4)
14/10/24 23:09:42 INFO DAGScheduler: Stage 2 (foreach at <console>:21) finished
in 0.031 s
14/10/24 23:09:42 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have
all completed, from pool
14/10/24 23:09:42 INFO SparkContext: Job finished: foreach at <console>:21, took
 0.037641021 s

The distances are not populated ?

Update :

To get Eugene Zhulenev answer below to work for me I required to make following changes :

extend UserObject with java.io.Serializable

also rename User to UserObject.

Here is updated code :

val user1 = List("a", "1", "3", "2", "6", "9")    //> user1  : List[String] = List(a, 1, 3, 2, 6, 9)
  val user2 = List("b", "1", "2", "2", "5", "9")  //> user2  : List[String] = List(b, 1, 2, 2, 5, 9)

  case class User(name: String, features: Vector[Double])

object UserObject extends java.io.Serializable {
    def fromList(list: List[String]): User = list match {
      case h :: tail => User(h, tail.map(_.toDouble).toVector)
    }
  }

 val all = List(UserObject.fromList(user1), UserObject.fromList(user2))


    val users= sc.parallelize(all.combinations(2).toSeq.map {
    case l :: r :: Nil => (l, r)
  })

   def euclDistance(userA: User, userB: User) = {
    println(s"comparing ${userA.name} and ${userB.name}")
    val subElements = (userA.features zip userB.features) map {
      m => (m._1 - m._2) * (m._1 - m._2)
    }
    val summed = subElements.sum
    val sqRoot = Math.sqrt(summed)

println("value is"+sqRoot)
    sqRoot
  }

  users.foreach(t => euclDistance(t._1, t._2))

Update 2 :

I've tried code in maasg answer but receive error :

scala> val userDistanceRdd = usersRdd.map { case (user1, user2) => {
     |         val data = sc.broadcast.value
     |         val distance = euclidDistance(data(user1), data(user2))
     |         ((user1, user2),distance)
     |     }
     |     }
<console>:27: error: missing arguments for method broadcast in class SparkContex
t;
follow this method with `_' if you want to treat it as a partially applied funct
ion
               val data = sc.broadcast.value

Here is the entire code with my amendments :

type UserId = String
type UserData = Array[Double]

val users: List[UserId]= List("a" , "b")
val data: Map[UserId,UserData] = Map( ("a" , Array(3.0,4.0)),
("b" , Array(3.0,4.0)) )

def combinations[T](l: List[T]): List[(T,T)] = l match {
    case Nil => Nil
    case h::Nil => Nil
    case h::t => t.map(x=>(h,x)) ++ combinations(t)
}
val broadcastData = sc.broadcast(data)

val usersRdd = sc.parallelize(combinations(users))

val euclidDistance: (UserData, UserData) => Double = (x,y) => 
    math.sqrt((x zip y).map{case (a,b) => math.pow(a-b,2)}.sum)
val userDistanceRdd = usersRdd.map { case (user1, user2) => {
        val data = sc.broadcast.value
        val distance = euclidDistance(data(user1), data(user2))
        ((user1, user2),distance)
    }
    }

For maasg code to work I needed to add } to userDistanceRdd function.

Code :

type UserId = String
type UserData = Array[Double]

val users: List[UserId] = List("a" , "b")

val data: Map[UserId,UserData] = Map( ("a" , Array(3.0,4.0)),
("b" , Array(3.0,3.0)) )

def combinations[T](l: List[T]): List[(T,T)] = l match {
    case Nil => Nil
    case h::Nil => Nil
    case h::t => t.map(x=>(h,x)) ++ combinations(t)
}

val broadcastData = sc.broadcast(data)
val usersRdd = sc.parallelize(combinations(users))
val euclidDistance: (UserData, UserData) => Double = (x,y) => 
    math.sqrt((x zip y).map{case (a,b) => math.pow(a-b,2)}.sum)
val userDistanceRdd = usersRdd.map{ case (user1, user2) => {
        val data = broadcastData.value
        val distance = euclidDistance(data(user1), data(user2))
        ((user1, user2),distance)
    }
    }

userDistanceRdd.foreach(println)

解决方案

First of all I suggest you to move from storing you user model in list, to well typed class. And then I don't think you need to compute distance between the same users like (a-a) and (b-b), and no reason to compute distance twice (a-b) (b-a).

  val user1 = List("a", "1", "3", "2", "6", "9")
  val user2 = List("b", "1", "2", "2", "5", "9")

  case class User(name: String, features: Vector[Double])

  object User {
    def fromList(list: List[String]): User = list match {
      case h :: tail => User(h, tail.map(_.toDouble).toVector)
    }
  }

  def euclDistance(userA: User, userB: User) = {
    println(s"comparing ${userA.name} and ${userB.name}")
    val subElements = (userA.features zip userB.features) map {
      m => (m._1 - m._2) * (m._1 - m._2)
    }
    val summed = subElements.sum
    val sqRoot = Math.sqrt(summed)

    sqRoot
  }

  val all = List(User.fromList(user1), User.fromList(user2))


  val users: RDD[(User, User)] = sc.parallelize(all.combinations(2).toSeq.map {
    case l :: r :: Nil => (l, r)
  })

  users.foreach(t => euclDistance(t._1, t._2))

这篇关于如何改造斯卡拉嵌套地图操作斯卡拉星火操作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆