如何使用密钥进行无限阵列加入? [英] How to unnest array with keys to join on afterwards?
问题描述
我有两张表,分别是 table1
和 table2
。 table1
很大,而 table2
很小。另外,我有一个UDF函数,它的接口定义如下:
- table1--
id
1
2
3
- 表2--
类别
a
b
c
d
e
f
g
UDF:foo(id:Int):List [String]
我打算首先调用UDF以获得相应的类别: foo(table1.id)
,它将返回一个WrappedArray,然后我想要加入 table2
中的每个类别
来做更多的操作。预期结果应该如下所示:
- view--
id,category
1,a
1,c
1,d
2,b
2,c
3,e
3,f
3,g
我尝试在Hive中找到一个不太合适的方法,但没有运气,帮帮我?谢谢!
我相信你要使用 explode
function 或Dataset's flatMap
operator 。
explode
函数为每个元素在给定的数组或map列中。
flatMap
运算符返回一个新的数据集,方法是首先对所有
执行完UDF foo(id:Int):List [String]
你最终会得到一个数据集
,并且列的类型为 array
。
val fooUDF = udf {id:Int => ('a'to('a'.toInt + id).toChar).map(_。toString)}
// table1 with fooUDF applied
val table1 = spark.range( 3).withColumn(foo,fooUDF('id))
scala> table1.show
+ --- + --------- +
| ID | FOO |
+ --- + --------- +
| 0 | [A] |
| 1 | [a,b] |
| 2 | [a,b,c] |
+ --- + --------- +
scala> table1.printSchema
root $ b $ - | id:long(nullable = false)
| - foo:array(nullable = true)
| | - element:string(containsNull = true)
scala> table1.withColumn(fooExploded,explode($foo))。show
+ --- + --------- + ----------- +
| ID |富| fooExploded |
+ --- + --------- + ----------- +
| 0 | [A] | A |
| 1 | [a,b] | A |
| 1 | [a,b] | C |
| 2 | [a,b,c] | A |
| 2 | [a,b,c] | C |
| 2 | [a,b,c] | ç|
+ --- + --------- + ----------- +
因此,加入
应该很容易。
I have two tables, namely table1
and table2
. table1
is big, whereas table2
is small. Also, I have a UDF function whose interface is defined as below:
--table1--
id
1
2
3
--table2--
category
a
b
c
d
e
f
g
UDF: foo(id: Int): List[String]
I intend to call UDF firstly to get the corresponding categories: foo(table1.id)
, which will return a WrappedArray, then I want to join every category
in table2
to do some more manipulation. The expected result should look like this:
--view--
id,category
1,a
1,c
1,d
2,b
2,c
3,e
3,f
3,g
I try to find a unnest method in Hive, but with no luck, could anyone help me out? Thanks!
I believe that you want to use explode
function or Dataset's flatMap
operator.
explode
function creates a new row for each element in the given array or map column.
flatMap
operator returns a new Dataset by first applying a function to all elements of this Dataset, and then flattening the results.
After you execute your UDF foo(id: Int): List[String]
you'll end up with a Dataset
with the column of type array
.
val fooUDF = udf { id: Int => ('a' to ('a'.toInt + id).toChar).map(_.toString) }
// table1 with fooUDF applied
val table1 = spark.range(3).withColumn("foo", fooUDF('id))
scala> table1.show
+---+---------+
| id| foo|
+---+---------+
| 0| [a]|
| 1| [a, b]|
| 2|[a, b, c]|
+---+---------+
scala> table1.printSchema
root
|-- id: long (nullable = false)
|-- foo: array (nullable = true)
| |-- element: string (containsNull = true)
scala> table1.withColumn("fooExploded", explode($"foo")).show
+---+---------+-----------+
| id| foo|fooExploded|
+---+---------+-----------+
| 0| [a]| a|
| 1| [a, b]| a|
| 1| [a, b]| b|
| 2|[a, b, c]| a|
| 2|[a, b, c]| b|
| 2|[a, b, c]| c|
+---+---------+-----------+
With that, join
should be quite easy.
这篇关于如何使用密钥进行无限阵列加入?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!