如何提高数据输入管道的性能? [英] How to improve data input pipeline performance?

查看:21
本文介绍了如何提高数据输入管道的性能?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试优化我的数据输入管道.该数据集是一组 450 个 TFRecord 文件,每个文件大小约为 70MB,托管在 GCS 上.该作业使用 GCP ML Engine 执行.没有 GPU.

这是管道:

def build_dataset(file_pattern):返回 tf.data.Dataset.list_files(文件模式). 交错(tf.data.TFRecordDataset,num_parallel_calls=tf.data.experimental.AUTOTUNE).洗牌(缓冲区大小=2048).批(批量大小=2048,drop_remainder=真,).缓存().重复().地图(map_func=_parse_example_batch,num_parallel_calls=tf.data.experimental.AUTOTUNE). 预取(缓冲区大小=1)

使用映射函数:

def _bit_to_float(string_batch: tf.Tensor):返回 tf.reshape(tf.math.floormod(tf.dtypes.cast(tf.bitwise.right_shift(tf.expand_dims(tf.io.decode_raw(string_batch, tf.uint8), 2),tf.reshape(tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8), (1, 1, 8))), tf.float32), 2), (tf.shape(string_batch)[0], -1))def _parse_example_batch(example_batch):preprocessed_sample_columns = {功能":tf.io.VarLenFeature(tf.float32),"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),标签":tf.io.FixedLenFeature((), tf.float32, -1)}样本 = tf.io.parse_example(example_batch, preprocessed_sample_columns)密集浮动 = tf.sparse.to_dense(样本[特征"])bits_to_float = _bit_to_float(samples["booleanFeatures"])返回 (tf.concat([dense_float, bits_to_float], 1),tf.reshape(samples["label"], (-1, 1)))

我尝试遵循数据管道教程的最佳实践,并将我的映射函数(根据 mrry 的建议).

在此设置下,虽然数据以高速下载(带宽约为 200MB/s),但 CPU 使用率不足 (14%),训练速度非常慢(一个 epoch 超过 1 小时).

>

我尝试了一些参数配置,更改了 interleave() 参数,例如 num_parallel_callscycle_lengthTFRecordDataset类似 num_parallel_calls 的参数.

最快的配置使用这组参数:

  • interleave.num_parallel_calls: 1
  • interleave.cycle_length:8
  • TFRecordDataset.num_parallel_calls:8

有了这个,一个 epoch 只需大约 20 分钟即可运行.然而,CPU 使用率仅为 50%,而带宽消耗约为 55MB/s

问题:

  1. 如何优化管道以达到 100% 的 CPU 使用率(以及大约 100MB/s 的带宽消耗)?
  2. 为什么 tf.data.experimental.AUTOTUNE 没有找到加速训练的最佳值?

亲切,亚历克西斯.

<小时>

编辑

经过一些更多的实验,我得出了以下解决方案.

  1. 如果 num_parallel_calls 大于 0,则移除 interleave 步骤,该步骤已经由 TFRecordDataset 处理.
  2. 更新映射函数只做parse_exampledecode_raw,返回一个元组`((, ), ())
  3. cachemap
  4. 之后
  5. _bit_to_float 函数作为模型的一个组件移动

最后,这里是数据管道代码:

def build_dataset(file_pattern):返回 tf.data.TFRecordDataset(tf.data.Dataset.list_files(file_pattern),num_parallel_reads=multiprocessing.cpu_count(),缓冲区大小=70*1000*1000).洗牌(缓冲区大小=2048).地图(map_func=split,num_parallel_calls=tf.data.experimental.AUTOTUNE).批(批量大小=2048,drop_remainder=真,).缓存().重复(). 预取(缓冲区大小=32)定义拆分(示例):preprocessed_sample_columns = {功能":tf.io.VarLenFeature(tf.float32),"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),标签":tf.io.FixedLenFeature((), tf.float32, -1)}样本 = tf.io.parse_single_example(例如,preprocessed_sample_columns)密集浮点 = tf.sparse.to_dense(样本[特征"])bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)返回 ((dense_float, bits_to_float),tf.reshape(samples["label"], (1,)))def build_model(input_shape):特征 = keras.Input(shape=(N,))bool_feature = keras.Input(shape=(M,), dtype="uint8")one_hot = dataset._bit_to_float(bool_feature)密集输入 = tf.reshape(keras.backend.concatenate([feature, one_hot], 1),输入形状)输出 = 实际模型(密集输入)模型 = keras.Model([feature, bool_feature], 输出)回报模式def _bit_to_float(string_batch: tf.Tensor):返回 tf.dtypes.cast(tf.reshape(tf.bitwise.bitwise_and(tf.bitwise.right_shift(tf.expand_dims(string_batch, 2),tf.reshape(tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8),(1, 1, 8)),),tf.constant(0x01,dtype=tf.uint8)),(tf.shape(string_batch)[0], -1)), tf.float32)

感谢所有这些优化:

  • 带宽消耗约为 90MB/s
  • CPU 使用率约为 20%
  • 第一个 epoch 花费 20 分钟
  • 连续的 epoch 每个花费 5 分钟

所以这似乎是一个很好的第一次设置.但是 CPU 和 BW 仍然没有被过度使用,所以仍然欢迎任何建议!

<小时>

编辑 Bis

因此,经过一些基准测试后,我发现了我认为最好的输入管道:

def build_dataset(file_pattern):tf.data.Dataset.list_files(文件模式). 交错(TFRecordDataset,cycle_length=tf.data.experimental.AUTOTUNE,num_parallel_calls=tf.data.experimental.AUTOTUNE).洗牌(2048).批(批量大小=64,drop_remainder=真,).地图(map_func=parse_examples_batch,num_parallel_calls=tf.data.experimental.AUTOTUNE).缓存(). 预取(tf.data.experimental.AUTOTUNE)def parse_examples_batch(示例):preprocessed_sample_columns = {功能":tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),标签":tf.io.FixedLenFeature((), tf.float32, -1)}样本 = tf.io.parse_example(examples, preprocessed_sample_columns)bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)返回 ((samples['features'], bits_to_float),tf.expand_dims(样本[标签"],1))

那么,有什么新鲜事:

  • 根据这个GitHub issueTFRecordDataset interleaving 是一个遗留的,所以 interleave 功能更好.
  • batchmap 之前是一个好习惯(向量化您的函数) 并减少调用映射函数的次数.
  • 不再需要repeat.从 TF2.0 开始,Keras 模型 API 支持数据集 API 并且可以使用缓存(参见 SO 帖子)
  • VarLenFeature 切换到 FixedLenSequenceFeature,删除对 tf.sparse.to_dense 的无用调用.

希望这能有所帮助.仍然欢迎提供建议.

解决方案

在回答部分提及解决方案和@AlexisBRENON 的重要观察,以造福社区.

下面提到的是重要的观察:

  1. 根据这个GitHub issueTFRecordDataset interleaving 是遗留的,所以 interleave 功能更好.
  2. batchmap 之前是一个好习惯(向量化您的函数) 并减少调用映射函数的次数.
  3. 不再需要repeat.从 TF2.0 开始,Keras 模型 API 支持数据集 API 并且可以使用缓存(参见 SO post)
  4. VarLenFeature 切换到 FixedLenSequenceFeature,删除对 tf.sparse.to_dense 的无用调用.

流水线代码,具有改进的性能,符合上述观察结果如下:

def build_dataset(file_pattern):tf.data.Dataset.list_files(文件模式). 交错(TFRecordDataset,cycle_length=tf.data.experimental.AUTOTUNE,num_parallel_calls=tf.data.experimental.AUTOTUNE).洗牌(2048).批(批量大小=64,drop_remainder=真,).地图(map_func=parse_examples_batch,num_parallel_calls=tf.data.experimental.AUTOTUNE).缓存(). 预取(tf.data.experimental.AUTOTUNE)def parse_examples_batch(示例):preprocessed_sample_columns = {功能":tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),标签":tf.io.FixedLenFeature((), tf.float32, -1)}样本 = tf.io.parse_example(examples, preprocessed_sample_columns)bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)返回 ((samples['features'], bits_to_float),tf.expand_dims(样本[标签"],1))

I try to optimize my data input pipeline. The dataset is a set of 450 TFRecord files of size ~70MB each, hosted on GCS. The job is executed with GCP ML Engine. There is no GPU.

Here is the pipeline:

def build_dataset(file_pattern):
    return tf.data.Dataset.list_files(
        file_pattern
    ).interleave(
        tf.data.TFRecordDataset,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).shuffle(
        buffer_size=2048
    ).batch(
        batch_size=2048,
        drop_remainder=True,
    ).cache(
    ).repeat(
    ).map(
        map_func=_parse_example_batch,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).prefetch(
        buffer_size=1
    )

With the mapped function:

def _bit_to_float(string_batch: tf.Tensor):
    return tf.reshape(tf.math.floormod(tf.dtypes.cast(tf.bitwise.right_shift(
        tf.expand_dims(tf.io.decode_raw(string_batch, tf.uint8), 2),
        tf.reshape(tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8), (1, 1, 8))
    ), tf.float32), 2), (tf.shape(string_batch)[0], -1))


def _parse_example_batch(example_batch):
    preprocessed_sample_columns = {
        "features": tf.io.VarLenFeature(tf.float32),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_example(example_batch, preprocessed_sample_columns)
    dense_float = tf.sparse.to_dense(samples["features"])
    bits_to_float = _bit_to_float(samples["booleanFeatures"])
    return (
        tf.concat([dense_float, bits_to_float], 1),
        tf.reshape(samples["label"], (-1, 1))
    )

I tried to follow the best practices of the data pipeline tutorial, and vectorize my mapped function (as advised by mrry).

With this settings, while data are downloaded at high-speed (bandwidth is around 200MB/s) the CPU is under-used (14%) and the training is very slow (more than 1hour for a epoch).

I tried some parameters configuration, changing the interleave() arguments like num_parallel_calls or cycle_length or the TFRecordDataset arguments like num_parallel_calls.

The fastest configuration uses this set of parameters:

  • interleave.num_parallel_calls: 1
  • interleave.cycle_length: 8
  • TFRecordDataset.num_parallel_calls: 8

With this one, one epoch only take ~20 minutes to run. However, CPU usage is only at 50% while bandwidth consumption is around 55MB/s

Questions:

  1. How to optimize the pipeline to reach 100% CPU usage (and something like 100MB/s of bandwidth consumption)?
  2. Why does tf.data.experimental.AUTOTUNE not find best value to speed up the training?

Kind, Alexis.


Edit

After some more experimentations, I came to the following solution.

  1. Remove the interleave step which is already handled by TFRecordDataset if num_parallel_calls is greater than 0.
  2. Update the mapped function to only do parse_example and decode_raw, returning a tuple `((, ), ())
  3. cache after the map
  4. Move the _bit_to_float function as a component of the model

Finally, here is the data pipeline code:

def build_dataset(file_pattern):
    return tf.data.TFRecordDataset(
        tf.data.Dataset.list_files(file_pattern),
        num_parallel_reads=multiprocessing.cpu_count(),
        buffer_size=70*1000*1000
    ).shuffle(
        buffer_size=2048
    ).map(
        map_func=split,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).batch(
        batch_size=2048,
        drop_remainder=True,
    ).cache(
    ).repeat(
    ).prefetch(
        buffer_size=32
    )


def split(example):
    preprocessed_sample_columns = {
        "features": tf.io.VarLenFeature(tf.float32),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_single_example(example, preprocessed_sample_columns)
    dense_float = tf.sparse.to_dense(samples["features"])
    bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
    return (
        (dense_float, bits_to_float),
        tf.reshape(samples["label"], (1,))
    )


def build_model(input_shape):
    feature = keras.Input(shape=(N,))
    bool_feature = keras.Input(shape=(M,), dtype="uint8")
    one_hot = dataset._bit_to_float(bool_feature)
    dense_input = tf.reshape(
        keras.backend.concatenate([feature, one_hot], 1),
        input_shape)
    output = actual_model(dense_input)

    model = keras.Model([feature, bool_feature], output)
    return model

def _bit_to_float(string_batch: tf.Tensor):
    return tf.dtypes.cast(tf.reshape(
        tf.bitwise.bitwise_and(
            tf.bitwise.right_shift(
                tf.expand_dims(string_batch, 2),
                tf.reshape(
                    tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8),
                    (1, 1, 8)
                ),
            ),
            tf.constant(0x01, dtype=tf.uint8)
        ),
        (tf.shape(string_batch)[0], -1)
    ), tf.float32)

Thanks to all these optimizations:

  • Bandwidth consumption is around 90MB/s
  • CPU usage is around 20%
  • First epoch spends 20 minutes
  • Successives epochs spend 5 minutes each

So this seems to be a good first setup. But CPU and BW are still not overused, so any advice is still welcomed!


Edit Bis

So, after some benchmarking I came accross what I think is our best input pipeline:

def build_dataset(file_pattern):
    tf.data.Dataset.list_files(
        file_pattern
    ).interleave(
        TFRecordDataset,
        cycle_length=tf.data.experimental.AUTOTUNE,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).shuffle(
        2048
    ).batch(
        batch_size=64,
        drop_remainder=True,
    ).map(
        map_func=parse_examples_batch,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).cache(
    ).prefetch(
        tf.data.experimental.AUTOTUNE
    )

def parse_examples_batch(examples):
    preprocessed_sample_columns = {
        "features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_example(examples, preprocessed_sample_columns)
    bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
    return (
        (samples['features'], bits_to_float),
        tf.expand_dims(samples["label"], 1)
    )

So, what's new:

  • According to this GitHub issue, the TFRecordDataset interleaving is a legacy one, so interleave function is better.
  • batch before map is a good habit (vectorizing your function) and reduce the number of times the mapped function is called.
  • No need of repeat anymore. Since TF2.0, the Keras model API supports the dataset API and can use cache (see the SO post)
  • Switch from a VarLenFeature to a FixedLenSequenceFeature, removing a useless call to tf.sparse.to_dense.

Hope this can help. Advices are still welcomed.

解决方案

Mentioning the Solution and the Important observations of @AlexisBRENON in the Answer Section, for the benefit of the Community.

Below mentioned are the Important Observations:

  1. According to this GitHub issue, the TFRecordDataset interleaving is a legacy one, so interleave function is better.
  2. batch before map is a good habit (vectorizing your function) and reduce the number of times the mapped function is called.
  3. No need of repeat anymore. Since TF2.0, the Keras model API supports the dataset API and can use cache (see the SO post)
  4. Switch from a VarLenFeature to a FixedLenSequenceFeature, removing a useless call to tf.sparse.to_dense.

Code for the Pipeline, with improved performance, in line with above observations is mentioned below:

def build_dataset(file_pattern):
    tf.data.Dataset.list_files(
        file_pattern
    ).interleave(
        TFRecordDataset,
        cycle_length=tf.data.experimental.AUTOTUNE,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).shuffle(
        2048
    ).batch(
        batch_size=64,
        drop_remainder=True,
    ).map(
        map_func=parse_examples_batch,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).cache(
    ).prefetch(
        tf.data.experimental.AUTOTUNE
    )

def parse_examples_batch(examples):
    preprocessed_sample_columns = {
        "features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_example(examples, preprocessed_sample_columns)
    bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
    return (
        (samples['features'], bits_to_float),
        tf.expand_dims(samples["label"], 1)
    )

这篇关于如何提高数据输入管道的性能?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆