正确使用大型广播变量的提示? [英] Tips for properly using large broadcast variables?

查看:89
本文介绍了正确使用大型广播变量的提示?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用一个大小约为100 MB的腌制广播变量,该变量近似为:

I'm using a broadcast variable about 100 MB pickled in size, which I'm approximating with:

>>> data = list(range(int(10*1e6)))
>>> import cPickle as pickle
>>> len(pickle.dumps(data))
98888896

在具有3个c3.2xlarge执行程序和m3.large驱动程序的群集上运行,并使用以下命令启动交互式会话:

Running on a cluster with 3 c3.2xlarge executors, and a m3.large driver, with the following command launching the interactive session:

IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g

在RDD中,如果我坚持对该广播变量的引用,则内存使用量将激增.对于100 MB变量的100个引用,即使将其复制100次,我也希望数据使用总量不超过10 GB(更不用说在3个节点上使用30 GB).但是,当我运行以下测试时,我看到内存不足的错误:

In an RDD, if I persist a reference to this broadcast variable, the memory usage explodes. For 100 references to a 100 MB variable, even if it were copied 100 times, I'd expect the data usage to be no more than 10 GB total (let alone 30 GB over 3 nodes). However, I see out of memory errors when I run the following test:

data = list(range(int(10*1e6)))
metadata = sc.broadcast(data)
ids = sc.parallelize(zip(range(100), range(100)))
joined_rdd = ids.mapValues(lambda _: metadata.value)
joined_rdd.persist()
print('count: {}'.format(joined_rdd.count()))

堆栈跟踪:

TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13): 

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func
    return f(iterator)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream
    yield self._read_with_length(stream)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
    return pickle.loads(obj)
MemoryError


  at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
  at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
  at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:88)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at org.apache.spark.scheduler.Task.run(Task.scala:88)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)

16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-1-7a262fdfa561> in <module>()
      7 joined_rdd.persist()
      8 print('persist called')
----> 9 print('count: {}'.format(joined_rdd.count()))

/usr/lib/spark/python/pyspark/rdd.py in count(self)
   1004         3
   1005         """
-> 1006         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
   1007
   1008     def stats(self):

/usr/lib/spark/python/pyspark/rdd.py in sum(self)
    995         6.0
    996         """
--> 997         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
    998
    999     def count(self):

/usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op)
    869         # zeroValue provided to each partition is unique from the one provided
    870         # to the final reduce call
--> 871         vals = self.mapPartitions(func).collect()
    872         return reduce(op, vals, zeroValue)
    873

/usr/lib/spark/python/pyspark/rdd.py in collect(self)
    771         """
    772         with SCCallSiteSync(self.context) as css:
--> 773             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    774         return list(_load_from_socket(port, self._jrdd_deserializer))
    775

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)

  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
  at py4j.Gateway.invoke(Gateway.java:259)
  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
  at py4j.commands.CallCommand.execute(CallCommand.java:79)
  at py4j.GatewayConnection.run(GatewayConnection.java:207)
  at java.lang.Thread.run(Thread.java:745)

我以前看过有关腌制反序列化的内存使用情况的问题.但是,我希望广播变量只能反序列化一次(并加载到执行程序的内存中),并且随后引用.value来引用该内存地址.但是,事实似乎并非如此.我想念什么吗?

I've seen previous threads about the memory usage of pickle deserialization being an issue. However, I would expect a broadcast variable to only be deserialized (and loaded into memory on an executor) once, and subsequent references to .value to reference that in-memory address. That doesn't seem to be the case, however. Am I missing something?

我所看到的带有广播变量的示例将它们作为字典,用于一次转换一组数据(即用机场名称替换机场缩写).在此处保留它们的动机是,创建一个具有广播变量知识的对象,以及如何与之交互的知识,保留这些对象并使用它们执行多次计算(通过火花将它们保存在内存中).

The examples I've seen with broadcast variables have them as dictionaries, used one time to transform a set of data (i.e. replace airport acronyms with airport names). The motivation behind persisting them here is to create objects with knowledge of a broadcast variable and how to interact with it, persist those objects, and perform multiple computations using them (with spark taking care of holding them in memory).

使用大型(100 MB +)广播变量的一些技巧是什么?持久保留广播变量是否被误导?这可能是PySpark特有的问题吗?

What are some tips for using large (100 MB+) broadcast variables? Is persisting a broadcast variable misguided? Is this an issue that is possibly specific to PySpark?

谢谢!感谢您的帮助.

请注意,我还将这个问题发布在 databricks论坛

Note, I've also posted this question on the databricks forums

编辑-后续问题:

建议默认的Spark序列化器的批处理大小为65337.在不同批处理中序列化的对象不会被标识为相同,并被分配了不同的内存地址,此处通过内置的id函数进行了检查.但是,即使使用较大的广播变量,从理论上讲,它需要256个批处理序列化,但我仍然只能看到2个不同的副本.我不应该再看更多吗?我对批处理序列化工作方式的理解不正确吗?

It was suggested that the default Spark serializer has a batch size of 65337. Objects serialized in different batches are not identified as the same and are assigned different memory addresses, examined here via the builtin id function. However, even with a larger broadcast variable that would in theory take 256 batches to serialize, I still see only 2 distinct copies. Shouldn't I see many more? Is my understanding of how batch serialization works incorrect?

>>> sc.serializer.bestSize
65536
>>> import cPickle as pickle
>>> broadcast_data = {k: v for (k, v) in enumerate(range(int(1e6)))}
>>> len(pickle.dumps(broadcast_data))
16777786
>>> len(pickle.dumps({k: v for (k, v) in enumerate(range(int(1e6)))})) / sc.serializer.bestSize
256
>>> bd = sc.broadcast(broadcast_data)
>>> rdd = sc.parallelize(range(100), 1).map(lambda _: bd.value)
>>> rdd.map(id).distinct().count()
1
>>> rdd.cache().count()
100
>>> rdd.map(id).distinct().count()
2

推荐答案

好吧,细节之处在于魔鬼.要了解发生这种情况的原因,我们必须仔细研究PySpark序列化器.首先让我们使用默认设置创建SparkContext:

Well, the devil is in the detail. To understand the reason why this may happen we'll have to take a closer look at the PySpark serializers. First lets create SparkContext with default settings:

from pyspark import SparkContext

sc = SparkContext("local", "foo")

并检查什么是默认的序列化器:

and check what is a default serializer:

sc.serializer
## AutoBatchedSerializer(PickleSerializer())

sc.serializer.bestSize
## 65536

它告诉我们三件事:

  • 这是AutoBatchedSerializer序列化器
  • 它正在使用PickleSerializer执行实际工作
  • 序列化批处理的
  • bestSize是65536字节
  • this is AutoBatchedSerializer serializer
  • it is using PickleSerializer to perform actual job
  • bestSize of the serialized batched is 65536 bytes

快速浏览在源代码中将向您显示此序列化将调整运行时时间序列化的记录数,并尝试使批大小保持小于10 * bestSize.重要的一点是,并非单个分区中的所有记录都同时被序列化.

A quick glance at the source code will show you that this serialize adjusts number of records serialized at the time on the runtime and tries to keep batch size less than 10 * bestSize. The important point is that not all records in the single partition are serialized at the same time.

我们可以通过以下实验进行检查:

We can check that experimentally as follows:

from operator import add

bd = sc.broadcast({})

rdd = sc.parallelize(range(10), 1).map(lambda _: bd.value)
rdd.map(id).distinct().count()
## 1

rdd.cache().count()
## 10

rdd.map(id).distinct().count()
## 2

您甚至可以在序列化-反序列化之后的这个简单示例中看到,我们得到了两个不同的对象.您可以观察到直接使用pickle的类似行为:

As you can see even in this simple example after serialization-deserialization we get two distinct objects. You can observe similar behavior working directly with pickle:

v = {}
vs = [v, v, v, v]

v1, *_, v4 = pickle.loads(pickle.dumps(vs))
v1 is v4
## True

(v1_, v2_), (v3_, v4_) = (
    pickle.loads(pickle.dumps(vs[:2])),
    pickle.loads(pickle.dumps(vs[2:]))
)

v1_ is v4_
## False

v3_ is v4_
## True

在取消腌制之后,在相同的批处理引用中序列化的值是相同的对象.来自不同批次的值指向不同的对象.

Values serialized in the same batch reference, after unpickling, the same object. Values from different batches point to different objects.

在实践中,Spark会进行多个序列化和不同的序列化策略.例如,您可以使用无限大小的批次:

In practice Spark multiple serializes and different serialization strategies. You can for example use batches of infinite size:

from pyspark.serializers import BatchedSerializer, PickleSerializer

rdd_ = (sc.parallelize(range(10), 1).map(lambda _: bd.value)
    ._reserialize(BatchedSerializer(PickleSerializer())))
rdd_.cache().count()

rdd_.map(id).distinct().count()
## 1

您可以通过将serializer和/或batchSize参数传递给SparkContext构造函数来更改序列化器:

You can change serializer by passing serializer and / or batchSize parameters to SparkContext constructor:

sc = SparkContext(
    "local", "bar",
    serializer=PickleSerializer(),  # Default serializer
    # Unlimited batch size -> BatchedSerializer instead of AutoBatchedSerializer
    batchSize=-1  
)

sc.serializer
## BatchedSerializer(PickleSerializer(), -1)

选择不同的序列化程序和批处理策略会导致不同的权衡(速度,序列化任意对象的能力,内存需求等).

Choosing different serializers and batching strategies results in different trade-offs (speed, ability to serialize arbitrary objects, memory requirements, etc.).

您还应该记住,Spark中的广播变量不会在执行程序线程之间共享,因此在同一工作线程上可以同时存在多个反序列化副本.

You should also remember that broadcast variables in Spark are not shared between executor threads so on the same worker can exist multiple deserialized copies at the same time.

此外,如果执行需要改组的转换,您将看到与此类似的行为.

Moreover you'll see a similar behavior to this if you execute a transformation which requires shuffling.

这篇关于正确使用大型广播变量的提示?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆