如何使用密钥进行无限阵列加入? [英] How to unnest array with keys to join on afterwards?

查看:143
本文介绍了如何使用密钥进行无限阵列加入?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两张表,分别是 table1 table2 table1 很大,而 table2 很小。另外,我有一个UDF函数,它的接口定义如下:

   -  table1-- 
id
1
2
3

- 表2--
类别
a
b
c
d
e
f
g

UDF:foo(id:Int):List [String]

我打算首先调用UDF以获得相应的类别: foo(table1.id),它将返回一个WrappedArray,然后我想要加入 table2 中的每个类别来做更多的操作。预期结果应该如下所示:

   -  view-- 

id,category
1,a
1,c
1,d
2,b
2,c
3,e
3,f
3,g

我尝试在Hive中找到一个不太合适的方法,但没有运气,帮帮我?谢谢!

解决方案

我相信你要使用 explode function 或Dataset's flatMap operator

explode 函数为每个元素在给定的数组或map列中。

flatMap 运算符返回一个新的数据集,方法是首先对所有



执行完UDF foo(id:Int):List [String] 你最终会得到一个数据集,并且列的类型为 array

  val fooUDF = udf {id:Int => ('a'to('a'.toInt + id).toChar).map(_。toString)} 

// table1 with fooUDF applied
val table1 = spark.range( 3).withColumn(foo,fooUDF('id))

scala> table1.show
+ --- + --------- +
| ID | FOO |
+ --- + --------- +
| 0 | [A] |
| 1 | [a,b] |
| 2 | [a,b,c] |
+ --- + --------- +

scala> table1.printSchema
root $ b $ - | id:long(nullable = false)
| - foo:array(nullable = true)
| | - element:string(containsNull = true)

scala> table1.withColumn(fooExploded,explode($foo))。show
+ --- + --------- + ----------- +
| ID |富| fooExploded |
+ --- + --------- + ----------- +
| 0 | [A] | A |
| 1 | [a,b] | A |
| 1 | [a,b] | C |
| 2 | [a,b,c] | A |
| 2 | [a,b,c] | C |
| 2 | [a,b,c] | ç|
+ --- + --------- + ----------- +

因此,加入应该很容易。


I have two tables, namely table1 and table2. table1 is big, whereas table2 is small. Also, I have a UDF function whose interface is defined as below:

--table1--
id
1
2
3

--table2--
category
a
b
c
d
e
f
g

UDF: foo(id: Int): List[String]

I intend to call UDF firstly to get the corresponding categories: foo(table1.id), which will return a WrappedArray, then I want to join every category in table2 to do some more manipulation. The expected result should look like this:

--view--

id,category
1,a
1,c
1,d
2,b
2,c
3,e
3,f
3,g

I try to find a unnest method in Hive, but with no luck, could anyone help me out? Thanks!

解决方案

I believe that you want to use explode function or Dataset's flatMap operator.

explode function creates a new row for each element in the given array or map column.

flatMap operator returns a new Dataset by first applying a function to all elements of this Dataset, and then flattening the results.

After you execute your UDF foo(id: Int): List[String] you'll end up with a Dataset with the column of type array.

val fooUDF = udf { id: Int => ('a' to ('a'.toInt + id).toChar).map(_.toString) }

// table1 with fooUDF applied
val table1 = spark.range(3).withColumn("foo", fooUDF('id))

scala> table1.show
+---+---------+
| id|      foo|
+---+---------+
|  0|      [a]|
|  1|   [a, b]|
|  2|[a, b, c]|
+---+---------+

scala> table1.printSchema
root
 |-- id: long (nullable = false)
 |-- foo: array (nullable = true)
 |    |-- element: string (containsNull = true)

scala> table1.withColumn("fooExploded", explode($"foo")).show
+---+---------+-----------+
| id|      foo|fooExploded|
+---+---------+-----------+
|  0|      [a]|          a|
|  1|   [a, b]|          a|
|  1|   [a, b]|          b|
|  2|[a, b, c]|          a|
|  2|[a, b, c]|          b|
|  2|[a, b, c]|          c|
+---+---------+-----------+

With that, join should be quite easy.

这篇关于如何使用密钥进行无限阵列加入?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆