在 Spark 中将流式 XML 转换为 JSON [英] Convert Streaming XML into JSON in Spark

查看:63
本文介绍了在 Spark 中将流式 XML 转换为 JSON的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Spark 的新手,正在开发一个简单的应用程序来将从 Kafka 接收的 XML 流转换为 JSON 格式

使用:

  • 火花 2.4.5
  • Scala 2.11.12

在我的用例中,kafka 流采用 xml 格式).以下是我尝试过的代码.

<预><代码>val spark: SparkSession = SparkSession.builder().master("本地").appName("Spark 演示").getOrCreate()spark.sparkContext.setLogLevel("错误")val inputStream = spark.readStream.format("卡夫卡").option("kafka.bootstrap.servers", "localhost:9092").option("订阅", "demo_topic_xml").option("startingOffsets", "earliest")//从开始.加载()inputStream.printSchema()val record = inputStream.selectExpr("CAST(value AS STRING)")//如何在将xml转换为json时删除此处的值列?val jsons = 记录.toJSONjsons.writeStream.format("控制台").option("truncate", false).outputMode("追加").开始().awaitTermination()

然而,上面的代码在 json 输出中给出了value"列标题作为字段名称,如下所示:{"value":"<?xml version=\"1.0\" encoding=\"utf-16\"?><employees><employee id=\"be129\"><firstname>Jane</firstname><lastname>Doe</lastname><title>工程师</title><division>材料</division><building>327</building><9>1房间><主管>be131</主管></employee><employees>"}

我真正需要的只是要转换为 json 的 xml 有效负载,而没有值"列部分.看起来我在这里遗漏了一些明显的东西.有人可以在这里帮助我.感谢您抽出宝贵时间.

解决方案

使用 org.json.XML 库将 XML 数据转换为 JSON>.

检查下面的代码.

创建UDF

scala>导入 org.json.XML导入 org.json.XML标度>val parse = udf((value: String) => XML.toJSONObject(value).toString)//定义UDF来解析xml到json解析:org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

基于XML数据定义schema.

scala>val schema_json = """{"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}"""//在 json 中定义 xml 数据的架构.schema_json: String = {"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}标度>val schema = DataType.fromJson(schema_json).asInstanceOf[StructType]//将 Json 模式数据转换为模式.架构:org.apache.spark.sql.types.StructType = StructType(StructField(employees,StructType(StructField(employee,StructType(StructField(building,LongType,true),StructField(division,StringType,true),StructField(firstname,StringType,true), StructField(id,StringType,true), StructField(lastname,StringType,true), StructField(room,LongType,true), StructField(supervisor,StringType,true), StructField(title,StringType,true)),真)),真))

最终Schema

scala>输入流.selectExpr("CAST(value AS STRING)").select(from_json(parse($"data"),schema).as("emp_data")).select($"emp_data.employees.employee.*").printSchema根|-- building: long (nullable = true)|-- 除法:字符串(可为空 = 真)|-- 名字:字符串(可为空 = 真)|-- id: string (nullable = true)|-- 姓氏:字符串(可为空 = 真)|-- 房间:长(可为空 = 真)|-- 主管:字符串(可为空 = 真)|-- 标题:字符串(可为空 = 真)

将转换为 JSON 的数据写入 console.

scala>输入流.selectExpr("CAST(value AS STRING)").select(from_json(parse($"data"),schema).as("emp_data")).select($"emp_data.employees.employee.*").writeStream.format("控制台").option("truncate", false).outputMode("追加").开始().awaitTermination()

I am new to Spark and working on a simple application to convert XML streams received from Kafka in to JSON format

Using:

  • Spark 2.4.5
  • Scala 2.11.12

In my use case kafka stream is in xml format). The Following is the code that I tried.


    val spark: SparkSession = SparkSession.builder()
      .master("local")
      .appName("Spark Demo")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    val inputStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "demo_topic_xml")
      .option("startingOffsets", "earliest") // From starting
      .load()

    inputStream.printSchema()


    val records = inputStream.selectExpr("CAST(value AS STRING)")
    //How to remove value column here while converting xml in to json?
    val jsons = records.toJSON

    jsons.writeStream
      .format("console")
      .option("truncate", false)
      .outputMode("append")
      .start()
      .awaitTermination()

However the above code gives "value" column header as field name in the json output as shown below: {"value":"<?xml version=\"1.0\" encoding=\"utf-16\"?><employees><employee id=\"be129\"><firstname>Jane</firstname><lastname>Doe</lastname><title>Engineer</title><division>Materials</division><building>327</building><room>19</room><supervisor>be131</supervisor></employee><employees>"}

What I really need is only the xml payload to be converted to json without "value" column part. Look like I am missing something obvious here. Can someone please help me here. Thanks for your time.

解决方案

Use org.json.XML library to convert XML data to JSON.

Check below code.

Creating UDF

scala> import org.json.XML
import org.json.XML

scala> val parse = udf((value: String) => XML.toJSONObject(value).toString) // Defined UDF to parse xml to json
parse: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

Defining schema based on XML data.

scala> val schema_json = """{"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}""" // Define Schema of your xml data in json.
schema_json: String = {"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}

scala> val schema = DataType.fromJson(schema_json).asInstanceOf[StructType] // Convert Json schema data to schema.
schema: org.apache.spark.sql.types.StructType = StructType(StructField(employees,StructType(StructField(employee,StructType(StructField(building,LongType,true), StructField(division,StringType,true), StructField(firstname,StringType,true), StructField(id,StringType,true), StructField(lastname,StringType,true), StructField(room,LongType,true), StructField(supervisor,StringType,true), StructField(title,StringType,true)),true)),true))

Final Schema

scala>
    inputStream
    .selectExpr("CAST(value AS STRING)")
    .select(from_json(parse($"data"),schema).as("emp_data"))
    .select($"emp_data.employees.employee.*")
    .printSchema

root
 |-- building: long (nullable = true)
 |-- division: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- room: long (nullable = true)
 |-- supervisor: string (nullable = true)
 |-- title: string (nullable = true)

Writing converted to JSON data to console.

scala> 
    inputStream
    .selectExpr("CAST(value AS STRING)")
    .select(from_json(parse($"data"),schema).as("emp_data"))
    .select($"emp_data.employees.employee.*")
    .writeStream
    .format("console")
    .option("truncate", false)
    .outputMode("append")
    .start()
    .awaitTermination()

这篇关于在 Spark 中将流式 XML 转换为 JSON的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆