Spark sql的from_json返回空值 [英] from_json of Spark sql return null values
问题描述
我将镶木地板文件加载到spark数据框中,如下所示:
I loaded a parquet file into a spark dataframe as follows :
val message= spark.read.parquet("gs://defenault-zdtt-devde/pubsub/part-00001-e9f8c58f-7de0-4537-a7be-a9a8556sede04a-c000.snappy.parquet")
当我对数据框执行收集时,会得到以下结果:
when I perform a collect on my dataframe I get the following result :
message.collect()
Array[org.apache.spark.sql.Row] = Array([118738748835150,2018-08-20T17:44:38.742Z,{"id":"uplink-3130-85bc","device_id":60517119992794222,"group_id":69,"group":"box-2478-2555","profile_id":3,"profile":"eolane-movee","type":"uplink","timestamp":"2018-08-20T17:44:37.048Z","count":3130,"payload":[{"timestamp":"2018-08-20T17:44:37.048Z","data":{"battery":3.5975599999999996,"temperature":27}}],"payload_encrypted":"9da25e36","payload_cleartext":"fe1b01aa","device_properties":{"appeui":"7ca97df000001190","deveui":"7ca97d0000001bb0","external_id":"Product: 3.7 / HW: 3.1 / SW: 1.8.8","no_de_serie_eolane":"4904","no_emballage":"S02066","product_version":"1.3.1"},"protocol_data":{"AppNonce":"e820ef","DevAddr":"0e6c5fda","DevNonce":"85bc","NetID":"000007","best_gateway_id":"M40246","gateway.
此数据框的架构为
message.printSchema()
root
|-- Id: string (nullable = true)
|-- publishTime: string (nullable = true)
|-- data: string (nullable = true)
我的目标是处理包含json数据的数据列并将其展平. 我写了以下代码
My aim is to work on the data column which holds json data and to flatten it. I wrote the following code
val schemaTotal = new StructType (
Array (StructField("id",StringType,false),StructField("device_id",StringType),StructField("group_id",LongType), StructField("group",StringType),StructField("profile_id",IntegerType),StructField("profile",StringType),StructField("type",StringType),StructField("timestamp",StringType),
StructField("count",StringType),
StructField("payload",new StructType ()
.add("timestamp",StringType)
.add("data",new ArrayType (new StructType().add("battery",LongType).add("temperature",LongType),false))),
StructField("payload_encrypted",StringType),
StructField("payload_cleartext",StringType),
StructField("device_properties", new ArrayType (new StructType().add("appeui",StringType).add("deveui",StringType).add("external_id",StringType).add("no_de_serie_eolane",LongType).add("no_emballage",StringType).add("product_version",StringType),false)),
StructField("protocol_data", new ArrayType (new StructType().add("AppNonce",StringType).add("DevAddr",StringType).add("DevNonce",StringType).add("NetID",LongType).add("best_gateway_id",StringType).add("gateways",IntegerType).add("lora_version",IntegerType).add("noise",LongType).add("port",IntegerType).add("rssi",DoubleType).add("sf",IntegerType).add("signal",DoubleType).add("snr",DoubleType),false)),
StructField("lat",StringType),
StructField("lng",StringType),
StructField("geolocation_type",StringType),
StructField("geolocation_precision",StringType),
StructField("delivered_at",StringType)))
val dataframe_extract=message.select($"Id",
$"publishTime",
from_json($"data",schemaTotal).as("content"))
val table = dataframe_extract.select(
$"Id",
$"publishTime",
$"content.id" as "id",
$"content.device_id" as "device_id",
$"content.group_id" as "group_id",
$"content.group" as "group",
$"content.profile_id" as "profile_id",
$"content.profile" as "profile",
$"content.type" as "type",
$"content.timestamp" as "timestamp",
$"content.count" as "count",
$"content.payload.timestamp" as "timestamp2",
$"content.payload.data.battery" as "battery",
$"content.payload.data.temperature" as "temperature",
$"content.payload_encrypted" as "payload_encrypted",
$"content.payload_cleartext" as "payload_cleartext",
$"content.device_properties.appeui" as "appeui"
)
table.show()
给我所有列的空值:
+---------------+--------------------+----+---------+--------+-----+----------+-------+----+---------+-----+----------+-------+-----------+-----------------+-----------------+------+
| Id| publishTime| id|device_id|group_id|group|profile_id|profile|type|timestamp|count|timestamp2|battery|temperature|payload_encrypted|payload_cleartext|appeui|
+---------------+--------------------+----+---------+--------+-----+----------+-------+----+---------+-----+----------+-------+-----------+-----------------+-----------------+------+
|118738748835150|2018-08-20T17:44:...|null| null| null| null| null| null|null| null| null| null| null| null| null| null| null|
+---------------+--------------------+----+---------+--------+-----+----------+-------+----+---------+-----+----------+-------+-----------+-----------------+-----------------+------+
,而table.printSchema()
给了我预期的结果,请问如何解决这个问题?
我正在与Zeppelin一起进行原型设计的第一步,在此先感谢您的帮助.
最好的问候
, whereas table.printSchema()
gives me the expected result, any idea how to solve this, please?
I am working with Zeppelin as a first prototyping step thanks a lot in advance for your help.
Best Regards
推荐答案
from_json()
SQL函数具有以下约束,必须遵循以下约束才能将列值转换为数据框.
from_json()
SQL function has below constraint to be followed to convert column value to a dataframe.
- 如果在模式中定义的数据类型应该与json中存在的值匹配,那么如果任何列的不匹配值导致所有列值都为空
例如:
'{"name": "raj", "age": 12}'
为此列值
StructType(List(StructField(name,StringType,true),StructField(age,StringType,true)))
StructType(List(StructField(name,StringType,true),StructField(age,StringType,true)))
上面的架构将在两列上返回空值
The above schema will return you a null value on both the columns
StructType(List(StructField(name,StringType,true),StructField(age,IntegerType,true)))
StructType(List(StructField(name,StringType,true),StructField(age,IntegerType,true)))
以上架构将为您返回预期的数据框
The above schema will return you an expected dataframe
为此线程可能的原因可能是,如果存在任何不匹配的列值,from_json将所有列值返回为空
For this thread possible reason could be this, if there is any mismatched column value present, from_json will return all column value as null
这篇关于Spark sql的from_json返回空值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!