使用 pyspark 将地图转换为 mapPartition [英] Transform map to mapPartition using pyspark
问题描述
我正在尝试从磁盘加载张量流模型并预测值.
I am trying to load a tensorflow model from disk and predicting the values.
代码
def get_value(row):
print("**********************************************")
graph = tf.Graph()
rowkey = row[0]
checkpoint_file = "/home/sahil/Desktop/Relation_Extraction/data/1485336002/checkpoints/model-300"
print("Loading model................................")
with graph.as_default():
session_conf = tf.ConfigProto(
allow_soft_placement=allow_soft_placement,
log_device_placement=log_device_placement)
sess = tf.Session(config=session_conf)
with sess.as_default():
# Load the saved meta graph and restore variables
saver = tf.train.import_meta_graph("{}.meta".format(checkpoint_file))
saver.restore(sess, checkpoint_file)
input_x = graph.get_operation_by_name("X_train").outputs[0]
dropout_keep_prob = graph.get_operation_by_name("dropout_keep_prob").outputs[0]
predictions = graph.get_operation_by_name("output/predictions").outputs[0]
batch_predictions = sess.run(predictions, {input_x: [row[1]], dropout_keep_prob: 1.0})
print(batch_predictions)
return (rowkey, batch_predictions)
我有一个由元组(rowkey,input_vector)组成的 RDD.我想使用加载的模型来预测输入的分数/类别.
I have a RDD which consists of a tuple (rowkey, input_vector). I want to use the loaded model to predict the score/class of the input.
调用 get_value() 的代码
Code to call get_value()
result = data_rdd.map(lambda iter: get_value(iter))
result.foreach(print)
问题是每次我调用地图时,每次都会为每个元组加载模型,这需要很多时间.
The problem is every time I call the map, the model is loaded everytime for each tuple and it takes a lot of time.
我正在考虑使用 mapPartitions 加载模型,然后使用 map 调用 get_value 函数.我不知道如何将代码转换为 mapPartition,其中每个分区只加载一次 tensorflow 模型并减少运行时间.
I am thinking of loading the model using mapPartitions and then use map to call get_value function. I have no clue as how to convert the code to a mapPartition where I load the tensorflow model only once per parition and reduce the running time.
提前致谢.
推荐答案
我猜下面的代码是一个巨大的改进,因为它使用了 mapPartitions.
I guess that the below code is a huge improvement as it uses mapPartitions.
代码
def predict(rows):
graph = tf.Graph()
checkpoint_file = "/home/sahil/Desktop/Relation_Extraction/data/1485336002/checkpoints/model-300"
print("Loading model................................")
with graph.as_default():
session_conf = tf.ConfigProto(
allow_soft_placement=allow_soft_placement,
log_device_placement=log_device_placement)
sess = tf.Session(config=session_conf)
with sess.as_default():
# Load the saved meta graph and restore variables
saver = tf.train.import_meta_graph("{}.meta".format(checkpoint_file))
saver.restore(sess, checkpoint_file)
print("**********************************************")
# Get the placeholders from the graph by name
input_x = graph.get_operation_by_name("X_train").outputs[0]
dropout_keep_prob = graph.get_operation_by_name("dropout_keep_prob").outputs[0]
# Tensors we want to evaluate
predictions = graph.get_operation_by_name("output/predictions").outputs[0]
# Generate batches for one epoch
for row in rows:
X_test = [row[1]]
batch_predictions = sess.run(predictions, {input_x: X_test, dropout_keep_prob:
yield (row[0], batch_predictions)
result = data_rdd.mapPartitions(lambda iter: predict(iter))
result.foreach(print)
这篇关于使用 pyspark 将地图转换为 mapPartition的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!