在Spark作业中使用Future [英] Using Future inside of a spark job

查看:286
本文介绍了在Spark作业中使用Future的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在一个RDD上同时执行2个操作.我已经写了这样的代码

I want to perform 2 operations on a single RDD concurrently. I have written code like this

val conf = new SparkConf().setAppName("Foo")
val sc = new SparkContext(conf)
val sqlSc = new SQLContext(sc)
sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")
val inputPath = path
val rdd = sc.textFile(inputPath).cache()

val f1 = Future {
  val schama1 = StructType(List(StructField("a", StringType, true), StructField("b", StringType, true), StructField("c", LongType, true)))
  val rdd1 = rdd.map(func1).filter(_.isDefined).flatMap(x => x)
  val df1 = sqlSc.createDataFrame(rdd, schema)
  formSubmissionDataFrame.save("/foo/", "com.databricks.spark.avro")
  0
}

val f2 = Future {
  val schema2 = StructType(List(StructField("d", StringType, true), StructField("e", StringType, true)))
  val rdd2 = rdd.map(func2).filter(_.isDefined).flatMap(x => x)
  val df2 = sqlSc.createDataFrame(rdd2, schema2)
  pageViewDataFrame.save("/bar/", "com.databricks.spark.avro")
  0
}

val result = for {
  r1 <- f1
  r2 <- f2
} yield(r1 + r2)

result onSuccess{
  case r => println("done")
}

Await.result(result, Duration.Inf)

运行此代码时,看不到预期的效果.目录栏上有很多临时文件,等等...但是foo没有什么...所以看来这两个数据集不是并行创建的.

When I am running this code, I don't see the desired effect. the directory bar has lots of temporary files etc... but foo has nothing... so it seems the two datasets are not being created in parallel.

在spark驱动程序中使用Future是个好主意吗?我做得对吗?我该怎么做?

Is it a good idea to use a future inside the spark driver? am I doing it correctly? should I do anything differently.

推荐答案

要并行执行两个或多个Spark JOBS(动作),Spark上下文需要在FAIR调度程序模式下运行.

For executing two or more Spark JOBS (actions) in parallel, the Spark Context needs to be running in FAIR scheduler mode.

在用于所有转换的驱动程序中,仅生成依赖关系图以进行执行,但是实际执行仅在调用动作时发生.通常,驱动程序会在Spark从站管理的节点之间执行时等待执行.在您的情况下,Spark Master直到第一个作业结束才开始执行第二个作业,因为默认情况下,Spark Scheduling是FIFO.

In the driver program for all transformation only the dependency graph is generated for execution however the actual execution happens only when an action is called. Typically the driver waits as the execution happens across nodes managed by Spark slaves. In your case the Spark Master doesn't start executing the second job till the first one is over, because by default Spark Scheduling is FIFO.

您可以按如下所示设置conf以启用并行执行

You can set the conf as follows to enable parallel execution

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

有关详细信息,请访问应用程序

For details visit Spark Scheduling within an application

这篇关于在Spark作业中使用Future的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆