使用 tf.contrib.data.parallel_interleave 并行化 tf.from_generator [英] Parallelize tf.from_generator using tf.contrib.data.parallel_interleave

查看:24
本文介绍了使用 tf.contrib.data.parallel_interleave 并行化 tf.from_generator的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一堆 JSON 数组文件(准确地说是 AVRO),每个文件都会产生多个样本来训练 Keras 模型.使用来自 @GPhilo 和来自 @jsimsa,我能够想出这个来并行化我的输入管道.无法弄清楚如何设计 generator(n) 来划分处理文件的工作.代码在 parse_file(f) 内失败,因为该函数需要一个字符串文件路径而不是一个 Tensor,

I have a bunch of JSON array files (AVRO to be accurate) and each of them yield multiple samples for training a Keras Model. Using ideas from @GPhilo and from @jsimsa, I was able to come up with this to parallelize my input pipeline. Unable to figure out how to design the generator(n) to divide the work of processing files. The code fails inside parse_file(f) as the function expects a string file path and not a Tensor,

N = num_cores = 2
files_to_process = ["f1.avro", "f2.avro", "f3.avro"]
shuffle_size = prefetch_buffer = 1000
batch_size = 512

def generator(n):
    size = math.ceil(len(files_to_process) / N)
    start_index = n * size
    end_index = start_index + size

    def gen():
        # for f in files_to_process[start_index:end_index]:
        for f in tf.slice(files_to_process, start_index, size):
            yield f

    return gen

def dataset(n):
    return tf.data.Dataset.from_generator(generator(n), (tf.string,))

def process_file(f):
    examples_x, examples_y = parse_file(f)
    return examples_x, examples_y

ds = tf.data.Dataset.range(N)
ds = ds.apply(tf.contrib.data.parallel_interleave(dataset, cycle_length=N))
ds = ds.map(process_file, num_parallel_calls=N)
ds = ds.prefetch(prefetch_buffer)
ds = ds.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
ds = ds.batch(batch_size).shuffle(shuffle_size)

...
myTfKerasModel.fit( ds.make_one_iterator(), NUM_TRAIN_SAMPLES // batch_size )

  • 在这里设计generator(n)的正确方法是什么
  • 这是使用 parallel_interleaveflat_map
  • 设计我的输入管道的优化方法吗?

    • What is the correct way to design generator(n) here
    • Is this an optimized way to design my input pipeline using parallel_interleave and flat_map
    • 推荐答案

      在我看来,你正在用发电机不必要地复杂化你的生活.这是我实现您的输入管道的方式:

      It seems to me you're complicating your life unnecessarily with the generator. This is how I'd implement your input pipeline:

      def parse_file_tf(filename):
          return tf.py_func(parse_file, [filename], [tf.float32, tf.float32])
      
      # version with map
      files = tf.data.Dataset.from_tensor_slices(files_to_process)
      dataset = files.map(parse_file_tf, num_parallel_calls=N)
      dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
      dataset = dataset.batch(batch_size).shuffle(shuffle_size).prefetch(2)
      it = dataset.make_one_shot_iterator()
      

      为了测试它,我定义了一个虚拟的 parse_file 为:

      To test it, I define a dummy parse_file as:

      i=0
      def parse_file(f):
          global i
          i += 1
          return np.asarray([i]*i, dtype=np.float32), np.asarray([i]*i, dtype=np.float32) # mimicks variable-length examples_x, examples_y
      

      我将其输入到显示迭代器返回内容的基本循环中:

      which I feed into a basic loop that shows what the iterator returns:

      sess = tf.Session()
      try:
          while True:
              x, y = it.get_next()
              vx, vy = sess.run([x,y])
              print(vx)
              print(vy)
      except tf.errors.OutOfRangeError:
          pass
      sess.close()
      

      运行上面的代码打印:

      [2. 3. 2. 1. 3. 3.]
      [2. 3. 2. 1. 3. 3.]
      

      管道说明

      本质上,我将并行化问题留给 map,在那里我可以传递它应该运行的线程数.无需生成器迭代范围和那些额外的复杂性.

      Explanation of the pipeline

      Essentially, I leave the parallelization issue to map, where I can pass the number of threads it should run. No need for generators iterating over ranges and those extra complications.

      我选择 map 而不是 parallel_interleave 因为后者要求您为它返回的每个项目生成一个 Dataset 实例,这在您的情况下并没有真正意义,因为您当您运行 parse_file 时,已经在内存中加载了所有值.parallel_interleave 如果您缓慢地生成值(例如,通过将 tf.data.TFRecordDataset 应用于文件名列表)是有意义的,但是如果您的数据集适合内存,请使用 <代码>地图.

      I chose map over parallel_interleave because the latter requires you to generate a Dataset instance for each item it returns, which in your case doesn't really make sense because you already have loaded all values in memory when you run parse_file. parallel_interleave makes sense if you slowly generate the values (e.g., by applying tf.data.TFRecordDataset to a list of filenames), but if your dataset fits in memory go for map.

      关于 tf.py_func 限制,它们不会影响您训练的网络,只会影响输入管道.理想情况下,您的训练和网络的最终使用会有不同的管道.您只需要注意后者的限制,而对于训练(除非您通过分布式训练和/或在机器之间移动训练进行非常具体的操作),您是相当安全的.

      About the tf.py_func limitations, they do not affect your trained network, only the input pipeline. Ideally, you'll have a different pipeline for your training and for your final use of the network. You only need to take care of the limitations during the latter, while for training (unless you do something very specific with distributed training and/or moving the training across machines) you're reasonably safe.

      如果您的 JSON 文件非常大并且它们的内容无法放入内存中,您可以使用生成器,但与您开始使用的方法略有不同.这个想法是,生成器一次遍历 JSON 文件和 yield 的一条记录.然后,生成器必须是您的 parse_file 函数.例如,假设您有以下 parse_file 生成器:

      If your JSON files are very big and their content won't fit in memory, you can use a generator, but slightly different from the approach you began with. The idea is, the generator goes through the JSON file and yields one record at a time. Then, the generator has to be your parse_file function. As an example, let's assume you have the following parse_file generator:

      i = 3
      def parse_file(filename):
          global i
          i += 1
          ctr = 0
          while ctr < i:
              yield ctr, ctr
      

      在这种情况下,管道将如下所示:

      In this case, the pipeline would look as follows:

      def wrap_generator(filename):
          return tf.data.Dataset.from_generator(parse_file(filename), [tf.int32, tf.int32])
      
      files = tf.data.Dataset.from_tensor_slices(files_to_process)
      dataset = files.apply(tf.contrib.data.parallel_interleave(wrap_generator, cycle_length=N))
      dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
      dataset = dataset.shuffle(shuffle_size).batch(batch_size).prefetch(2)
      it = dataset.make_one_shot_iterator()
      

      请注意,这里我们需要使用 parallel_interleave,因为我们将生成器转换为从中提取值的 Dataset 实例.其余保持不变.

      Note that here we need to use parallel_interleave because we turn the generators into Dataset instances from which we extract values. The rest stays the same.

      将其送入与上述打印相同的示例循环:

      Feeding this to the same sample loop as above prints:

      [6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]
      [6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]
      

      这篇关于使用 tf.contrib.data.parallel_interleave 并行化 tf.from_generator的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆