两个tf.data.Dataset是否可以共存并由tf.cond()控制 [英] Could two tf.data.Dataset coexist and controled by tf.cond()

查看:83
本文介绍了两个tf.data.Dataset是否可以共存并由tf.cond()控制的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的Graph中设置了两个用于训练/测试= 9:1的 Dataset 管道,并通过tf.cond控制流。我遇到一个问题,在培训期间,每个步骤都激活了两个管道。测试集在训练集之前用完了,因为它在训练期间较少。

I put two Dataset pipeline for train/test = 9:1 set in my Graph and the control the flow by a tf.cond. I encountered a problem that during the training the both pipelines are activated at each step. The testset ran out before the trainset as it has less during training.


OutOfRangeError(请参阅上面的追溯):序列结束

OutOfRangeError (see above for traceback): End of sequence

首先,将输入管道嵌套在一个函数中:

First, nest the input pipeline in a function:

def input_pipeline(*args):
    ...
    # construct iterator
    it = batch.make_initializable_iterator()
    iter_init_op = it.initializer

    # get next img and label
    X_it, y_it = it.get_next()
    inputs = {'img': X_it, 'label': y_it, 'iterator_init_op': iter_init_op}
    return inputs

然后启动:

train_input = input_pipeline(args)
test_input = input_pipeline(args)

在模型中,我们放置了一个占位符来填充feed_dict不会影响性能的条件:

In the model, we put a placeholder to fill condition with feed_dict which won't penalize the performance:

...
def f1(): return train_input
def f2(): return test_input
cond_pl = tf.placeholder(tf.string, name='cond_pl')
input = tf.cond(tf.equal(cond_pl, 'train'), lambda: f1(), lambda: f2())
...

在会话中:

for ep in range(nb_ep):
   ...
   for step in range(ep_len):
       print('step:{}\r'.format(step))
       try:
           sess.run([train_op], feed_dict={cond_pl: 'train'})
           if step % step_len == (step_len - 1):
               sess.run([test_op], feed_dict={cond_pl: 'test'})
       except tf.errors.OutOfRangeError:
           raise('drop the remainder')
   ...

仅在条件合适的情况下,如何让输入管道 get_next()被调用?

How can I let the input pipeline get_next() get called only if the condition fits?

根据@sharky答案更新的播放片段:

def write_h5(args):
    x, is_training = args
    with h5py.File('./{}_{}.h5'.format('train' if is_training else 'test', x), 'w') as f:
        h = w = np.arange(-1, 1, 0.02)
        hh, _ = np.meshgrid(h, w)
        a = hh ** 2
        b = np.add(a + 1, np.random.randn(100, 100))  #do something and add gaussian noise
        f.create_dataset('X', shape=(100, 100), dtype='float32', data=a)
        f.create_dataset('y', shape=(100, 100), dtype='float32', data=b)


def input_pipeline(window_size, batch_size, is_train=True, ncores=mp.cpu_count()):
    flist = []
    for dirpath, _, fnames in os.walk('./'):
        for fname in fnames:
            if fname.startswith('train' if is_train else 'test') and fname.endswith('.h5'):
                print(fname)
                flist.append((os.path.abspath(os.path.join(dirpath, fname)), str(window_size)))
    f_len = len(flist)
    print(f_len)
    # init list of files
    batch = tf.data.Dataset.from_tensor_slices((tf.constant(flist)))
    batch = batch.map(_pyfn_wrapper, num_parallel_calls=ncores)
    batch = batch.shuffle(batch_size).batch(batch_size, drop_remainder=True).prefetch(ncores).repeat()

    # construct iterator
    it = batch.make_initializable_iterator()
    iter_init_op = it.initializer

    # get next img and label
    X_it, y_it = it.get_next()
    inputs = {'img': X_it, 'label': y_it, 'iterator_init_op': iter_init_op}
    return inputs, f_len


def _pyfn_wrapper(args):
    return tf.py_func(parse_h5,  #wrapped pythonic function
                      [args],
                      [tf.float32, tf.float32]  #[input, output] dtype
                      )

def parse_h5(args):
    name, window_size = args
    window_size = int(window_size.decode('utf-8'))
    with h5py.File(name, 'r') as f:
        X = f['X'][:].reshape(window_size, window_size, 1)
        y = f['y'][:].reshape(window_size, window_size, 1)
        return X, y


# init data
p = mp.Pool(mp.cpu_count())
p.map(write_h5, zip(range(9000), repeat(True)))
p.map(write_h5, zip(range(1000), repeat(False)))

# hparam
ep_len = 90
step_len = 9  # run test_op after 9 steps

# create tf.data.Dataset
train_input, train_len = input_pipeline(100, 5, is_train=True)
test_input, test_len = input_pipeline(100, 5, is_train=False)


# draw graph
def f1(): return train_input
def f2(): return test_input


cond_pl = tf.placeholder(tf.string, shape=None, name='cond_pl')
input = tf.cond(tf.equal(cond_pl, 'train'), lambda: f1(), lambda: f2())  # I thou

with tf.name_scope("Conv1"):
    W = tf.get_variable("W", shape=[3, 3, 1, 1],
                         initializer=tf.contrib.layers.xavier_initializer())
    b = tf.get_variable("b", shape=[1], initializer=tf.contrib.layers.xavier_initializer())
    layer1 = tf.nn.conv2d(input['img'], W, strides=[1, 1, 1, 1], padding='SAME') + b
    logits = tf.nn.relu(layer1)

loss = tf.reduce_mean(tf.losses.mean_squared_error(labels=input['label'], predictions=logits))
train_op = tf.train.AdamOptimizer(learning_rate=0.0001).minimize(loss)
test_op = print(loss)
#

# session
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    for ep in range(5):
        print('ep:{}'.format(ep))
        sess.run(input['iterator_init_op'], feed_dict={cond_pl: 'train'})
        sess.run(input['iterator_init_op'], feed_dict={cond_pl: 'test'})
        for step in range(ep_len):
            print('step:{}\r'.format(step))
            try:
                sess.run([train_op], feed_dict={cond_pl: 'train'})
                if step % step_len == (step_len - 1):
                    sess.run([test_op], feed_dict={cond_pl: 'test'})
            except tf.errors.OutOfRangeError:
                raise('drop the remainder')


推荐答案

考虑示例:

train = np.arange(90)
test = np.arange(10)

train_ds = tf.data.Dataset.from_tensor_slices(train).shuffle(10).batch(10).repeat()
test_ds = tf.data.Dataset.from_tensor_slices(test).shuffle(10).batch(10).repeat()

train_iterator = train_ds.make_initializable_iterator()
test_iterator = test_ds.make_initializable_iterator()

with tf.Session() as sess:
    sess.run(train_iterator.initializer)
    sess.run(test_iterator.initializer)
    for i in range(len(train) + 1):
        print(sess.run(train_iterator.get_next()))
        if i % 9 == 8:
            print(sess.run(test_iterator.get_next()))

两个数据集,两个迭代器,都在启动时初始化。当 i 超过数据集的长度时,由于 repeat(),它开始重复两个数据集。如果要使用num_epochs调用它,或者根本不调用它,则序列结束。如果由于某种原因您需要/想要使用 cond ,也许此答案会有所帮助

Two datasets, two iterators, both initialized at startup. When i exceeds length of datasets, it starts repeating both of them because of repeat(). If it'll be called with num_epochs or not called at all, you'll get end of sequence. If for some reason you need/want to use cond, maybe this answer will help

如何将Tensorflow的tf.cond()与两个

这篇关于两个tf.data.Dataset是否可以共存并由tf.cond()控制的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆