Django celery worker将实时状态和结果消息发送到前端 [英] Django celery worker to send real-time status and result messages to front end

查看:232
本文介绍了Django celery worker将实时状态和结果消息发送到前端的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在Django应用中,我正在运行异步任务,并希望向用户显示进度,错误等.如果存在错误,则应将用户重定向到需要其他输入或采取某些措施来解决此问题的页面.从芹菜工作回到前端的最佳沟通方式是什么?

In a django app I'm running async tasks and would like to show progress, errors etc to the user. If there are errors, the user should be redirect to a page where additional input or some action is required to fix the problem. What is the best way to communicate from the celery work back to the front end?

这是伪代码的基本结构:

Here's a basic structure in pseudo code:

# views.py
from tasks import run_task

def view_task():
    run_task.delay()
    return render(request, 'template.html')

# tasks.py
from compute_module import compute_fct

@shared_task
def run_task():
    result = compute_fct()

    # how to catch status update messages from compute_module while compute_fct is running??

    if result == 'error':
        handle_error()
    else:
        handle_succes()     

# compute_module
import pandas as pd

def compute_fct():
    # send message: status = loading file
    df = pd.read_csv('test.csv')
    # send message: status = computing
    val = df['col'].mean()

    if val is None:
        return {'status':'error'}
    else:
        return {'status':'success','val':val}

我理想地想要的东西:

  • compute_module.py模块使用python native logger.通过职责分离,我想使日志记录尽可能通用,并使用标准的python/django记录器.但是它们似乎并非旨在向前端发送消息.
  • celery任务以某种方式处理日志,而不是将其显示在stdout上,而是将其重定向到pusher
  • 前端js显示并处理消息
  • compute_module.py module uses python native logger. By separation of duties I want to keep the logging as generic as possible and use the standard python/django loggers. But they don't seem to be designed to send messages to front end.
  • celery task somehow handles the logs and instead of displaying them on stdout redirects them to pusher
  • front-end js shows and handles the messages

芹菜工作者与前端之间可能存在我不知道的标准通信方式.这种情况经常发生,令我惊讶的是它是如此难以实现.在某种程度上,应该为此设计rabbitmq消息队列或aws sns.下面是我看过的资源,但都不觉得它们都能很好地工作,但是也许我很困惑.

There might be standard ways of communicating between celery worker and front end that I'm not aware off. this scenario must happen often and I am surprised it's so difficult to implement. in a way the rabbitmq message queue or aws sns should be designed for this. below are resources that I looked at but don't feel either of them work very well but maybe I am just confused.

日志记录:这似乎更多是关于在服务器端进行日志记录,而不是向用户发送消息

logging: this seems to be more about logging on the server side, not sending messages to user

  • http://docs.celeryproject.org/en/latest/userguide/tasks.html#logging
  • https://docs.djangoproject.com/en/2.0/topics/logging/
  • http://oddbird.net/2017/04/17/async-notifications/
  • https://www.google.com/search?q=celery+worker+send+message+to+front+end

Celery cam似乎与管理员监视任务有关,而不是向用户发送消息

Celery cam seems to be about admin monitoring tasks, not sending messages to user

我喜欢的按钮,但我不想让compute_module.py处理.举例来说,我不希望在compute_module.py内部不进行任何pusher.com集成.猜猜我可以传递一个已经实例化的pusher对象,以便该模块可以推送消息,但是我还是希望它是通用的

pusher I like but I don't want to have compute_module.py deal with it. That is For example I would prefer not to do any pusher.com integration inside compute_module.py. Guess I could pass a pusher object that has already been instantiated so the module can just push messages but again I would prefer it to be generic

  • https://blog.pusher.com/improve-user-experience-app-real-time-progress-bar-tutorial/
  • https://blog.pusher.com/django-pusherable/

推荐答案

现在移至django-channels,效果很好,但比下面的解决方案复杂.

Moved to django-channels now, works well but more complex than solution below.

上一个:

好吧,下面是我现在如何解决的伪代码.基本上,我使用 https://pusher.com/docs/javascript_quick_start 并在服务器端传递实例化的对象进入compute_module.缺点是推送消息是泛滥的消息,因此我将不得不在LogPusher中做一些额外的工作才能将它们存储在db中,这又需要一天的时间...

Ok so below is pseudo code for how I've solved it for now. Basically I use https://pusher.com/docs/javascript_quick_start and server-side pass the instantiated object into the compute_module. One downside is that the pusher messages are ephermeral so I'm going to have to do some extra work in LogPusher to store them in a db, something for another day...

在我的实际实现中,我还通过$(document).ready()中的$.post() ajax调用触发了任务,因为小任务完成得如此之快,用户将永远不会看到推送消息,因为未建立连接(返回到该历史消息)问题).

Also in my real implementation I trigger the task via a $.post() ajax call in $(document).ready() because small tasks completed so fast the user would never see the pusher messages because the connection wasn't established (back to that historic message problem).

我上面没有提到的另一种替代方法是 https://channels.readthedocs.io /en/latest/

Another alternative route which I hadn't mentioned above is https://channels.readthedocs.io/en/latest/

# views.py
from tasks import run_task

def view_task():
    run_task.delay('event')
    return render(request, 'template.html', 'pusher_event':'event')


# tasks.py
import pusher
from django.conf import settings
from compute_module import compute_fct

class LogPusher(object):
    def __init__(self, event):
        self.pusher_client = pusher.Pusher(app_id=settings.PUSHER_APP_ID,
                        key=settings.PUSHER_KEY,
                        secret=settings.PUSHER_SECRET,
                        cluster=settings.PUSHER_CLUSTER, ssl=True)
        self.event = event

    def send(self, data):
        self.pusher_client.trigger(settings.PUSHER_CHANNEL, self.event, json.dumps(data))

@shared_task
def run_task(pusher_event):

    log_pusher = LogPusher(pusher_event)
    result = compute_fct(log_pusher)

    # how to catch status update messages from compute_module while compute_fct is running??

    if result == 'error':
            log_pusher.send('status':'error')
    else:
            log_pusher.send('status':'success')


# compute_module.py
import pandas as pd

def compute_fct(log_pusher):
    # send message: status = loading file
    log_pusher.send('status':'loading file')
    df = pd.read_csv('test.csv')
    # send message: status = computing
    log_pusher.send('status':'computing')
    val = df['col'].mean()

    if val is None:
        return {'status':'error'}
    else:
        return {'status':'success','val':val}


# context_processors.py
# see https://stackoverflow.com/questions/433162/can-i-access-constants-in-settings-py-from-templates-in-django
from django.conf import settings 

def pusher(request):
    return {'PUSHER_KEY': settings.PUSHER_KEY, 'PUSHER_CLUSTER': settings.PUSHER_CLUSTER , 'PUSHER_CHANNEL': settings.PUSHER_CHANNEL }


# template.html
<script>

var pusher = new Pusher("{{PUSHER_KEY}}", {
  cluster: "{{PUSHER_CLUSTER}}",
  encrypted: true    
});

var channel = pusher.subscribe("{{PUSHER_CHANNEL}}");
channel.bind("{{pusher_event}}", function(data) {
    // process data
});

</script>

这篇关于Django celery worker将实时状态和结果消息发送到前端的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆