Spark执行器上的对象缓存 [英] Object cache on Spark executors

查看:74
本文介绍了Spark执行器上的对象缓存的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于Spark专家来说是个好问题.

A good question for Spark experts.

我正在用map操作(RDD)处理数据.在mapper函数中,我需要查找类A的对象以用于RDD中的元素处理.

I am processing data in a map operation (RDD). Within the mapper function, I need to lookup objects of class A to be used in processing of elements in an RDD.

由于这将在执行程序上执行并且创建A类型的元素(将被查找)恰好是一项昂贵的操作,因此我想在每个执行程序上预加载并缓存这些对象.最好的方法是什么?

Since this will be performed on executors AND creation of elements of type A (that will be looked up) happens to be an expensive operation, I want to pre-load and cache these objects on each executor. What is the best way of doing it?

  • 一个想法是广播查找表,但是类A不可序列化(无法控制其实现).

  • One idea is to broadcast a lookup table, but class A is not serializable (no control over its implementation).

另一个想法是将它们加载到单例对象中.但是,我想控制将哪些内容加载到该查找表中(例如,不同Spark作业上的不同数据).

Another idea is to load them up in a singleton object. However, I want to control what gets loaded into that lookup table (e.g. possibly different data on different Spark jobs).

理想情况下,我想通过在启动时在驱动程序上可用的参数指定一次将在执行程序上加载的内容(包括流的情况,以便查找表在批处理之间保留在内存中). ,然后再处理任何数据.

Ideally, I want to specify what will be loaded on executors once (including the case of Streaming, so that the lookup table stays in memory between batches), through a parameter that will be available on the driver during its start-up, before any data gets processed.

是否有一种干净而优雅的方法?还是不可能实现?

Is there a clean and elegant way of doing it or is it impossible to achieve?

推荐答案

这正是broadcast.的目标使用情况广播变量仅传输一次,并使用torrent将其有效地转移到所有执行程序,并保留在内存/本地磁盘中直到不再需要它们为止.

This is exactly the targeted use case for broadcast. Broadcasted variables are transmitted once and use torrents to move efficiently to all executors, and stay in memory / local disk until you no longer need them.

在使用其他人的界面时,串行化经常会弹出一个问题.如果可以强制使您消耗的对象可序列化,那将是最好的解决方案.如果这不可能,那么您的生活就会变得更加复杂.如果无法序列​​化A对象,则必须在执行器上为每个任务创建它们.如果它们存储在某个位置的文件中,则看起来将像这样:

Serialization often pops up as an issue when using others' interfaces. If you can enforce that the objects you consume are serializable, that's going to be the best solution. If this is impossible, your life gets a little more complicated. If you can't serialize the A objects, then you have to create them on the executors for each task. If they're stored in a file somewhere, this would look something like:

rdd.mapPartitions { it => 
  val lookupTable = loadLookupTable(path)
  it.map(elem => fn(lookupTable, elem))
}

请注意,如果您使用的是此模型,则每个任务必须加载一次查找表-无法从广播变量的跨任务持久性中受益.

Note that if you're using this model, then you have to load the lookup table once per task -- you can't benefit from the cross-task persistence of broadcast variables.

这是另一个模型,我相信它可以让您在每个JVM的各个任务之间共享查找表.

Here's another model, which I believe lets you share the lookup table across tasks per JVM.

class BroadcastableLookupTable {
  @transient val lookupTable: LookupTable[A] = null

  def get: LookupTable[A] = {
    if (lookupTable == null)
      lookupTable = < load lookup table from disk>
    lookupTable
  }
}

可以广播该类(不会传输任何实质内容),并且在每个JVM首次调用该类时,您将加载查找表并返回它.

This class can be broadcast (nothing substantial is transmitted) and the first time it's called per JVM, you'll load the lookup table and return it.

这篇关于Spark执行器上的对象缓存的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆