在Scala Spark中未调用RDD的Map函数 [英] Map function of RDD not being invoked in Scala Spark

查看:400
本文介绍了在Scala Spark中未调用RDD的Map函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我调用RDD的地图函数时,未被应用.它对于scala.collection.immutable.List可以正常工作,但对于RDD则不能.这是一些代码来说明:

When I call the map function of an RDD is is not being applied. It works as expected for a scala.collection.immutable.List but not for an RDD. Here is some code to illustrate :

val list = List ("a" , "d" , "c" , "d")
list.map(l => {
  println("mapping list")
})

val tm = sc.parallelize(list)
tm.map(m => {
  println("mapping RDD")
})

以上代码的结果是:

mapping list
mapping list
mapping list
mapping list

但是请注意,映射RDD"未打印到屏幕上.为什么会这样?

But notice "mapping RDD" is not printed to screen. Why is this occurring ?

这是一个较大的问题的一部分,在该问题中,我尝试从RDD填充HashMap:

This is part of a larger issue where I am trying to populate a HashMap from an RDD :

  def getTestMap( dist: RDD[(String)]) = {

    var testMap = new java.util.HashMap[String , String]();

    dist.map(m => {
      println("populating map")
      testMap.put(m , m)

    })
    testMap
  }
val testM = getTestMap(tm)
println(testM.get("a"))

此代码显示为空

这是由于懒惰的评估吗?

Is this due to lazy evaluation ?

推荐答案

惰性评估可能是其中的一部分.在操作(以Spark术语)之前,Spark不会安排执行.在RDD谱系中被请求.

Lazy evaluation might be part of this, if map is the only operation you are executing. Spark will not schedule execution until an action (in Spark terms) is requested on the RDD lineage.

执行动作时,将发生println,但不会发生在您期望其执行的驱动程序上,而是发生在执行该关闭操作的从属设备上.尝试查看工人的日志.

When you execute an action, the println will happening, but not on the driver where you are expecting it but rather on the slave executing that closure. Try looking into the logs of the workers.

在问题的第二部分,hashMap群体也发生了类似的情况.相同的代码段将在每个分区上的不同工作线程上执行,并将被序列化回驱动程序.鉴于Spark已清除"了关闭包,可能已从序列化关闭包中删除了testMap,从而导致了null.请注意,如果只是由于未执行map,则哈希映射应该为空,而不是null.

A similar thing is happening on the hashMap population in the 2nd part of the question. The same piece of code will be executed on each partition, on separate workers and will be serialized back to the driver. Given that closures are 'cleaned' by Spark, probably testMap is being removed from the serialized closure, resulting in a null. Note that if it was only due to the map not being executed, the hashmap should be empty, not null.

如果要将RDD的数据传输到另一个结构,则需要在驱动程序中执行此操作.因此,您需要强制Spark将所有数据传递给驱动程序.这就是rdd.collect()的功能.

If you want to transfer the data of the RDD to another structure, you need to do that in the driver. Therefore you need to force Spark to deliver all the data to the driver. That's the function of rdd.collect().

这应该适合您的情况.请注意,所有RDD数据都应适合驱动程序的内存:

This should work for your case. Be aware that all the RDD data should fit in the memory of your driver:

import scala.collection.JavaConverters._
def getTestMap(dist: RDD[(String)]) =  dist.collect.map(m => (m , m)).toMap.asJava

这篇关于在Scala Spark中未调用RDD的Map函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆