tensorflow分布式进程中的任务分配 [英] task assignment in tensorflow distributed process

查看:44
本文介绍了tensorflow分布式进程中的任务分配的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对 tensorflow 中的分布式训练过程感到困惑.

I'm confused about the distributed training process in tensorflow.

我认为 tensorflow 将一个 batch_size 的数据提供给工作人员,然后工作人员更新 ps 服务器,这是对的吗?

I think the tensorflow feed a batch_size of data to a worker and then the worker update the ps server,is this right?

但是在训练时,我注意到日志中的步数可能很奇怪.

But when training , I noticed that the step number in the log may strange.

如果我只有 2 个工人,我认为正确的流程应该是这样的

If I have only 2 workers , I thinks the right process should be some thing like

[worker1] step 0 xxxxxxx
[worker2] step 100 xxxxxxx
[worker1] step 200 xxxxxxx
[worker2] step 300 xxxxxxx

.....每个工人都应该打印不同的步骤来记录.

..... every worker should print different step to log.

实际上,日志如下:

[worker1] step 0 xxxxxxx
[worker2] step 100 xxxxxxx
[worker1] step 100 xxxxxxx
[worker2] step 200 xxxxxxx
[worker1] step 300 xxxxxxx

...为什么 worker1 不打印步骤 200?

... Why the worker1 dosn't print step 200?

我对工作分配感到困惑.

I am confused about the job assign.

tensorflow 如何做分布训练?首席工人将数据拆分为 batch_size ,然后给一个工人一个批次,然后更新 ps 服务器?或者,每个工作人员都会运行整个数据,并更新 ps 服务器?

How the tensorflow do the distribution training? the chief worker split data to batch_size , then give a batch to a worker then update the ps server? OR, every worker will run whole data ,and update the ps server ?

```

with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):
    # Read TFRecords files for training
    filename_queue = tf.train.string_input_producer(
        tf.train.match_filenames_once(FLAGS.train),
        num_epochs=epoch_number)
    serialized_example = read_and_decode(filename_queue)
    batch_serialized_example = tf.train.shuffle_batch(
        [serialized_example],
        batch_size=batch_size,
        num_threads=thread_number,
        capacity=capacity,
        min_after_dequeue=min_after_dequeue)
    features = tf.parse_example(
        batch_serialized_example,
        features={
            "label": tf.FixedLenFeature([], tf.float32),
            "ids": tf.VarLenFeature(tf.int64),
            "values": tf.VarLenFeature(tf.float32),
        })
    batch_labels = features["label"]
    batch_ids = features["ids"]
    batch_values = features["values"]

    # Read TFRecords file for validatioin
    validate_filename_queue = tf.train.string_input_producer(
        tf.train.match_filenames_once(FLAGS.eval),
        num_epochs=epoch_number)
    validate_serialized_example = read_and_decode(validate_filename_queue)
    validate_batch_serialized_example = tf.train.shuffle_batch(
        [validate_serialized_example],
        batch_size=validate_batch_size,
        num_threads=thread_number,
        capacity=capacity,
        min_after_dequeue=min_after_dequeue)
    validate_features = tf.parse_example(
        validate_batch_serialized_example,
        features={
            "label": tf.FixedLenFeature([], tf.float32),
            "ids": tf.VarLenFeature(tf.int64),
            "values": tf.VarLenFeature(tf.float32),
        })
    validate_batch_labels = features["label"]
    validate_batch_ids = features["ids"]
    validate_batch_values = features["values"]
    logits = inference(batch_ids, batch_values)
    batch_labels = tf.to_int64(batch_labels)
    cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits,
                                                                   batch_labels)
    loss = tf.reduce_mean(cross_entropy, name='loss')

    print("Use the optimizer: {}".format(FLAGS.optimizer))

    optimizer = tf.train.FtrlOptimizer(learning_rate)

    global_step = tf.Variable(0, name='global_step', trainable=False)
    train_op = optimizer.minimize(loss, global_step=global_step)




    # Initialize saver and summary
    steps_to_validate = FLAGS.steps_to_validate
    init_op = tf.initialize_all_variables()

    saver = tf.train.Saver(max_to_keep = 2)
    keys_placeholder = tf.placeholder("float")
    keys = tf.identity(keys_placeholder)
    tf.add_to_collection("inputs", json.dumps({'key': keys_placeholder.name}))
    tf.add_to_collection("outputs", json.dumps({'key': keys.name,
                                                'softmax': inference_softmax.name,
                                                'prediction': inference_op.name}))

    summary_op = tf.merge_all_summaries()


sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                         logdir="./train_process/",
                         init_op=init_op,
                         summary_op=summary_op,
                         saver=saver,
                         global_step=global_step,
                         save_model_secs=60)

# Create session to run graph
with sv.managed_session(server.target) as sess:

    while not sv.should_stop():
        # Get coordinator and run queues to read data
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(coord=coord, sess=sess)

        start_time = datetime.datetime.now()

        try:
            while not coord.should_stop():
                _, loss_value, step = sess.run([train_op, loss, global_step])
                if step % steps_to_validate == 0:
                    accuracy_value, auc_value, summary_value = sess.run(
                        [accuracy, auc_op, summary_op])
                    end_time = datetime.datetime.now()
                    print("[{}] Task: {}, Step: {}, loss: {}, accuracy: {}, auc: {}".format(
                        end_time - start_time,
                        FLAGS.task_index,
                        step, loss_value, accuracy_value,
                        auc_value))

                    start_time = end_time
        except tf.errors.OutOfRangeError:
            print("Done training after reading all data")
        finally:
            coord.request_stop()
            print("coord stopped")

        # Wait for threads to exit
        coord.join(threads)

```

(如果日志很大,请上传附件或提供链接).```

(If logs are large, please upload as attachment or provide link). ```

[0:00:17.115814] Task: 0, Step: 74600, loss: 0.303285002708, accuracy: 0.910000026226, auc: 0.946377456188
[0:00:03.804889] Task: 1, Step: 74700, loss: 0.287385582924, accuracy: 0.879999995232, auc: 0.946395516396
[0:00:03.778589] Task: 0, Step: 74800, loss: 0.247096762061, accuracy: 0.860000014305, auc: 0.946370542049
[0:00:03.772320] Task: 1, Step: 74900, loss: 0.264987647533, accuracy: 0.899999976158, auc: 0.946406364441
[0:00:03.795459] Task: 0, Step: 75000, loss: 0.228719010949, accuracy: 0.899999976158, auc: 0.946437120438
[0:00:01.902293] Task: 1, Step: 75000, loss: 0.217391207814, accuracy: 0.910000026226, auc: 0.946473121643
[0:00:01.942055] Task: 1, Step: 75100, loss: 0.284583866596, accuracy: 0.889999985695, auc: 0.946496844292
[0:00:03.860608] Task: 0, Step: 75200, loss: 0.273199081421, accuracy: 0.850000023842, auc: 0.946503221989
[0:00:03.800881] Task: 1, Step: 75300, loss: 0.189931258559, accuracy: 0.930000007153, auc: 0.946559965611

```

推荐答案

除了 HowTo 所以通过研究例子来弄清楚事情是如何运作的一个好方法.

There aren't really official docs besides the HowTo so a good way to figure out how things work is by studying examples.

要理解的基本概念是有 3 种张量流过程.

The basic concept to understand is that there are 3 kinds of tensorflow processes.

  1. 客户端——这是构建图形的 Python 进程,连接到本地主节点 (Session()) 或远程主节点 (Session("grpc://...")) 并发出 session.run 调用.

  1. The client -- this is the Python process which builds the graph, connects to local master (Session()) or remote master (Session("grpc://...")) and issues session.run calls.

有 master,它是客户端连接到的进程,它确定如何在 worker 之间分配工作.

There's the master, which is the process that client connects to, and which figures out how to distribute the work among workers.

工作人员负责实际工作.如果你的图有一个 with tf.device(job:worker/task:0):, 块,那么该块中的计算应该在 task:0 上执行

There's the worker, which does actual work. If your graph has a with tf.device(job:worker/task:0):, block, then computation in that block should be executed on task:0

当您使用 server = tf.train.Server 创建新服务器时,启动的进程既是工作进程又是主进程,但了解调试差异很有用.

When you create new server with server = tf.train.Server, the process that's started is both a worker and a master, but it's useful to understand the difference for debugging.

分布式 TF 的最简单示例是当您有一个客户端,它启动一个进程内 master 和多个 worker.这是一个这样的示例.在这种用法中,与非分布式版本的主要区别在于您使用 with tf.device("worker1") 而不是 tf.device("gpu1")告诉它在 worker1

The easiest example of distributed TF is when you have a single client, which starts an in-process master, and multiple workers. Here's one such example. In this usage, the main difference from non-distributed version is that you do with tf.device("worker1") instead of tf.device("gpu1") to tell it to execute that part of graph on worker1

当您有多个客户端时,情况会变得更加复杂,例如图间复制".在参数服务器示例中,您有多个并行训练循环,其中每个循环对应一个单独的客户端,该客户端是一个发出运行调用的 Python 进程.要查看 ops 实际位于哪个 worker,您可以查看 with tf.device 注释.

It gets more complicated when you have multiple clients, as in the case of "between-graph replication." The parameter server example, you have multiple parallel training loops, where each loop corresponds to a separate client which is a python process issuing run calls. To see on which worker the ops are actually located you can look on the with tf.device annotations.

在您的示例中,您的代码段中没有显式 with.device("job:worker/task") 块,但这部分由 tf.device(tf.train.replica_device_setter(.本质上,代码不是为块中的所有操作设置固定设备,而是为每个操作运行 replica_device_setter 以生成设备以放置它.它放置所有变量/job:ps/task 工作人员,以及当前工作人员上的其余操作.随着时间的推移,replica_device_setter 的代码变得有点复杂,但您可以使用更简单的实现,效果如下

In your example you don't have explicit with.device("job:worker/task") blocks in your snippet, but this part is done by tf.device(tf.train.replica_device_setter(. Essentially instead of having a fixed device for all ops in block, the code runs the replica_device_setter for each op to generate device to place it on. It places all variables onto /job:ps/task workers, and the rest of the ops on the current worker. The code for replica_device_setter got a bit complicated over time, but you could use a simpler implementation of it for the same effect as below

def simple_setter(ps_device="/job:ps/task:0"):
    def _assign(op):
        node_def = op if isinstance(op, tf.NodeDef) else op.node_def
        if node_def.op == "Variable":
            return ps_device
        else:
            return "/job:worker/task:%d" % (FLAGS.task)
    return _assign
 ...
with tf.device(simple_setter):
    ...

当你运行这个时,每个 python 进程将创建略有不同版本的图,除了变量节点,它们在每个进程中看起来都相同(检查 tf.get_default_graph().as_graph_def())

When you run this, each python process will create slightly different version of the graph, except for the Variable nodes, which will look identical in each process (check with tf.get_default_graph().as_graph_def())

当您有多个客户端运行训练循环时,一个问题是——谁执行需要为所有客户端完成一次的任务?例如,有人需要为所有变量运行初始化程序.您可以将 sess.run(tf.initialize_all_variables...) 放在客户端主体中,但是由于多个客户端并行运行,这意味着操作初始化会运行不止一次.所以解决方案是指定一名工人作为首席"工人,并且只让该工人运行操作.

When you have multiple clients running training loops, one issue is -- who executes tasks that need to be done once for all clients? For instance, someone needs to run initializers for all variables. You could put sess.run(tf.initialize_all_variables...) in client body, but with multiple clients running in parallel, this means op initializations are run more than once. So the solution is to designate one worker as "chief" worker, and only have that worker run the operation.

此外,workerps 设备之间没有内在的区别——这只是将变量分配给 ps 设备的约定,并将操作分配给 worker 设备.您也可以只拥有 worker 设备,并让 replica_device_setter 版本将变量放置到第 0 个工作器.

Also, there's no built-in distinction between worker and ps devices -- it's just a convention that variables get assigned to ps devices, and ops are assigned to worker devices. You could alternatively only have worker devices, and have a version of replica_device_setter put variables to 0'th worker.

这是一个准系统示例m 工人更新分片的变量n PS 任务,使用显式设备分配代替 replica_device_setter

Here's a barebones example with m workers updating variables sharded over n PS tasks, which uses explicit device assignment instead of replica_device_setter

总而言之,在您的情况下,replica_device_setter 确保您的 global_step 是一个存储在 ps 工作器上的变量,因此此变量在您的所有训练循环中共享.至于为什么您在两个工作人员中获得相同的 global_step - 您的图表中没有任何内容迫使 global_step 递增之后被读取.因此,如果您在两个不同的工作线程上并行运行 sess.run([increment_global_step, fetch_global_step]),您可能会看到

To summarize, in your case replica_device_setter makes sure that your global_step is a variable that's stored on ps worker, and as such that makes this variable shared across all of your training loops. As to why you get the same of global_step in both workers -- there's nothing in your graph forcing global_step to be read after it's incremented. So if you run sess.run([increment_global_step, fetch_global_step]) in parallel on two different workers, you could potentially see

worker 0: 0
worker 1: 0
worker 0: 2
worker 1: 2
etc

这篇关于tensorflow分布式进程中的任务分配的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆