使用 Spark 数据集在 Scala 中执行类型化连接 [英] Perform a typed join in Scala with Spark Datasets

查看:22
本文介绍了使用 Spark 数据集在 Scala 中执行类型化连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我喜欢 Spark 数据集,因为它们在编译时给我分析错误和语法错误,还允许我使用 getter 而不是硬编码的名称/数字.大多数计算都可以使用 Dataset 的高级 API 来完成.例如,通过访问 Dataset 类型的对象来执行 agg、select、sum、avg、map、filter 或 groupBy 操作要比使用 RDD 行的数据字段简单得多.

I like Spark Datasets as they give me analysis errors and syntax errors at compile time and also allow me to work with getters instead of hard-coded names/numbers. Most computations can be accomplished with Dataset’s high-level APIs. For example, it’s much simpler to perform agg, select, sum, avg, map, filter, or groupBy operations by accessing a Dataset typed object’s than using RDD rows’ data fields.

但是这里缺少连接操作,我读到我可以像这样进行连接

However the join operation is missing from this, I read that I can do a join like this

ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")

但这不是我想要的,因为我更喜欢通过案例类接口来做,所以更像这样

But that is not what I want as I would prefer to do it via the case class interface, so something more like this

ds1.joinWith(ds2, ds1.key === ds2.key, "inner")

目前最好的选择似乎是在 case 类旁边创建一个对象,并提供此函数来为我提供正确的列名作为字符串.所以我会使用第一行代码,但放置一个函数而不是一个硬编码的列名.但这感觉不够优雅..

The best alternative for now seems to create an object next to the case class and give this functions to provide me with the right column name as a String. So I would use the first line of code but put a function instead of a hard-coded column name. But that doesn't feel elegant enough..

有人可以在这里为我提供其他选项的建议吗?目标是从实际的列名中抽象出来,并且最好通过 case 类的 getter 来工作.

Can someone advise me on other options here? The goal is to have an abstraction from the actual column names and work preferably via the getters of the case class.

我使用的是 Spark 1.6.1 和 Scala 2.10

I'm using Spark 1.6.1 and Scala 2.10

推荐答案

观察

Spark SQL 只有在连接条件基于相等运算符时才能优化连接.这意味着我们可以分别考虑 equijoins 和 non-equijoins.

Observation

Spark SQL can optimize join only if join condition is based on the equality operator. This means we can consider equijoins and non-equijoins separately.

Equijoin 可以通过将 Datasets 映射到 (key, value) 元组、基于键执行连接并重新调整结果来以类型安全的方式实现:

Equijoin can be implemented in a type safe manner by mapping both Datasets to (key, value) tuples, performing join based on keys, and reshaping the result:

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Dataset

def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U])
    (f: T => K, g: U => K)
    (implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = {
  val ds1_ = ds1.map(x => (f(x), x))
  val ds2_ = ds2.map(x => (g(x), x))
  ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
}

非对等连接

可以使用关系代数运算符表示为 R ⋈θ S = σθ(R × S) 并直接转换为代码.

Non-equijoin

Can be expressed using relational algebra operators as R ⋈θ S = σθ(R × S) and converted directly to code.

启用 crossJoin 并使用 joinWith 与平凡相等的谓词:

Enable crossJoin and use joinWith with trivially equal predicate:

spark.conf.set("spark.sql.crossJoin.enabled", true)

def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
                         (p: (T, U) => Boolean) = {
  ds1.joinWith(ds2, lit(true)).filter(p.tupled)
}

火花 2.1

使用crossJoin方法:

def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
    (p: (T, U) => Boolean)
    (implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = {
  ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled)
}

示例

case class LabeledPoint(label: String, x: Double, y: Double)
case class Category(id: Long, name: String)

val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS
val points2 = Seq(
  LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0)
).toDS
val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS

safeEquiJoin(points1, categories)(_.label, _.name)
safeNonEquiJoin(points1, points2)(_.x > _.x)

注意事项

  • 应该注意的是,这些方法与直接的 joinWith 应用程序在性质上不同,并且需要昂贵的 DeserializeToObject/SerializeFromObject 转换(与直接 joinWith 可以对数据使用逻辑运算相比).

    Notes

    • It should be noted that these methods are qualtiatively differnt from a direct joinWith application and require expensive DeserializeToObject / SerializeFromObject transformations (compared to that direct joinWith can use logical operations on the data).

      这类似于Spark 2.0 Dataset vs DataFrame中描述的行为.

      This is similar to the behavior described in Spark 2.0 Dataset vs DataFrame.

      如果您不限于 Spark SQL API framelessDatasets 提供了有趣的类型安全扩展(截至今天它仅支持 Spark 2.0):

      If you're not limited to the Spark SQL API frameless provides interesting type safe extensions for Datasets (as of today its supports only Spark 2.0):

      import frameless.TypedDataset
      
      val typedPoints1 = TypedDataset.create(points1)
      val typedPoints2 = TypedDataset.create(points2)
      
      typedPoints1.join(typedPoints2, typedPoints1('x), typedPoints2('x))
      

    • Dataset API 在 1.6 中不稳定,所以我认为在那里使用它没有意义.

    • Dataset API is not stable in 1.6 so I don't think it makes sense to use it there.

      当然,这种设计和描述性名称不是必需的.您可以轻松地使用类型类将此方法隐式添加到 Dataset 并且与内置签名没有冲突,因此两者都可以称为 joinWith.

      Of course this design and descriptive names are not necessary. You can easily use type class to add this methods implicitly to Dataset an there is no conflict with built in signatures so both can be called joinWith.

      这篇关于使用 Spark 数据集在 Scala 中执行类型化连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆