正确使用大型广播变量的提示? [英] Tips for properly using large broadcast variables?
问题描述
我正在使用一个大小约为100 MB的腌制广播变量,该变量近似为:
I'm using a broadcast variable about 100 MB pickled in size, which I'm approximating with:
>>> data = list(range(int(10*1e6)))
>>> import cPickle as pickle
>>> len(pickle.dumps(data))
98888896
在具有3个c3.2xlarge执行程序和m3.large驱动程序的群集上运行,并使用以下命令启动交互式会话:
Running on a cluster with 3 c3.2xlarge executors, and a m3.large driver, with the following command launching the interactive session:
IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g
在RDD中,如果我坚持对该广播变量的引用,则内存使用量将激增.对于100 MB变量的100个引用,即使将其复制100次,我也希望数据使用总量不超过10 GB(更不用说在3个节点上使用30 GB).但是,当我运行以下测试时,我看到内存不足的错误:
In an RDD, if I persist a reference to this broadcast variable, the memory usage explodes. For 100 references to a 100 MB variable, even if it were copied 100 times, I'd expect the data usage to be no more than 10 GB total (let alone 30 GB over 3 nodes). However, I see out of memory errors when I run the following test:
data = list(range(int(10*1e6)))
metadata = sc.broadcast(data)
ids = sc.parallelize(zip(range(100), range(100)))
joined_rdd = ids.mapValues(lambda _: metadata.value)
joined_rdd.persist()
print('count: {}'.format(joined_rdd.count()))
堆栈跟踪:
TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13):
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func
return f(iterator)
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <lambda>
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <genexpr>
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream
yield self._read_with_length(stream)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
MemoryError
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-1-7a262fdfa561> in <module>()
7 joined_rdd.persist()
8 print('persist called')
----> 9 print('count: {}'.format(joined_rdd.count()))
/usr/lib/spark/python/pyspark/rdd.py in count(self)
1004 3
1005 """
-> 1006 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
1007
1008 def stats(self):
/usr/lib/spark/python/pyspark/rdd.py in sum(self)
995 6.0
996 """
--> 997 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
998
999 def count(self):
/usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op)
869 # zeroValue provided to each partition is unique from the one provided
870 # to the final reduce call
--> 871 vals = self.mapPartitions(func).collect()
872 return reduce(op, vals, zeroValue)
873
/usr/lib/spark/python/pyspark/rdd.py in collect(self)
771 """
772 with SCCallSiteSync(self.context) as css:
--> 773 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
774 return list(_load_from_socket(port, self._jrdd_deserializer))
775
/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
我以前看过有关腌制反序列化的内存使用情况的问题.但是,我希望广播变量只能反序列化一次(并加载到执行程序的内存中),并且随后引用.value
来引用该内存地址.但是,事实似乎并非如此.我想念什么吗?
I've seen previous threads about the memory usage of pickle deserialization being an issue. However, I would expect a broadcast variable to only be deserialized (and loaded into memory on an executor) once, and subsequent references to .value
to reference that in-memory address. That doesn't seem to be the case, however. Am I missing something?
我所看到的带有广播变量的示例将它们作为字典,用于一次转换一组数据(即用机场名称替换机场缩写).在此处保留它们的动机是,创建一个具有广播变量知识的对象,以及如何与之交互的知识,保留这些对象并使用它们执行多次计算(通过火花将它们保存在内存中).
The examples I've seen with broadcast variables have them as dictionaries, used one time to transform a set of data (i.e. replace airport acronyms with airport names). The motivation behind persisting them here is to create objects with knowledge of a broadcast variable and how to interact with it, persist those objects, and perform multiple computations using them (with spark taking care of holding them in memory).
使用大型(100 MB +)广播变量的一些技巧是什么?持久保留广播变量是否被误导?这可能是PySpark特有的问题吗?
What are some tips for using large (100 MB+) broadcast variables? Is persisting a broadcast variable misguided? Is this an issue that is possibly specific to PySpark?
谢谢!感谢您的帮助.
请注意,我还将这个问题发布在 databricks论坛
Note, I've also posted this question on the databricks forums
编辑-后续问题:
建议默认的Spark序列化器的批处理大小为65337.在不同批处理中序列化的对象不会被标识为相同,并被分配了不同的内存地址,此处通过内置的id
函数进行了检查.但是,即使使用较大的广播变量,从理论上讲,它需要256个批处理序列化,但我仍然只能看到2个不同的副本.我不应该再看更多吗?我对批处理序列化工作方式的理解不正确吗?
It was suggested that the default Spark serializer has a batch size of 65337. Objects serialized in different batches are not identified as the same and are assigned different memory addresses, examined here via the builtin id
function. However, even with a larger broadcast variable that would in theory take 256 batches to serialize, I still see only 2 distinct copies. Shouldn't I see many more? Is my understanding of how batch serialization works incorrect?
>>> sc.serializer.bestSize
65536
>>> import cPickle as pickle
>>> broadcast_data = {k: v for (k, v) in enumerate(range(int(1e6)))}
>>> len(pickle.dumps(broadcast_data))
16777786
>>> len(pickle.dumps({k: v for (k, v) in enumerate(range(int(1e6)))})) / sc.serializer.bestSize
256
>>> bd = sc.broadcast(broadcast_data)
>>> rdd = sc.parallelize(range(100), 1).map(lambda _: bd.value)
>>> rdd.map(id).distinct().count()
1
>>> rdd.cache().count()
100
>>> rdd.map(id).distinct().count()
2
推荐答案
好吧,细节之处在于魔鬼.要了解发生这种情况的原因,我们必须仔细研究PySpark序列化器.首先让我们使用默认设置创建SparkContext
:
Well, the devil is in the detail. To understand the reason why this may happen we'll have to take a closer look at the PySpark serializers. First lets create SparkContext
with default settings:
from pyspark import SparkContext
sc = SparkContext("local", "foo")
并检查什么是默认的序列化器:
and check what is a default serializer:
sc.serializer
## AutoBatchedSerializer(PickleSerializer())
sc.serializer.bestSize
## 65536
它告诉我们三件事:
- 这是
AutoBatchedSerializer
序列化器 - 它正在使用
PickleSerializer
执行实际工作
序列化批处理的 -
bestSize
是65536字节
- this is
AutoBatchedSerializer
serializer - it is using
PickleSerializer
to perform actual job bestSize
of the serialized batched is 65536 bytes
快速浏览在源代码中将向您显示此序列化将调整运行时时间序列化的记录数,并尝试使批大小保持小于10 * bestSize
.重要的一点是,并非单个分区中的所有记录都同时被序列化.
A quick glance at the source code will show you that this serialize adjusts number of records serialized at the time on the runtime and tries to keep batch size less than 10 * bestSize
. The important point is that not all records in the single partition are serialized at the same time.
我们可以通过以下实验进行检查:
We can check that experimentally as follows:
from operator import add
bd = sc.broadcast({})
rdd = sc.parallelize(range(10), 1).map(lambda _: bd.value)
rdd.map(id).distinct().count()
## 1
rdd.cache().count()
## 10
rdd.map(id).distinct().count()
## 2
您甚至可以在序列化-反序列化之后的这个简单示例中看到,我们得到了两个不同的对象.您可以观察到直接使用pickle
的类似行为:
As you can see even in this simple example after serialization-deserialization we get two distinct objects. You can observe similar behavior working directly with pickle
:
v = {}
vs = [v, v, v, v]
v1, *_, v4 = pickle.loads(pickle.dumps(vs))
v1 is v4
## True
(v1_, v2_), (v3_, v4_) = (
pickle.loads(pickle.dumps(vs[:2])),
pickle.loads(pickle.dumps(vs[2:]))
)
v1_ is v4_
## False
v3_ is v4_
## True
在取消腌制之后,在相同的批处理引用中序列化的值是相同的对象.来自不同批次的值指向不同的对象.
Values serialized in the same batch reference, after unpickling, the same object. Values from different batches point to different objects.
在实践中,Spark会进行多个序列化和不同的序列化策略.例如,您可以使用无限大小的批次:
In practice Spark multiple serializes and different serialization strategies. You can for example use batches of infinite size:
from pyspark.serializers import BatchedSerializer, PickleSerializer
rdd_ = (sc.parallelize(range(10), 1).map(lambda _: bd.value)
._reserialize(BatchedSerializer(PickleSerializer())))
rdd_.cache().count()
rdd_.map(id).distinct().count()
## 1
您可以通过将serializer
和/或batchSize
参数传递给SparkContext
构造函数来更改序列化器:
You can change serializer by passing serializer
and / or batchSize
parameters to SparkContext
constructor:
sc = SparkContext(
"local", "bar",
serializer=PickleSerializer(), # Default serializer
# Unlimited batch size -> BatchedSerializer instead of AutoBatchedSerializer
batchSize=-1
)
sc.serializer
## BatchedSerializer(PickleSerializer(), -1)
选择不同的序列化程序和批处理策略会导致不同的权衡(速度,序列化任意对象的能力,内存需求等).
Choosing different serializers and batching strategies results in different trade-offs (speed, ability to serialize arbitrary objects, memory requirements, etc.).
您还应该记住,Spark中的广播变量不会在执行程序线程之间共享,因此在同一工作线程上可以同时存在多个反序列化副本.
You should also remember that broadcast variables in Spark are not shared between executor threads so on the same worker can exist multiple deserialized copies at the same time.
此外,如果执行需要改组的转换,您将看到与此类似的行为.
Moreover you'll see a similar behavior to this if you execute a transformation which requires shuffling.
这篇关于正确使用大型广播变量的提示?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!