星火RDD相当于Scala集合分区 [英] Spark RDD equivalent to Scala collections partition

查看:143
本文介绍了星火RDD相当于Scala集合分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我工作的火花它似乎并没有引起任何问题中的一个小问题 - 但我每次看到它,并未能拿出一个更好的解决方案时让我很烦。

说我有一个集合斯卡拉像这样的:

  VAL的MyStuff =名单(TRY(2/2),试(2/0))

我可以分割这个名单到成功和失败与分区:

  VAL(成功,失败)= myStuff.partition(_。isSuccess)

这是很好的。分区的实现只遍历源集合曾经建造两个新的集合。但是,使用星火,我已经能够设计出最好相当于是这样的:

  VAL的MyStuff:RDD [尝试[?] = sourceRDD.map(someOperationThatMayFail)
VAL成功:RDD [???] = myStuff.collect {壳体成功(V)=> v}
VAL故障:RDD [Throwable的] = {myStuff.collect失败的情况下(前)=>前}

从拆包尝试(这是罚款)的区别哪个一边也需要遍历数据的两次的。这是烦人。

有没有更好的替代星火可以无需多次遍历拆分RDD?即具有签名是这样的,其中分区有Scala集合分区的行为,而不是RDD分区:

  VAL(成功:RDD [尝试[?],故障:RDD [尝试[?])= myStuff.partition(_ isSuccess)

有关参考,I $ pviously用类似下面的解决该p $。潜在的失败的操作被去串行化,从二进制格式的某些数据,而失败已经变得​​足够有趣的是,它们需要被处理,并保存为RDD而不是一些记录

 高清someOperationThatMayFail(数据:数组[字节]):选项[MyDataType] = {
   尝试{
      有些(反序列化(数据))
   } {抓
      方案E:MyDesrializationError => {
         logger.error(五)
         没有
      }
   }
}


解决方案

有可能是其他的解决方案,但在这里你去:

设置:

 进口scala.util._
VAL的MyStuff =名单(TRY(2/2),试(2/0))
VAL myStuffInSpark = sc.parallelize(的MyStuff)

执行

  VAL myStuffInSparkPartitioned = myStuffInSpark.aggregate((列表[尝试[INT]](),列表[尝试[INT]]())​​)(
  (ACCUM,CURR)=>如果(curr.isSuccess)(CURR :: accum._1,accum._2)其他(accum._1,CURR :: accum._2)
  (第一,第二)=> (first._1 ++ second._1,first._2 ++ second._2))

让我知道如果你需要一个解释

This is a minor issue with one of my spark jobs which doesn't seem to cause any issues -- yet annoys me every time I see it and fail to come up with a better solution.

Say I have a Scala collection like this:

val myStuff = List(Try(2/2), Try(2/0))

I can partition this list into successes and failures with partition:

val (successes, failures) =  myStuff.partition(_.isSuccess)

Which is nice. The implementation of partition only traverses the source collection once to build the two new collections. However, using Spark, the best equivalent I have been able to devise is this:

val myStuff: RDD[Try[???]] = sourceRDD.map(someOperationThatMayFail)
val successes: RDD[???] = myStuff.collect { case Success(v) => v }
val failures: RDD[Throwable] = myStuff.collect { case Failure(ex) => ex }

Which aside from the difference of unpacking the Try (which is fine) also requires traversing the data twice. Which is annoying.

Is there any better Spark alternative that can split an RDD without multiple traversals? i.e. having a signature something like this where partition has the behaviour of Scala collections partition rather than RDD partition:

val (successes: RDD[Try[???]], failures: RDD[Try[???]]) = myStuff.partition(_.isSuccess)

For reference, I previously used something like the below to solve this. The potentially failing operation is de-serializing some data from a binary format, and the failures have become interesting enough that they need to be processed and saved as an RDD rather than something logged.

def someOperationThatMayFail(data: Array[Byte]): Option[MyDataType] = {
   try {
      Some(deserialize(data))
   } catch {
      case e: MyDesrializationError => {
         logger.error(e)
         None
      }
   }
}

解决方案

There might be other solutions, but here you go:

Setup:

import scala.util._
val myStuff = List(Try(2/2), Try(2/0))
val myStuffInSpark = sc.parallelize(myStuff)

Execution:

val myStuffInSparkPartitioned = myStuffInSpark.aggregate((List[Try[Int]](),List[Try[Int]]()))(
  (accum, curr)=>if(curr.isSuccess) (curr :: accum._1,accum._2) else (accum._1, curr :: accum._2), 
  (first, second)=> (first._1 ++ second._1,first._2 ++ second._2))

Let me know if you need an explanation

这篇关于星火RDD相当于Scala集合分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆