如何在Task中创建RDD? [英] How to create RDD from within Task?

查看:62
本文介绍了如何在Task中创建RDD?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

通常,从列表创建RDD时,只能使用 SparkContext.parallelize 方法,但是由于无法序列化,因此无法在Task中使用spark上下文.我需要根据任务中的字符串列表创建RDD.有办法吗?

Normally when creating an RDD from a List you can just use the SparkContext.parallelize method, but you can not use the spark context from within a Task as it's not serializeable. I have a need to create an RDD from a list of Strings from within a task. Is there a way to do this?

我尝试在任务中创建一个新的SparkContext,但是它给我一个错误,提示我在同一JVM中不支持多个Spark上下文,因此我需要设置 spark.driver.allowMultipleContexts = true .根据Apache用户组的说法,

I've tried creating a new SparkContext in the task, but it gives me an error about not supporting multiple spark contexts in the same JVM and that I need to set spark.driver.allowMultipleContexts = true. According to the Apache User Group, that setting however does not yet seem to be supported

推荐答案

就我而言,这是不可能的,这几乎不是序列化问题或对多个Spark上下文的支持.一个基本的限制是核心Spark架构.由于Spark上下文由驱动程序维护,并且任务是在工作线程上执行的,因此从任务内部创建RDD要求将更改从工作线程推送到驱动程序.我并不是说这在技术上是不可能的,但是整个想法似乎很麻烦.

As far as I am concerned it is not possible and it is hardly a matter of serialization or a support for multiple Spark contexts. A fundamental limitation is a core Spark architecture. Since Spark context is maintained by a driver and tasks are executed on the workers creating a RDD from inside a task would require pushing changes from workers to a driver. I am not saying it is technically impossible but a whole ideas seems to be rather cumbersome.

从内部任务创建Spark上下文看起来更加糟糕.首先,这意味着在工作人员上创建了上下文,出于所有实际目的,它们不会相互通信.每个工作人员将获得自己的上下文,该上下文只能对给定工作人员可访问的数据进行操作.最后,保持工作人员状态绝对不是合同的一部分,因此在任务内部创建的任何上下文都应该只是在任务完成后进行垃圾回收.

Creating Spark context from inside tasks looks even worse. First of all it would mean that context is created on the workers, which for all practical purposes don't communicate with each other. Each worker would get its own context which could operate only on a data that is accessible on given worker. Finally preserving worker state is definitely not a part of the contract so any context create inside a task should be simply garbage collected after the task is finished.

如果不是使用多个作业来处理问题,则可以尝试使用 mapPartitions 这样的方式:

If handling the problem using multiple jobs is not an option you can try to use mapPartitions like this:

val rdd = sc.parallelize(1 to 100)

val tmp = rdd.mapPartitions(iter => {

  val results = Map(
    "odd" -> scala.collection.mutable.ArrayBuffer.empty[Int],
    "even" -> scala.collection.mutable.ArrayBuffer.empty[Int]
  )

  for(i <- iter) {
    if (i % 2 != 0) results("odd") += i
    else results("even") += i
  }

  Iterator(results)
})

val odd = tmp.flatMap(_("odd"))
val even = tmp.flatMap(_("even"))

这篇关于如何在Task中创建RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆