酸洗Python函数失败,装饰器中出现ProcessPoolExecutor [英] Pickling Python Function fails with ProcessPoolExecutor in a decorator

查看:204
本文介绍了酸洗Python函数失败,装饰器中出现ProcessPoolExecutor的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我问了这个问题,并尝试了ProcessPoolExecutor方法.我用装饰器建议的以下方式:

So I asked this question and tried the ProcessPoolExecutor approach. I used the decorator suggested the following way:

run_in_executor中的运行图像操纵.适应多处理

import asyncio
import functools
from concurrent import futures

from app.exceptions.errors import ManipulationError


_pool = futures.ProcessPoolExecutor()


def executor(function):
    @functools.wraps(function)
    def decorator(*args, **kwargs):
        try:
            partial = functools.partial(function, *args, **kwargs)
            loop = asyncio.get_event_loop()
            return loop.run_in_executor(_pool, partial)
        except Exception as e:
            raise ManipulationError(str(e))

    return decorator

然后我将其用于类似的功能上:

I then used it on a function like:

@executor
@pil
def blur(image):
    frame = image.convert("RGBA")
    return frame.filter(ImageFilter.BLUR)

请注意, @pil 是我制作的另一个装饰器.

Note the @pil is another decorator I made.

def pil(function):
    @functools.wraps(function)
    def wrapper(image, *args, **kwargs) -> BytesIO:
        img = PILManip.pil_image(image)
        if img.format == "GIF":
            frames = []
            for frame in ImageSequence.Iterator(img):
                res_frame = function(frame, *args, **kwargs)
                frames.append(res_frame)
            return PILManip.pil_gif_save(frames), "gif"
        elif img.format in ["PNG", "JPEG"]:
            img = function(img, *args, **kwargs)
            return PILManip.pil_image_save(img), "png"
        else:
            raise BadImage("Bad Format")

    return wrapper

我在FastApi路线中这样称呼它:

I called it in a FastApi route like so:


@router.get("/blur/", responses=normal_response)
async def blur_image(url: str):
    byt = await Client.image_bytes(url)
    img, image_format = await blur(byt)
    return Response(img.read(), media_type=f"image/{image_format}")

我在腌制中遇到一些错误.

I get some error about pickling.

500 Internal Server Error
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/uvicorn/protocols/http/httptools_impl.py", line 391, in run_asgi
    result = await app(self.scope, self.receive, self.send)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
    return await self.app(scope, receive, send)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/fastapi/applications.py", line 199, in __call__
    await super().__call__(scope, receive, send)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/applications.py", line 111, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
    raise exc from None
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
    await self.app(scope, receive, _send)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 25, in __call__
    response = await self.dispatch_func(request, self.call_next)
  File "/home/codespace/workspace/dagpi-image/app/middleware/timer.py", line 8, in add_process_time_header
    response = await call_next(request)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 45, in call_next
    task.result()
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 38, in coro
    await self.app(scope, receive, send)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 25, in __call__
    response = await self.dispatch_func(request, self.call_next)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette_prometheus/middleware.py", line 56, in dispatch
    raise e from None
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette_prometheus/middleware.py", line 52, in dispatch
    response = await call_next(request)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 45, in call_next
    task.result()
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 38, in coro
    await self.app(scope, receive, send)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
    raise exc from None
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/exceptions.py", line 71, in __call__
    await self.app(scope, receive, sender)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/routing.py", line 566, in __call__
    await route.handle(scope, receive, send)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/routing.py", line 227, in handle
    await self.app(scope, receive, send)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/routing.py", line 41, in app
    response = await func(request)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/fastapi/routing.py", line 201, in app
    raw_response = await run_endpoint_function(
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/fastapi/routing.py", line 148, in run_endpoint_function
    return await dependant.call(**values)
  File "/home/codespace/workspace/dagpi-image/app/routes/image_routes.py", line 107, in blur_image
    img, image_format = await blur(byt)
  File "/opt/python/3.8.6/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/opt/python/3.8.6/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function blur at 0x7f87524091f0>: it's not the same object as app.image.pil_manipulation.blur

有人知道为什么这种情况持续发生吗?有人告诉我对象必须是可序列化的,我相信BytesIO是映像的唯一输入/输出.那应该是可序列化的.

Does someone know why this keeps happening? I was told the objects have to be serializable, I believe BytesIO is the only in/out of the image. That should be serializable.

推荐答案

装饰器通常会产生包装的函数,因为它们包含隐藏状态,因此不容易腌制(序列化).处理多处理时,应避免使用修饰符,并将普通的全局函数发送给 run_in_executor .例如,您可以将 executor 装饰器重新编写为实用程序函数:

Decorators typically produce wrapped functions that aren't easy to pickle (serialize) because they contain hidden state. When dealing with multiprocessing, you should avoid decorators and send ordinary global functions to run_in_executor. For example, you could re-write your executor decorator into a utility function:

_pool = concurrent.futures.ProcessPoolExecutor()
 
async def exec_async(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(_pool, fn, *args)

您可以使用 await exec_async(some_function,arg1,arg2,...)来等待它,而不是用 executor 装饰一个函数.同样,您可以将 pil 装饰器重写为另一个实用程序:

Instead of decorating a function with executor, you can just await it using await exec_async(some_function, arg1, arg2, ...). Likewise, you can rewrite the pil decorator into another utility:

def pil(image, transform):
    img = PILManip.pil_image(image)
    if img.format == "GIF":
        frames = []
        for frame in ImageSequence.Iterator(img):
            res_frame = transform(frame)
            frames.append(res_frame)
        return PILManip.pil_gif_save(frames), "gif"
    elif img.format in ["PNG", "JPEG"]:
        img = transform(img)
        return PILManip.pil_image_save(img), "png"
    else:
        raise BadImage("Bad Format")

blur 的实现现在变成了调用 pil 的普通函数,可以安全地传递给 exec_async :

The implementation of blur now becomes an ordinary function which calls pil, and which can be safely passed to exec_async:


def blur(image):
    def transform(frame):
        frame = frame.convert("RGBA")
        return frame.filter(ImageFilter.BLUR)
    return pil(image, transform)
 
@router.get("/blur/", responses=normal_response)
async def blur_image(url: str):
    byt = await Client.image_bytes(url)
    img, image_format = await exec_async(blur, byt)
    return Response(img.read(), media_type=f"image/{image_format}")

注意:以上代码未经测试.

Note: the above code is untested.

这篇关于酸洗Python函数失败,装饰器中出现ProcessPoolExecutor的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆