在 Spark 中将流式 XML 转换为 JSON [英] Convert Streaming XML into JSON in Spark
问题描述
我是 Spark 的新手,正在开发一个简单的应用程序来将从 Kafka 接收的 XML 流转换为 JSON 格式
使用:
- 火花 2.4.5
- Scala 2.11.12
在我的用例中,kafka 流采用 xml 格式).以下是我尝试过的代码.
<预><代码>val spark: SparkSession = SparkSession.builder().master("本地").appName("Spark 演示").getOrCreate()spark.sparkContext.setLogLevel("错误")val inputStream = spark.readStream.format("卡夫卡").option("kafka.bootstrap.servers", "localhost:9092").option("订阅", "demo_topic_xml").option("startingOffsets", "earliest")//从开始.加载()inputStream.printSchema()val record = inputStream.selectExpr("CAST(value AS STRING)")//如何在将xml转换为json时删除此处的值列?val jsons = 记录.toJSONjsons.writeStream.format("控制台").option("truncate", false).outputMode("追加").开始().awaitTermination()然而,上面的代码在 json 输出中给出了value"列标题作为字段名称,如下所示:{"value":"<?xml version=\"1.0\" encoding=\"utf-16\"?><employees><employee id=\"be129\"><firstname>Jane</firstname><lastname>Doe</lastname><title>工程师</title><division>材料</division><building>327</building><9>1房间><主管>be131</主管></employee><employees>"}
我真正需要的只是要转换为 json 的 xml 有效负载,而没有值"列部分.看起来我在这里遗漏了一些明显的东西.有人可以在这里帮助我.感谢您抽出宝贵时间.
使用 org.json.XML
库将 XML
数据转换为 JSON
>.
检查下面的代码.
创建UDF
scala>导入 org.json.XML导入 org.json.XML标度>val parse = udf((value: String) => XML.toJSONObject(value).toString)//定义UDF来解析xml到json解析:org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
基于XML
数据定义schema
.
scala>val schema_json = """{"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}"""//在 json 中定义 xml 数据的架构.schema_json: String = {"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}标度>val schema = DataType.fromJson(schema_json).asInstanceOf[StructType]//将 Json 模式数据转换为模式.架构:org.apache.spark.sql.types.StructType = StructType(StructField(employees,StructType(StructField(employee,StructType(StructField(building,LongType,true),StructField(division,StringType,true),StructField(firstname,StringType,true), StructField(id,StringType,true), StructField(lastname,StringType,true), StructField(room,LongType,true), StructField(supervisor,StringType,true), StructField(title,StringType,true)),真)),真))
最终Schema
scala>输入流.selectExpr("CAST(value AS STRING)").select(from_json(parse($"data"),schema).as("emp_data")).select($"emp_data.employees.employee.*").printSchema根|-- building: long (nullable = true)|-- 除法:字符串(可为空 = 真)|-- 名字:字符串(可为空 = 真)|-- id: string (nullable = true)|-- 姓氏:字符串(可为空 = 真)|-- 房间:长(可为空 = 真)|-- 主管:字符串(可为空 = 真)|-- 标题:字符串(可为空 = 真)
将转换为 JSON
的数据写入 console
.
scala>输入流.selectExpr("CAST(value AS STRING)").select(from_json(parse($"data"),schema).as("emp_data")).select($"emp_data.employees.employee.*").writeStream.format("控制台").option("truncate", false).outputMode("追加").开始().awaitTermination()
I am new to Spark and working on a simple application to convert XML streams received from Kafka in to JSON format
Using:
- Spark 2.4.5
- Scala 2.11.12
In my use case kafka stream is in xml format). The Following is the code that I tried.
val spark: SparkSession = SparkSession.builder()
.master("local")
.appName("Spark Demo")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val inputStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "demo_topic_xml")
.option("startingOffsets", "earliest") // From starting
.load()
inputStream.printSchema()
val records = inputStream.selectExpr("CAST(value AS STRING)")
//How to remove value column here while converting xml in to json?
val jsons = records.toJSON
jsons.writeStream
.format("console")
.option("truncate", false)
.outputMode("append")
.start()
.awaitTermination()
However the above code gives "value" column header as field name in the json output as shown below:
{"value":"<?xml version=\"1.0\" encoding=\"utf-16\"?><employees><employee id=\"be129\"><firstname>Jane</firstname><lastname>Doe</lastname><title>Engineer</title><division>Materials</division><building>327</building><room>19</room><supervisor>be131</supervisor></employee><employees>"}
What I really need is only the xml payload to be converted to json without "value" column part. Look like I am missing something obvious here. Can someone please help me here. Thanks for your time.
Use org.json.XML
library to convert XML
data to JSON
.
Check below code.
Creating UDF
scala> import org.json.XML
import org.json.XML
scala> val parse = udf((value: String) => XML.toJSONObject(value).toString) // Defined UDF to parse xml to json
parse: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
Defining schema
based on XML
data.
scala> val schema_json = """{"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}""" // Define Schema of your xml data in json.
schema_json: String = {"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}
scala> val schema = DataType.fromJson(schema_json).asInstanceOf[StructType] // Convert Json schema data to schema.
schema: org.apache.spark.sql.types.StructType = StructType(StructField(employees,StructType(StructField(employee,StructType(StructField(building,LongType,true), StructField(division,StringType,true), StructField(firstname,StringType,true), StructField(id,StringType,true), StructField(lastname,StringType,true), StructField(room,LongType,true), StructField(supervisor,StringType,true), StructField(title,StringType,true)),true)),true))
Final Schema
scala>
inputStream
.selectExpr("CAST(value AS STRING)")
.select(from_json(parse($"data"),schema).as("emp_data"))
.select($"emp_data.employees.employee.*")
.printSchema
root
|-- building: long (nullable = true)
|-- division: string (nullable = true)
|-- firstname: string (nullable = true)
|-- id: string (nullable = true)
|-- lastname: string (nullable = true)
|-- room: long (nullable = true)
|-- supervisor: string (nullable = true)
|-- title: string (nullable = true)
Writing converted to JSON
data to console
.
scala>
inputStream
.selectExpr("CAST(value AS STRING)")
.select(from_json(parse($"data"),schema).as("emp_data"))
.select($"emp_data.employees.employee.*")
.writeStream
.format("console")
.option("truncate", false)
.outputMode("append")
.start()
.awaitTermination()
这篇关于在 Spark 中将流式 XML 转换为 JSON的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!