如何并行处理单个训练文件 [英] How to process single training file in parallel

查看:20
本文介绍了如何并行处理单个训练文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个文件 train.csv,其中包含图像及其标签的路径.即:

img1.jpg 3img2.jpg 1...

在完成阅读数据教程之后我想出了一些代码来遍历每个图像,调整它的大小并应用扭曲:

def apply_distortions(resized_image):# 做一堆 tf.image 失真...返回 float_image定义处理(文件名):文件内容 = tf.read_file(文件名)图像 = tf.image.decode_jpeg(file_contents, channels=3)resized_image = tf.image.resize_images(image, 299, 299)失真图像 = 应用失真(调整大小的图像)返回 distorted_imagedef parse_csv(filename_queue):line_reader = tf.TextLineReader()key, line = line_reader.read(filename_queue)filename, label = tf.decode_csv(line, # line_batch 或 line (取决于你是否想要批处理)record_defaults=[tf.constant([],dtype=tf.string),tf.constant([],dtype=tf.int32)],field_delim='')处理图像 = 处理(文件名)返回processed_image,标签

现在的问题是我很困惑如何在文件中并行执行这些操作.文档建议使用 tf.train.batch_jointf.train.batch 和 num_threads=N.

我首先尝试使用 tf.train.batch_join 遵循示例代码,但这似乎是为了并行处理多个文件.但是就我而言,我只有 1 个文件.

filename_queue = tf.train.string_input_producer(["train.txt"], num_epochs=1, shuffle=True)example_list = [parse_csv(filename_queue) for _ in range(8)]example_batch,label_batch = tf.train.batch_join(example_list,batch_size)

我也尝试设置 tf.train.batch([example, label], batch_size, num_threads=8) 但我不清楚这是否做对了(虽然我可以看到更多的 CPU 内核在使用)

filename_queue = tf.train.string_input_producer(["train.txt"], num_epochs=1, shuffle=True)例如,标签 = parse_csv(filename_queue)example_batch, label_batch = tf.train.batch([example, label], batch_size, num_threads=8)

这是我执行图表的代码:

sess.run(tf.initialize_all_variables())坐标 = tf.train.Coordinator()线程 = tf.train.start_queue_runners(sess,coord)尝试:而不是 coord.should_stop():X, Y = sess.run([example_batch, label_batch])# 现在运行一个训练步骤除了 tf.errors.OutOfRangeError:print('完成训练——达到纪元限制')最后:# 完成后,让线程停止.coord.request_stop()coord.join(线程)sess.close()

并行处理此文件的最佳方法是什么?

解决方案

两者似乎都是可行的方法.使用 batchthreads=N 将创建 N 个连接到队列的阅读器操作副本,以便它们可以并行运行,而 batch_join 您必须手动创建副本.

在您使用 batch_join 时,您正在创建 TextLineReader 的多个副本(如您所见)只会跨文件并行化.要让多个线程读取单个文件,您可以改为创建一个 TextLineReader 并使用同一个读取器使用多个 line_reader.read 操作.

这是一些包含数字的文本文件的示例

生成数字:

num_files=10num_entries_per_file=10file_root="/temp/pipeline"os.system('mkdir -p '+file_root)对于范围内的 fi(num_files):fname = file_root+"/"+str(fi)dump_numbers_to_file(fname, fi*num_entries_per_file, (fi+1)*num_entries_per_file)

批量读取这些数字,大小为 2,并行度为 2

ops.reset_default_graph()filename_queue = tf.train.string_input_producer(["/temp/pipeline/0","/temp/pipeline/1"],洗牌=假)读者 = tf.TextLineReader()键,值 = reader.read(filename_queue)numeric_val1, = tf.decode_csv(value, record_defaults=[[-1]])numeric_val2, = tf.decode_csv(value, record_defaults=[[-1]])numeric_batch = tf.batch_join([[numeric_val1,], [numeric_val2]], 2)# 必须在队列运行器之前创建会话,因为他们使用默认会话sess = create_session()坐标 = tf.train.Coordinator()线程 = tf.train.start_queue_runners(coord=coord)打印 '\n'.join([t.name for t in thread])对于范围内的我(20):打印 sess.run([numeric_batch])coord.request_stop()coord.join(线程)

您可能会看到如下内容:

QueueRunner(input_producer:input_producer/input_producer_EnqueueMany)QueueRunner(input_producer:input_producer/input_producer_Close_1)QueueRunner(batch_join/fifo_queue:batch_join/fifo_queue_enqueue)QueueRunner(batch_join/fifo_queue:batch_join/fifo_queue_enqueue_1)QueueRunner(batch_join/fifo_queue:batch_join/fifo_queue_Close_1)[数组([0, 1], dtype=int32)][数组([2, 3], dtype=int32)][数组([4, 5], dtype=int32)][数组([6, 7], dtype=int32)][数组([8, 9], dtype=int32)][数组([10, 11], dtype=int32)][数组([12, 13], dtype=int32)][数组([14, 15], dtype=int32)][数组([16, 17], dtype=int32)][数组([18, 19], dtype=int32)]

从线程列表中可以看到有2个线程对应读操作(fifo_queue_enqueuefifo_queue_enqueue_1,所以可以并行2次读)

I have a file train.csv that contains paths to images and their labels. ie:

img1.jpg 3
img2.jpg 1
...

After going through the reading data tutorial I came up with some code to go through each image, resize it and apply distortions:

def apply_distortions(resized_image):
    # do a bunch of tf.image distortion...
    return float_image

def processing(filename):
    file_contents = tf.read_file(filename)
    image = tf.image.decode_jpeg(file_contents, channels=3)
    resized_image = tf.image.resize_images(image, 299, 299)
    distorted_image = apply_distortions(resized_image)
    return distorted_image

def parse_csv(filename_queue):
    line_reader = tf.TextLineReader()
    key, line = line_reader.read(filename_queue)
    filename, label = tf.decode_csv(line,     # line_batch or line (depending if you want to batch)
                               record_defaults=[tf.constant([],dtype=tf.string),
                                                tf.constant([],dtype=tf.int32)],
                               field_delim=' ')
    processed_image = processing(filename)
    return processed_image, label

The problem now is that I'm confused how to do these operations across the file in parallel. The documentation suggests either using tf.train.batch_join or tf.train.batch with num_threads=N.

I first tried following the example code using tf.train.batch_join but this seems to be intended for processing multiple files in parallel. In my case however I just have 1 file.

filename_queue = tf.train.string_input_producer(["train.txt"], num_epochs=1, shuffle=True)    
example_list = [parse_csv(filename_queue) for _ in range(8)]
example_batch, label_batch = tf.train.batch_join(example_list, batch_size)

I also tried setting tf.train.batch([example, label], batch_size, num_threads=8) but its not clear to me if this is doing the right thing (although I can see more cpu cores in use)

filename_queue = tf.train.string_input_producer(["train.txt"], num_epochs=1, shuffle=True)
example, label = parse_csv(filename_queue)
example_batch, label_batch = tf.train.batch([example, label], batch_size, num_threads=8)

Here is my code for executing the graph:

sess.run(tf.initialize_all_variables())
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess,coord)
try:
    while not coord.should_stop():
        X, Y = sess.run([example_batch, label_batch])
        # Now run a training step
except tf.errors.OutOfRangeError:
    print('Done training -- epoch limit reached')
finally:
    # When done, ask the threads to stop.
    coord.request_stop()
coord.join(threads)
sess.close()

Whats the best way to process this file in parallel?

解决方案

Both seem like viable approaches. Using batch with threads=N will create N copies of your reader op connected to your queue so that they can run in parallel, while batch_join you have to create the copies manually.

In your usage with batch_join you are creating several copies of TextLineReader which (as you noticed) will only parallelize across files. To have several threads reading a single file, you could instead create one TextLineReader and have several line_reader.read ops using the same reader.

Here's an example with some textfiles containing numbers

Generate numbers:

num_files=10
num_entries_per_file=10
file_root="/temp/pipeline"
os.system('mkdir -p '+file_root)
for fi in range(num_files):
  fname = file_root+"/"+str(fi)
  dump_numbers_to_file(fname, fi*num_entries_per_file, (fi+1)*num_entries_per_file)

Read those numbers in batches of size 2, with parallelism of 2

ops.reset_default_graph()
filename_queue = tf.train.string_input_producer(["/temp/pipeline/0",
                                                 "/temp/pipeline/1"],
                                                shuffle=False)
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
numeric_val1, = tf.decode_csv(value, record_defaults=[[-1]])
numeric_val2, = tf.decode_csv(value, record_defaults=[[-1]])
numeric_batch = tf.batch_join([[numeric_val1,], [numeric_val2]], 2)
# have to create session before queue runners because they use default session
sess = create_session()
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)

print '\n'.join([t.name for t in threads])
for i in range(20):
  print sess.run([numeric_batch])

coord.request_stop()
coord.join(threads)

You may see something like this:

QueueRunner(input_producer:input_producer/input_producer_EnqueueMany)
QueueRunner(input_producer:input_producer/input_producer_Close_1)
QueueRunner(batch_join/fifo_queue:batch_join/fifo_queue_enqueue)
QueueRunner(batch_join/fifo_queue:batch_join/fifo_queue_enqueue_1)
QueueRunner(batch_join/fifo_queue:batch_join/fifo_queue_Close_1)
[array([0, 1], dtype=int32)]
[array([2, 3], dtype=int32)]
[array([4, 5], dtype=int32)]
[array([6, 7], dtype=int32)]
[array([8, 9], dtype=int32)]
[array([10, 11], dtype=int32)]
[array([12, 13], dtype=int32)]
[array([14, 15], dtype=int32)]
[array([16, 17], dtype=int32)]
[array([18, 19], dtype=int32)]

From the list of threads, you can see that there are 2 threads corresponding to read operations (fifo_queue_enqueue and fifo_queue_enqueue_1 so you can do 2 reads in parallel)

这篇关于如何并行处理单个训练文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆