GRPC使训练在单个工作程序中暂停(分布式张量流,已同步) [英] GRPC causes training to pause in individual worker (distributed tensorflow, synchronised)

查看:92
本文介绍了GRPC使训练在单个工作程序中暂停(分布式张量流,已同步)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试以同步分布式的方式训练模型以实现数据并行性。我的机器上有4个GPU。每个GPU都应该运行一个工作程序来训练数据的单独的不重叠子集(在图复制之间)。主数据文件分为16个较小的TFRecord文件。每个工作人员应处理4个不同的文件。问题在于,培训在每个工人流程中独立且在不同时间冻结。它们在某个时刻冻结。

I am trying to train model in synchronous distributed fashion for data parallelism. There are 4 gpus in my machine. Each gpu should should run a worker to train on separate non-overlapping subset of the data (between graph replication). The main data file is separated into 16 smaller TFRecord files. Each worker is supposed to process 4 different files. The problem is that training freezes independently and at different times in each worker process. They freeze at some point.

< img src = https://i.stack.imgur.com/KwilQ.png alt =在此处输入图片描述>

其中之一'ps'报告以下与grpc相关的错误:

One of the 'ps' reports following error related to grpc:

2017-09-21 16:45:55.606842: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> localhost:2000, 1 -> localhost:2001, 2 -> localhost:2002}
2017-09-21 16:45:55.606877: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> localhost:2003, 1 -> localhost:2004, 2 -> localhost:2005, 3 -> localhost:2006}
2017-09-21 16:45:55.608066: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:316] Started server with target: grpc://localhost:2002
E0921 16:48:52.596846076    3037 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=12325, new grpc_chttp2_stream id=12317
2017-09-21 16:48:57.497244: W tensorflow/core/framework/op_kernel.cc:1158] Out of range: End of sequence
     [[Node: data_source_task_index_0/IteratorGetNext = IteratorGetNext[output_shapes=[[-1,-1], [-1,-1], [-1,-1], [-1,-1], [-1,-1]], output_types=[DT_INT64, DT_INT64, DT_INT64, DT_INT64, DT_INT64], _device="/job:ps/replica:0/task:0/cpu:0"](data_source_task_index_0/Iterator)]]
     [[Node: data_source_task_index_0/cond/Merge_2_S341 = _Recv[client_terminated=false, recv_device="/job:ps/replica:0/task:2/cpu:0", send_device="/job:ps/replica:0/task:0/cpu:0", send_device_incarnation=-6450759800525444137, tensor_name="edge_359_data_source_task_index_0/cond/Merge_2", tensor_type=DT_INT64, _device="/job:ps/replica:0/task:2/cpu:0"]()]]
E0921 16:49:58.462749643    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24775, new grpc_chttp2_stream id=24769
E0921 16:49:58.462780714    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24775, new grpc_chttp2_stream id=24773
E0921 16:49:58.463260203    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24777
E0921 16:49:58.463277333    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24779
E0921 16:49:58.463283953    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24781
E0921 16:49:58.463289625    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24783
E0921 16:49:58.463295275    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24785

输入管道

我正在使用tensorflow数据集API作为输入管道。数据集代码的草图如下所示:

I am using tensorflow dataset API as input pipeline. The sketch of the dataset code looks as follows:

def _dataset(filenames):
    input_files = tf.constant(filenames, dtype=tf.string)
    dataset = tf.contrib.data.TFRecordDataset(filenames)
    dataset = dataset.map(_parse_single_example)
    dataset = dataset.padded_batch(batch_size, padded_shapes=([-1], [-1]))
    iterator = dataset.make_initializable_iterator()
    words, labels = iterator.get_next()
    init_op = iterator.initializer
    return init_op, words, labels

尝试进行数据分离

首先,我们获取当前工作程序/任务的文件列表。

First, we get list of files for the current worker/task.

data_files = get_file_name_for_this_worker(task_index)

然后,将data_files馈入数据集中。我们想要的效果是没有两个工作人员可以处理同一数据集。

Then, the data_files are feed into the dataset. The effect we want have is that no two workers process the same dataset.

为每个工人分配数据范围

    with tf.device(
        tf.train.replica_device_setter(
            worker_device = worker_device,
            ps_device     = ps_device,
            cluster       = cluster,
            ps_strategy = load_balancer)):

            global DATA_SOURCES

            # Setup dataset for each worker (in each process)
            for worker_id in range(num_workers):
                with tf.variable_scope('data_source_task_index_%d' % worker_id):
                    DATA_SOURCES[worker_id] = _dataset(data_files)

            # Select the relevent data source for current task       
            init_op, words, labels = DATA_SOURCES[task_index]

            model = build_model(words, labels)
            ...
            sess, sv, train_op = synchronise(model, p_config, server)
            train(model, sess, train_op, init_op, sv)

训练循环

训练迭代代码应使数据源在完全通过本地数据(每个本地纪元)之后初始化。

The training iteration code is such that the data source is initialized after completely passing through the local data (every local epoch). The OutOfRange Exception is an indication that the epoch is complete.

def train(model, sess, train_op, init_op, sv)   
    for epoch in range(FLAGS.num_epochs):
        print("Initialising the data source")
        sess.run(init_op)
        batch = 0
        while True:
            batch += 1
            try:
                if (batch % FLAGS.report_every_batch == 0):
                    on_report_batch(model, train_op, sess)
                else:
                    sess.run(train_op)
            except tf.errors.OutOfRangeError:
                on_epoch_complete(model, sess)
                break
    print("Out of epoch loop")
    if writer:
        writer.close()
        print('Done training, total elapsed time:  %f' % (time.time()-begin_time))

def on_report_batch(model, train_op, sess):
    ...
    _, batch_loss, step = sess.run([train_op, model.batch_loss, model.global_step])
    print("Step: %d," % step, 
          " Epoch: %2d," % (epoch+1), 
          " Batch: %3d," % batch, 
          " Batch Cost: %.4f," % batch_loss,
          " Elapsed Time: %f, " % (elapsed/60.0),
          " Time per batch: %f" % time_per_batch)


推荐答案

以下每晚构建的tensorflow:

The problem went away with the following nightly build of tensorflow:

> http://ci.tensorflow.org/view/Nightly/job/nightly-matrix-linux- gpu / TF_BUILD_IS_OPT = OPT,TF_BUILD_IS_PIP = PIP,TF_BUILD_PYTHON_VERSION = PYTHON3.5,label = gpu-linux /

此处有一些相关评论:

https://github.com/tensorflow / tensorflow / issues / 13213

这篇关于GRPC使训练在单个工作程序中暂停(分布式张量流,已同步)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆