如何将 kafka 上的火花流嵌套 json 转换为平面数据帧? [英] How to convert spark streaming nested json coming on kafka to flat dataframe?

查看:16
本文介绍了如何将 kafka 上的火花流嵌套 json 转换为平面数据帧?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我第一次尝试将来自 Kafka 的 JSON 解析为 Spark 结构化流时需要一些帮助.

我正在努力转换传入的 JSON 并将其转换为平面数据帧以供进一步处理.

我的输入json是

<预><代码>[{ "siteId": "30:47:47:BE:16:8F", "siteData":[{ "dataseries": "trend-255", "values":[{"ts": 1502715600, "value": 35.74 },{"ts": 1502715660, "value": 35.65 },{"ts": 1502715720, "value": 35.58 },{ts":1502715780,价值":35.55 }]},{ "dataseries": "trend-256", "values":[{"ts": 1502715840, "value": 18.45 },{"ts": 1502715900, "value": 18.35 },{ts":1502715960,价值":18.32 }]}]},{ "siteId": "30:47:47:BE:16:FF", "siteData":[{ "dataseries": "trend-255", "values":[{"ts": 1502715600, "value": 35.74 },{"ts": 1502715660, "value": 35.65 },{"ts": 1502715720, "value": 35.58 },{ts":1502715780,价值":35.55 }]},{ "dataseries": "trend-256", "values":[{"ts": 1502715840, "value": 18.45 },{"ts": 1502715900, "value": 18.35 },{ts":1502715960,价值":18.32 }]}]}]

Spark 架构是

data1_spark_schema = ArrayType(结构类型([StructField("siteId", StringType(), False),StructField("siteData", ArrayType(StructType([StructField("dataseries", StringType(), False),StructField("values", ArrayType(StructType([StructField("ts", IntegerType(), False),StructField("value", StringType(), False)]), 错误), 错误)]), 错误), 错误)]), 错误的)

我非常简单的代码是:

from pyspark.sql import SparkSession从 pyspark.sql.functions 导入 *从 config.general 导入 kafka_instance从 config.general 导入主题从 schemas.schema 导入 data1_spark_schema火花 = SparkSession \.builder \.appName("Structured_BMS_Feed") \.getOrCreate()流 = 火花 \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", kafka_instance) \.option("订阅", 主题) \.option("startingOffsets", "最新") \.option("max.poll.records", 100) \.option("failOnDataLoss", False) \.加载()stream_records = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as bms_data1") \.select(from_json("bms_data1", data1_spark_schema).alias("bms_data1"))站点 = stream_records.select(explode("bms_data1").alias("site")) \.select("站点.*")站点.printSchema()stream_debug = sites.writeStream \.outputMode("追加") \.format("控制台") \.option("numRows", 20) \.option("truncate", False) \.开始()stream_debug.awaitTermination()

当我运行此代码时,我的模式打印如下:

root|-- siteId: string (nullable = false)|-- 站点数据:数组(可为空 = false)||-- 元素: struct (containsNull = false)|||-- 数据系列:字符串(可为空 = 假)|||-- 值:数组(可为空 = false)||||-- 元素: struct (containsNull = false)|||||-- ts: 整数(可为空 = false)|||||-- 值:字符串(可为空 = false)

是否有可能以某种方式获得此架构,即在平面数据框中获取所有字段而不是嵌套的 JSON.因此,对于每个 ts 和 value,它应该给我一行,其中包含其父数据系列和站点 ID.

解决方案

回答我自己的问题.我设法使用以下几行将其展平:

sites_flat = stream_records.select(explode("bms_data1").alias("site")) \.select("site.siteId",explode("site.siteData").alias("siteData")) \.select("siteId", "siteData.dataseries",explode("siteData.values").alias("values")) \.select("siteId", "dataseries", "values.*")

need some help on my first attempt to parse JSON coming on Kafka to Spark structured streaming.

I am struggling to convert the incoming JSON and covert it into flat dataframe for further processing.

My input json is

[
    { "siteId": "30:47:47:BE:16:8F", "siteData": 
        [
            { "dataseries": "trend-255", "values": 
                [
                    {"ts": 1502715600, "value": 35.74 },
                    {"ts": 1502715660, "value": 35.65 },
                    {"ts": 1502715720, "value": 35.58 },
                    {"ts": 1502715780, "value": 35.55 }
                ]
            },
            { "dataseries": "trend-256", "values":
                [
                    {"ts": 1502715840, "value": 18.45 },
                    {"ts": 1502715900, "value": 18.35 },
                    {"ts": 1502715960, "value": 18.32 }
                ]
            }
        ]
    },
    { "siteId": "30:47:47:BE:16:FF", "siteData": 
        [
            { "dataseries": "trend-255", "values": 
                [
                    {"ts": 1502715600, "value": 35.74 },
                    {"ts": 1502715660, "value": 35.65 },
                    {"ts": 1502715720, "value": 35.58 },
                    {"ts": 1502715780, "value": 35.55 }
                ]
            },
            { "dataseries": "trend-256", "values":
                [
                    {"ts": 1502715840, "value": 18.45 },
                    {"ts": 1502715900, "value": 18.35 },
                    {"ts": 1502715960, "value": 18.32 }
                ]
            }
        ]
    }
]

Spark schema is

data1_spark_schema = ArrayType(
StructType([
  StructField("siteId", StringType(), False),
  StructField("siteData", ArrayType(StructType([
    StructField("dataseries", StringType(), False),
    StructField("values", ArrayType(StructType([
      StructField("ts", IntegerType(), False),
      StructField("value", StringType(), False)
    ]), False), False)
  ]), False), False)
]), False
)

My very simple code is:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

from config.general import kafka_instance
from config.general import topic
from schemas.schema import data1_spark_schema

spark = SparkSession \
            .builder \
            .appName("Structured_BMS_Feed") \
            .getOrCreate()

stream = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_instance) \
        .option("subscribe", topic) \
        .option("startingOffsets", "latest") \
        .option("max.poll.records", 100) \
        .option("failOnDataLoss", False) \
        .load()

stream_records = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as bms_data1") \
                       .select(from_json("bms_data1", data1_spark_schema).alias("bms_data1"))

sites = stream_records.select(explode("bms_data1").alias("site")) \
                      .select("site.*")

sites.printSchema()

stream_debug = sites.writeStream \
                             .outputMode("append") \
                             .format("console") \
                             .option("numRows", 20) \
                             .option("truncate", False) \
                             .start()


stream_debug.awaitTermination()

When I run this code I schema is printing like this:

root
 |-- siteId: string (nullable = false)
 |-- siteData: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- dataseries: string (nullable = false)
 |    |    |-- values: array (nullable = false)
 |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |-- ts: integer (nullable = false)
 |    |    |    |    |-- value: string (nullable = false)

Is it possible to have this schema in a way where I get all fields in a flat dataframe instead of nested JSON. So for every ts and value it should give me one row with its parent dataseries and site id.

解决方案

Answering my own question. I managed to flatten it using following lines:

sites_flat = stream_records.select(explode("bms_data1").alias("site")) \
                           .select("site.siteId", explode("site.siteData").alias("siteData")) \
                           .select("siteId", "siteData.dataseries", explode("siteData.values").alias("values")) \
                           .select("siteId", "dataseries", "values.*")

这篇关于如何将 kafka 上的火花流嵌套 json 转换为平面数据帧?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆