RDD CountClose花费的时间远远超过请求的超时时间 [英] RDD CountApproximate taking far longer than requested timeout

查看:79
本文介绍了RDD CountClose花费的时间远远超过请求的超时时间的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

为了减少花费在收集 DataFrame 行的 count 行上的时间,正在调用 RDD.countrox().它具有以下签名:

  def countApprox(超时:很长,置信度:Double = 0.95):PartialResult [BoundedDouble] = withScope { 

我试图将输出计算限制为60秒.另请注意 0.10 非常低精度要求:

  val waitSecs = 60val cnt = inputDf.rdd.countApprox(waitSecs * 1000,0.10).getFinalValue.mean 

但是实际时间是.. 17分钟 ??

该时间与最初生成数据所需的时间几乎相同( 19 分钟)!

那么-这个api的用途是什么:有什么方法可以使它真正节省 exact 时间计算中有意义的部分?

TL; DR (请参见接受的答案):使用 initialValue 代替 getFinalValue

解决方案

请注意 approxCount 定义中的返回类型.这是部分结果.

  def countApprox(超时:很长,置信度:Double = 0.95):PartialResult [BoundedDouble] = withScope { 

现在,请注意其用法:

  val waitSecs = 60val cnt = inputDf.rdd.countApprox(waitSecs * 1000,0.10).** getFinalValue **.mean 

根据spark scala文档,

cntApprxInitial 在完成所有任务之前可用.

希望,这会有所帮助!

In an attempt to reduce the time spent on gathering counts of DataFrame rows the RDD.countApproximate() is being invoked. It has the following signature:

def countApprox(
    timeout: Long,
    confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {

I have attempted to limit the output calculation to sixty seconds. Notice also the very low accuracy requirement of 0.10:

val waitSecs = 60
val cnt = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).getFinalValue.mean

However the actual time is .. 17 minutes ??

That time is almost the same as what was required to generate the data in the first place (19 minutes) !

So then - what is the use of this api: is there any way to get it to actually save some meaningful fraction of the exact time calculation?

TL;DR (See accepted answer): use initialValue instead of getFinalValue

解决方案

Please note the return type in the approxCount definition. It's a partial result.

def countApprox(
    timeout: Long,
    confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {

Now, please pay attention on how it is being used:

val waitSecs = 60
val cnt = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).**getFinalValue**.mean

According to spark scala doc, getFinalValue is blocking method which means it will wait for complete operation to finish.

Whereas initialValue can be fetched within specified timeout. So the following snippet will not block further operations after timeout,

val waitSecs = 60
val cnt = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).initialValue.mean

Please note the downside of using countApprox(timeout, confidence).initialValue is that even after getting the value, it will continue counting till it get final count that you would have obtained using getFinalValue and still will hold the resources till operation is complete.

Now the use of this api is not to get blocked at count operation.

Reference: https://mail-archives.apache.org/mod_mbox/spark-user/201505.mbox/%3C747872034.1520543.1431544429083.JavaMail.yahoo@mail.yahoo.com%3E

Now lets validate our assumption of non blocking operation on spark2-shell. Lets create random dataframe and perform count, approxCount with getFinalValue and approxCount with initialValue:

scala> val schema = StructType((0 to 10).map(n => StructField(s"column_$n", StringType)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(column_0,StringType,true), StructField(column_1,StringType,true), StructField(column_2,StringType,true), StructField(column_3,StringType,true), StructField(column_4,StringType,true), StructField(column_5,StringType,true), StructField(column_6,StringType,true), StructField(column_7,StringType,true), StructField(column_8,StringType,true), StructField(column_9,StringType,true), StructField(column_10,StringType,true))

scala> val rows = spark.sparkContext.parallelize(Seq[Row](), 100).mapPartitions { _ => { Range(0, 100000).map(m => Row(schema.map(_ => Random.alphanumeric.filter(_.isLower).head.toString).toList: _*)).iterator } }
rows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[1] at mapPartitions at <console>:32

scala> val inputDf = spark.sqlContext.createDataFrame(rows, schema)
inputDf: org.apache.spark.sql.DataFrame = [column_0: string, column_1: string ... 9 more fields]

//Please note that cnt will be displayed only when all tasks are completed
scala> val cnt = inputDf.rdd.count
cnt: Long = 10000000

scala> val waitSecs = 60
waitSecs: Int = 60

//cntApproxFinal will be displayed only when all tasks are completed.
scala> val cntApprxFinal = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).getFinalValue.mean
[Stage 1:======================================================> (98 + 2) / 100]cntApprxFinal: Double = 1.0E7

scala> val waitSecs = 60
waitSecs: Int = 60

//Please note that cntApprxInitila in this case, will be displayed exactly after timeout duration. In this case 80 tasks were completed within timeout and it displayed the value of variable. Even after displaying the variable value, it continued will all the remaining tasks
scala> val cntApprxInitial = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).initialValue.mean
[Stage 2:============================================>           (80 + 4) / 100]cntApprxInitial: Double = 1.0E7

[Stage 2:=======================================================>(99 + 1) / 100]

Let's have look at spark ui and spark-shell, all 3 operations took same time:

cntApprxInitial is available before completion of all tasks.

Hope, this helps!

这篇关于RDD CountClose花费的时间远远超过请求的超时时间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆