使用 Scala 在 Spark DataFrame 中重用来自 JSON 的架构 [英] Re-using A Schema from JSON within a Spark DataFrame using Scala

查看:27
本文介绍了使用 Scala 在 Spark DataFrame 中重用来自 JSON 的架构的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一些这样的 JSON 数据:

I have some JSON data like this:

{"gid":"111","createHour":"2014-10-20 01:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 01:40:37.0"},{"revId":"4","modDate":"2014-11-20 01:40:40.0"}],"comments":[],"replies":[]}
{"gid":"222","createHour":"2014-12-20 01:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 01:39:31.0"},{"revId":"4","modDate":"2014-11-20 01:39:34.0"}],"comments":[],"replies":[]}
{"gid":"333","createHour":"2015-01-21 00:00:00.0","revisions":[{"revId":"25","modDate":"2014-11-21 00:34:53.0"},{"revId":"110","modDate":"2014-11-21 00:47:10.0"}],"comments":[{"comId":"4432","content":"How are you?"}],"replies":[{"repId":"4441","content":"I am good."}]}
{"gid":"444","createHour":"2015-09-20 23:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 23:23:47.0"}],"comments":[],"replies":[]}
{"gid":"555","createHour":"2016-01-21 01:00:00.0","revisions":[{"revId":"135","modDate":"2014-11-21 01:01:58.0"}],"comments":[],"replies":[]}
{"gid":"666","createHour":"2016-04-23 19:00:00.0","revisions":[{"revId":"136","modDate":"2014-11-23 19:50:51.0"}],"comments":[],"replies":[]}

我可以阅读:

val df = sqlContext.read.json("./data/full.json")

我可以用 df.printSchema

root
 |-- comments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- comId: string (nullable = true)
 |    |    |-- content: string (nullable = true)
 |-- createHour: string (nullable = true)
 |-- gid: string (nullable = true)
 |-- replies: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- content: string (nullable = true)
 |    |    |-- repId: string (nullable = true)
 |-- revisions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- modDate: string (nullable = true)
 |    |    |-- revId: string (nullable = true)

我可以显示数据 df.show(10,false)

+---------------------+---------------------+---+-------------------+---------------------------------------------------------+
|comments             |createHour           |gid|replies            |revisions                                                |
+---------------------+---------------------+---+-------------------+---------------------------------------------------------+
|[]                   |2014-10-20 01:00:00.0|111|[]                 |[[2014-11-20 01:40:37.0,2], [2014-11-20 01:40:40.0,4]]   |
|[]                   |2014-12-20 01:00:00.0|222|[]                 |[[2014-11-20 01:39:31.0,2], [2014-11-20 01:39:34.0,4]]   |
|[[4432,How are you?]]|2015-01-21 00:00:00.0|333|[[I am good.,4441]]|[[2014-11-21 00:34:53.0,25], [2014-11-21 00:47:10.0,110]]|
|[]                   |2015-09-20 23:00:00.0|444|[]                 |[[2014-11-20 23:23:47.0,2]]                              |
|[]                   |2016-01-21 01:00:00.0|555|[]                 |[[2014-11-21 01:01:58.0,135]]                            |
|[]                   |2016-04-23 19:00:00.0|666|[]                 |[[2014-11-23 19:50:51.0,136]]                            |
+---------------------+---------------------+---+-------------------+---------------------------------------------------------+

我可以打印/读取架构 val dfSc = df.schema 为:

I can print / read the schema val dfSc = df.schema as:

StructType(StructField(comments,ArrayType(StructType(StructField(comId,StringType,true), StructField(content,StringType,true)),true),true), StructField(createHour,StringType,true), StructField(gid,StringType,true), StructField(replies,ArrayType(StructType(StructField(content,StringType,true), StructField(repId,StringType,true)),true),true), StructField(revisions,ArrayType(StructType(StructField(modDate,StringType,true), StructField(revId,StringType,true)),true),true))

我可以更好地打印出来:

I can print this out nicer:

println(df.schema.fields.mkString(",\n"))
StructField(comments,ArrayType(StructType(StructField(comId,StringType,true), StructField(content,StringType,true)),true),true),
StructField(createHour,StringType,true),
StructField(gid,StringType,true),
StructField(replies,ArrayType(StructType(StructField(content,StringType,true), StructField(repId,StringType,true)),true),true),
StructField(revisions,ArrayType(StructType(StructField(modDate,StringType,true), StructField(revId,StringType,true)),true),true)

现在,如果我在没有 commentsreplys 行的同一个文件中阅读,val df2 = sqlContext.read.json("./data/partialRevOnly.json") 只需删除这些行,我用 printSchema 得到类似的东西:

Now if I read in the same file without the comments and replies row, with val df2 = sqlContext.read. json("./data/partialRevOnly.json") simply deleting those rows, I get something like this with printSchema:

root
 |-- comments: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- createHour: string (nullable = true)
 |-- gid: string (nullable = true)
 |-- replies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- revisions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- modDate: string (nullable = true)
 |    |    |-- revId: string (nullable = true)

我不喜欢那样,所以我使用:

I don't like that, so I use:

val df3 = sqlContext.read.
  schema(dfSc).
  json("./data/partialRevOnly.json")

原始模式是 dfSc.所以现在我得到了之前删除数据时的模式:

where the original schema was dfSc. So now I get exactly the schema I had before with the removed data:

root
 |-- comments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- comId: string (nullable = true)
 |    |    |-- content: string (nullable = true)
 |-- createHour: string (nullable = true)
 |-- gid: string (nullable = true)
 |-- replies: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- content: string (nullable = true)
 |    |    |-- repId: string (nullable = true)
 |-- revisions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- modDate: string (nullable = true)
 |    |    |-- revId: string (nullable = true)

这太完美了……差不多了.我想将此架构分配给类似于此的变量:

This is perfect ... well almost. I would like to assign this schema to a variable similar to this:

val textSc =  StructField(comments,ArrayType(StructType(StructField(comId,StringType,true), StructField(content,StringType,true)),true),true),
    StructField(createHour,StringType,true),
    StructField(gid,StringType,true),
    StructField(replies,ArrayType(StructType(StructField(content,StringType,true), StructField(repId,StringType,true)),true),true),
    StructField(revisions,ArrayType(StructType(StructField(modDate,StringType,true), StructField(revId,StringType,true)),true),true)

好的 - 由于双引号和其他一些结构性"的东西,这将不起作用,所以试试这个(有错误):

OK - This won't work due to double quotes, and 'some other structural' stuff, so try this (with error):

import org.apache.spark.sql.types._

val textSc = StructType(Array(
    StructField("comments",ArrayType(StructType(StructField("comId",StringType,true), StructField("content",StringType,true)),true),true),
    StructField("createHour",StringType,true),
    StructField("gid",StringType,true),
    StructField("replies",ArrayType(StructType(StructField("content",StringType,true), StructField("repId",StringType,true)),true),true),
    StructField("revisions",ArrayType(StructType(StructField("modDate",StringType,true), StructField("revId",StringType,true)),true),true)
))

Name: Compile Error
Message: <console>:78: error: overloaded method value apply with alternatives:
  (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
 cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField)
           StructField("comments",ArrayType(StructType(StructField("comId",StringType,true), StructField("content",StringType,true)),true),true),

...没有这个错误(我无法快速解决),我想然后使用 textSc 代替 dfSc 来读取 JSON具有强加模式的数据.

... Without this error (that I cannot figure a quick way around), I would like to then use textSc in place of dfSc to read in the JSON data with an imposed schema.

我找不到一种一对一匹配"的方式来获取(通过 println 或 ...)具有可接受语法的模式(有点像上面).我想可以通过大小写匹配来完成一些编码以消除双引号.但是,我仍然不清楚需要什么规则才能从测试装置中获取确切的模式,我可以简单地在我的重复生产(相对于测试装置)代码中重复使用.有没有办法让这个模式完全按照我的编码打印?

I cannot find a '1-to-1 match' way of getting (via println or ...) the schema with acceptable syntax (sort of like above). I suppose some coding can be done with case matching to iron out the double quotes. However, I'm still unclear what rules are required to get the exact schema out of the test fixture that I can simply re-use in my recurring production (versus test fixture) code. Is there a way to get this schema to print exactly as I would code it?

注意:这包括双引号和所有正确的 StructField/Types 等,以便与代码兼容.

Note: This includes double quotes and all the proper StructField/Types and so forth to be code-compatible drop in.

作为侧边栏,我想保存一个完整格式的黄金 JSON 文件以在 Spark 作业开始时使用,但我希望最终在适用的结构位置使用日期字段和其他更简洁的类型而不是字符串.

As a sidebar, I thought about saving a fully-formed golden JSON file to use at the start of the Spark job, but I would like to eventually use date fields and other more concise types instead of strings at the applicable structural locations.

如何从我的测试工具中获取数据帧信息(使用带有注释和回复的完整 JSON 输入行)到我可以将模式作为源代码放入生产代码 Scala Spark 作业的程度?

How can I get the dataFrame information coming out of my test harness (using a fully-formed JSON input row with comments and replies) to a point where I can drop the schema as source-code into production code Scala Spark job?

注意:最好的答案是一些编码方法,但一个解释让我可以通过编码跋涉,沉闷,辛劳,涉水,犁地和跋涉也很有帮助.:)

Note: The best answer is some coding means, but an explanation so I can trudge, plod, toil, wade, plow and slog thru the coding is helpful too. :)

推荐答案

好吧,错误消息应该告诉您这里必须知道的一切 - StructType 需要一个字段序列作为参数.因此,在您的情况下,架构应如下所示:

Well, the error message should tell you everything you have to know here - StructType expects a sequence of fields as an argument. So in your case schema should look like this:

StructType(Seq(
  StructField("comments", ArrayType(StructType(Seq(       // <- Seq[StructField]
    StructField("comId", StringType, true),
    StructField("content", StringType, true))), true), true), 
  StructField("createHour", StringType, true),
  StructField("gid", StringType, true),
  StructField("replies", ArrayType(StructType(Seq(        // <- Seq[StructField]
    StructField("content", StringType, true),
    StructField("repId", StringType, true))), true), true),
  StructField("revisions", ArrayType(StructType(Seq(      // <- Seq[StructField]
    StructField("modDate", StringType, true),
    StructField("revId", StringType, true))),true), true)))

这篇关于使用 Scala 在 Spark DataFrame 中重用来自 JSON 的架构的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆