Apache Spark 中的案例类相等 [英] Case class equality in Apache Spark

查看:33
本文介绍了Apache Spark 中的案例类相等的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

为什么 Spark 中的模式匹配与 Scala 中的模式匹配不同?请参见下面的示例...函数 f() 尝试对类进行模式匹配,这在 Scala REPL 中有效,但在 Spark 中失败并导致所有???".f2() 是一种使用 .isInstanceOf() 在 Spark 中获得所需结果的解决方法,但我知道这在 Scala 中是不好的形式.

Why does pattern matching in Spark not work the same as in Scala? See example below... function f() tries to pattern match on class, which works in the Scala REPL but fails in Spark and results in all "???". f2() is a workaround that gets the desired result in Spark using .isInstanceOf(), but I understand that to be bad form in Scala.

对于在 Spark 中以正确方式匹配模式的任何帮助,我们将不胜感激.

Any help on pattern matching the correct way in this scenario in Spark would be greatly appreciated.

abstract class a extends Serializable {val a: Int}
case class b(a: Int) extends a 
case class bNull(a: Int=0) extends a 

val x: List[a] = List(b(0), b(1), bNull())
val xRdd = sc.parallelize(x)

尝试在 Scala REPL 中有效但在 Spark 中失败的模式匹配

attempt at pattern matching which works in Scala REPL but fails in Spark

def f(x: a) = x match {
    case b(n) => "b"
    case bNull(n) => "bnull"
    case _ => "???"
}

在 Spark 中起作用的解决方法,但形式不佳(我认为)

workaround that functions in Spark, but is bad form (I think)

def f2(x: a) = {
    if (x.isInstanceOf[b]) {
        "b"
    } else if (x.isInstanceOf[bNull]) {
        "bnull"
    } else {
        "???"
    }
}

查看结果

xRdd.map(f).collect                   //does not work in Spark
                                      // result: Array("???", "???", "???")
xRdd.map(f2).collect                  // works in Spark
                                      // resut: Array("b", "b", "bnull")
x.map(f(_))                           // works in Scala REPL    
                                      // result: List("b", "b", "bnull")

使用的版本...Spark 结果在 spark-shell 中运行(AWS EMR-4.3 上的 Spark 1.6)SBT 0.13.9 (Scala 2.10.5) 中的 Scala REPL

Versions used... Spark results run in spark-shell (Spark 1.6 on AWS EMR-4.3) Scala REPL in SBT 0.13.9 (Scala 2.10.5)

推荐答案

这是 Spark REPL 的一个已知问题.您可以在 SPARK-2620 中找到更多详细信息.它影响 Spark REPL 中的多个操作,包括 PairwiseRDDs 上的大多数转换.例如:

This is a known issue with Spark REPL. You can find more details in SPARK-2620. It affects multiple operations in Spark REPL including most of transformations on the PairwiseRDDs. For example:

case class Foo(x: Int)

val foos = Seq(Foo(1), Foo(1), Foo(2), Foo(2))
foos.distinct.size
// Int = 2

val foosRdd = sc.parallelize(foos, 4)
foosRdd.distinct.count
// Long = 4  

foosRdd.map((_, 1)).reduceByKey(_ + _).collect
// Array[(Foo, Int)] = Array((Foo(1),1), (Foo(1),1), (Foo(2),1), (Foo(2),1))

foosRdd.first == foos.head
// Boolean = false

Foo.unapply(foosRdd.first) == Foo.unapply(foos.head)
// Boolean = true

更糟糕的是,结果取决于数据分布:

What makes it even worse is that the results depend on the data distribution:

sc.parallelize(foos, 1).distinct.count
// Long = 2

sc.parallelize(foos, 1).map((_, 1)).reduceByKey(_ + _).collect
// Array[(Foo, Int)] = Array((Foo(2),2), (Foo(1),2))

您可以做的最简单的事情是在 REPL 之外定义和打包所需的案例类.任何直接使用 spark-submit 提交的代码也应该可以正常工作.

The simplest thing you can do is to define and package required case classes outside REPL. Any code submitted directly using spark-submit should work as well.

在 Scala 2.11+ 中,您可以使用 paste -raw 直接在 REPL 中创建包.

In Scala 2.11+ you can create a package directly in the REPL with paste -raw.

scala> :paste -raw
// Entering paste mode (ctrl-D to finish)

package bar

case class Bar(x: Int)


// Exiting paste mode, now interpreting.

scala> import bar.Bar
import bar.Bar

scala> sc.parallelize(Seq(Bar(1), Bar(1), Bar(2), Bar(2))).distinct.collect
res1: Array[bar.Bar] = Array(Bar(1), Bar(2))

这篇关于Apache Spark 中的案例类相等的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆