Spark 数据集 API - 加入 [英] Spark Dataset API - join

查看:27
本文介绍了Spark 数据集 API - 加入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Spark 数据集API,但我在进行简单连接时遇到了一些问题.

I am trying to use the Spark Dataset API but I am having some issues doing a simple join.

假设我有两个带有字段的数据集:date |value,然后在 DataFrame 的情况下,我的连接看起来像:

Let's say I have two dataset with fields: date | value, then in the case of DataFrame my join would look like:

val dfA : DataFrame
val dfB : DataFrame

dfA.join(dfB, dfB("date") === dfA("date") )

但是对于 Dataset.joinWith 方法,但同样的方法不起作用:

However for Dataset there is the .joinWith method, but the same approach does not work:

val dfA : Dataset
val dfB : Dataset

dfA.joinWith(dfB, ? )

.joinWith 需要什么参数?

推荐答案

要使用 joinWith,您首先必须创建一个 DataSet,而且很可能是其中两个.要创建 DataSet,您需要创建一个与您的架构匹配的案例类并调用 DataFrame.as[T] 其中 T 是您的案例班级.所以:

To use joinWith you first have to create a DataSet, and most likely two of them. To create a DataSet, you need to create a case class that matches your schema and call DataFrame.as[T] where T is your case class. So:

case class KeyValue(key: Int, value: String)
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value")
val ds = df.as[KeyValue]
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]

您也可以跳过案例类并使用元组:

You could also skip the case class and use a tuple:

val tupDs = df.as[(Int,String)]
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]

如果你有另一个案例类/DF,就像这样说:

Then if you had another case class / DF, like this say:

case class Nums(key: Int, num1: Double, num2: Long)
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2")
val ds2 = df2.as[Nums]
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]

那么,虽然joinjoinWith的语法相似,但结果却不同:

Then, while the syntax of join and joinWith are similar, the results are different:

df.join(df2, df.col("key") === df2.col("key")).show
// +---+-----+---+----+----+
// |key|value|key|num1|num2|
// +---+-----+---+----+----+
// |  1| asdf|  1| 7.7| 101|
// |  2|34234|  2| 1.2|  10|
// +---+-----+---+----+----+

ds.joinWith(ds2, df.col("key") === df2.col("key")).show
// +---------+-----------+
// |       _1|         _2|
// +---------+-----------+
// | [1,asdf]|[1,7.7,101]|
// |[2,34234]| [2,1.2,10]|
// +---------+-----------+

如您所见,joinWith 将对象作为元组的一部分保持完整,而 join 将列展平为单个命名空间.(在上述情况下会出现问题,因为列名key"重复了.)

As you can see, joinWith leaves the objects intact as parts of a tuple, while join flattens out the columns into a single namespace. (Which will cause problems in the above case because the column name "key" is repeated.)

奇怪的是,我必须使用df.col("key")df2.col("key")来创建加入的条件dsds2 -- 如果您只在任一侧使用 col("key") 它不起作用,并且 ds.col(...) 不存在.但是,使用原始的 df.col("key") 可以解决问题.

Curiously enough, I have to use df.col("key") and df2.col("key") to create the conditions for joining ds and ds2 -- if you use just col("key") on either side it does not work, and ds.col(...) doesn't exist. Using the original df.col("key") does the trick, however.

这篇关于Spark 数据集 API - 加入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆