DataFrame 分解 JSON 对象列表 [英] DataFrame explode list of JSON objects

查看:25
本文介绍了DataFrame 分解 JSON 对象列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下格式的 JSON 数据:

I have JSON data in the following format:

{
     "date": 100
     "userId": 1
     "data": [
         {
             "timeStamp": 101,
             "reading": 1
         },
         {
             "timeStamp": 102,
             "reading": 2
         }
     ]
 }
 {
     "date": 200
     "userId": 1
     "data": [
         {
             "timeStamp": 201,
             "reading": 3
         },
         {
             "timeStamp": 202,
             "reading": 4
         }
     ]
 }

我将其读入 Spark SQL:

I read it into Spark SQL:

val df = SQLContext.read.json(...)
df.printSchema
// root
//  |-- date: double (nullable = true)
//  |-- userId: long (nullable = true)
//  |-- data: array (nullable = true)
//  |     |-- element: struct (containsNull = true)
//  |     |    |-- timeStamp: double (nullable = true)
//  |     |    |-- reading: double (nullable = true)

我想对其进行转换,以便每次阅读都有一行.据我了解,每次转换都应该产生一个新的 DataFrame,所以以下应该起作用:

I would like to transform it in order to have one row per reading. To my understanding, every transformation should produce a new DataFrame, so the following should work:

import org.apache.spark.sql.functions.explode
val exploded = df
    .withColumn("reading", explode(df("data.reading")))
    .withColumn("timeStamp", explode(df("data.timeStamp")))
    .drop("data")
exploded.printSchema
// root
//  |-- date: double (nullable = true)
//  |-- userId: long (nullable = true)
//  |-- timeStamp: double (nullable = true)
//  |-- reading: double (nullable = true)

结果模式是正确的,但我得到了每个值两次:

The resulting schema is correct, but I get every value twice:

exploded.show
// +-----------+-----------+-----------+-----------+
// |       date|     userId|  timeStamp|    reading|
// +-----------+-----------+-----------+-----------+
// |        100|          1|        101|          1|
// |        100|          1|        101|          1|
// |        100|          1|        102|          2|
// |        100|          1|        102|          2|
// |        200|          1|        201|          3|
// |        200|          1|        201|          3|
// |        200|          1|        202|          4|
// |        200|          1|        202|          4|
// +-----------+-----------+-----------+-----------+

我的感觉是,关于两次爆炸的懒惰评估有一些我不明白的地方.

My feeling is that there is something about the lazy evaluation of the two explodes that I don't understand.

有没有办法让上面的代码工作?还是我应该一起使用不同的方法?

Is there a way to get the above code to work? Or should I use a different approach all together?

推荐答案

结果模式是正确的,但我得到了两次每个值

The resulting schema is correct, but I get every value twice

虽然架构正确,但您提供的输出并未反映实际结果.在实践中,您将获得每个输入行的 timeStampreading 的笛卡尔积.

While schema is correct the output you've provided doesn't reflect actual result. In practice you'll get Cartesian product of timeStamp and reading for each input row.

我的感觉是懒惰评估有一些东西

My feeling is that there is something about the lazy evaluation

不,它与懒惰评估无关.你使用 explode 的方式是错误的.要了解发生了什么,让跟踪 date 等于 100 的执行:

No, it has nothing to do with lazy evaluation. The way you use explode is just wrong. To understand what is going on lets trace execution for date equal 100:

val df100 = df.where($"date" === 100)

循序渐进.第一次 explode 将生成两行,一行代表 1,另一行代表 2:

step by step. First explode will generate two rows, one for 1 and one for 2:

val df100WithReading = df100.withColumn("reading", explode(df("data.reading")))

df100WithReading.show
// +------------------+----+------+-------+
// |              data|date|userId|reading|
// +------------------+----+------+-------+
// |[[1,101], [2,102]]| 100|     1|      1|
// |[[1,101], [2,102]]| 100|     1|      2|
// +------------------+----+------+-------+

第二次爆炸为上一步中的每行生成两行(timeStamp 等于 101 和 102):

The second explode generate two rows (timeStamp equal 101 and 102) for each row from the previous step:

val df100WithReadingAndTs = df100WithReading
  .withColumn("timeStamp", explode(df("data.timeStamp")))

df100WithReadingAndTs.show
// +------------------+----+------+-------+---------+
// |              data|date|userId|reading|timeStamp|
// +------------------+----+------+-------+---------+
// |[[1,101], [2,102]]| 100|     1|      1|      101|
// |[[1,101], [2,102]]| 100|     1|      1|      102|
// |[[1,101], [2,102]]| 100|     1|      2|      101|
// |[[1,101], [2,102]]| 100|     1|      2|      102|
// +------------------+----+------+-------+---------+

如果你想要正确的结果 explode 数据和 select 之后:

If you want correct results explode data and select afterwards:

val exploded = df.withColumn("data", explode($"data"))
  .select($"userId", $"date",
    $"data".getItem("reading"),  $"data".getItem("timestamp"))

exploded.show
// +------+----+-------------+---------------+
// |userId|date|data[reading]|data[timestamp]|
// +------+----+-------------+---------------+
// |     1| 100|            1|            101|
// |     1| 100|            2|            102|
// |     1| 200|            3|            201|
// |     1| 200|            4|            202|
// +------+----+-------------+---------------+

这篇关于DataFrame 分解 JSON 对象列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆