使用 pyspark 将地图转换为 mapPartition [英] Transform map to mapPartition using pyspark

查看:97
本文介绍了使用 pyspark 将地图转换为 mapPartition的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从磁盘加载张量流模型并预测值.

I am trying to load a tensorflow model from disk and predicting the values.

代码

def get_value(row):
    print("**********************************************")
    graph = tf.Graph()
    rowkey = row[0]
    checkpoint_file = "/home/sahil/Desktop/Relation_Extraction/data/1485336002/checkpoints/model-300"
    print("Loading model................................")
    with graph.as_default():
        session_conf = tf.ConfigProto(
            allow_soft_placement=allow_soft_placement,
            log_device_placement=log_device_placement)
        sess = tf.Session(config=session_conf)
        with sess.as_default():
            # Load the saved meta graph and restore variables
            saver = tf.train.import_meta_graph("{}.meta".format(checkpoint_file))
            saver.restore(sess, checkpoint_file)
            input_x = graph.get_operation_by_name("X_train").outputs[0]
            dropout_keep_prob = graph.get_operation_by_name("dropout_keep_prob").outputs[0]
            predictions = graph.get_operation_by_name("output/predictions").outputs[0]
            batch_predictions = sess.run(predictions, {input_x: [row[1]], dropout_keep_prob: 1.0})
            print(batch_predictions)
            return (rowkey, batch_predictions)

我有一个由元组(rowkey,input_vector)组成的 RDD.我想使用加载的模型来预测输入的分数/类别.

I have a RDD which consists of a tuple (rowkey, input_vector). I want to use the loaded model to predict the score/class of the input.

调用 get_value() 的代码

Code to call get_value()

result = data_rdd.map(lambda iter: get_value(iter))
result.foreach(print)

问题是每次我调用地图时,每次都会为每个元组加载模型,这需要很多时间.

The problem is every time I call the map, the model is loaded everytime for each tuple and it takes a lot of time.

我正在考虑使用 ma​​pPartitions 加载模型,然后使用 map 调用 get_value 函数.我不知道如何将代码转换为 mapPartition,其中每个分区只加载一次 tensorflow 模型并减少运行时间.

I am thinking of loading the model using mapPartitions and then use map to call get_value function. I have no clue as how to convert the code to a mapPartition where I load the tensorflow model only once per parition and reduce the running time.

提前致谢.

推荐答案

我猜下面的代码是一个巨大的改进,因为它使用了 mapPartitions.

I guess that the below code is a huge improvement as it uses mapPartitions.

代码

def predict(rows):
    graph = tf.Graph()
    checkpoint_file = "/home/sahil/Desktop/Relation_Extraction/data/1485336002/checkpoints/model-300"
    print("Loading model................................")
    with graph.as_default():
        session_conf = tf.ConfigProto(
            allow_soft_placement=allow_soft_placement,
            log_device_placement=log_device_placement)
        sess = tf.Session(config=session_conf)
        with sess.as_default():
            # Load the saved meta graph and restore variables
            saver = tf.train.import_meta_graph("{}.meta".format(checkpoint_file))
            saver.restore(sess, checkpoint_file)
        print("**********************************************")
        # Get the placeholders from the graph by name
        input_x = graph.get_operation_by_name("X_train").outputs[0]
        dropout_keep_prob = graph.get_operation_by_name("dropout_keep_prob").outputs[0]
        # Tensors we want to evaluate
        predictions = graph.get_operation_by_name("output/predictions").outputs[0]

        # Generate batches for one epoch
        for row in rows:
            X_test = [row[1]]
            batch_predictions = sess.run(predictions, {input_x: X_test, dropout_keep_prob: 
            yield (row[0], batch_predictions)


result = data_rdd.mapPartitions(lambda iter: predict(iter))
result.foreach(print)

这篇关于使用 pyspark 将地图转换为 mapPartition的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆