Spark Dataset API-加入 [英] Spark Dataset API - join
问题描述
我正在尝试使用Spark 数据集 API,但在进行简单连接时遇到一些问题.
I am trying to use the Spark Dataset API but I am having some issues doing a simple join.
假设我有两个带有字段的数据集:date | value
,那么在DataFrame
的情况下,我的联接如下所示:
Let's say I have two dataset with fields: date | value
, then in the case of DataFrame
my join would look like:
val dfA : DataFrame
val dfB : DataFrame
dfA.join(dfB, dfB("date") === dfA("date") )
但是对于Dataset
,有.joinWith
方法,但是相同的方法不起作用:
However for Dataset
there is the .joinWith
method, but the same approach does not work:
val dfA : Dataset
val dfB : Dataset
dfA.joinWith(dfB, ? )
.joinWith
要求的参数是什么?
推荐答案
要使用joinWith
,您首先必须创建一个DataSet
,并且很可能要创建两个.要创建DataSet
,您需要创建一个与您的模式匹配的案例类,并调用DataFrame.as[T]
,其中T
是您的案例类.所以:
To use joinWith
you first have to create a DataSet
, and most likely two of them. To create a DataSet
, you need to create a case class that matches your schema and call DataFrame.as[T]
where T
is your case class. So:
case class KeyValue(key: Int, value: String)
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value")
val ds = df.as[KeyValue]
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]
您还可以跳过案例类并使用元组:
You could also skip the case class and use a tuple:
val tupDs = df.as[(Int,String)]
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]
然后,如果您还有另一个案例类/DF,例如:
Then if you had another case class / DF, like this say:
case class Nums(key: Int, num1: Double, num2: Long)
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2")
val ds2 = df2.as[Nums]
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]
然后,尽管join
和joinWith
的语法相似,但结果却不同:
Then, while the syntax of join
and joinWith
are similar, the results are different:
df.join(df2, df.col("key") === df2.col("key")).show
// +---+-----+---+----+----+
// |key|value|key|num1|num2|
// +---+-----+---+----+----+
// | 1| asdf| 1| 7.7| 101|
// | 2|34234| 2| 1.2| 10|
// +---+-----+---+----+----+
ds.joinWith(ds2, df.col("key") === df2.col("key")).show
// +---------+-----------+
// | _1| _2|
// +---------+-----------+
// | [1,asdf]|[1,7.7,101]|
// |[2,34234]| [2,1.2,10]|
// +---------+-----------+
如您所见,joinWith
将对象完整保留为元组的一部分,而join
将列扁平化为单个命名空间. (在上述情况下,这会引起问题,因为重复了列名"key".)
As you can see, joinWith
leaves the objects intact as parts of a tuple, while join
flattens out the columns into a single namespace. (Which will cause problems in the above case because the column name "key" is repeated.)
很奇怪,我必须使用df.col("key")
和df2.col("key")
来创建连接ds
和ds2
的条件-如果您在任一侧仅使用col("key")
,则将不起作用,而df.col("key")
可以达到目的.
Curiously enough, I have to use df.col("key")
and df2.col("key")
to create the conditions for joining ds
and ds2
-- if you use just col("key")
on either side it does not work, and ds.col(...)
doesn't exist. Using the original df.col("key")
does the trick, however.
这篇关于Spark Dataset API-加入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!