凯拉斯预测芹菜任务不归队 [英] Keras predict not returning inside celery task

查看:102
本文介绍了凯拉斯预测芹菜任务不归队的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下Keras函数(预测)在同步调用时有效

pred = model.predict(x)

但是当从异步任务队列(Celery)中调用时,它不起作用. Keras预测函数在异步调用时不会返回任何输出.

堆栈是:Django,Celery,Redis,Keras,TensorFlow

解决方案

我碰到了这个完全相同的问题,而那真是个兔子洞.想要在这里发布我的解决方案,因为这可能会节省某人一天的工作:

TensorFlow线程特定的数据结构

在TensorFlow中,当您调用model.predict(或keras.models.load_modelkeras.backend.clear_session或几乎任何其他与TensorFlow后端交互的函数)时,有两种关键的数据结构在幕后工作:

在文档中没有明确挖掘就不清楚的是,会话和图形都是当前线程的属性.请在此处 from keras.models import load_model MY_MODEL = load_model('path/to/model/file') def some_worker_function(inputs): return MY_MODEL.predict(inputs)

在像Celery这样的Web服务器或工作器池上下文中,这意味着您在导入包含load_model行的模块时将加载模型,然后另一个线程将执行some_worker_function,并在全局包含Keras模型的变量.但是,尝试在装入不同线程的模型上运行预测会产生张量不是此图的元素"错误.感谢有关此主题的几篇SO帖子,例如 from keras.models import load_model import tensorflow as tf MY_MODEL = load_model('path/to/model/file') MY_GRAPH = tf.get_default_graph() def some_worker_function(inputs): with MY_GRAPH.as_default(): return MY_MODEL.predict(inputs)

这里有些令人惊讶的转折是:如果您使用Thread,上面的代码就足够了,但是如果您使用Process es,则可以无限期地挂起.并且默认情况下,Celery使用进程管理其所有工人池.因此,此时,仍然在Celery上无法正常工作.

为什么这仅在Thread上有效?

在Python中,Thread与父进程共享相同的全局执行上下文.从 Python _thread文档:

此模块提供了用于处理多个线程(也称为轻量级进程或任务)的低级原语-多个控件线程共享其全局数据空间.

由于线程不是实际的独立进程,因此它们使用相同的python解释器,因此要受到臭名昭著的Global Interpeter Lock(GIL)的约束.对于这次调查而言,也许更重要的是,它们与父级共享全局数据空间.

与此相反,Process es是程序产生的 actual 新进程.这意味着:

  • 新的Python解释器实例(没有GIL)
  • 全局地址空间已重复

请注意此处的区别.虽然Thread可以访问共享的单个全局Session变量(内部存储在Keras的tensorflow_backend模块中),但是Process es具有Session变量的重复项.

我对这个问题的最佳理解是,Session变量应该表示客户机(进程)与TensorFlow运行时之间的唯一连接,但是由于在派生过程中被复制,因此该连接信息不正确调整.这会导致TensorFlow在尝试使用以其他过程创建的Session时挂起.如果有人对TensorFlow的工作原理有更深入的了解,我很乐意听到!

解决方案/解决方法

我调整了Celery,以便它使用Thread s而不是Process es进行池化.这种方法有一些缺点(请参见上面的GIL注释),但这使我们只能加载一次模型.无论如何,由于TensorFlow运行时会最大化所有CPU内核,因此我们并不是真正受CPU约束的地方(因为它不是用Python编写的,因此可以避开GIL).您必须为Celery提供一个单独的库才能进行基于线程的池化.该文档建议了两个选项: gevent 解决方案

I ran into this exact same issue, and man was it a rabbit hole. Wanted to post my solution here since it might save somebody a day of work:

TensorFlow Thread-Specific Data Structures

In TensorFlow, there are two key data structures that are working behind the scenes when you call model.predict (or keras.models.load_model, or keras.backend.clear_session, or pretty much any other function interacting with the TensorFlow backend):

Something that is not explicitly clear in the docs without some digging is that both the session and the graph are properties of the current thread. See API docs here and here.

Using TensorFlow Models in Different Threads

It's natural to want to load your model once and then call .predict() on it multiple times later:

from keras.models import load_model

MY_MODEL = load_model('path/to/model/file')

def some_worker_function(inputs):
    return MY_MODEL.predict(inputs)

In a webserver or worker pool context like Celery, what this means is that you will load the model when you import the module containing the load_model line, then a different thread will execute some_worker_function, running predict on the global variable containing the Keras model. However, trying to run predict on a model loaded in a different thread produces "tensor is not an element of this graph" errors. Thanks to the several SO posts that touched on this topic, such as ValueError: Tensor Tensor(...) is not an element of this graph. When using global variable keras model. In order to get this to work, you need to hang on to the TensorFlow graph that was used-- as we saw earlier, the graph is a property of the current thread. The updated code looks like this:

from keras.models import load_model
import tensorflow as tf

MY_MODEL = load_model('path/to/model/file')
MY_GRAPH = tf.get_default_graph()

def some_worker_function(inputs):
    with MY_GRAPH.as_default():
        return MY_MODEL.predict(inputs)

The somewhat surprising twist here is: the above code is sufficient if you are using Threads, but hangs indefinitely if you are using Processes. And by default, Celery uses processes to manage all its worker pools. So at this point, things are still not working on Celery.

Why does this only work on Threads?

In Python, Threads share the same global execution context as the parent process. From the Python _thread docs:

This module provides low-level primitives for working with multiple threads (also called light-weight processes or tasks) — multiple threads of control sharing their global data space.

Because threads are not actual separate processes, they use the same python interpreter and thus are subject to the infamous Global Interpeter Lock (GIL). Perhaps more importantly for this investigation, they share global data space with the parent.

In contrast to this, Processes are actual new processes spawned by the program. This means:

  • New Python interpreter instance (and no GIL)
  • Global address space is duplicated

Note the difference here. While Threads have access to a shared single global Session variable (stored internally in the tensorflow_backend module of Keras), Processes have duplicates of the Session variable.

My best understanding of this issue is that the Session variable is supposed to represent a unique connection between a client (process) and the TensorFlow runtime, but by being duplicated in the forking process, this connection information is not properly adjusted. This causes TensorFlow to hang when trying to use a Session created in a different process. If anybody has more insight into how this is working under the hood in TensorFlow, I would love to hear it!

The Solution / Workaround

I went with adjusting Celery so that it uses Threads instead of Processes for pooling. There are some disadvantages to this approach (see GIL comment above), but this allows us to load the model only once. We aren't really CPU bound anyways since the TensorFlow runtime maxes out all the CPU cores (it can sidestep the GIL since it is not written in Python). You have to supply Celery with a separate library to do thread-based pooling; the docs suggest two options: gevent or eventlet. You then pass the library you choose into the worker via the --pool command line argument.

Alternatively, it seems (as you already found out @pX0r) that other Keras backends such as Theano do not have this issue. That makes sense, since these issues are tightly related to TensorFlow implementation details. I personally have not yet tried Theano, so your mileage may vary.

I know this question was posted a while ago, but the issue is still out there, so hopefully this will help somebody!

这篇关于凯拉斯预测芹菜任务不归队的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆