并行性:rdd.parallelize(....)vs dataSet.map(...)? [英] Parallelism: rdd.parallelize(....) vs dataSet.map(...)?

查看:67
本文介绍了并行性:rdd.parallelize(....)vs dataSet.map(...)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经使用DataFrame/DataSet和RDD实现了一个Spark应用程序.我将应用程序提交到了我的Spark 2.1.1本地开发环境.我的电脑有八个 CPU内核.

I have implemented a Spark application using both DataFrame/DataSet and RDD. I submitted the application to my local development environment of Spark 2.1.1. My PC has eight CPU cores.

DateFrame/DataSet

val date : LocalDate = ....
val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val itemListJob = new ItemList(sqlContext, jdbcSqlConn)
import sqlContext.implicits._ 
val processed = itemListJob.run(rc, priority).select("id").map(d => {
  val (a, b) = runJob.run(d, date) // returns a tuple of (int, java.sql.Date), which are the passed parameters.
  s"$a, $b"
})

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
  def run(date: LocalDate) = {
    import sqlContext.implicits._ 
    sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"dbo.GetList('$date')"
    )).load()
    .select("id") 
    .as[Int] 
  }
}
processed.write.text("c:\\temp\\mpa")

RDD

val itemList = itemListJob.run(rc, priority).select("id").rdd.map(r => r(0).asInstanceOf[Int]).collect()

val processed = sc.parallelize(itemList).map(d => {
  runJob.run(d, rc) // returns a tuple of (int, LocalDate), which are the passed parameters.
})
processed.saveAsTextFile("c:\\temp\\mpa")

RDD应用程序拆分并生成了八个文本文件,而Dataframe/DataSet一个仅生成了一个文件.这是否意味着RDD并行运行了八个 runJob.run(),而DataFrame/DataSet方法仅一次运行了一个而没有并发?

The RDD application split and generated eight text files while the Dataframe/DataSet one generated only one file. Does it mean the RDD ran eight runJob.run() in parallel while the DataFrame/DataSet approach only ran one a time without concurrency?

我希望runJob.run()既可以分散运行,又可以并行运行,它既要处理主要工作量,也要进行jdbc调用.

I want the runJob.run(), which does the main workload and will also make a jdbc call, to be run distributed and in parallel.

推荐答案

是的,生成的文件数量很好地表明了最后一步的并行性.(我可以想到一些极端的情况,虽然可能并非如此,但这在这里无关紧要)

Yes, the number of files produced is a good indicator of the parallelism in the last step. (I can think of a few corner cases where this might not be the case, but that's irrelevant here)

sc.parallelize 在本地运行时,应根据内核数进行拆分.

sc.parallelize when running locally should split according to the number of cores.

但是,在两种情况下,您将仅使用1个内核来读取jdbc连接,而在RDD情况下,您还需要 collect()数据返回驱动程序,然后 parallelize 返回任务.

However, in both cases, you would be using only 1 core to read over the jdbc connection, and in the RDD case, you additionally collect() the data back to the driver, then parallelize back to the task.

首选方法是使用 repartition ,而不是 collect ,然后使用 parallelize .更好的是始终并行执行操作.如果通过jdbc加载数据帧,请查看是否使用参数 partitionColumn,lowerBound,upperBound,numPartition (

The preferred approach is to use repartition rather than collect and then parallelize. And even better would be to always be doing things in parallel. In the case of loading the data frame over jdbc, take a look at whether using the parameters partitionColumn, lowerBound, upperBound, numPartition (link) might be applicable in order to run in parallel from the very start.

这篇关于并行性:rdd.parallelize(....)vs dataSet.map(...)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆