如何从struct元素的嵌套数组创建Spark DataFrame? [英] How can I create a Spark DataFrame from a nested array of struct element?

查看:272
本文介绍了如何从struct元素的嵌套数组创建Spark DataFrame?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已将一个JSON文件读入Spark.该文件具有以下结构:

I have read a JSON file into Spark. This file has the following structure:

scala> tweetBlob.printSchema
root
 |-- related: struct (nullable = true)
 |    |-- next: struct (nullable = true)
 |    |    |-- href: string (nullable = true)
 |-- search: struct (nullable = true)
 |    |-- current: long (nullable = true)
 |    |-- results: long (nullable = true)
 |-- tweets: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cde: struct (nullable = true)
...
...
 |    |    |-- cdeInternal: struct (nullable = true)
...
...
 |    |    |-- message: struct (nullable = true)
...
...

我理想地想要的是一个具有列"cde","cdeInternal","message" ...的DataFrame,如下所示.

What I would ideally want is a DataFrame with columns "cde", "cdeInternal", "message"... as shown below

root
|-- cde: struct (nullable = true)
...
...
|-- cdeInternal: struct (nullable = true)
...
...
|-- message: struct (nullable = true)
...
...

我设法使用"explode"将元素从"tweets"数组提取到名为"tweets"的列中

I have managed to use "explode" to extract elements from the "tweets" array into a column called "tweets"

scala> val tweets = tweetBlob.select(explode($"tweets").as("tweets"))
tweets: org.apache.spark.sql.DataFrame = [tweets: struct<cde:struct<author:struct<gender:string,location:struct<city:string,country:string,state:string>,maritalStatus:struct<evidence:string,isMarried:string>,parenthood:struct<evidence:string,isParent:string>>,content:struct<sentiment:struct<evidence:array<struct<polarity:string,sentimentTerm:string>>,polarity:string>>>,cdeInternal:struct<compliance:struct<isActive:boolean,userProtected:boolean>,tracks:array<struct<id:string>>>,message:struct<actor:struct<displayName:string,favoritesCount:bigint,followersCount:bigint,friendsCount:bigint,id:string,image:string,languages:array<string>,link:string,links:array<struct<href:string,rel:string>>,listedCount:bigint,location:struct<displayName:string,objectType:string>,objectType:string,postedTime...
scala> tweets.printSchema
root
 |-- tweets: struct (nullable = true)
 |    |-- cde: struct (nullable = true)
...
...
 |    |-- cdeInternal: struct (nullable = true)
...
...
 |    |-- message: struct (nullable = true)
...
...

如何选择结构中的所有列并从中创建一个DataFrame?如果我的理解正确的话,Explode在结构上不起作用.

How can I select all columns inside the struct and create a DataFrame out of it? Explode does not work on a struct if my understanding is correct.

感谢您的帮助.

推荐答案

处理此问题的一种可能方法是从架构中提取所需的信息.让我们从一些虚拟数据开始:

One possible way to handle this is to extract required information from the schema. Lets start with some dummy data:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._


case class Bar(x: Int, y: String)
case class Foo(bar: Bar)

val df = sc.parallelize(Seq(Foo(Bar(1, "first")), Foo(Bar(2, "second")))).toDF

df.printSchema

// root
//  |-- bar: struct (nullable = true)
//  |    |-- x: integer (nullable = false)
//  |    |-- y: string (nullable = true)

和一个辅助功能:

def children(colname: String, df: DataFrame) = {
  val parent = df.schema.fields.filter(_.name == colname).head
  val fields = parent.dataType match {
    case x: StructType => x.fields
    case _ => Array.empty[StructField]
  }
  fields.map(x => col(s"$colname.${x.name}"))
}

最后的结果:

df.select(children("bar", df): _*).printSchema

// root
// |-- x: integer (nullable = true)
// |-- y: string (nullable = true)

这篇关于如何从struct元素的嵌套数组创建Spark DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆