星火:广播杰克逊ObjectMapper [英] Spark: broadcasting jackson ObjectMapper
问题描述
我有一个火花应用程序,它读取一个文件的线条和尝试使用杰克逊反序列化它们。
为了得到这个code的工作,我需要定义Map操作(否则我得到一个NullPointerException异常)。
I have a spark application which reads lines from a files and tries to deserialize them using jackson. To get this code to work, I needed to define the ObjectMapper inside the Map operation (otherwise I got a NullPointerException).
我有以下的code这是工作的:
I have the following code which is working:
val alertsData = sc.textFile(rawlines).map(alertStr => {
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
broadcastVar.value.readValue(alertStr, classOf[Alert])
})
不过,如果我定义映射器的地图之外,并播放它时,出现一个NullPointerException异常。
However, If I define the mapper outside the map and broadcast it, it fails with a NullPointerException.
这code失败:
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
val broadcastVar = sc.broadcast(mapper)
val alertsData = sc.textFile(rawlines).map(alertStr => {
broadcastVar.value.readValue(alertStr, classOf[Alert])
})
我是什么在这里失踪?
What am I missing here?
谢谢,
阿里扎
Thanks, Aliza
推荐答案
原来你的可以的广播映射器。有问题的部分是 mapper.registerModule(DefaultScalaModule)
这就需要将执行每个从站(执行器)的机器上,而不是仅在驱动程序。
It turns out you can broadcast the mapper. The problematic part was mapper.registerModule(DefaultScalaModule)
which needs to be execute on each slave (executor) machine and not only on the driver.
所以这code工作:
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
val broadcastVar = sc.broadcast(mapper)
val alertsData = sc.textFile(rawlines).map(alertStr => {
broadcastVar.value.registerModule(DefaultScalaModule)
broadcastVar.value.readValue(alertStr, classOf[Alert])
})
我运行registerModule每个分区只有一次进一步优化了code(而不是在每个RDD元素)。
I further optimised the code by running registerModule only once per partition (and not for each element in the RDD).
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
val broadcastVar = sc.broadcast(mapper)
val alertsRawData = sc.textFile(rawlines)
val alertsData = alertsRawData.mapPartitions({ iter: Iterator[String] => broadcastVar.value.registerModule(DefaultScalaModule)
for (i <- iter) yield broadcastVar.value.readValue(i, classOf[Alert]) })
阿里扎
这篇关于星火:广播杰克逊ObjectMapper的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!