正确使用大型广播变量的技巧? [英] Tips for properly using large broadcast variables?

查看:28
本文介绍了正确使用大型广播变量的技巧?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用了一个大约 100 MB 大小的腌制广播变量,我用它来近似:

<预><代码>>>>数据 = 列表(范围(整数(10*1e6)))>>>导入 cPickle 作为泡菜>>>len(pickle.dumps(数据))98888896

在具有 3 个 c3.2xlarge 执行程序和一个 m3.large 驱动程序的集群上运行,使用以下命令启动交互式会话:

IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g

在 RDD 中,如果我保留对这个广播变量的引用,内存使用量会爆炸.对于 100 MB 变量的 100 次引用,即使它被复制 100 次,我也希望数据使用总量不超过 10 GB(更不用说 30 GB 超过 3 个节点了).但是,当我运行以下测试时,我看到内存不足错误:

data = list(range(int(10*1e6)))元数据 = sc.broadcast(data)ids = sc.parallelize(zip(range(100), range(100)))join_rdd = ids.mapValues(lambda _: metadata.value)加入_rdd.persist()打印('计数:{}'.格式(joined_rdd.count()))

堆栈跟踪:

TaskSetManager:在 0.0 阶段丢失任务 17.3 (TID 75, 10.22.10.13):org.apache.spark.api.python.PythonException:回溯(最近一次调用):文件/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py",第 111 行,在主目录中过程()文件/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py",第 106 行,正在处理中serializer.dump_stream(func(split_index, iterator), outfile)文件/usr/lib/spark/python/pyspark/rdd.py",第 2355 行,pipeline_funcreturn func(split, prev_func(split, iterator))文件/usr/lib/spark/python/pyspark/rdd.py",第 2355 行,pipeline_funcreturn func(split, prev_func(split, iterator))文件/usr/lib/spark/python/pyspark/rdd.py",第 317 行,在 func返回 f(迭代器)文件/usr/lib/spark/python/pyspark/rdd.py",第 1006 行,在 <lambda> 中return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 中的文件/usr/lib/spark/python/pyspark/rdd.py",第 1006 行return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()文件/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py",第 139 行,在 load_stream产量 self._read_with_length(stream)文件/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py",第 164 行,_read_with_length返回 self.loads(obj)加载中的文件/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py",第 422 行返回pickle.loads(obj)内存错误在 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)在 org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)在 org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)在 org.apache.spark.rdd.RDD.iterator(RDD.scala:264)在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)在 org.apache.spark.scheduler.Task.run(Task.scala:88)在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)在 org.apache.spark.scheduler.Task.run(Task.scala:88)在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)在 java.lang.Thread.run(Thread.java:745)16/05/25 23:57:15 错误 TaskSetManager:阶段 0.0 中的任务 17 失败了 4 次;中止工作---------------------------------------------------------------------------Py4JJavaError Traceback(最近一次调用最后一次)<ipython-input-1-7a262fdfa561>在 <module>()7 加入_rdd.persist()8 print('持续调用')---->9 print('count: {}'.format(joined_rdd.count()))/usr/lib/spark/python/pyspark/rdd.py in count(self)1004 3第1005章->1006 返回 self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()10071008 def stats(self):/usr/lib/spark/python/pyspark/rdd.py in sum(self)995 6.0第996章-->第 997 章998999 def 计数(自己):/usr/lib/spark/python/pyspark/rdd.py 折叠(self,zeroValue,op)第869话870 # 到最后的reduce调用-->第871话第872回873/usr/lib/spark/python/pyspark/rdd.py 在 collect(self)第771章772 与 SCCallSiteSync(self.context) 作为 css:-->773端口= self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())774返回列表(_load_from_socket(端口,self._jrdd_deserializer))775/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py 在 __call__(self, *args)在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)在 py4j.Gateway.invoke(Gateway.java:259)在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)在 py4j.commands.CallCommand.execute(CallCommand.java:79)在 py4j.GatewayConnection.run(GatewayConnection.java:207)在 java.lang.Thread.run(Thread.java:745)

我已经看到以前的线程关于 pickle 反序列化的内存使用是一个问题.但是,我希望广播变量只反序列化一次(并加载到执行程序的内存中),随后对 .value 的引用以引用该内存地址.然而,情况似乎并非如此.我错过了什么吗?

我看到的关于广播变量的例子将它们作为字典,一次用于转换一组数据(即用机场名称替换机场首字母缩略词).在这里持久化它们背后的动机是创建具有广播变量知识的对象以及如何与之交互、持久化这些对象,并使用它们执行多个计算(用 Spark 负责将它们保存在内存中).

使用大型(100 MB+)广播变量有哪些技巧?坚持广播变量是否被误导了?这可能是 PySpark 特有的问题吗?

谢谢!感谢您的帮助.

注意,我也在 databricks 论坛

编辑 - 后续问题:

建议默认 Spark 序列化器的批大小为 65337.在不同批中序列化的对象不被识别为相同,并分配不同的内存地址,此处通过内置的 id 函数检查.然而,即使有一个更大的广播变量,理论上需要 256 个批次来序列化,我仍然只看到 2 个不同的副本.我不应该看到更多吗?我对批量序列化如何工作的理解不正确吗?

<预><代码>>>>sc.serializer.bestSize65536>>>导入 cPickle 作为泡菜>>>broadcast_data = {k: v for (k, v) in enumerate(range(int(1e6)))}>>>len(pickle.dumps(broadcast_data))16777786>>>len(pickle.dumps({k: v for (k, v) in enumerate(range(int(1e6)))}))/sc.serializer.bestSize256>>>bd = sc.broadcast(broadcast_data)>>>rdd = sc.parallelize(range(100), 1).map(lambda _: bd.value)>>>rdd.map(id).distinct().count()1>>>rdd.cache().count()100>>>rdd.map(id).distinct().count()2

解决方案

好吧,细节决定成败.要了解发生这种情况的原因,我们必须仔细研究 PySpark 序列化程序.首先让我们使用默认设置创建 SparkContext:

from pyspark import SparkContextsc = SparkContext("local", "foo")

并检查什么是默认序列化程序:

sc.serializer## AutoBatchedSerializer(PickleSerializer())sc.serializer.bestSize##65536

它告诉我们三个不同的事情:

  • 这是AutoBatchedSerializer 序列化器
  • 它正在使用 PickleSerializer 来执行实际工作
  • bestSize 序列化batched为65536字节

快速浏览源代码 将向您展示此序列化会在运行时调整当时序列化的记录数,并尝试将批次大小保持在小于 10 * bestSize.重要的一点是单个分区中的所有记录并不是同时序列化的.

我们可以通过如下实验来检查:

from operator import addbd = sc.broadcast({})rdd = sc.parallelize(range(10), 1).map(lambda _: bd.value)rdd.map(id).distinct().count()##1rdd.cache().count()## 10rdd.map(id).distinct().count()## 2

正如您在序列化-反序列化之后的这个简单示例中所看到的,我们得到了两个不同的对象.您可以直接使用 pickle 观察类似的行为:

v = {}vs = [v, v, v, v]v1, *_, v4 = pickle.loads(pickle.dumps(vs))v1 是 v4## 真的(v1_, v2_), (v3_, v4_) = (pickle.loads(pickle.dumps(vs[:2])),pickle.loads(pickle.dumps(vs[2:])))v1_ 是 v4_## 错误的v3_ 是 v4_## 真的

在同一个批处理引用中序列化的值,在 unpickling 后,同一个对象.不同批次的值指向不同的对象.

在实践中,Spark 有多个序列化和不同的序列化策略.例如,您可以使用无限大小的批次:

from pyspark.serializers import BatchedSerializer, PickleSerializerrdd_ = (sc.parallelize(range(10), 1).map(lambda _: bd.value)._reserialize(BatchedSerializer(PickleSerializer())))rdd_.cache().count()rdd_.map(id).distinct().count()##1

您可以通过将 serializer 和/或 batchSize 参数传递给 SparkContext 构造函数来更改序列化程序:

sc = SparkContext("本地", "酒吧",serializer=PickleSerializer(), # 默认序列化器# 无限批量大小 ->BatchedSerializer 而不是 AutoBatchedSerializer批量大小=-1)sc.serializer## BatchedSerializer(PickleSerializer(), -1)

选择不同的序列化器和批处理策略会导致不同的权衡(速度、序列化任意对象的能力、内存要求等).

您还应该记住,Spark 中的广播变量不会在执行器线程之间共享,因此同一个 worker 上可以同时存在多个反序列化副本.

此外,如果您执行需要改组的转换,您将看到与此类似的行为.

I'm using a broadcast variable about 100 MB pickled in size, which I'm approximating with:

>>> data = list(range(int(10*1e6)))
>>> import cPickle as pickle
>>> len(pickle.dumps(data))
98888896

Running on a cluster with 3 c3.2xlarge executors, and a m3.large driver, with the following command launching the interactive session:

IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g

In an RDD, if I persist a reference to this broadcast variable, the memory usage explodes. For 100 references to a 100 MB variable, even if it were copied 100 times, I'd expect the data usage to be no more than 10 GB total (let alone 30 GB over 3 nodes). However, I see out of memory errors when I run the following test:

data = list(range(int(10*1e6)))
metadata = sc.broadcast(data)
ids = sc.parallelize(zip(range(100), range(100)))
joined_rdd = ids.mapValues(lambda _: metadata.value)
joined_rdd.persist()
print('count: {}'.format(joined_rdd.count()))

The stack trace:

TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13): 

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func
    return f(iterator)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream
    yield self._read_with_length(stream)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
    return pickle.loads(obj)
MemoryError


  at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
  at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
  at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:88)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at org.apache.spark.scheduler.Task.run(Task.scala:88)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)

16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-1-7a262fdfa561> in <module>()
      7 joined_rdd.persist()
      8 print('persist called')
----> 9 print('count: {}'.format(joined_rdd.count()))

/usr/lib/spark/python/pyspark/rdd.py in count(self)
   1004         3
   1005         """
-> 1006         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
   1007
   1008     def stats(self):

/usr/lib/spark/python/pyspark/rdd.py in sum(self)
    995         6.0
    996         """
--> 997         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
    998
    999     def count(self):

/usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op)
    869         # zeroValue provided to each partition is unique from the one provided
    870         # to the final reduce call
--> 871         vals = self.mapPartitions(func).collect()
    872         return reduce(op, vals, zeroValue)
    873

/usr/lib/spark/python/pyspark/rdd.py in collect(self)
    771         """
    772         with SCCallSiteSync(self.context) as css:
--> 773             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    774         return list(_load_from_socket(port, self._jrdd_deserializer))
    775

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)

  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
  at py4j.Gateway.invoke(Gateway.java:259)
  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
  at py4j.commands.CallCommand.execute(CallCommand.java:79)
  at py4j.GatewayConnection.run(GatewayConnection.java:207)
  at java.lang.Thread.run(Thread.java:745)

I've seen previous threads about the memory usage of pickle deserialization being an issue. However, I would expect a broadcast variable to only be deserialized (and loaded into memory on an executor) once, and subsequent references to .value to reference that in-memory address. That doesn't seem to be the case, however. Am I missing something?

The examples I've seen with broadcast variables have them as dictionaries, used one time to transform a set of data (i.e. replace airport acronyms with airport names). The motivation behind persisting them here is to create objects with knowledge of a broadcast variable and how to interact with it, persist those objects, and perform multiple computations using them (with spark taking care of holding them in memory).

What are some tips for using large (100 MB+) broadcast variables? Is persisting a broadcast variable misguided? Is this an issue that is possibly specific to PySpark?

Thank you! Your help is appreciated.

Note, I've also posted this question on the databricks forums

Edit - followup question:

It was suggested that the default Spark serializer has a batch size of 65337. Objects serialized in different batches are not identified as the same and are assigned different memory addresses, examined here via the builtin id function. However, even with a larger broadcast variable that would in theory take 256 batches to serialize, I still see only 2 distinct copies. Shouldn't I see many more? Is my understanding of how batch serialization works incorrect?

>>> sc.serializer.bestSize
65536
>>> import cPickle as pickle
>>> broadcast_data = {k: v for (k, v) in enumerate(range(int(1e6)))}
>>> len(pickle.dumps(broadcast_data))
16777786
>>> len(pickle.dumps({k: v for (k, v) in enumerate(range(int(1e6)))})) / sc.serializer.bestSize
256
>>> bd = sc.broadcast(broadcast_data)
>>> rdd = sc.parallelize(range(100), 1).map(lambda _: bd.value)
>>> rdd.map(id).distinct().count()
1
>>> rdd.cache().count()
100
>>> rdd.map(id).distinct().count()
2

解决方案

Well, the devil is in the detail. To understand the reason why this may happen we'll have to take a closer look at the PySpark serializers. First lets create SparkContext with default settings:

from pyspark import SparkContext

sc = SparkContext("local", "foo")

and check what is a default serializer:

sc.serializer
## AutoBatchedSerializer(PickleSerializer())

sc.serializer.bestSize
## 65536

It tells us three different things:

  • this is AutoBatchedSerializer serializer
  • it is using PickleSerializer to perform actual job
  • bestSize of the serialized batched is 65536 bytes

A quick glance at the source code will show you that this serialize adjusts number of records serialized at the time on the runtime and tries to keep batch size less than 10 * bestSize. The important point is that not all records in the single partition are serialized at the same time.

We can check that experimentally as follows:

from operator import add

bd = sc.broadcast({})

rdd = sc.parallelize(range(10), 1).map(lambda _: bd.value)
rdd.map(id).distinct().count()
## 1

rdd.cache().count()
## 10

rdd.map(id).distinct().count()
## 2

As you can see even in this simple example after serialization-deserialization we get two distinct objects. You can observe similar behavior working directly with pickle:

v = {}
vs = [v, v, v, v]

v1, *_, v4 = pickle.loads(pickle.dumps(vs))
v1 is v4
## True

(v1_, v2_), (v3_, v4_) = (
    pickle.loads(pickle.dumps(vs[:2])),
    pickle.loads(pickle.dumps(vs[2:]))
)

v1_ is v4_
## False

v3_ is v4_
## True

Values serialized in the same batch reference, after unpickling, the same object. Values from different batches point to different objects.

In practice Spark multiple serializes and different serialization strategies. You can for example use batches of infinite size:

from pyspark.serializers import BatchedSerializer, PickleSerializer

rdd_ = (sc.parallelize(range(10), 1).map(lambda _: bd.value)
    ._reserialize(BatchedSerializer(PickleSerializer())))
rdd_.cache().count()

rdd_.map(id).distinct().count()
## 1

You can change serializer by passing serializer and / or batchSize parameters to SparkContext constructor:

sc = SparkContext(
    "local", "bar",
    serializer=PickleSerializer(),  # Default serializer
    # Unlimited batch size -> BatchedSerializer instead of AutoBatchedSerializer
    batchSize=-1  
)

sc.serializer
## BatchedSerializer(PickleSerializer(), -1)

Choosing different serializers and batching strategies results in different trade-offs (speed, ability to serialize arbitrary objects, memory requirements, etc.).

You should also remember that broadcast variables in Spark are not shared between executor threads so on the same worker can exist multiple deserialized copies at the same time.

Moreover you'll see a similar behavior to this if you execute a transformation which requires shuffling.

这篇关于正确使用大型广播变量的技巧?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆