如何在地图内填充变量-Scala Spark [英] How to fill a variable inside a map - Scala Spark

查看:54
本文介绍了如何在地图内填充变量-Scala Spark的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我必须读取一个文本文件并读取它,以将其值保存为变量类型

I have to read a text file and read it to save its values in a variable type

Map [Int,collection.mutable.Map [Int,Double]] .

我已经用一个foreach和广播变量来完成它,它在我的本地机器上可以正常工作,但是在纱线簇上却不能.Foreach任务花费的时间太多,而本地计算机仅花费1分钟即可完成相同的任务.

I have done it with a foreach and a broadcast variable, and it works properly in my local machine but it does not in a yarn-cluster. Foreach task takes too much time with the same task that in my local computer takes only 1 minute.

val data = sc.textFile(fileOriginal)

val dataRDD = parsedData.map(s => s.split(';').map(_.toDouble)).cache()

val datos = collection.mutable.Map[Int, collection.mutable.Map[Int, Double]]()
val bcDatos = sc.broadcast(datos)


dataRDD.foreach { case x =>

  if (bcDatos.value.contains(x.apply(0).toInt)) {
    bcDatos.value(x.apply(0).toInt).put(x.apply(1).toInt, x.apply(2) / x.apply(3) * 100)

  } else {
    bcDatos.value.put(x.apply(0).toInt, collection.mutable.Map((x.apply(1).toInt, x.apply(2) / x.apply(3) * 100)))
  }
}

我的问题是:如何使用地图进行相同的操作?我可以在地图内使用该结构填充"变量吗?

My question is: How can I do the same, but using map? Can I "fill" a variable with that structure inside a map?

谢谢

推荐答案

使用Spark时-您永远从不尝试以分布式方式使用可变结构-根本不支持.如果您更改在驱动程序代码中创建的变量(无论是否使用广播),则该变量的副本将分别在每个执行程序上进行更改,并且您将永远无法合并"这些更改部分结果,然后将其发送回驱动程序.

When using Spark - you should never try using mutable structures in distributed manner - that's simply not supported. If you mutate a variable created in driver code (whether using broadcast or not), a copy of that variable will be mutated on each executor separately, and you'll never be able to "merge" these mutated partial results and send them back to the driver.

相反-您应该使用所需的数据将您的RDD 转换成新的(不变的!)RDD.

Instead - you should transform your RDD into a new (immutable!) RDD with the data you need.

如果我设法正确地遵循了您的逻辑-这将为您提供所需的地图:

If I managed to follow your logic correctly - this would give you the map you need:

// assuming dataRDD has type RDD[Array[Double]] and each Array has at least 4 items:
val result: Map[Int, Map[Int, Double]] = dataRDD
  .keyBy(_(0).toInt)
  .mapValues(arr => Map(arr(1).toInt -> arr(2) / arr(3) * 100))
  .reduceByKey((a, b) => a) // you probably want to "merge" maps "a" and "b" here, but your code doesn't seem to do that now either
  .collectAsMap()

这篇关于如何在地图内填充变量-Scala Spark的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆