分布式 TensorFlow - 没有运行一些工作程序 [英] Distributed TensorFlow - Not running some workers

查看:38
本文介绍了分布式 TensorFlow - 没有运行一些工作程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试获取一个非常简单的分布式 TensorFlow 工作示例.但是,我遇到了一个在运行之间不确定地出现的错误.在某些运行中,它可以完美运行.输出一些东西:

工人 2 |步骤 0工人 0 |步骤 0工人 1 |步骤 0工人 3 |步骤 0工人 2 |第1步工人 0 |第1步工人 1 |第1步工人 3 |第1步...

然而,每隔一段时间,一个或多个工作程序无法运行,导致输出如下:

工人 0 |步骤 0工人 3 |步骤 0工人 0 |第1步工人 3 |第1步工人 0 |第2步工人 3 |第2步...

如果我无限期地运行循环,似乎丢失的工人总是在某个时刻启动,但只是几分钟后,这是不切实际的.

我发现有两件事可以使问题消失(但使程序无用):1. 没有在 with tf.device(tf.train.replica_device_setter()) 中声明任何 tf 变量 范围.如果我什至声明一个变量(例如下面的 nasty_var),问题就会开始出现.和 2. 将 tf.train.MonitoredTrainingSession() 中的 is_chief 参数设置为所有工人的 True.即使声明了变量,这也会导致错误消失,但让所有工人都成为首席似乎是错误的.我目前在下面设置它的方式 - is_chief=(task_index == 0) - 直接取自 TensorFlow 教程.

这是我可以用来复制问题的最简单的代码.(您可能需要多次运行才能看到错误,但它几乎总是在 5 次运行内出现

from multiprocessing import Process将张量流导入为 tf从时间导入睡眠从 numpy.random 导入 random_samplecluster = tf.train.ClusterSpec({'ps': ['localhost:2222'],'工人':['本地主机:2223','本地主机:2224','本地主机:2225','本地主机:2226']})def create_worker(task_index):server = tf.train.Server(集群,job_name='worker',task_index=task_index)使用 tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_index, cluster=cluster)):nasty_var = tf.Variable(0) # 这一行导致了问题.当这被注释掉时没有问题.使用 tf.train.MonitoredTrainingSession(master=server.target, is_chief=(task_index == 0)):对于 xrange(10000) 中的步骤:sleep(random_sample()) # 模拟一些正在完成的工作.打印 '工人 %d |步骤 %d' % (task_index, step)def create_ps(task_index):param_server = tf.train.Server(cluster, job_name='ps',任务索引=任务索引)param_server.join()# 在不同的进程中启动 worker 和 ps.进程 = []对于 xrange(len(clu​​ster.as_dict()['worker'])) 中的 i:打印分叉工作进程",我p = 进程(目标=create_worker,args=[i])p.start()进程.append(p)对于 xrange(len(clu​​ster.as_dict()['ps'])) 中的 i:打印分叉 ps 进程",我p = 进程(目标=create_ps,args=[i])p.start()进程.append(p)对于进程中的 p:p.join()

解决方案

我猜这里的原因是 tf.train.MonitoredTrainingSession 启动,实现了这里:

  • 如果这次会议是首席:

    • 运行变量初始化操作.
  • 其他(如果这次会议不是首席):

    • 运行操作以检查变量是否已初始化.
    • 虽然任何变量尚未初始化.
      • 等待 30 秒.
      • 尝试创建一个新会话,并检查变量是否已初始化.

(我在关于分布式 TensorFlow 的视频中讨论了该协议背后的基本原理.)>

当每个会话都是首席时,或者没有要初始化的变量时,tf.train.MonitoredTrainingSession 将总是立即开始.但是,一旦只有一个变量,而您只有一个主管,您会看到非主管必须等待主管采取行动.

使用此协议的原因是它对各种失败的进程具有鲁棒性,并且延迟(虽然在单个进程上运行所有内容时非常明显)与典型分布式训练作业的预期运行时间相比很短.

再次查看实现,似乎这个 30 秒超时应该是可配置的(作为 recovery_wait_secs 参数到 tf.train.SessionManager()),但是当您创建 tf.train.MonitoredTrainingSession 时,目前无法设置此超时,因为它使用了一组硬编码的参数 用于创建会话管理器.这似乎是 API 中的疏忽,因此请随时在 GitHub 问题页面 上打开功能请求一个>!

I'm trying to get a very simple example of distributed TensorFlow working. However, I'm having a bug that appears non-deterministically between runs. On some runs, it works perfectly. Outputting something along the lines of:

Worker 2 | step 0
Worker 0 | step 0
Worker 1 | step 0
Worker 3 | step 0
Worker 2 | step 1
Worker 0 | step 1
Worker 1 | step 1
Worker 3 | step 1
...

However, every once in a while, one or more of the workers fails to run, resulting in output like this:

Worker 0 | step 0
Worker 3 | step 0
Worker 0 | step 1
Worker 3 | step 1
Worker 0 | step 2
Worker 3 | step 2
...

If I run the loop indefinitely, it seems that the missing workers always startup at some point, but only minutes later, which isn't practical.

I've found that two things make the issue go away (but make the program useless): 1. Not declaring any tf Variables inside the with tf.device(tf.train.replica_device_setter()) scope. If I even declare one variable (e.g. nasty_var below), the issue starts cropping up. and 2. setting the is_chief param in tf.train.MonitoredTrainingSession() to True for all workers. This causes the bug to go away even if variables are declared, but it seems wrong to make all of the workers the chief. The way I'm currently setting it below - is_chief=(task_index == 0) - is taken directly from a TensorFlow tutorial.

Here's the simplest code I can get to replicate the issue. (You may have to run multiple times to see the bug, but it almost always shows up within 5 runs

from multiprocessing import Process
import tensorflow as tf
from time import sleep
from numpy.random import random_sample

cluster = tf.train.ClusterSpec({'ps': ['localhost:2222'],
                                'worker': ['localhost:2223',
                                           'localhost:2224',
                                           'localhost:2225',
                                           'localhost:2226']})


def create_worker(task_index):
    server = tf.train.Server(cluster, job_name='worker', task_index=task_index)

    with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_index, cluster=cluster)):
        nasty_var = tf.Variable(0)  # This line causes the problem. No issue when this is commented out.

    with tf.train.MonitoredTrainingSession(master=server.target, is_chief=(task_index == 0)):
        for step in xrange(10000):
            sleep(random_sample())  # Simulate some work being done.
            print 'Worker %d | step %d' % (task_index, step)


def create_ps(task_index):
    param_server = tf.train.Server(cluster, job_name='ps',
                                   task_index=task_index)
    param_server.join()

# Launch workers and ps in separate processes.
processes = []
for i in xrange(len(cluster.as_dict()['worker'])):
    print 'Forking worker process ', i
    p = Process(target=create_worker, args=[i])
    p.start()
    processes.append(p)

for i in xrange(len(cluster.as_dict()['ps'])):
    print 'Forking ps process ', i
    p = Process(target=create_ps, args=[i])
    p.start()
    processes.append(p)

for p in processes:
    p.join()

解决方案

I'm guessing the cause here is the implicit coordination protocol in how a tf.train.MonitoredTrainingSession starts, which is implemented here:

  • If this session is the chief:

    • Run the variable initializer op.
  • Else (if this session is not the chief):

    • Run an op to check if the variables has been initialized.
    • While any of the variables has not yet been initialized.
      • Wait 30 seconds.
      • Try creating a new session, and checking to see if the variables have been initialized.

(I discuss the rationale behind this protocol in a video about Distributed TensorFlow.)

When every session is the chief, or there are no variables to initialize, the tf.train.MonitoredTrainingSession will always start immediately. However, once there is a single variable, and you only have a single chief, you will see that the non-chief workers have to wait for the chief to act.

The reason for using this protocol is that it is robust to various processes failing, and the delay—while very noticeable when running everything on a single process—is short compared to the expected running time of a typical distributed training job.

Looking at the implementation again, it does seem that this 30-second timeout should be configurable (as the recovery_wait_secs argument to tf.train.SessionManager()), but there is currently no way to set this timeout when you create a tf.train.MonitoredTrainingSession, because it uses a hardcoded set of arguments for creating a session manager. This seems like an oversight in the API, so please feel free to open a feature request on the GitHub issues page!

这篇关于分布式 TensorFlow - 没有运行一些工作程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆