Kafka接收Spark中的JSON到Dataframe的JSON数组 [英] Array of JSON to Dataframe in Spark received by Kafka

查看:561
本文介绍了Kafka接收Spark中的JSON到Dataframe的JSON数组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spark结构化流在Scala中编写一个Spark应用程序,该应用程序从Kafka接收一些以JSON格式格式化的数据.此应用程序可以接收以这种方式设置格式的单个或多个JSON对象:

I'm writing a Spark application in Scala using Spark Structured Streaming that receive some data formatted in JSON style from Kafka. This application could receive both a single or multiple JSON object formatted in this way:

[{"key1":"value1","key2":"value2"},{"key1":"value1","key2":"value2"},...,{"key1":"value1","key2":"value2"}]

我试图定义一个StructType:

I tried to define a StructType like:

var schema = StructType(
                  Array(
                        StructField("key1",DataTypes.StringType),
                        StructField("key2",DataTypes.StringType)
             ))

但是它不起作用. 我用于解析JSON的实际代码:

But it doesn't work. My actual code for parsing JSON:

var data = (this.stream).getStreamer().load()
  .selectExpr("CAST (value AS STRING) as json")
  .select(from_json($"json",schema=schema).as("data"))

我想在这样的数据框中获取此JSON对象

I would like to get this JSON objects in a dataframe like

+----------+---------+
|      key1|     key2|
+----------+---------+
|    value1|   value2|
|    value1|   value2|
        ........
|    value1|   value2|
+----------+---------+

任何人都可以帮助我吗? 谢谢!

Anyone can help me please? Thank you!

推荐答案

由于您输入的字符串是JSONArray,因此一种方法是编写UDF来解析Array,然后展开已解析的Array.下面是完整的代码,并解释了每个步骤.我已经批量编写了它,但是同样可以用于流媒体,只需进行最小的更改.

As your incoming string is Array of JSON, one way is to write a UDF to parse the Array, then explode the parsed Array. Below is the complete code with each steps explained. I have written it for batch but same can be used for streaming with minimal changes.

object JsonParser{

  //case class to parse the incoming JSON String
  case class JSON(key1: String, key2: String)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.
      builder().
      appName("JSON").
      master("local").
      getOrCreate()

    import spark.implicits._
    import org.apache.spark.sql.functions._

    //sample JSON array String coming from kafka
    val str = Seq("""[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}]""")

    //UDF to parse JSON array String
    val jsonConverter = udf { jsonString: String =>
      val mapper = new ObjectMapper()
      mapper.registerModule(DefaultScalaModule)
      mapper.readValue(jsonString, classOf[Array[JSON]])
    }

    val df = str.toDF("json") //json String column
      .withColumn("array", jsonConverter($"json")) //parse the JSON Array
      .withColumn("json", explode($"array")) //explode the Array
      .drop("array") //drop unwanted columns
      .select("json.*") //explode the JSON to separate columns

    //display the DF
    df.show()
    //+------+------+
    //|  key1|  key2|
    //+------+------+
    //|value1|value2|
    //|value3|value4|
    //+------+------+

  }
}

这篇关于Kafka接收Spark中的JSON到Dataframe的JSON数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆