Kafka接收Spark中的JSON到Dataframe的JSON数组 [英] Array of JSON to Dataframe in Spark received by Kafka
问题描述
我正在使用Spark结构化流在Scala中编写一个Spark应用程序,该应用程序从Kafka接收一些以JSON格式格式化的数据.此应用程序可以接收以这种方式设置格式的单个或多个JSON对象:
I'm writing a Spark application in Scala using Spark Structured Streaming that receive some data formatted in JSON style from Kafka. This application could receive both a single or multiple JSON object formatted in this way:
[{"key1":"value1","key2":"value2"},{"key1":"value1","key2":"value2"},...,{"key1":"value1","key2":"value2"}]
我试图定义一个StructType:
I tried to define a StructType like:
var schema = StructType(
Array(
StructField("key1",DataTypes.StringType),
StructField("key2",DataTypes.StringType)
))
但是它不起作用. 我用于解析JSON的实际代码:
But it doesn't work. My actual code for parsing JSON:
var data = (this.stream).getStreamer().load()
.selectExpr("CAST (value AS STRING) as json")
.select(from_json($"json",schema=schema).as("data"))
我想在这样的数据框中获取此JSON对象
I would like to get this JSON objects in a dataframe like
+----------+---------+
| key1| key2|
+----------+---------+
| value1| value2|
| value1| value2|
........
| value1| value2|
+----------+---------+
任何人都可以帮助我吗? 谢谢!
Anyone can help me please? Thank you!
推荐答案
由于您输入的字符串是JSON
的Array
,因此一种方法是编写UDF
来解析Array
,然后展开已解析的Array
.下面是完整的代码,并解释了每个步骤.我已经批量编写了它,但是同样可以用于流媒体,只需进行最小的更改.
As your incoming string is Array
of JSON
, one way is to write a UDF
to parse the Array
, then explode the parsed Array
. Below is the complete code with each steps explained. I have written it for batch but same can be used for streaming with minimal changes.
object JsonParser{
//case class to parse the incoming JSON String
case class JSON(key1: String, key2: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession.
builder().
appName("JSON").
master("local").
getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
//sample JSON array String coming from kafka
val str = Seq("""[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}]""")
//UDF to parse JSON array String
val jsonConverter = udf { jsonString: String =>
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
mapper.readValue(jsonString, classOf[Array[JSON]])
}
val df = str.toDF("json") //json String column
.withColumn("array", jsonConverter($"json")) //parse the JSON Array
.withColumn("json", explode($"array")) //explode the Array
.drop("array") //drop unwanted columns
.select("json.*") //explode the JSON to separate columns
//display the DF
df.show()
//+------+------+
//| key1| key2|
//+------+------+
//|value1|value2|
//|value3|value4|
//+------+------+
}
}
这篇关于Kafka接收Spark中的JSON到Dataframe的JSON数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!