tensorflow分布式进程中的任务分配 [英] task assignment in tensorflow distributed process
问题描述
我对 tensorflow 中的分布式训练过程感到困惑.
I'm confused about the distributed training process in tensorflow.
我认为 tensorflow 将一个 batch_size 的数据提供给工作人员,然后工作人员更新 ps 服务器,这是对的吗?
I think the tensorflow feed a batch_size of data to a worker and then the worker update the ps server,is this right?
但是在训练时,我注意到日志中的步数可能很奇怪.
But when training , I noticed that the step number in the log may strange.
如果我只有 2 个工人,我认为正确的流程应该是这样的
If I have only 2 workers , I thinks the right process should be some thing like
[worker1] step 0 xxxxxxx
[worker2] step 100 xxxxxxx
[worker1] step 200 xxxxxxx
[worker2] step 300 xxxxxxx
.....每个工人都应该打印不同的步骤来记录.
..... every worker should print different step to log.
实际上,日志如下:
[worker1] step 0 xxxxxxx
[worker2] step 100 xxxxxxx
[worker1] step 100 xxxxxxx
[worker2] step 200 xxxxxxx
[worker1] step 300 xxxxxxx
...为什么 worker1 不打印步骤 200?
... Why the worker1 dosn't print step 200?
我对工作分配感到困惑.
I am confused about the job assign.
tensorflow 如何做分布训练?首席工人将数据拆分为 batch_size ,然后给一个工人一个批次,然后更新 ps 服务器?或者,每个工作人员都会运行整个数据,并更新 ps 服务器?
How the tensorflow do the distribution training? the chief worker split data to batch_size , then give a batch to a worker then update the ps server? OR, every worker will run whole data ,and update the ps server ?
```
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# Read TFRecords files for training
filename_queue = tf.train.string_input_producer(
tf.train.match_filenames_once(FLAGS.train),
num_epochs=epoch_number)
serialized_example = read_and_decode(filename_queue)
batch_serialized_example = tf.train.shuffle_batch(
[serialized_example],
batch_size=batch_size,
num_threads=thread_number,
capacity=capacity,
min_after_dequeue=min_after_dequeue)
features = tf.parse_example(
batch_serialized_example,
features={
"label": tf.FixedLenFeature([], tf.float32),
"ids": tf.VarLenFeature(tf.int64),
"values": tf.VarLenFeature(tf.float32),
})
batch_labels = features["label"]
batch_ids = features["ids"]
batch_values = features["values"]
# Read TFRecords file for validatioin
validate_filename_queue = tf.train.string_input_producer(
tf.train.match_filenames_once(FLAGS.eval),
num_epochs=epoch_number)
validate_serialized_example = read_and_decode(validate_filename_queue)
validate_batch_serialized_example = tf.train.shuffle_batch(
[validate_serialized_example],
batch_size=validate_batch_size,
num_threads=thread_number,
capacity=capacity,
min_after_dequeue=min_after_dequeue)
validate_features = tf.parse_example(
validate_batch_serialized_example,
features={
"label": tf.FixedLenFeature([], tf.float32),
"ids": tf.VarLenFeature(tf.int64),
"values": tf.VarLenFeature(tf.float32),
})
validate_batch_labels = features["label"]
validate_batch_ids = features["ids"]
validate_batch_values = features["values"]
logits = inference(batch_ids, batch_values)
batch_labels = tf.to_int64(batch_labels)
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits,
batch_labels)
loss = tf.reduce_mean(cross_entropy, name='loss')
print("Use the optimizer: {}".format(FLAGS.optimizer))
optimizer = tf.train.FtrlOptimizer(learning_rate)
global_step = tf.Variable(0, name='global_step', trainable=False)
train_op = optimizer.minimize(loss, global_step=global_step)
# Initialize saver and summary
steps_to_validate = FLAGS.steps_to_validate
init_op = tf.initialize_all_variables()
saver = tf.train.Saver(max_to_keep = 2)
keys_placeholder = tf.placeholder("float")
keys = tf.identity(keys_placeholder)
tf.add_to_collection("inputs", json.dumps({'key': keys_placeholder.name}))
tf.add_to_collection("outputs", json.dumps({'key': keys.name,
'softmax': inference_softmax.name,
'prediction': inference_op.name}))
summary_op = tf.merge_all_summaries()
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
logdir="./train_process/",
init_op=init_op,
summary_op=summary_op,
saver=saver,
global_step=global_step,
save_model_secs=60)
# Create session to run graph
with sv.managed_session(server.target) as sess:
while not sv.should_stop():
# Get coordinator and run queues to read data
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord, sess=sess)
start_time = datetime.datetime.now()
try:
while not coord.should_stop():
_, loss_value, step = sess.run([train_op, loss, global_step])
if step % steps_to_validate == 0:
accuracy_value, auc_value, summary_value = sess.run(
[accuracy, auc_op, summary_op])
end_time = datetime.datetime.now()
print("[{}] Task: {}, Step: {}, loss: {}, accuracy: {}, auc: {}".format(
end_time - start_time,
FLAGS.task_index,
step, loss_value, accuracy_value,
auc_value))
start_time = end_time
except tf.errors.OutOfRangeError:
print("Done training after reading all data")
finally:
coord.request_stop()
print("coord stopped")
# Wait for threads to exit
coord.join(threads)
```
(如果日志很大,请上传附件或提供链接).```
(If logs are large, please upload as attachment or provide link). ```
[0:00:17.115814] Task: 0, Step: 74600, loss: 0.303285002708, accuracy: 0.910000026226, auc: 0.946377456188
[0:00:03.804889] Task: 1, Step: 74700, loss: 0.287385582924, accuracy: 0.879999995232, auc: 0.946395516396
[0:00:03.778589] Task: 0, Step: 74800, loss: 0.247096762061, accuracy: 0.860000014305, auc: 0.946370542049
[0:00:03.772320] Task: 1, Step: 74900, loss: 0.264987647533, accuracy: 0.899999976158, auc: 0.946406364441
[0:00:03.795459] Task: 0, Step: 75000, loss: 0.228719010949, accuracy: 0.899999976158, auc: 0.946437120438
[0:00:01.902293] Task: 1, Step: 75000, loss: 0.217391207814, accuracy: 0.910000026226, auc: 0.946473121643
[0:00:01.942055] Task: 1, Step: 75100, loss: 0.284583866596, accuracy: 0.889999985695, auc: 0.946496844292
[0:00:03.860608] Task: 0, Step: 75200, loss: 0.273199081421, accuracy: 0.850000023842, auc: 0.946503221989
[0:00:03.800881] Task: 1, Step: 75300, loss: 0.189931258559, accuracy: 0.930000007153, auc: 0.946559965611
```
推荐答案
除了 HowTo 所以通过研究例子来弄清楚事情是如何运作的一个好方法.
There aren't really official docs besides the HowTo so a good way to figure out how things work is by studying examples.
要理解的基本概念是有 3 种张量流过程.
The basic concept to understand is that there are 3 kinds of tensorflow processes.
客户端——这是构建图形的 Python 进程,连接到本地主节点 (
Session()
) 或远程主节点 (Session("grpc://...")
) 并发出session.run
调用.
The client -- this is the Python process which builds the graph, connects to local master (
Session()
) or remote master (Session("grpc://...")
) and issuessession.run
calls.
有 master,它是客户端连接到的进程,它确定如何在 worker 之间分配工作.
There's the master, which is the process that client connects to, and which figures out how to distribute the work among workers.
工作人员负责实际工作.如果你的图有一个 with tf.device(job:worker/task:0):
, 块,那么该块中的计算应该在 task:0 上执行
There's the worker, which does actual work. If your graph has a with tf.device(job:worker/task:0):
, block, then computation in that block should be executed on task:0
当您使用 server = tf.train.Server
创建新服务器时,启动的进程既是工作进程又是主进程,但了解调试差异很有用.
When you create new server with server = tf.train.Server
, the process that's started is both a worker and a master, but it's useful to understand the difference for debugging.
分布式 TF 的最简单示例是当您有一个客户端,它启动一个进程内 master 和多个 worker.这是一个这样的示例.在这种用法中,与非分布式版本的主要区别在于您使用 with tf.device("worker1")
而不是 tf.device("gpu1")
告诉它在 worker1
The easiest example of distributed TF is when you have a single client, which starts an in-process master, and multiple workers. Here's one such example. In this usage, the main difference from non-distributed version is that you do with tf.device("worker1")
instead of tf.device("gpu1")
to tell it to execute that part of graph on worker1
当您有多个客户端时,情况会变得更加复杂,例如图间复制".在参数服务器示例中,您有多个并行训练循环,其中每个循环对应一个单独的客户端,该客户端是一个发出运行调用的 Python 进程.要查看 ops 实际位于哪个 worker,您可以查看 with tf.device
注释.
It gets more complicated when you have multiple clients, as in the case of "between-graph replication." The parameter server example, you have multiple parallel training loops, where each loop corresponds to a separate client which is a python process issuing run calls. To see on which worker the ops are actually located you can look on the with tf.device
annotations.
在您的示例中,您的代码段中没有显式 with.device("job:worker/task")
块,但这部分由 tf.device(tf.train.replica_device_setter(
.本质上,代码不是为块中的所有操作设置固定设备,而是为每个操作运行 replica_device_setter
以生成设备以放置它.它放置所有变量/job:ps/task
工作人员,以及当前工作人员上的其余操作.随着时间的推移,replica_device_setter
的代码变得有点复杂,但您可以使用更简单的实现,效果如下
In your example you don't have explicit with.device("job:worker/task")
blocks in your snippet, but this part is done by tf.device(tf.train.replica_device_setter(
. Essentially instead of having a fixed device for all ops in block, the code runs the replica_device_setter
for each op to generate device to place it on. It places all variables onto /job:ps/task
workers, and the rest of the ops on the current worker. The code for replica_device_setter
got a bit complicated over time, but you could use a simpler implementation of it for the same effect as below
def simple_setter(ps_device="/job:ps/task:0"):
def _assign(op):
node_def = op if isinstance(op, tf.NodeDef) else op.node_def
if node_def.op == "Variable":
return ps_device
else:
return "/job:worker/task:%d" % (FLAGS.task)
return _assign
...
with tf.device(simple_setter):
...
当你运行这个时,每个 python 进程将创建略有不同版本的图,除了变量节点,它们在每个进程中看起来都相同(检查 tf.get_default_graph().as_graph_def())
When you run this, each python process will create slightly different version of the graph, except for the Variable nodes, which will look identical in each process (check with tf.get_default_graph().as_graph_def())
当您有多个客户端运行训练循环时,一个问题是——谁执行需要为所有客户端完成一次的任务?例如,有人需要为所有变量运行初始化程序.您可以将 sess.run(tf.initialize_all_variables...)
放在客户端主体中,但是由于多个客户端并行运行,这意味着操作初始化会运行不止一次.所以解决方案是指定一名工人作为首席"工人,并且只让该工人运行操作.
When you have multiple clients running training loops, one issue is -- who executes tasks that need to be done once for all clients? For instance, someone needs to run initializers for all variables. You could put sess.run(tf.initialize_all_variables...)
in client body, but with multiple clients running in parallel, this means op initializations are run more than once. So the solution is to designate one worker as "chief" worker, and only have that worker run the operation.
此外,worker
和 ps
设备之间没有内在的区别——这只是将变量分配给 ps
设备的约定,并将操作分配给 worker
设备.您也可以只拥有 worker
设备,并让 replica_device_setter
版本将变量放置到第 0 个工作器.
Also, there's no built-in distinction between worker
and ps
devices -- it's just a convention that variables get assigned to ps
devices, and ops are assigned to worker
devices. You could alternatively only have worker
devices, and have a version of replica_device_setter
put variables to 0'th worker.
这是一个准系统示例,m
工人更新分片的变量n
PS 任务,使用显式设备分配代替 replica_device_setter
Here's a barebones example with m
workers updating variables sharded over n
PS tasks, which uses explicit device assignment instead of replica_device_setter
总而言之,在您的情况下,replica_device_setter
确保您的 global_step
是一个存储在 ps
工作器上的变量,因此此变量在您的所有训练循环中共享.至于为什么您在两个工作人员中获得相同的 global_step
- 您的图表中没有任何内容迫使 global_step
在 递增之后被读取.因此,如果您在两个不同的工作线程上并行运行 sess.run([increment_global_step, fetch_global_step])
,您可能会看到
To summarize, in your case replica_device_setter
makes sure that your global_step
is a variable that's stored on ps
worker, and as such that makes this variable shared across all of your training loops. As to why you get the same of global_step
in both workers -- there's nothing in your graph forcing global_step
to be read after it's incremented. So if you run sess.run([increment_global_step, fetch_global_step])
in parallel on two different workers, you could potentially see
worker 0: 0
worker 1: 0
worker 0: 2
worker 1: 2
etc
这篇关于tensorflow分布式进程中的任务分配的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!