如何向DataFrame添加一个新的Struct列 [英] How to add a new Struct column to a DataFrame

查看:295
本文介绍了如何向DataFrame添加一个新的Struct列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从MongoDB中提取数据库,并使用Spark将其从 geo_points 中获取到ElasticSearch。

I'm currently trying to extract a database from MongoDB and use Spark to ingest into ElasticSearch with geo_points.

Mongo数据库具有纬度和经度值,但ElasticSearch要求将它们转换为 geo_point 类型。

The Mongo database has latitude and longitude values, but ElasticSearch requires them to be casted into the geo_point type.

在Spark中有一种方法将 lat lon 列复制到一个数组 struct

Is there a way in Spark to copy the lat and lon columns to a new column that is an array or struct?

任何帮助都赞赏! >

Any help is appreciated!

推荐答案

我假设你从某种平面模式开始:

I assume you start with some kind of flat schema like this:

root
 |-- lat: double (nullable = false)
 |-- long: double (nullable = false)
 |-- key: string (nullable = false)

首先让我们创建示例数据:

First lets create example data:

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types._

val rdd = sc.parallelize(
    Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil)

val schema = StructType(
    StructField("lat", DoubleType, false) ::
    StructField("long", DoubleType, false) ::
    StructField("key", StringType, false) ::Nil)

val df = sqlContext.createDataFrame(rdd, schema)

方式是使用udf和案例类:

An easy way is to use an udf and case class:

case class Location(lat: Double, long: Double)
val makeLocation = udf((lat: Double, long: Double) => Location(lat, long))

val dfRes = df.
   withColumn("location", makeLocation(col("lat"), col("long"))).
   drop("lat").
   drop("long")

dfRes.printSchema

我们得到

root
 |-- key: string (nullable = false)
 |-- location: struct (nullable = true)
 |    |-- lat: double (nullable = false)
 |    |-- long: double (nullable = false)

以后的模式:

val rddRes = df.
    map{case Row(lat, long, key) => Row(key, Row(lat, long))}

val schemaRes = StructType(
    StructField("key", StringType, false) ::
    StructField("location", StructType(
        StructField("lat", DoubleType, false) ::
        StructField("long", DoubleType, false) :: Nil
    ), true) :: Nil 
)

sqlContext.createDataFrame(rddRes, schemaRes).show

我们得到一个预期的输出

and we get an expected output

+------+-------------+
|   key|     location|
+------+-------------+
|Warsaw|[52.23,21.01]|
| Corte|  [42.3,9.15]|
+------+-------------+

从头创建嵌套模式可能很繁琐,所以如果我可以推荐第一种方法。如果您需要更复杂的结构,可以轻松扩展:

Creating nested schema from scratch can be tedious so if you can I would recommend the first approach. It can be easily extended if you need more sophisticated structure:

case class Pin(location: Location)
val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long))

df.
    withColumn("pin", makePin(col("lat"), col("long"))).
    drop("lat").
    drop("long").
    printSchema

,我们得到预期的输出:

and we get expected output:

root
 |-- key: string (nullable = false)
 |-- pin: struct (nullable = true)
 |    |-- location: struct (nullable = true)
 |    |    |-- lat: double (nullable = false)
 |    |    |-- long: double (nullable = false)

不幸的是,您无法控制可空的字段,所以如果您的项目很重要,则必须指定模式。

Unfortunately you have no control over nullable field so if is important for your project you'll have to specify schema.

最后,您可以使用1.4中介绍的 struct 函数:

Finally you can use struct function introduced in 1.4:

import org.apache.spark.sql.functions.struct

df.select($"key", struct($"lat", $"long").alias("location"))

这篇关于如何向DataFrame添加一个新的Struct列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆