spark 高阶函数变换输出结构 [英] spark higher order function transform output struct

查看:32
本文介绍了spark 高阶函数变换输出结构的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何使用 spark 高阶函数将结构数组转换再次转换为结构?

数据集:

case class Foo(thing1:String, thing2:String, thing3:String)案例类 Baz(foo:Foo, other:String)case class Bar(id:Int, bazes:Seq[Baz])导入 spark.implicits._val df = Seq(Bar(1, Seq(Baz(Foo("first", "second", "third"), "other"), Baz(Foo("1", "2", "3"),"其他")))).toDFdf.printSchemadf.show(false)

我想连接所有 thing1, thign2, thing3 但保留每个 barother 属性.

一个简单的:

scala>df.withColumn("cleaned", expr("transform(bazes, x -> x)")).printSchema根|-- id: 整数(可为空 = false)|-- bazes: 数组 (nullable = true)||-- 元素: struct (containsNull = true)|||-- foo: struct (nullable = true)||||-- thing1: string (nullable = true)||||-- thing2: string (nullable = true)||||-- thing3: 字符串 (nullable = true)|||-- 其他:字符串(可为空 = 真)|-- 已清理:数组(可为空 = 真)||-- 元素: struct (containsNull = true)|||-- foo: struct (nullable = true)||||-- thing1: string (nullable = true)||||-- thing2: string (nullable = true)||||-- thing3: 字符串 (nullable = true)|||-- 其他:字符串(可为空 = 真)

只会把东西复制过来.

所需的连接操作:

 df.withColumn("cleaned", expr("transform(bazes, x -> concat(x.foo.thing1, '::', x.foo.thing2, '::', x.foo.thing3))")).printSchema

不幸的是,将从 other 列中删除所有值:

 +---+---------------------------------------------------+------------------------------+|id |bazes |已清理 |+---+------------------------------------------------+----------------------------------------------+|1 |[[[第一, 第二, 第三], 其他], [[1, 2, 3], 其他]]|[第一::第二::第三, 1::2::3]|+---+------------------------------------------------+----------------------------------------------+

如何保留这些?试图保留元组:

df.withColumn("cleaned", expr("transform(bazes, x -> (concat(x.foo.thing1, '::', x.foo.thing2, '::',x.foo.thing3), x.other))")).printSchema

失败:

.AnalysisException: 无法解析 'named_struct('col1', concat(namedlambdavariable().`foo`.`thing1`, '::', namedlambdavariable().`foo`.`thing2`, '::', namedlambdavariable().`foo`.`thing3`), NamePlaceholder(), namedlambdavariable().`other`)' 由于数据类型不匹配:只允许折叠字符串表达式出现在奇数位置,得到:名称占位符;第 1 行 pos 22;

编辑

所需的输出:

  • 一个包含内容的新列:

    [[[first::second::third, other], [1::2::3,else]

保留列other

解决方案

通过这种方式,您可以实现您想要的输出.您不能直接访问其他值 bcoz foo 和其他共享相同的层次结构.所以你需要单独访问其他.

scala>df.withColumn("cleaned", expr("transform(bazes, x -> struct(concat(x.foo.thing1, '::', x.foo.thing2, '::', x.foo.thing3),cast(x.other as string)))")).show(false)+---+------------------------------------------------+----------------------------------------------+|id |bazes |已清理 |+---+------------------------------------------------+----------------------------------------------+

<块引用>

打印架构

scala>df.withColumn("cleaned", expr("transform(bazes, x -> struct(concat(x.foo.thing1, '::', x.foo.thing2, '::', x.foo.thing3),cast(x.other as string)))")).printSchema根|-- id: 整数(可为空 = false)|-- bazes: 数组 (nullable = true)||-- 元素: struct (containsNull = true)|||-- foo: struct (nullable = true)||||-- thing1: string (nullable = true)||||-- thing2: string (nullable = true)||||-- thing3: 字符串 (nullable = true)|||-- 其他:字符串(可为空 = 真)|-- 已清理:数组(可为空 = 真)||-- 元素: struct (containsNull = false)|||-- col1: string (nullable = true)|||-- col2: string (nullable = true)

如果您还有任何与此相关的问题,请告诉我.

How can I transform an array of structs to again a struct using spark higher order functions?

The dataset:

case class Foo(thing1:String, thing2:String, thing3:String)
case class Baz(foo:Foo, other:String)
case class Bar(id:Int, bazes:Seq[Baz])
import spark.implicits._
val df = Seq(Bar(1, Seq(Baz(Foo("first", "second", "third"), "other"), Baz(Foo("1", "2", "3"), "else")))).toDF
df.printSchema
df.show(false)

I want to concatenate all thing1, thign2, thing3 but keep the other property for each bar.

A simple:

scala> df.withColumn("cleaned", expr("transform(bazes, x -> x)")).printSchema
root
 |-- id: integer (nullable = false)
 |-- bazes: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- foo: struct (nullable = true)
 |    |    |    |-- thing1: string (nullable = true)
 |    |    |    |-- thing2: string (nullable = true)
 |    |    |    |-- thing3: string (nullable = true)
 |    |    |-- other: string (nullable = true)
 |-- cleaned: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- foo: struct (nullable = true)
 |    |    |    |-- thing1: string (nullable = true)
 |    |    |    |-- thing2: string (nullable = true)
 |    |    |    |-- thing3: string (nullable = true)
 |    |    |-- other: string (nullable = true)

Will only copy the thing over.

The desired concatentate operation:

 df.withColumn("cleaned", expr("transform(bazes, x -> concat(x.foo.thing1, '::', x.foo.thing2, '::', x.foo.thing3))")).printSchema

will, unfortunately, remove all the values form the other column:

 +---+----------------------------------------------------+-------------------------------+
|id |bazes                                               |cleaned                        |
+---+----------------------------------------------------+-------------------------------+
|1  |[[[first, second, third], other], [[1, 2, 3], else]]|[first::second::third, 1::2::3]|
+---+----------------------------------------------------+-------------------------------+

How can these be retained? Trying to keep the tuples:

df.withColumn("cleaned", expr("transform(bazes, x -> (concat(x.foo.thing1, '::', x.foo.thing2, '::', x.foo.thing3), x.other))")).printSchema

fails with:

.AnalysisException: cannot resolve 'named_struct('col1', concat(namedlambdavariable().`foo`.`thing1`, '::', namedlambdavariable().`foo`.`thing2`, '::', namedlambdavariable().`foo`.`thing3`), NamePlaceholder(), namedlambdavariable().`other`)' due to data type mismatch: Only foldable string expressions are allowed to appear at odd position, got: NamePlaceholder; line 1 pos 22;

edit

The desired output:

  • a new column with contents:

    [[first::second::third, other], [1::2::3,else]

which retain the column other

解决方案

In this way, you can achieve your desired output. you cannot directly access other value bcoz foo and other are sharing the same hierarchy. so you need to access other separately.

scala>  df.withColumn("cleaned", expr("transform(bazes, x -> struct(concat(x.foo.thing1, '::', x.foo.thing2, '::', x.foo.thing3),cast(x.other as string)))")).show(false)
+---+----------------------------------------------------+------------------------------------------------+
|id |bazes                                               |cleaned                                         |
+---+----------------------------------------------------+------------------------------------------------+

printSchema

scala>  df.withColumn("cleaned", expr("transform(bazes, x -> struct(concat(x.foo.thing1, '::', x.foo.thing2, '::', x.foo.thing3),cast(x.other as string)))")).printSchema
root
 |-- id: integer (nullable = false)
 |-- bazes: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- foo: struct (nullable = true)
 |    |    |    |-- thing1: string (nullable = true)
 |    |    |    |-- thing2: string (nullable = true)
 |    |    |    |-- thing3: string (nullable = true)
 |    |    |-- other: string (nullable = true)
 |-- cleaned: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- col1: string (nullable = true)
 |    |    |-- col2: string (nullable = true)

let me know if you have further any question related to the same.

这篇关于spark 高阶函数变换输出结构的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆