spark每组动态创建struct/json [英] spark dynamically create struct/json per group

查看:150
本文介绍了spark每组动态创建struct/json的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个火花数据框,例如

I have a spark dataframe like

+-----+---+---+---+------+
|group|  a|  b|  c|config|
+-----+---+---+---+------+
|    a|  1|  2|  3|   [a]|
|    b|  2|  3|  4|[a, b]|
+-----+---+---+---+------+
val df = Seq(("a", 1, 2, 3, Seq("a")),("b", 2, 3,4, Seq("a", "b"))).toDF("group", "a", "b","c", "config")

如何添加其他列,即

df.withColumn("select_by_config", <<>>).show

作为结构或JSON,它在类似于配置单元struct/spark struct/json的配置单元中结合了许多列(由config指定)?注意,该结构是针对每个组的,对于整个数据帧而言不是恒定的;在config列中指定.

as a struct or JSON which combines a number of columns (specified by config) in something similar to a hive named struct / spark struct / json column? Note, this struct is specific per group and not constant for the whole dataframe; it is specified in config column.

我可以想象df.map可以解决问题,但是序列化开销似乎并不高效.如何通过仅SQL表达式实现此目的?也许作为地图类型的列?

I can imagine that a df.map could do the trick, but the serialization overhead does not seem to be efficient. How can this be achieved via SQL only expressions? Maybe as a Map-type column?

2.2可能但很笨拙的解决方案是:

a possible but really clumsy solution for 2.2 is:

val df = Seq((1,"a", 1, 2, 3, Seq("a")),(2, "b", 2, 3,4, Seq("a", "b"))).toDF("id", "group", "a", "b","c", "config")
  df.show
  import spark.implicits._
  final case class Foo(id:Int, c1:Int, specific:Map[String, Int])
  df.map(r => {
    val config = r.getAs[Seq[String]]("config")
    print(config)
    val others = config.map(elem => (elem, r.getAs[Int](elem))).toMap
    Foo(r.getAs[Int]("id"), r.getAs[Int]("c"), others)
  }).show

有什么更好的方法可以解决2.2的问题?

are there any better ways to solve the problem for 2.2?

推荐答案

如果您使用的是最新版本(Spark 2.4.0 RC 1或更高版本),则应结合使用高阶函数.创建列映射:

If you use a recent build (Spark 2.4.0 RC 1 or later) a combination of higher order functions should do the trick. Create a map of columns:

import org.apache.spark.sql.functions.{
  array, col, expr, lit, map_from_arrays, map_from_entries
}

val cols = Seq("a", "b", "c")

val dfm = df.withColumn(
  "cmap", 
  map_from_arrays(array(cols map lit: _*), array(cols map col: _*))
)

transform config:

dfm.withColumn(
  "config_mapped",
   map_from_entries(expr("transform(config, k -> struct(k, cmap[k]))"))
).show

// +-----+---+---+---+------+--------------------+----------------+
// |group|  a|  b|  c|config|                cmap|   config_mapped|
// +-----+---+---+---+------+--------------------+----------------+
// |    a|  1|  2|  3|   [a]|[a -> 1, b -> 2, ...|        [a -> 1]|
// |    b|  2|  3|  4|[a, b]|[a -> 2, b -> 3, ...|[a -> 2, b -> 3]|
// +-----+---+---+---+------+--------------------+----------------+

这篇关于spark每组动态创建struct/json的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆