在Scala中使用Spark数据集执行类型化联接 [英] Perform a typed join in Scala with Spark Datasets

查看:127
本文介绍了在Scala中使用Spark数据集执行类型化联接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我喜欢Spark数据集,因为它们在编译时为我提供了分析错误和语法错误,还使我可以使用getter而不是硬编码的名称/数字.大多数计算都可以使用Dataset的高级API来完成.例如,与使用RDD行的数据字段相比,通过访问数据集类型的对象来执行 agg,选择,求和,平均,映射,过滤或groupBy 操作要简单得多.

I like Spark Datasets as they give me analysis errors and syntax errors at compile time and also allow me to work with getters instead of hard-coded names/numbers. Most computations can be accomplished with Dataset’s high-level APIs. For example, it’s much simpler to perform agg, select, sum, avg, map, filter, or groupBy operations by accessing a Dataset typed object’s than using RDD rows’ data fields.

但是,缺少联接操作,我读到我可以像这样进行联接

However the join operation is missing from this, I read that I can do a join like this

ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")

但这不是我想要的,因为我更愿意通过case类接口进行操作,所以更像这样

But that is not what I want as I would prefer to do it via the case class interface, so something more like this

ds1.joinWith(ds2, ds1.key === ds2.key, "inner")

目前最好的替代方法似乎是在case类旁边创建一个对象,并提供此功能以向我提供正确的列名作为String.因此,我将使用第一行代码,但放置一个函数而不是一个硬编码的列名.但这还不够优雅..

The best alternative for now seems to create an object next to the case class and give this functions to provide me with the right column name as a String. So I would use the first line of code but put a function instead of a hard-coded column name. But that doesn't feel elegant enough..

有人可以在这里为我提供其他建议吗?目的是从实际的列名中提取一个抽象,并最好通过case类的getter进行工作.

Can someone advise me on other options here? The goal is to have an abstraction from the actual column names and work preferably via the getters of the case class.

我正在使用Spark 1.6.1和Scala 2.10

I'm using Spark 1.6.1 and Scala 2.10

推荐答案

观察

Spark SQL仅在联接条件基于相等运算符的情况下才能优化联接.这意味着我们可以分别考虑等联接和非等联接.

Observation

Spark SQL can optimize join only if join condition is based on the equality operator. This means we can consider equijoins and non-equijoins separately.

通过将Datasets都映射到(键,值)元组,基于键执行联接并重整结果,可以以类型安全的方式实现等参:

Equijoin can be implemented in a type safe manner by mapping both Datasets to (key, value) tuples, performing join based on keys, and reshaping the result:

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Dataset

def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U])
    (f: T => K, g: U => K)
    (implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = {
  val ds1_ = ds1.map(x => (f(x), x))
  val ds2_ = ds2.map(x => (g(x), x))
  ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
}

非等额竞猜

可以使用关系代数算子表示为R⋈θS =σθ(R×S),并直接转换为代码.

Non-equijoin

Can be expressed using relational algebra operators as R ⋈θ S = σθ(R × S) and converted directly to code.

启用crossJoin并将joinWith与琐碎相等的谓词一起使用:

Enable crossJoin and use joinWith with trivially equal predicate:

spark.conf.set("spark.sql.crossJoin.enabled", true)

def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
                         (p: (T, U) => Boolean) = {
  ds1.joinWith(ds2, lit(true)).filter(p.tupled)
}

Spark 2.1

使用crossJoin方法:

def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
    (p: (T, U) => Boolean)
    (implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = {
  ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled)
}

示例

case class LabeledPoint(label: String, x: Double, y: Double)
case class Category(id: Long, name: String)

val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS
val points2 = Seq(
  LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0)
).toDS
val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS

safeEquiJoin(points1, categories)(_.label, _.name)
safeNonEquiJoin(points1, points2)(_.x > _.x)

注释

  • 应注意,这些方法在质量上与直接joinWith应用程序不同,并且需要昂贵的DeserializeToObject/SerializeFromObject转换(相比之下,直接joinWith可以对数据使用逻辑运算) .

    Notes

    • It should be noted that these methods are qualtiatively differnt from a direct joinWith application and require expensive DeserializeToObject / SerializeFromObject transformations (compared to that direct joinWith can use logical operations on the data).

      这类似于 Spark 2.0 Dataset vs DataFrame 中描述的行为.

      This is similar to the behavior described in Spark 2.0 Dataset vs DataFrame.

      如果您不限于Spark SQL API, frameless 提供了有趣的类型Datasets的安全扩展(到目前为止,它仅支持Spark 2.0):

      If you're not limited to the Spark SQL API frameless provides interesting type safe extensions for Datasets (as of today its supports only Spark 2.0):

      import frameless.TypedDataset
      
      val typedPoints1 = TypedDataset.create(points1)
      val typedPoints2 = TypedDataset.create(points2)
      
      typedPoints1.join(typedPoints2, typedPoints1('x), typedPoints2('x))
      

    • Dataset API在1.6中不稳定,因此我认为在此使用它并不合理.

    • Dataset API is not stable in 1.6 so I don't think it makes sense to use it there.

      当然不需要此设计和描述性名称.您可以轻松地使用类型类将此方法隐式添加到Dataset,并且与内置签名没有冲突,因此两者都可以称为joinWith.

      Of course this design and descriptive names are not necessary. You can easily use type class to add this methods implicitly to Dataset an there is no conflict with built in signatures so both can be called joinWith.

      这篇关于在Scala中使用Spark数据集执行类型化联接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆