启用急切执行时如何运行并行 map_fn [英] How to run parallel map_fn when eager execution enabled

查看:61
本文介绍了启用急切执行时如何运行并行 map_fn的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑以下 tensorflow 代码片段:

导入时间将 numpy 导入为 np将张量流导入为 tf定义 fn(i):# 做一些垃圾工作对于 _ 范围(100):我** 2返回我n = 1000n_jobs = 8东西 = np.arange(1, n + 1)渴望 = 错误t0 = time.time()如果渴望:tf.enable_eager_execution()res = tf.map_fn(fn,东西,parallel_iterations=n_jobs)如果不渴望:使用 tf.Session() 作为 sess:res = sess.run(res)打印(总和(res))别的:打印(总和(res))dt = time.time() - t0print("(eager=%s) 花了 %ims" % (eager, dt * 1000))

如果使用 eager = True 运行,它比使用 eager = False 运行时慢 10 倍.我做了一些打印,发现在 eager = True 模式下,map_fn 调用是按顺序运行的,而不是产生 8 个并行线程.

问题

所以我的问题是如何在急切执行模式下使用 map_fn(parallel_iterations > 1)?

解决方案

(我为此使用了 TF 2.3,不要指望新版本的结果相同.)

不仅仅是对 OP 问题的回答,这是它的扩展,说明了为什么其他答案没有解决真正的问题,因为 tf.function 不足以强制并行.


首先,使用 tf.function 不会强制并行化.它强制跟踪,并构建图形,这只会发生一次,因此,其他答案中使用的 time.sleep() 仅在第一次需要跟踪时运行,这就是为什么您看到使用 tf.function 加速.但是在更改 parallel_iterations 时您仍然看不到任何区别.

让我们使用 py_fuction 来看看区别:

def op(x):时间.睡眠(1)返回 2 * x.numpy()def op_tf(x):打印('跟踪')返回 tf.py_function(op, [x], Tout=tf.int32)

不使用装饰器(或直接调用)tf.function 任何对 op_tf 的调用将始终打印Tracing";(虽然在这种情况下不是跟踪)

在[57]中:op_tf(1)追踪Out[57]: <tf.Tensor: shape=(), dtype=int32, numpy=2>在 [58] 中:op_tf(1)追踪Out[58]: <tf.Tensor: shape=(), dtype=int32, numpy=2>

使用 tf.function 我们只看到一次跟踪(如果我们使用相同的参数):

在 [67] 中:@tf.function...: def op_tf(x):...:打印(跟踪")...: 返回 tf.py_function(op, [x], Tout=tf.int32)...:在 [68] 中:op_tf(1)追踪Out[68]: <tf.Tensor: shape=(), dtype=int32, numpy=2>在 [69] 中:op_tf(2)追踪Out[69]: <tf.Tensor: shape=(), dtype=int32, numpy=4>在 [70] 中:op_tf(3)追踪Out[70]: <tf.Tensor: shape=(), dtype=int32, numpy=6>在 [71] 中:op_tf(3)Out[71]: <tf.Tensor: shape=(), dtype=int32, numpy=6>

发生这种情况是因为函数必须为每个新参数构建一个新图,如果我们直接传递签名,我们就可以避免这种情况发生:

在 [73] 中:@tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])...: def op_tf(x):...:打印(跟踪")...: 返回 tf.py_function(op, [x], Tout=tf.int32)...:...:在 [74] 中:op_tf(1)追踪输出[74]:<tf.Tensor:shape=(),dtype=int32,numpy=2>在 [75] 中:op_tf(2)Out[75]: <tf.Tensor: shape=(), dtype=int32, numpy=4>在 [76] 中:op_tf(3)输出[76]:<tf.Tensor:shape=(),dtype=int32,numpy=6>

如果我们首先调用方法get_concrete_function,也会发生同样的情况:

在 [79] 中:@tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])...: def op_tf(x):...:打印(跟踪")...: 返回 tf.py_function(op, [x], Tout=tf.int32)...:...:在 [80] 中:op_tf = op_tf.get_concrete_function()追踪在 [81] 中:op_tf(1)输出[81]:<tf.Tensor:shape=(),dtype=int32,numpy=2>在 [82] 中:op_tf(2)Out[82]: <tf.Tensor: shape=(), dtype=int32, numpy=4>

然后,声称仅通过添加 tf.function 就足以获得并行执行的答案并不完全正确:

在 [84] 中:def op(x):...:打印(睡眠")...: time.sleep(0.1)...:返回 1....:在 [85] 中:x = tf.ones(shape=(10,))在 [86] 中:_ = tf.map_fn(op, x, parallel_iterations=10)睡觉睡觉睡觉睡觉睡觉睡觉睡觉睡觉睡觉睡觉在 [87] 中:@tf.function...: def my_map(*args, **kwargs):...:返回 tf.map_fn(*args, **kwargs)...:在 [88]: my_map(op, x, parallel_iterations=10)睡觉Out[88]: <tf.Tensor: shape=(10,), dtype=float32, numpy=array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.], dtype=float32)>

相比之下,如果用于 sleep 和 print 的 python 指令在 py_function 内部,它们将始终被调用:

在[96]中:x = tf.ones(shape=(10,), dtype=tf.int32)在 [97] 中:def op(x):...:打印(睡眠")...: time.sleep(0.1)...: 返回 2 * x.numpy()...:在 [98] 中:@tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])...: def op_tf(x):...:打印(跟踪")...: 返回 tf.py_function(op, [x], Tout=tf.int32)...:在 [99] 中:_ = my_map(op_tf, x, parallel_iterations=1)追踪睡觉睡觉睡觉睡觉睡觉睡觉睡觉睡觉睡觉睡觉

现在,我们已经清楚地知道跟踪函数给我们带来了一些困惑,让我们删除打印以计时调用:

在 [106]: def op(x):...: time.sleep(0.1)...: 返回 2 * x.numpy()...:在 [107] 中:@tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])...: def op_tf(x):...: 返回 tf.py_function(op, [x], Tout=tf.int32)...:在 [108]: %timeit tf.map_fn(op_tf, x, parallel_iterations=1)每个循环 1.02 s ± 554 µs(平均值 ± 标准偏差,7 次运行,每次 1 次循环)在 [109]: %timeit tf.map_fn(op_tf, x, parallel_iterations=10)每个循环 1.03 s ± 509 µs(7 次运行的平均值 ± 标准偏差,每次 1 次循环)

运行以下脚本并使用 tensorboard 我们可以确切地看到发生了什么:

将tensorflow导入为tf导入时间从日期时间导入日期时间戳 = datetime.now().strftime("%Y%m%d-%H%M%S")logdir = 'logs/func/%s' % 戳# 开始追踪.选项 = tf.profiler.experimental.ProfilerOptions(host_tracer_level=3,python_tracer_level=1,device_tracer_level=1,delay_ms=None)tf.profiler.experimental.start(日志目录,选项=选项)定义操作(x):x = x.numpy()开始 = time.time()而 time.time() <开始 + x/100:x = (2 * x) % 123返回 x@tf.function(input_signature=[tf.TensorSpec([], tf.int32)])def op_tf(x):返回 tf.py_function(op, [x], Tout=tf.int32, name='op')@tf.function(input_signature=[tf.TensorSpec([None], tf.int32)])def my_map(x):返回 tf.map_fn(op_tf, x, parallel_iterations=16)x = tf.ones(100, tf.int32)打印(my_map(x))tf.profiler.experimental.stop()

我们在 Tensorboard 中得到以下信息:

py_function 有效地使用了多个线程,但不是并行的.使用 parallel_iterations=1 我们得到类似的结果

如果我们在脚本的开头添加以下内容

tf.config.threading.set_inter_op_parallelism_threads(1)tf.config.threading.set_intra_op_parallelism_threads(1)

我们强制 TF 使用单个线程进行所有图形计算:

所以,此时我们只有正确设置内部/内部线程才能获得某种形式的并行执行.

如果我们完全禁用 Eager 执行:

导入时间从日期时间导入日期时间将 numpy 导入为 np将张量流导入为 tftf.compat.v1.disable_eager_execution()tf.config.threading.set_inter_op_parallelism_threads(128)tf.config.threading.set_intra_op_parallelism_threads(128)戳 = datetime.now().strftime("%Y%m%d-%H%M%S")logdir = f'logs/func/{stamp}'tf.profiler.experimental.start(logdir)定义操作(x):x = x.numpy()开始 = time.time()而 time.time() <开始 + x/100:x = (2 * x) % 123返回 x@tf.function(input_signature=[tf.TensorSpec([], tf.int32)])def op_tf(x):返回 tf.py_function(op, [x], Tout=tf.int32, name='op')# 创建一个占位符.x = tf.compat.v1.placeholder(tf.int32, shape=[None])使用 tf.compat.v1.Session() 作为 sess:writer = tf.summary.create_file_writer(logdir)#tf.profiler.experimental.start(logdir, options = options)tf.summary.trace_on(graph=True, profiler=True)打印(sess.run([tf.map_fn(op_tf, x, parallel_iterations=16)],feed_dict={x: np.ones(4, dtype=np.int)}))tf.profiler.experimental.stop()

我们现在可以在 Tensorboard 中看到并行执行:

如果我们将内部/内部线程和 parallel_iterations 设置为 1,我们将恢复之前的行为:

我希望这有助于阐明 tf.function 在检查完全并行性方面的作用.

Consider the following tensorflow code snippet:

import time
import numpy as np
import tensorflow as tf

def fn(i):
    # do some junk work
    for _ in range(100):
        i ** 2
    return i

n = 1000
n_jobs = 8
stuff = np.arange(1, n + 1)
eager = False
t0 = time.time()
if eager:
    tf.enable_eager_execution()
res = tf.map_fn(fn, stuff, parallel_iterations=n_jobs)
if not eager:
    with tf.Session() as sess:
        res = sess.run(res)
        print(sum(res))
else:
    print(sum(res))
dt = time.time() - t0
print("(eager=%s) Took %ims" % (eager, dt * 1000))

If run with eager = True it is 10x slower than when run with eager = False. I did some prints, and found out that in eager = True mode, the map_fn call is running sequential, instead of spawning 8 parallel threads.

Question

So my question is how to use map_fn (with parallel_iterations > 1) in eager execution mode ?

解决方案

(I was using TF 2.3 for this, don't expect the same result with newer versions.)

More than an answer to OP's question, this is an extension of it, showing why the other answers are not addressing the real problem, because tf.function is not enough to force parallelism.


First, using tf.function does not force parallelization. It force tracing, and the construction of a graph, this happens just once, so, the time.sleep() used in other answers runs only the first time the tracing is necessary, that's why you see a speed up with tf.function. But you still don't see a difference when changing parallel_iterations.

Let's use a py_fuction to see the difference:

def op(x):
    time.sleep(1)
    return 2 * x.numpy()

def op_tf(x):
    print('Tracing')
    return tf.py_function(op, [x], Tout=tf.int32)

Without using the decorator (or calling directly) tf.function any call to op_tf will always print "Tracing" (though in this cases is not tracing)

In [57]: op_tf(1)
Tracing
Out[57]: <tf.Tensor: shape=(), dtype=int32, numpy=2>

In [58]: op_tf(1)
Tracing
Out[58]: <tf.Tensor: shape=(), dtype=int32, numpy=2>

With tf.function we see Tracing just once (if we use the same arguments):

In [67]: @tf.function
    ...: def op_tf(x):
    ...:     print("Tracing")
    ...:     return tf.py_function(op, [x], Tout=tf.int32)
    ...: 

In [68]: op_tf(1)
Tracing
Out[68]: <tf.Tensor: shape=(), dtype=int32, numpy=2>

In [69]: op_tf(2)
Tracing
Out[69]: <tf.Tensor: shape=(), dtype=int32, numpy=4>

In [70]: op_tf(3)
Tracing
Out[70]: <tf.Tensor: shape=(), dtype=int32, numpy=6>

In [71]: op_tf(3)
Out[71]: <tf.Tensor: shape=(), dtype=int32, numpy=6>

This happens because the function has to build a new graph for every new argument, if we pass directly a signature we avoid that happening:

In [73]: @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])
    ...: def op_tf(x):
    ...:     print("Tracing")
    ...:     return tf.py_function(op, [x], Tout=tf.int32)
    ...: 
    ...: 

In [74]: op_tf(1)
Tracing
Out[74]: <tf.Tensor: shape=(), dtype=int32, numpy=2>

In [75]: op_tf(2)
Out[75]: <tf.Tensor: shape=(), dtype=int32, numpy=4>

In [76]: op_tf(3)
Out[76]: <tf.Tensor: shape=(), dtype=int32, numpy=6>

The same happens if we first call the method get_concrete_function:

In [79]: @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])
    ...: def op_tf(x):
    ...:     print("Tracing")
    ...:     return tf.py_function(op, [x], Tout=tf.int32)
    ...: 
    ...: 

In [80]: op_tf = op_tf.get_concrete_function()
Tracing

In [81]: op_tf(1)
Out[81]: <tf.Tensor: shape=(), dtype=int32, numpy=2>

In [82]: op_tf(2)
Out[82]: <tf.Tensor: shape=(), dtype=int32, numpy=4>

Then, the answers claiming that just by adding tf.function is enough to get parallel execution is not fully correct:

In [84]: def op(x):
    ...:     print("sleep")
    ...:     time.sleep(0.1)
    ...:     return 1.
    ...: 

In [85]: x = tf.ones(shape=(10,))

In [86]: _ = tf.map_fn(op, x, parallel_iterations=10)
sleep
sleep
sleep
sleep
sleep
sleep
sleep
sleep
sleep
sleep

In [87]: @tf.function
    ...: def my_map(*args, **kwargs):
    ...:     return tf.map_fn(*args, **kwargs)
    ...: 

In [88]: my_map(op, x, parallel_iterations=10)
sleep
Out[88]: <tf.Tensor: shape=(10,), dtype=float32, numpy=array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.], dtype=float32)>

In comparison, if the python instructions for sleep and print are inside of a py_function they will be always called:

In [96]: x = tf.ones(shape=(10,), dtype=tf.int32)

In [97]: def op(x):
    ...:     print("sleep")
    ...:     time.sleep(0.1)
    ...:     return 2 * x.numpy()
    ...: 

In [98]: @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])
    ...: def op_tf(x):
    ...:     print("Tracing")
    ...:     return tf.py_function(op, [x], Tout=tf.int32)
    ...: 

In [99]: _ = my_map(op_tf, x, parallel_iterations=1)
Tracing
sleep
sleep
sleep
sleep
sleep
sleep
sleep
sleep
sleep
sleep

Now, that we have somehow clear that Tracing of a function is giving us some confusions, let's remove the prints to time the calls:

In [106]: def op(x):
     ...:     time.sleep(0.1)
     ...:     return 2 * x.numpy()
     ...: 

In [107]: @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])
     ...: def op_tf(x):
     ...:     return tf.py_function(op, [x], Tout=tf.int32)
     ...: 

In [108]: %timeit tf.map_fn(op_tf, x, parallel_iterations=1)
1.02 s ± 554 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [109]: %timeit tf.map_fn(op_tf, x, parallel_iterations=10)
1.03 s ± 509 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

Running the following script and using tensorboard we can see exactly what's happening:

import tensorflow as tf
import time
from datetime import datetime

stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
logdir = 'logs/func/%s' % stamp

# Start tracing.                                                                                                                                                                              
options = tf.profiler.experimental.ProfilerOptions(
    host_tracer_level=3, python_tracer_level=1, device_tracer_level=1, delay_ms=None
)

tf.profiler.experimental.start(logdir, options = options)

def op(x):
    x = x.numpy()
    start = time.time()

    while time.time() < start + x / 100:
        x = (2 * x) % 123

    return x

@tf.function(input_signature=[tf.TensorSpec([], tf.int32)])
def op_tf(x):
    return tf.py_function(op, [x], Tout=tf.int32, name='op')

@tf.function(input_signature=[tf.TensorSpec([None], tf.int32)])
def my_map(x):
    return tf.map_fn(op_tf, x, parallel_iterations=16)

x = tf.ones(100, tf.int32)
print(my_map(x))

tf.profiler.experimental.stop()

We get the following in Tensorboard:

The py_function is effectively using several threads but not in parallel. With parallel_iterations=1 we obtain something similar

If we add at the beginning of the script the following

tf.config.threading.set_inter_op_parallelism_threads(1)
tf.config.threading.set_intra_op_parallelism_threads(1)

we are forcing TF to use a single thread for all the graph computations:

So, at this moment we can only get some form of parallel execution if we set the inter/intra threads right.

If we disable Eager execution completely:

import time
from datetime import datetime
import numpy as np
import tensorflow as tf

tf.compat.v1.disable_eager_execution()
tf.config.threading.set_inter_op_parallelism_threads(128)
tf.config.threading.set_intra_op_parallelism_threads(128)

stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
logdir = f'logs/func/{stamp}'

tf.profiler.experimental.start(logdir)

def op(x):
    x = x.numpy()
    start = time.time()
    while time.time() < start + x / 100:
        x = (2 * x) % 123
    return x

@tf.function(input_signature=[tf.TensorSpec([], tf.int32)])
def op_tf(x):
    return tf.py_function(op, [x], Tout=tf.int32, name='op')

# Create a placeholder.                                                                                                                                                                       
x = tf.compat.v1.placeholder(tf.int32, shape=[None])

with tf.compat.v1.Session() as sess:

    writer = tf.summary.create_file_writer(logdir)

    #tf.profiler.experimental.start(logdir, options = options)                                                                                                                                
    tf.summary.trace_on(graph=True, profiler=True)

    print(
        sess.run(
            [tf.map_fn(op_tf, x, parallel_iterations=16)],
            feed_dict={
                x: np.ones(4, dtype=np.int)
            }
        )
    )

tf.profiler.experimental.stop()

we can see now parallel executions in Tensorboard:

And if we set to 1 the intra/inter threads and parallel_iterations we get the previous behaviuour back:

I hope this helps to clarify the role of tf.function in checking full parallelism.

这篇关于启用急切执行时如何运行并行 map_fn的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆