DataFrame爆炸了JSON对象的列表 [英] DataFrame explode list of JSON objects

查看:67
本文介绍了DataFrame爆炸了JSON对象的列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我具有以下格式的JSON数据:

I have JSON data in the following format:

{
     "date": 100
     "userId": 1
     "data": [
         {
             "timeStamp": 101,
             "reading": 1
         },
         {
             "timeStamp": 102,
             "reading": 2
         }
     ]
 }
 {
     "date": 200
     "userId": 1
     "data": [
         {
             "timeStamp": 201,
             "reading": 3
         },
         {
             "timeStamp": 202,
             "reading": 4
         }
     ]
 }

我将其读入Spark SQL:

I read it into Spark SQL:

val df = SQLContext.read.json(...)
df.printSchema
// root
//  |-- date: double (nullable = true)
//  |-- userId: long (nullable = true)
//  |-- data: array (nullable = true)
//  |     |-- element: struct (containsNull = true)
//  |     |    |-- timeStamp: double (nullable = true)
//  |     |    |-- reading: double (nullable = true)

我想对其进行转换,以使每次读取都有一行.据我了解,每次转换都应产生一个新的DataFrame,因此以下各项应起作用:

I would like to transform it in order to have one row per reading. To my understanding, every transformation should produce a new DataFrame, so the following should work:

import org.apache.spark.sql.functions.explode
val exploded = df
    .withColumn("reading", explode(df("data.reading")))
    .withColumn("timeStamp", explode(df("data.timeStamp")))
    .drop("data")
exploded.printSchema
// root
//  |-- date: double (nullable = true)
//  |-- userId: long (nullable = true)
//  |-- timeStamp: double (nullable = true)
//  |-- reading: double (nullable = true)

生成的模式是正确的,但是我两次获得每个值:

The resulting schema is correct, but I get every value twice:

exploded.show
// +-----------+-----------+-----------+-----------+
// |       date|     userId|  timeStamp|    reading|
// +-----------+-----------+-----------+-----------+
// |        100|          1|        101|          1|
// |        100|          1|        101|          1|
// |        100|          1|        102|          2|
// |        100|          1|        102|          2|
// |        200|          1|        201|          3|
// |        200|          1|        201|          3|
// |        200|          1|        202|          4|
// |        200|          1|        202|          4|
// +-----------+-----------+-----------+-----------+

我的感觉是,对我不理解的两个爆炸的惰性评估有些事情.

My feeling is that there is something about the lazy evaluation of the two explodes that I don't understand.

有没有办法使上述代码正常工作?还是我应该一起使用其他方法?

Is there a way to get the above code to work? Or should I use a different approach all together?

推荐答案

生成的模式是正确的,但是每个值我都得到两次

The resulting schema is correct, but I get every value twice

虽然架构正确,但您提供的输出未反映实际结果.实际上,对于每个输入行,您都会得到timeStampreading的笛卡尔积.

While schema is correct the output you've provided doesn't reflect actual result. In practice you'll get Cartesian product of timeStamp and reading for each input row.

我的感觉是,关于懒惰的评估有一些问题

My feeling is that there is something about the lazy evaluation

否,与懒惰评估无关.您使用explode的方式是错误的.要了解发生了什么,让date的跟踪执行等于100:

No, it has nothing to do with lazy evaluation. The way you use explode is just wrong. To understand what is going on lets trace execution for date equal 100:

val df100 = df.where($"date" === 100)

一步一步.第一个explode将生成两行,一个用于1,另一个用于2:

step by step. First explode will generate two rows, one for 1 and one for 2:

val df100WithReading = df100.withColumn("reading", explode(df("data.reading")))

df100WithReading.show
// +------------------+----+------+-------+
// |              data|date|userId|reading|
// +------------------+----+------+-------+
// |[[1,101], [2,102]]| 100|     1|      1|
// |[[1,101], [2,102]]| 100|     1|      2|
// +------------------+----+------+-------+

第二个爆炸在上一步中为每行生成了两行(timeStamp等于101和102):

The second explode generate two rows (timeStamp equal 101 and 102) for each row from the previous step:

val df100WithReadingAndTs = df100WithReading
  .withColumn("timeStamp", explode(df("data.timeStamp")))

df100WithReadingAndTs.show
// +------------------+----+------+-------+---------+
// |              data|date|userId|reading|timeStamp|
// +------------------+----+------+-------+---------+
// |[[1,101], [2,102]]| 100|     1|      1|      101|
// |[[1,101], [2,102]]| 100|     1|      1|      102|
// |[[1,101], [2,102]]| 100|     1|      2|      101|
// |[[1,101], [2,102]]| 100|     1|      2|      102|
// +------------------+----+------+-------+---------+

如果要获得正确的结果,请explode数据以及之后的select:

If you want correct results explode data and select afterwards:

val exploded = df.withColumn("data", explode($"data"))
  .select($"userId", $"date",
    $"data".getItem("reading"),  $"data".getItem("timestamp"))

exploded.show
// +------+----+-------------+---------------+
// |userId|date|data[reading]|data[timestamp]|
// +------+----+-------------+---------------+
// |     1| 100|            1|            101|
// |     1| 100|            2|            102|
// |     1| 200|            3|            201|
// |     1| 200|            4|            202|
// +------+----+-------------+---------------+

这篇关于DataFrame爆炸了JSON对象的列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆