使用 Spark 数据集在 Scala 中执行类型化连接 [英] Perform a typed join in Scala with Spark Datasets
问题描述
我喜欢 Spark 数据集,因为它们在编译时给我分析错误和语法错误,还允许我使用 getter 而不是硬编码的名称/数字.大多数计算都可以使用 Dataset 的高级 API 来完成.例如,通过访问 Dataset 类型的对象来执行 agg、select、sum、avg、map、filter 或 groupBy 操作要比使用 RDD 行的数据字段简单得多.
I like Spark Datasets as they give me analysis errors and syntax errors at compile time and also allow me to work with getters instead of hard-coded names/numbers. Most computations can be accomplished with Dataset’s high-level APIs. For example, it’s much simpler to perform agg, select, sum, avg, map, filter, or groupBy operations by accessing a Dataset typed object’s than using RDD rows’ data fields.
但是这里缺少连接操作,我读到我可以像这样进行连接
However the join operation is missing from this, I read that I can do a join like this
ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")
但这不是我想要的,因为我更喜欢通过案例类接口来做,所以更像这样
But that is not what I want as I would prefer to do it via the case class interface, so something more like this
ds1.joinWith(ds2, ds1.key === ds2.key, "inner")
目前最好的选择似乎是在 case 类旁边创建一个对象,并提供此函数来为我提供正确的列名作为字符串.所以我会使用第一行代码,但放置一个函数而不是一个硬编码的列名.但这感觉不够优雅..
The best alternative for now seems to create an object next to the case class and give this functions to provide me with the right column name as a String. So I would use the first line of code but put a function instead of a hard-coded column name. But that doesn't feel elegant enough..
有人可以在这里为我提供其他选项的建议吗?目标是从实际的列名中抽象出来,并且最好通过 case 类的 getter 来工作.
Can someone advise me on other options here? The goal is to have an abstraction from the actual column names and work preferably via the getters of the case class.
我使用的是 Spark 1.6.1 和 Scala 2.10
I'm using Spark 1.6.1 and Scala 2.10
推荐答案
观察
Spark SQL 只有在连接条件基于相等运算符时才能优化连接.这意味着我们可以分别考虑 equijoins 和 non-equijoins.
Observation
Spark SQL can optimize join only if join condition is based on the equality operator. This means we can consider equijoins and non-equijoins separately.
Equijoin 可以通过将 Datasets
映射到 (key, value) 元组、基于键执行连接并重新调整结果来以类型安全的方式实现:
Equijoin can be implemented in a type safe manner by mapping both Datasets
to (key, value) tuples, performing join based on keys, and reshaping the result:
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Dataset
def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U])
(f: T => K, g: U => K)
(implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = {
val ds1_ = ds1.map(x => (f(x), x))
val ds2_ = ds2.map(x => (g(x), x))
ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
}
非对等连接
可以使用关系代数运算符表示为 R ⋈θ S = σθ(R × S) 并直接转换为代码.
Non-equijoin
Can be expressed using relational algebra operators as R ⋈θ S = σθ(R × S) and converted directly to code.
启用 crossJoin
并使用 joinWith
与平凡相等的谓词:
Enable crossJoin
and use joinWith
with trivially equal predicate:
spark.conf.set("spark.sql.crossJoin.enabled", true)
def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
(p: (T, U) => Boolean) = {
ds1.joinWith(ds2, lit(true)).filter(p.tupled)
}
火花 2.1
使用crossJoin
方法:
def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
(p: (T, U) => Boolean)
(implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = {
ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled)
}
示例
case class LabeledPoint(label: String, x: Double, y: Double)
case class Category(id: Long, name: String)
val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS
val points2 = Seq(
LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0)
).toDS
val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS
safeEquiJoin(points1, categories)(_.label, _.name)
safeNonEquiJoin(points1, points2)(_.x > _.x)
注意事项
应该注意的是,这些方法与直接的
joinWith
应用程序在性质上不同,并且需要昂贵的DeserializeToObject
/SerializeFromObject
转换(与直接joinWith
可以对数据使用逻辑运算相比).Notes
It should be noted that these methods are qualtiatively differnt from a direct
joinWith
application and require expensiveDeserializeToObject
/SerializeFromObject
transformations (compared to that directjoinWith
can use logical operations on the data).这类似于Spark 2.0 Dataset vs DataFrame中描述的行为.
This is similar to the behavior described in Spark 2.0 Dataset vs DataFrame.
如果您不限于 Spark SQL API
frameless
为Datasets
提供了有趣的类型安全扩展(截至今天它仅支持 Spark 2.0):If you're not limited to the Spark SQL API
frameless
provides interesting type safe extensions forDatasets
(as of today its supports only Spark 2.0):import frameless.TypedDataset val typedPoints1 = TypedDataset.create(points1) val typedPoints2 = TypedDataset.create(points2) typedPoints1.join(typedPoints2, typedPoints1('x), typedPoints2('x))
Dataset
API 在 1.6 中不稳定,所以我认为在那里使用它没有意义.Dataset
API is not stable in 1.6 so I don't think it makes sense to use it there.当然,这种设计和描述性名称不是必需的.您可以轻松地使用类型类将此方法隐式添加到
Dataset
并且与内置签名没有冲突,因此两者都可以称为joinWith
.Of course this design and descriptive names are not necessary. You can easily use type class to add this methods implicitly to
Dataset
an there is no conflict with built in signatures so both can be calledjoinWith
.这篇关于使用 Spark 数据集在 Scala 中执行类型化连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!