Spark sql的from_json返回空值 [英] from_json of Spark sql return null values

查看:725
本文介绍了Spark sql的from_json返回空值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我将镶木地板文件加载到spark数据框中,如下所示:

I loaded a parquet file into a spark dataframe as follows :

val message= spark.read.parquet("gs://defenault-zdtt-devde/pubsub/part-00001-e9f8c58f-7de0-4537-a7be-a9a8556sede04a-c000.snappy.parquet")

当我对数据框执行收集时,会得到以下结果:

when I perform a collect on my dataframe I get the following result :

message.collect()

Array[org.apache.spark.sql.Row] = Array([118738748835150,2018-08-20T17:44:38.742Z,{"id":"uplink-3130-85bc","device_id":60517119992794222,"group_id":69,"group":"box-2478-2555","profile_id":3,"profile":"eolane-movee","type":"uplink","timestamp":"2018-08-20T17:44:37.048Z","count":3130,"payload":[{"timestamp":"2018-08-20T17:44:37.048Z","data":{"battery":3.5975599999999996,"temperature":27}}],"payload_encrypted":"9da25e36","payload_cleartext":"fe1b01aa","device_properties":{"appeui":"7ca97df000001190","deveui":"7ca97d0000001bb0","external_id":"Product: 3.7 / HW: 3.1 / SW: 1.8.8","no_de_serie_eolane":"4904","no_emballage":"S02066","product_version":"1.3.1"},"protocol_data":{"AppNonce":"e820ef","DevAddr":"0e6c5fda","DevNonce":"85bc","NetID":"000007","best_gateway_id":"M40246","gateway.

此数据框的架构为

message.printSchema()
root


 |-- Id: string (nullable = true)
 |-- publishTime: string (nullable = true)
 |-- data: string (nullable = true)

我的目标是处理包含json数据的数据列并将其展平. 我写了以下代码

My aim is to work on the data column which holds json data and to flatten it. I wrote the following code

val schemaTotal = new StructType (
Array (StructField("id",StringType,false),StructField("device_id",StringType),StructField("group_id",LongType), StructField("group",StringType),StructField("profile_id",IntegerType),StructField("profile",StringType),StructField("type",StringType),StructField("timestamp",StringType),
StructField("count",StringType),
StructField("payload",new StructType ()
.add("timestamp",StringType)
.add("data",new ArrayType (new StructType().add("battery",LongType).add("temperature",LongType),false))),
StructField("payload_encrypted",StringType),
StructField("payload_cleartext",StringType),
StructField("device_properties", new ArrayType (new StructType().add("appeui",StringType).add("deveui",StringType).add("external_id",StringType).add("no_de_serie_eolane",LongType).add("no_emballage",StringType).add("product_version",StringType),false)),
StructField("protocol_data", new ArrayType (new StructType().add("AppNonce",StringType).add("DevAddr",StringType).add("DevNonce",StringType).add("NetID",LongType).add("best_gateway_id",StringType).add("gateways",IntegerType).add("lora_version",IntegerType).add("noise",LongType).add("port",IntegerType).add("rssi",DoubleType).add("sf",IntegerType).add("signal",DoubleType).add("snr",DoubleType),false)),
StructField("lat",StringType),
StructField("lng",StringType),
StructField("geolocation_type",StringType),
StructField("geolocation_precision",StringType),
StructField("delivered_at",StringType)))


val dataframe_extract=message.select($"Id",
$"publishTime",
from_json($"data",schemaTotal).as("content"))

val table = dataframe_extract.select(
$"Id",
$"publishTime",
$"content.id" as "id",
$"content.device_id" as "device_id",
$"content.group_id" as "group_id",
$"content.group" as "group",
$"content.profile_id" as "profile_id",
$"content.profile" as "profile",
$"content.type" as "type",
$"content.timestamp" as "timestamp",
$"content.count" as "count",
$"content.payload.timestamp" as "timestamp2",
$"content.payload.data.battery" as "battery",
$"content.payload.data.temperature" as "temperature",
$"content.payload_encrypted" as "payload_encrypted",
$"content.payload_cleartext" as "payload_cleartext",
$"content.device_properties.appeui" as "appeui"
)

table.show()给我所有列的空值:

    +---------------+--------------------+----+---------+--------+-----+----------+-------+----+---------+-----+----------+-------+-----------+-----------------+-----------------+------+
|             Id|         publishTime|  id|device_id|group_id|group|profile_id|profile|type|timestamp|count|timestamp2|battery|temperature|payload_encrypted|payload_cleartext|appeui|
+---------------+--------------------+----+---------+--------+-----+----------+-------+----+---------+-----+----------+-------+-----------+-----------------+-----------------+------+
|118738748835150|2018-08-20T17:44:...|null|     null|    null| null|      null|   null|null|     null| null|      null|   null|       null|             null|             null|  null|
+---------------+--------------------+----+---------+--------+-----+----------+-------+----+---------+-----+----------+-------+-----------+-----------------+-----------------+------+

,而table.printSchema()给了我预期的结果,请问如何解决这个问题? 我正在与Zeppelin一起进行原型设计的第一步,在此先感谢您的帮助. 最好的问候

, whereas table.printSchema() gives me the expected result, any idea how to solve this, please? I am working with Zeppelin as a first prototyping step thanks a lot in advance for your help. Best Regards

推荐答案

from_json() SQL函数具有以下约束,必须遵循以下约束才能将列值转换为数据框.

from_json() SQL function has below constraint to be followed to convert column value to a dataframe.

  1. 如果在模式中定义的数据类型应该与json中存在的值匹配,那么如果任何列的不匹配值导致所有列值都为空

例如:

'{"name": "raj", "age": 12}'为此列值

StructType(List(StructField(name,StringType,true),StructField(age,StringType,true)))

StructType(List(StructField(name,StringType,true),StructField(age,StringType,true)))

上面的架构将在两列上返回空值

The above schema will return you a null value on both the columns

StructType(List(StructField(name,StringType,true),StructField(age,IntegerType,true)))

StructType(List(StructField(name,StringType,true),StructField(age,IntegerType,true)))

以上架构将为您返回预期的数据框

The above schema will return you an expected dataframe

为此线程可能的原因可能是,如果存在任何不匹配的列值,from_json将所有列值返回为空

For this thread possible reason could be this, if there is any mismatched column value present, from_json will return all column value as null

这篇关于Spark sql的from_json返回空值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆