转换JSON对象RDD [英] Convert JSON objects to RDD

查看:3158
本文介绍了转换JSON对象RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我不知道,如果这个问题是一个重复,但不知何故似乎所有我遇到答不为我工作(也许我做错了什么)。

I don't know if this question is a repetition but somehow all the answers I came across don't seem to work for me (maybe I'm doing something wrong).

我有一个因此类中定义的:

I have a class defined thus:

case class myRec(
                 time: String,
                 client_title: String,
                 made_on_behalf: Double,
                 country: String,
                 email_address: String,
                 phone: String)

和包含的形式记录或对象的示例JSON文件

and a sample Json file that contains records or objects in the form

[{...}{...}{...}...] 

[{"time": "2015-05-01 02:25:47",
"client_title": "Mr.",
"made_on_behalf": 0,
"country": "Brussel",
"email_address": "15e29034@gmail.com"},
{"time": "2015-05-01 04:15:03",
"client_title": "Mr.",
"made_on_behalf": 0,
"country": "Bundesliga",
"email_address": "aae665d95c5d630@aol.com"},
{"time": "2015-05-01 06:29:18",
"client_title": "Mr.",
"made_on_behalf": 0,
"country": "Japan",
"email_address": "fef412c714ff@yahoo.com"}...]

我的 build.sbt libraryDependencies + =com.owlike%genson-scala_2.11%1.3 scalaVersion:=2.11.7

我有一个这样的Scala函数中定义的

I have a scala function defined thus

//PS: Other imports already made
import com.owlike.genson.defaultGenson_

//PS: Spark context already defined
def prepData(infile:String):RDD[myRec] = {

val input = sc.textFile(infile)
//Read Json Data into my Record Case class
input.mapPartitions( records =>
  records.map( record => fromJson[myRec](record))
)}

和我打电话的功能

prepData("file://path/to/abc.json")

是否有这样做或任何方式有任何其他的JSON库,我可以使用转换为RDD

Is there any way of doing this or is there any other Json library I can use to convert to RDD

我也试过这也和这两个似乎不工作

I also tried this too and both don't seem to work

<一个href=\"https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/BasicParseJsonWithJackson.scala\"相对=nofollow>使用ScalaObjectMapper

PS:我不想去通过火花SQL 来处理JSON文件

PS: I don't want to go through spark SQL to process the json file

谢谢!

推荐答案

JYD,不使用SQL星火为JSON是一个有趣的选择,但它非常可行的。有(我的合着者所以有点偏的一个声明)怎么做,这是在学习星火书上的例子一个例子。这些例子在github https://github.com/databricks/learning-spark ,但在这里是相关code片断:

Jyd, not using Spark SQL for JSON is an interesting choice, but its very much doable. There is an example of how to do this is in the Learning Spark book's examples (disclaimer I am one of the co-authors so a little biased). The examples are on github https://github.com/databricks/learning-spark, but here is the relevant code snippet:

case class Person(name: String, lovesPandas: Boolean) // Note: must be a top level class

object BasicParseJsonWithJackson {

  def main(args: Array[String]) {
    if (args.length < 3) {
      println("Usage: [sparkmaster] [inputfile] [outputfile]")
      exit(1)
      }
    val master = args(0)
    val inputFile = args(1)
    val outputFile = args(2)
    val sc = new SparkContext(master, "BasicParseJsonWithJackson", System.getenv("SPARK_HOME"))
    val input = sc.textFile(inputFile)

    // Parse it into a specific case class. We use mapPartitions beacuse:
    // (a) ObjectMapper is not serializable so we either create a singleton object encapsulating ObjectMapper
    //     on the driver and have to send data back to the driver to go through the singleton object.
    //     Alternatively we can let each node create its own ObjectMapper but that's expensive in a map
    // (b) To solve for creating an ObjectMapper on each node without being too expensive we create one per
    //     partition with mapPartitions. Solves serialization and object creation performance hit.
    val result = input.mapPartitions(records => {
        // mapper object created on each executor node
        val mapper = new ObjectMapper with ScalaObjectMapper
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
        mapper.registerModule(DefaultScalaModule)
        // We use flatMap to handle errors
        // by returning an empty list (None) if we encounter an issue and a
        // list with one element if everything is ok (Some(_)).
        records.flatMap(record => {
          try {
            Some(mapper.readValue(record, classOf[Person]))
          } catch {
            case e: Exception => None
          }
        })
    }, true)
    result.filter(_.lovesPandas).mapPartitions(records => {
      val mapper = new ObjectMapper with ScalaObjectMapper
      mapper.registerModule(DefaultScalaModule)
      records.map(mapper.writeValueAsString(_))
    })
      .saveAsTextFile(outputFile)
    }
}

请注意这里使用杰克逊(特别是com.fasterxml.jackson.core%杰克逊 - 数据绑定%2.3.3&放大器; com.fasterxml.jackson.module%杰克逊 - 模块scala_2.10%2.3.3依赖)。

Note this uses Jackson (specifically "com.fasterxml.jackson.core" % "jackson-databind" % "2.3.3" & "com.fasterxml.jackson.module" % "jackson-module-scala_2.10" % "2.3.3" dependencies).

我只注意到你的问题有一些样本输入和@ zero323由行解析指出行没有去上班。相反,你会怎么做:

I just noticed that your question had some sample input and as @zero323 pointed out line by line parsing isn't going to work. Instead you would do:

    val input = sc.wholeTextFiles(inputFile).map(_._2)

    // Parse it into a specific case class. We use mapPartitions beacuse:
    // (a) ObjectMapper is not serializable so we either create a singleton object encapsulating ObjectMapper
    //     on the driver and have to send data back to the driver to go through the singleton object.
    //     Alternatively we can let each node create its own ObjectMapper but that's expensive in a map
    // (b) To solve for creating an ObjectMapper on each node without being too expensive we create one per
    //     partition with mapPartitions. Solves serialization and object creation performance hit.
    val result = input.mapPartitions(records => {
        // mapper object created on each executor node
        val mapper = new ObjectMapper with ScalaObjectMapper
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
        mapper.registerModule(DefaultScalaModule)
        // We use flatMap to handle errors
        // by returning an empty list (None) if we encounter an issue and a
        // list with one element if everything is ok (List(_)).
        records.flatMap(record => {
          try {
            mapper.readValue(record, classOf[List[Person]])
          } catch {
            case e: Exception => None
          }
        })
    })

这篇关于转换JSON对象RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆