星火:广播杰克逊ObjectMapper [英] Spark: broadcasting jackson ObjectMapper

查看:251
本文介绍了星火:广播杰克逊ObjectMapper的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个火花应用程序,它读取一个文件的线条和尝试使用杰克逊反序列化它们。
为了得到这个code的工作,我需要定义Map操作(否则我得到一个NullPointerException异常)。

I have a spark application which reads lines from a files and tries to deserialize them using jackson. To get this code to work, I needed to define the ObjectMapper inside the Map operation (otherwise I got a NullPointerException).

我有以下的code这是工作的:

I have the following code which is working:

val alertsData = sc.textFile(rawlines).map(alertStr => {
      val mapper = new ObjectMapper()
      mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
      mapper.registerModule(DefaultScalaModule)
      broadcastVar.value.readValue(alertStr, classOf[Alert])
    })

不过,如果我定义映射器的地图之外,并播放它时,出现一个NullPointerException异常。

However, If I define the mapper outside the map and broadcast it, it fails with a NullPointerException.

这code失败:

val mapper = new ObjectMapper()
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
    mapper.registerModule(DefaultScalaModule)
    val broadcastVar = sc.broadcast(mapper)

    val alertsData = sc.textFile(rawlines).map(alertStr => {
      broadcastVar.value.readValue(alertStr, classOf[Alert])
    })

我是什么在这里失踪?

What am I missing here?

谢谢,
阿里扎

Thanks, Aliza

推荐答案

原来你的可以的广播映射器。有问题的部分是 mapper.registerModule(DefaultScalaModule)这就需要将执行每个从站(执行器)的机器上,而不是仅在驱动程序。

It turns out you can broadcast the mapper. The problematic part was mapper.registerModule(DefaultScalaModule) which needs to be execute on each slave (executor) machine and not only on the driver.

所以这code工作:

val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
val broadcastVar = sc.broadcast(mapper)

val alertsData = sc.textFile(rawlines).map(alertStr => {
      broadcastVar.value.registerModule(DefaultScalaModule)
      broadcastVar.value.readValue(alertStr, classOf[Alert])
})

我运行registerModule每个分区只有一次进一步优化了code(而不是在每个RDD元素)。

I further optimised the code by running registerModule only once per partition (and not for each element in the RDD).

val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

val broadcastVar = sc.broadcast(mapper)
val alertsRawData = sc.textFile(rawlines)

val alertsData = alertsRawData.mapPartitions({ iter: Iterator[String] => broadcastVar.value.registerModule(DefaultScalaModule)
      for (i <- iter) yield broadcastVar.value.readValue(i, classOf[Alert]) })

阿里扎

这篇关于星火:广播杰克逊ObjectMapper的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆