如何在新的结构列添加到数据框 [英] How to add a new Struct column to a DataFrame
问题描述
我目前正在试图从MongoDB的数据库,用星火摄取到ElasticSearch与 geo_points
。
I'm currently trying to extract a database from MongoDB and use Spark to ingest into ElasticSearch with geo_points
.
蒙戈数据库有经纬度值,但ElasticSearch要求它们被铸造成 geo_point
键入
The Mongo database has latitude and longitude values, but ElasticSearch requires them to be casted into the geo_point
type.
是否有星火办法列复制经纬度
和 LON到一个新的列,它是一个
阵列
或结构
?
任何帮助AP preciated!
Any help is appreciated!
推荐答案
我假设你开始与一些扁平架构是这样的:
I assume you start with some kind of flat schema like this:
root
|-- lat: double (nullable = false)
|-- long: double (nullable = false)
|-- key: string (nullable = false)
首先,让我们创建示例数据:
First lets create example data:
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types._
val rdd = sc.parallelize(
Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil)
val schema = StructType(
StructField("lat", DoubleType, false) ::
StructField("long", DoubleType, false) ::
StructField("key", StringType, false) ::Nil)
val df = sqlContext.createDataFrame(rdd, schema)
一个简单的方法是使用UDF和案例类:
An easy way is to use an udf and case class:
case class Location(lat: Double, long: Double)
val makeLocation = udf((lat: Double, long: Double) => Location(lat, long))
val dfRes = df.
withColumn("location", makeLocation(col("lat"), col("long"))).
drop("lat").
drop("long")
dfRes.printSchema
和我们得到
root
|-- key: string (nullable = false)
|-- location: struct (nullable = true)
| |-- lat: double (nullable = false)
| |-- long: double (nullable = false)
一个硬盘的方式是事后转换数据和应用模式:
A hard way is to transform your data and apply schema afterwards:
val rddRes = df.
map{case Row(lat, long, key) => Row(key, Row(lat, long))}
val schemaRes = StructType(
StructField("key", StringType, false) ::
StructField("location", StructType(
StructField("lat", DoubleType, false) ::
StructField("long", DoubleType, false) :: Nil
), true) :: Nil
)
sqlContext.createDataFrame(rddRes, schemaRes).show
和我们得到预期的输出
+------+-------------+
| key| location|
+------+-------------+
|Warsaw|[52.23,21.01]|
| Corte| [42.3,9.15]|
+------+-------------+
从头创建嵌套模式可以很乏味,所以如果可以,我会建议第一种方法。它可以,如果你需要更复杂的结构很容易地扩展:
Creating nested schema from scratch can be tedious so if you can I would recommend the first approach. It can be easily extended if you need more sophisticated structure:
case class Pin(location: Location)
val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long))
df.
withColumn("pin", makePin(col("lat"), col("long"))).
drop("lat").
drop("long").
printSchema
和我们得到预期的输出结果:
and we get expected output:
root
|-- key: string (nullable = false)
|-- pin: struct (nullable = true)
| |-- location: struct (nullable = true)
| | |-- lat: double (nullable = false)
| | |-- long: double (nullable = false)
不幸的是,你有超过可为空
现场无法控制,所以如果是为你的项目很重要,你必须指定模式。
Unfortunately you have no control over nullable
field so if is important for your project you'll have to specify schema.
这篇关于如何在新的结构列添加到数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!