如何将深度学习模型数据传递给Spark中的映射函数 [英] How to pass deep learning model data to map function in Spark
问题描述
我有一个非常简单的用例,其中我使用sc.binaryFiles方法从s3中读取了rdd的大量图像.创建此RDD后,我会将rdd中的内容传递给vgg16功能提取器函数.因此,在此我将需要用于完成特征提取的模型数据,因此我将模型数据放入广播变量中,然后访问每个映射函数中的值.下面是代码:-
I have a very simple use-case where I am reading large number of images as rdd from s3 using sc.binaryFiles method. Once this RDD is created I am passing the content inside the rdd to the vgg16 feature extractor function. So, in this I will need the model data using which the feature extraction will be done, so I am putting the model data into broadcast variable and then accesing the value in each map function. Below is the code:-
s3_files_rdd = sc.binaryFiles(RESOLVED_IMAGE_PATH)
s3_files_rdd.persist()
model_data = initVGG16()
broadcast_model = sc.broadcast(model_data)
features_rdd = s3_files_rdd.mapPartitions(extract_features_)
response_rdd = features_rdd.map(lambda x: (x[0], write_to_s3(x, OUTPUT, FORMAT_NAME)))
extract_features_方法:-
extract_features_ method:-
def extract_features_(xs):
model_data = initVGG16()
for k, v in xs:
yield k, extract_features2(model_data,v)
extract_features方法:-
extract_features method:-
from keras.preprocessing import image
from keras.applications.vgg16 import VGG16
from keras.models import Model
from io import BytesIO
from keras.applications.vgg16 import preprocess_input
def extract_features(model,obj):
try:
print('executing vgg16 feature extractor...')
img = image.load_img(BytesIO(obj), target_size=(224, 224,3))
img_data = image.img_to_array(img)
img_data = np.expand_dims(img_data, axis=0)
img_data = preprocess_input(img_data)
vgg16_feature = model.predict(img_data)[0]
print('++++++++++++++++++++++++++++',vgg16_feature.shape)
return vgg16_feature
except Exception as e:
print('Error......{}'.format(e.args))
return []
写入s3方法:-
def write_to_s3(rdd, output_path, format_name):
file_path = rdd[0]
file_name_without_ext = get_file_name_without_ext(file_name)
bucket_name = output_path.split('/', 1)[0]
final_path = 'deepak' + '/' + file_name_without_ext + '.' + format_name
LOGGER.info("Saving to S3....")
cci = cc.get_interface(bucket_name, ACCESS_KEY=os.environ.get("AWS_ACCESS_KEY_ID"),
SECRET_KEY=os.environ.get("AWS_SECRET_ACCESS_KEY"), endpoint_url='https://s3.amazonaws.com')
response = cci.upload_npy_array(final_path, rdd[1])
return response
在write_to_s3方法内部,我得到了RDD,提取了要保存的密钥名称并进行存储桶操作.然后使用名为cottoncandy的库直接保存RDD内容(在我的情况下为numpy数组),而不是保存任何中间文件.
Inside the write_to_s3 method I am getting the RDD, extracting the key name to be saved and bucket. then using a library called cottoncandy to drectly save the RDD content which is numpy array in my case instead of saving any intermediate file.
我遇到以下错误:-
127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
Traceback (most recent call last):
File "one_file5.py", line 98, in <module>
run()
File "one_file5.py", line 89, in run
LOGGER.info('features_rdd rdd created,...... %s',features_rdd.count())
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 1041, in count
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 1032, in sum
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 906, in fold
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 809, in collect
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects.
当我注释掉features_rdd的代码部分时,程序可以正常运行,这意味着在features_rdd部分不正确.不知道我在这里做错了什么.
When I am commenting out the the code part of features_rdd, then the program runs fine which means something is not proper in the features_rdd part. Not sure what I am doing wrong here.
我正在4名执行者的情况下在AWS EMR中运行该程序. 执行者核心7 执行器RAM 8GB Spark版本2.2.1
I am running the program in AWS EMR, with 4 executors. executor core 7 executor RAM 8GB Spark version 2.2.1
推荐答案
用mapPartitions
替换当前代码:
def extract_features_(xs):
model_data = initVGG16()
for k, v in xs:
yield k, extract_features(model_data, v)
features_rdd = s3_files_rdd.mapPartitions(extract_features_)
这篇关于如何将深度学习模型数据传递给Spark中的映射函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!