在Apache Spark中,我可以增量缓存RDD分区吗? [英] In Apache Spark, can I incrementally cache an RDD partition?

查看:86
本文介绍了在Apache Spark中,我可以增量缓存RDD分区吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的印象是RDD的执行和缓存都是惰性的:也就是说,如果缓存了RDD,并且仅使用了一部分,则缓存机制将仅缓存该部分,而另一部分将被计算按需.

I was under the impression that both RDD execution and caching are lazy: Namely, if an RDD is cached, and only part of it was used, then the caching mechanism will only cache that part, and the other part will be computed on-demand.

不幸的是,以下实验似乎表明并非如此:

Unfortunately, the following experiment seems to indicate otherwise:

      val acc = new LongAccumulator()
      TestSC.register(acc)

      val rdd = TestSC.parallelize(1 to 100, 16).map { v =>
        acc add 1
        v
      }

      rdd.persist()

      val sliced = rdd
        .mapPartitions { itr =>
          itr.slice(0, 2)
        }

      sliced.count()

      assert(acc.value == 32)

运行它会产生以下异常:

Running it yields the following exception:

100 did not equal 32
ScalaTestFailureLocation: 
Expected :32
Actual   :100

结果是,整个RDD的计算结果,而不是每个分区中仅前2个项目的计算结果.在某些情况下(例如,当您需要确定RDD是否快速为空时),这是非常低效的.理想情况下,缓存管理器应允许对缓存缓冲区进行增量写入并随机访问,此功能是否存在?如果没有,我应该怎么做才能做到这一点?(最好使用现有的内存和磁盘缓存机制)

Turns out the entire RDD was computed instead of only the first 2 items in each partition. This is very inefficient in some cases (e.g. when you need to determine whether the RDD is empty quickly). Ideally, the caching manager should allow the caching buffer to be incrementally written and accessed randomly, does this feature exists? If not, what should I do to make it happen? (preferrably using existing memory & disk caching mechanism)

非常感谢您的意见

更新1 似乎Spark已经有2个类:

UPDATE 1 It appears that Spark already has 2 classes:

  • ExternalAppendOnlyMap
  • ExternalAppendOnlyUnsafeRowArray

支持更精细地缓存许多值.更好的是,它们不依赖于StorageLevel,而是自己决定要使用哪个存储设备.但是,令我感到惊讶的是,它们不是直接用于RDD/Dataset缓存的选项,而不是用于co-group/join/streamOps或累加器的选项.

that supports more granular caching of many values. Even better, they don't rely on StorageLevel, instead make its own decision which storage device to use. I'm however surprised that they are not options for RDD/Dataset caching directly, rather than for co-group/join/streamOps or accumulators.

推荐答案

事后看来,这是我的看法:

In hindsight interesting, here is my take:

  • 您不能增量缓存.因此,您的问题的答案是否.

  • You cannot cache incrementally. So the answer to your question is No.

对于该RDD的所有分区, persist 是RDD,用于从同一公共RDD阶段开始进行多个处理的多个Action或单个Action.

The persist is RDD for all partitions of that RDD, used for multiple Actions or single Action with multiple processing from same common RDD phase onwards.

如果您使用 persist ,rdd Optimizer不会显示您声明的状态如何进行优化.您发出了该调用,方法和api,因此它可以执行它.

The rdd Optimizer does not look to see how that could be optimized as you state - if you use the persist. You issued that call, method, api, so it executes it.

但是,如果您不使用 persist ,则懒惰的评估和Stage中的代码融合似乎将切片基数和acc结合在一起.很清楚是否合乎逻辑,是的,因为在其他行动中没有其他参考文献.其他人可能会认为它是奇怪或错误的.但这并不意味着imo增量式持久性/缓存.

But, if you do not use the persist, the lazy evaluation and fusing of code within Stage, seems to tie the slice cardinality and the acc together. That is clear. Is it logical, yes as there is no further reference elsewhere as part of another Action. Others may see it as odd or erroneous. But it does not imply imo incremental persistence / caching.

因此,恕我直言,我不会想出有趣的观察方法,并且也不相信它能证明有关部分缓存的任何信息.

So, imho, interesting observation I would not have come up with, and not convinced it proves anything about partial caching.

这篇关于在Apache Spark中,我可以增量缓存RDD分区吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆