spark 为每组动态创建 struct/json [英] spark dynamically create struct/json per group
问题描述
我有一个像火花数据框
+-----+---+---+---+------+
|group| a| b| c|config|
+-----+---+---+---+------+
| a| 1| 2| 3| [a]|
| b| 2| 3| 4|[a, b]|
+-----+---+---+---+------+
val df = Seq(("a", 1, 2, 3, Seq("a")),("b", 2, 3,4, Seq("a", "b"))).toDF("group", "a", "b","c", "config")
如何添加额外的列,即
df.withColumn("select_by_config", <<>>).show
作为一个 struct 或 JSON,它组合了许多列(由 config
指定),类似于名为 struct/spark struct/json 列的配置单元?请注意,此结构是特定于每个组的,而不是整个数据帧的常量;它在 config
列中指定.
as a struct or JSON which combines a number of columns (specified by config
) in something similar to a hive named struct / spark struct / json column? Note, this struct is specific per group and not constant for the whole dataframe; it is specified in config
column.
我可以想象 df.map
可以做到这一点,但序列化开销似乎并不高效.这如何通过仅 SQL 表达式来实现?也许作为地图类型的列?
I can imagine that a df.map
could do the trick, but the serialization overhead does not seem to be efficient. How can this be achieved via SQL only expressions? Maybe as a Map-type column?
2.2 的一个可能但非常笨拙的解决方案是:
a possible but really clumsy solution for 2.2 is:
val df = Seq((1,"a", 1, 2, 3, Seq("a")),(2, "b", 2, 3,4, Seq("a", "b"))).toDF("id", "group", "a", "b","c", "config")
df.show
import spark.implicits._
final case class Foo(id:Int, c1:Int, specific:Map[String, Int])
df.map(r => {
val config = r.getAs[Seq[String]]("config")
print(config)
val others = config.map(elem => (elem, r.getAs[Int](elem))).toMap
Foo(r.getAs[Int]("id"), r.getAs[Int]("c"), others)
}).show
有没有更好的方法来解决 2.2 的问题?
are there any better ways to solve the problem for 2.2?
推荐答案
如果您使用最新版本(Spark 2.4.0 RC 1 或更高版本),高阶函数的组合应该可以解决问题.创建列映射:
If you use a recent build (Spark 2.4.0 RC 1 or later) a combination of higher order functions should do the trick. Create a map of columns:
import org.apache.spark.sql.functions.{
array, col, expr, lit, map_from_arrays, map_from_entries
}
val cols = Seq("a", "b", "c")
val dfm = df.withColumn(
"cmap",
map_from_arrays(array(cols map lit: _*), array(cols map col: _*))
)
和转换
config
:
dfm.withColumn(
"config_mapped",
map_from_entries(expr("transform(config, k -> struct(k, cmap[k]))"))
).show
// +-----+---+---+---+------+--------------------+----------------+
// |group| a| b| c|config| cmap| config_mapped|
// +-----+---+---+---+------+--------------------+----------------+
// | a| 1| 2| 3| [a]|[a -> 1, b -> 2, ...| [a -> 1]|
// | b| 2| 3| 4|[a, b]|[a -> 2, b -> 3, ...|[a -> 2, b -> 3]|
// +-----+---+---+---+------+--------------------+----------------+
这篇关于spark 为每组动态创建 struct/json的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!