如何在不收集的情况下将RDD,Dataframe或Dataset直接转换为Broadcast变量? [英] How to transform RDD, Dataframe or Dataset straight to a Broadcast variable without collect?

查看:238
本文介绍了如何在不收集的情况下将RDD,Dataframe或Dataset直接转换为Broadcast变量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否有任何方法(或任何计划)可以将Spark分布式集合(RDDDataframeDataset s)直接转换为Broadcast变量而无需collect?公用API似乎没有开箱即用"的功能,但是可以在较低级别上做些什么吗?

Is there any way (or any plans) to be able to turn Spark distributed collections (RDDs, Dataframe or Datasets) directly into Broadcast variables without the need for a collect? The public API doesn't seem to have anything "out of box", but can something be done at a lower level?

我可以想象这些操作有2倍的加速潜力(或更多?).为了详细解释我的意思,我们来看一个示例:

I can imagine there is some 2x speedup potential (or more?) for these kind of operations. To explain what I mean in detail let's work through an example:

val myUberMap: Broadcast[Map[String, String]] =
  sc.broadcast(myStringPairRdd.collect().toMap)

someOtherRdd.map(someCodeUsingTheUberMap)

这会将所有数据收集到驱动程序,然后广播数据.这意味着数据实际上是通过网络发送两次的.

This causes all the data to be collected to the driver, then the data is broadcasted. This means the data is sent over the network essentially twice.

类似这样的东西会很好:

What would be nice is something like this:

val myUberMap: Broadcast[Map[String, String]] =
  myStringPairRdd.toBroadcast((a: Array[(String, String)]) => a.toMap)

someOtherRdd.map(someCodeUsingTheUberMap)

Spark可以完全绕开收集数据,而只是在节点之间移动数据.

Here Spark could bypass collecting the data altogether and just move the data between the nodes.

奖励

此外,对于.toMapArray[T]上的任何操作都很昂贵但可以并行完成的情况,可能会有类Monoid的API(有点像combineByKey).例如.构造某些Trie结构可能会很昂贵,这种功能可能会导致算法设计的范围很大.该CPU活动也可以在IO也正在运行时运行-在当前广播机制处于阻塞状态(即所有IO,然后是所有CPU,然后是所有IO).

Furthermore, there could be a Monoid-like API (a bit like combineByKey) for situations where the .toMap or whatever operation on Array[T] is expensive, but can possibly be done in parallel. E.g. constructing certain Trie structures can be expensive, this kind of functionality could result in awesome scope for algorithm design. This CPU activity can also be run while the IO is running too - while the current broadcast mechanism is blocking (i.e. all IO, then all CPU, then all IO again).

澄清

在这里加入不是(主要)用例,可以假设我很少使用广播的数据结构.例如,someOtherRdd中的键绝不会覆盖myUberMap中的键,但是直到遍历someOtherRdd并假设我多次使用myUberMap时,我才知道需要哪些键.

Joining is not (main) use case here, it can be assumed that I sparsely use the broadcasted data structure. For example the keys in someOtherRdd by no means covers the keys in myUberMap but I don't know which keys I need until I traverse someOtherRdd AND suppose I use myUberMap multiple times.

我知道一切听起来都有些模糊,但这是针对更通用的机器学习算法设计的.

I know that all sounds a bit vague, but the point is for more general machine learning algorithm design.

推荐答案

虽然从理论上讲这是一个有趣的想法,但我将争辩说,尽管从理论上讲它可能具有非常有限的实际应用.显然,我不能代表PMC,所以我不能说是否有计划实施这种广播机制.

While theoretically this is an interesting idea I will argue that although theoretically possible it has very limited practical applications. Obviously I cannot speak for PMC so I cannot say if there are any plans to implement this type of broadcasting mechanism at all.

可能的实现:

由于Spark已经提供了洪流广播机制,其行为描述如下:

Since Spark already provides torrent broadcasting mechanism which behavior is described as follows:

驱动程序将序列化的对象分成小块,然后 将这些块存储在驱动程序的BlockManager中.

The driver divides the serialized object into small chunks and stores those chunks in the BlockManager of the driver.

在每个执行程序上,执行程序首先尝试从其BlockManager中获取对象. 如果不存在,则使用远程提取从驱动程序中提取小块和/或 其他执行人(如果有).

On each executor, the executor first attempts to fetch the object from its BlockManager. If it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or other executors if available.

一旦获得了块,就将块放入自己的块中 BlockManager,可供其他执行者从中获取.

Once it gets the chunks, it puts the chunks in its own BlockManager, ready for other executors to fetch from.

应该有可能在直接的节点到节点广播中重用相同的机制.

it should be possible to reuse the same mechanism for direct node-to-node broadcasting.

值得注意的是,这种方法不能完全消除驾驶员的沟通.即使可以在本地创建块,您仍然需要一个真相来源来宣传要提取的一组块.

It is worth noting that this approach cannot completely eliminate driver communication. Even though blocks could be created locally you still need a single source of truth to advertise a set of blocks to fetch.

有限的应用程序

广播变量的一个问题是价格昂贵.即使可以消除驱动程序瓶颈,仍然存在两个问题:

One problem with broadcast variables is that there are quite expensive. Even if you can eliminate driver bottleneck two problems remain:

  • 在每个执行程序上存储反序列化对象所需的内存.
  • 将广播数据传输给每个执行者的成本.

第一个问题应该相对明显.它不仅与直接内存使用有关,而且还与GC成本及其对整体延迟的影响有关.第二个是相当微妙的.我在对为什么在Spark中我的BroadcastHashJoin比ShuffledHashJoin慢的答案中对此做了部分讨论.

The first problem should be relatively obvious. It is not only about direct memory usage but also about GC cost and its effect on overall latency. The second one is rather subtle. I partially covered this in my answer to Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark but let's discus this further.

从网络流量的角度来看,广播整个数据集几乎等同于创建笛卡尔积.因此,如果数据集足够大而无法成为驱动程序的瓶颈,那么它不太可能成为广播的理想选择,并且在实践中可以优先选择像哈希联接这样的有针对性的方法.

From network traffic perspective broadcasting a whole dataset is pretty much equivalent to creating Cartesian product. So if dataset is large enough for driver becoming a bottleneck it is unlikely to be a good candidate for broadcasting and targeted approach like hash join can be preferred in practice.

替代:

可以使用一些方法来获得与直接广播相似的结果,并解决上面列举的问题,包括:

There are some methods which can be used to achieve similar results as direct broadcast and address issues enumerated above including:

  • 通过分布式文件系统传递数据.
  • 使用与工作节点并置的复制数据库.

这篇关于如何在不收集的情况下将RDD,Dataframe或Dataset直接转换为Broadcast变量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆