使用tf.contrib.data.parallel_interleave并行化tf.from_generator [英] Parallelize tf.from_generator using tf.contrib.data.parallel_interleave

查看:779
本文介绍了使用tf.contrib.data.parallel_interleave并行化tf.from_generator的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一堆JSON数组文件(准确地说是AVRO),每个文件都产生多个样本来训练Keras模型.使用 @GPhilo @jsimsa ,我想出了这一点来并行化我的输入管道.无法弄清楚如何设计generator(n)来划分处理文件的工作.代码在parse_file(f)内部失败,因为该函数需要字符串文件路径而不是Tensor

I have a bunch of JSON array files (AVRO to be accurate) and each of them yield multiple samples for training a Keras Model. Using ideas from @GPhilo and from @jsimsa, I was able to come up with this to parallelize my input pipeline. Unable to figure out how to design the generator(n) to divide the work of processing files. The code fails inside parse_file(f) as the function expects a string file path and not a Tensor,

N = num_cores = 2
files_to_process = ["f1.avro", "f2.avro", "f3.avro"]
shuffle_size = prefetch_buffer = 1000
batch_size = 512

def generator(n):
    size = math.ceil(len(files_to_process) / N)
    start_index = n * size
    end_index = start_index + size

    def gen():
        # for f in files_to_process[start_index:end_index]:
        for f in tf.slice(files_to_process, start_index, size):
            yield f

    return gen

def dataset(n):
    return tf.data.Dataset.from_generator(generator(n), (tf.string,))

def process_file(f):
    examples_x, examples_y = parse_file(f)
    return examples_x, examples_y

ds = tf.data.Dataset.range(N)
ds = ds.apply(tf.contrib.data.parallel_interleave(dataset, cycle_length=N))
ds = ds.map(process_file, num_parallel_calls=N)
ds = ds.prefetch(prefetch_buffer)
ds = ds.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
ds = ds.batch(batch_size).shuffle(shuffle_size)

...
myTfKerasModel.fit( ds.make_one_iterator(), NUM_TRAIN_SAMPLES // batch_size )

  • 在这里设计generator(n)的正确方法是什么
  • 这是使用parallel_interleaveflat_map
  • 设计输入管道的一种优化方法

    • What is the correct way to design generator(n) here
    • Is this an optimized way to design my input pipeline using parallel_interleave and flat_map
    • 推荐答案

      在我看来,发电机不必要地使您的生活变得复杂. 这就是我实现输入管道的方式:

      It seems to me you're complicating your life unnecessarily with the generator. This is how I'd implement your input pipeline:

      def parse_file_tf(filename):
          return tf.py_func(parse_file, [filename], [tf.float32, tf.float32])
      
      # version with map
      files = tf.data.Dataset.from_tensor_slices(files_to_process)
      dataset = files.map(parse_file_tf, num_parallel_calls=N)
      dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
      dataset = dataset.batch(batch_size).shuffle(shuffle_size).prefetch(2)
      it = dataset.make_one_shot_iterator()
      

      要对其进行测试,我将虚拟parse_file定义为:

      To test it, I define a dummy parse_file as:

      i=0
      def parse_file(f):
          global i
          i += 1
          return np.asarray([i]*i, dtype=np.float32), np.asarray([i]*i, dtype=np.float32) # mimicks variable-length examples_x, examples_y
      

      我进入一个基本循环,该循环显示了迭代器返回的内容:

      which I feed into a basic loop that shows what the iterator returns:

      sess = tf.Session()
      try:
          while True:
              x, y = it.get_next()
              vx, vy = sess.run([x,y])
              print(vx)
              print(vy)
      except tf.errors.OutOfRangeError:
          pass
      sess.close()
      

      运行上面的代码将打印:

      Running the code above prints:

      [2. 3. 2. 1. 3. 3.]
      [2. 3. 2. 1. 3. 3.]
      

      管道的说明

      本质上,我将并行化问题留给map,在这里我可以传递应该运行的线程数.无需生成器迭代范围和那些额外的复杂问题.

      Explanation of the pipeline

      Essentially, I leave the parallelization issue to map, where I can pass the number of threads it should run. No need for generators iterating over ranges and those extra complications.

      我选择了parallel_interleave上的map,因为后者要求您为它返回的每个项目生成一个Dataset实例,在您的情况下,这实际上没有任何意义,因为您已经在运行时将所有值加载到了内存中parse_file. 如果您缓慢生成值(例如,通过将tf.data.TFRecordDataset应用于文件名列表),则parallel_interleave是有意义的,但是如果您的数据集适合内存,请使用map.

      I chose map over parallel_interleave because the latter requires you to generate a Dataset instance for each item it returns, which in your case doesn't really make sense because you already have loaded all values in memory when you run parse_file. parallel_interleave makes sense if you slowly generate the values (e.g., by applying tf.data.TFRecordDataset to a list of filenames), but if your dataset fits in memory go for map.

      关于tf.py_func限制,它们不会影响您训练有素的网络,只会影响输入管道.理想情况下,您将为培训和网络的最终使用使用不同的管道.您只需要注意后者的局限性,而对于培训(除非您对分布式培训和/或跨机器进行培训非常特殊),则可以相当安全.

      About the tf.py_func limitations, they do not affect your trained network, only the input pipeline. Ideally, you'll have a different pipeline for your training and for your final use of the network. You only need to take care of the limitations during the latter, while for training (unless you do something very specific with distributed training and/or moving the training across machines) you're reasonably safe.

      如果您的JSON文件很大,并且其内容不适合存储在内存中,则可以使用生成器,但与您最初使用的方法略有不同. 这个想法是,生成器一次遍历JSON文件并一次yield一条记录.然后,生成器必须是您的parse_file函数.例如,假设您具有以下parse_file生成器:

      If your JSON files are very big and their content won't fit in memory, you can use a generator, but slightly different from the approach you began with. The idea is, the generator goes through the JSON file and yields one record at a time. Then, the generator has to be your parse_file function. As an example, let's assume you have the following parse_file generator:

      i = 3
      def parse_file(filename):
          global i
          i += 1
          ctr = 0
          while ctr < i:
              yield ctr, ctr
      

      在这种情况下,管道如下所示:

      In this case, the pipeline would look as follows:

      def wrap_generator(filename):
          return tf.data.Dataset.from_generator(parse_file(filename), [tf.int32, tf.int32])
      
      files = tf.data.Dataset.from_tensor_slices(files_to_process)
      dataset = files.apply(tf.contrib.data.parallel_interleave(wrap_generator, cycle_length=N))
      dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
      dataset = dataset.shuffle(shuffle_size).batch(batch_size).prefetch(2)
      it = dataset.make_one_shot_iterator()
      

      请注意,这里我们需要使用parallel_interleave,因为我们将生成器转换为从中提取值的Dataset实例. 其余的保持不变.

      Note that here we need to use parallel_interleave because we turn the generators into Dataset instances from which we extract values. The rest stays the same.

      将其送至与上述打印相同的样本循环中:

      Feeding this to the same sample loop as above prints:

      [6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]
      [6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]
      

      这篇关于使用tf.contrib.data.parallel_interleave并行化tf.from_generator的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆