python& Redis:Manager/Worker应用程序最佳实践 [英] Python & Redis: Manager/Worker application best practices

查看:52
本文介绍了python& Redis:Manager/Worker应用程序最佳实践的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

关于使用Python和Redis创建用于运行异步命令的作业队列应用程序,我有一些一般性问题.这是我到目前为止生成的代码:

I have a few general questions about using Python and Redis to create a job queue application for running asynchronous commands. Here is the code I have generated so far:

def queueCmd(cmd):
    r_server.rpush("cmds", cmd)

def printCmdQueue():
    print r_server.lrange("cmds", 0 , -1)

def work():
    print "command being consumed: ", r_server.lpop("cmds")
    return -1

def boom(info):
    print "pop goes the weasel"

if __name__ == '__main__':

    r_server = redis.Redis("localhost")

    queueCmd("ls -la;sleep 10;ls")
    queueCmd("mkdir test; sleep 20")
    queueCmd("ls -la;sleep 10;ls")
    queueCmd("mkdir test; sleep 20")
    queueCmd("ls -la;sleep 10;ls")
    queueCmd("mkdir test; sleep 20")

    printCmdQueue()

    pool = Pool(processes=2)

    print "cnt:", +r_server.llen("cmds")
    #while r_server.llen("cmds") > 0:
    while True:
        pool.apply_async(work, callback=boom)
        if not r_server.lrange("cmds", 0, -1):
        #if r_server.llen("cmds") == 0:
            print "Terminate pool"
            pool.terminate()
            break

    printCmdQueue()

首先,我是否正确认为如果我需要与管理器进行任何通信,那么我想通过回调进行通信吗?我在此用法上看到的快速示例将async调用存储在结果中,并通过result.get(timeout = 1)进行访问.通过交流,我的意思是将内容放回Redis列表中.

First, am I correct in believing that if I need to do any communication to the manager, that I want to do so with a callback? The quick examples I seen on this use store the async call in a result and access it via result.get(timeout=1). And by communication, I mean put stuff back into a redis list.

如果命令以异步方式运行并且我在main内部的结果上超时,那么该超时是worker超时还是仅仅是manager内的操作超时?如果只有经理,我不能用它来检查工人的退出代码吗?

if the command is run in async and I timeout on the result inside the main, does that timeout the worker or just that operation inside the manager? If only the manager couldn't I use this to check for exit codes from the worker?

接下来,此代码将产生以下输出:

Next, this code produces the following output:

['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed:  ['mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed:  ['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed:  mkdir test; sleep 20
pop goes the weasel
pop goes the weasel
command being consumed:  ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed:  ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed:  mkdir test; sleep 20
Terminate pool
command being consumed:  None
 pop goes the weasel
pop goes the weasel
pop goes the weasel
[]

为什么即使我一次将多个cmds弹出,该工作人员仍要一次消耗多个cmd?在类似的情况下,这并不总是能很好地结束,有时需要使用ctrl + c.为了对付他,我清理了队列,然后再去.我认为这与apply_sync()和是否退出循环有关.我想知道在工人方面是否还有更多需求?

Why does the worker want to consume multiple cmds at a time even though I am poping them off one at a time? On a similar not, this doesn't always end nicely and sometimes requires a ctrl+c. To deal with his I clear out the queue and go again. I think this relates to the apply_sync() and if to get out of loop. I am wondering if more needs to happen on the worker side?

如果我将ifs更改为已注释掉的那一个,则会得到:

If I change the ifs to the one commented out, I get:

ValueError: invalid literal for int() with base 10: 'ls -la;sleep 10;ls'

这似乎是检查我是否需要中断的更好方法,但似乎函数有时会返回字符串文字?

This seems like it would be a better way to check to see if I need to break but it seems that function is returning a string literal at times?

任何对此改善的建议将不胜感激.我只是想做一个管理器,就像一个Linux机器上的服务/守护程序.它将用于从Redis列表中获取作业(当前是命令,但可能还有更多),并将结果返回到Redis列表中.然后,GUI将与该管理器交互以获取队列状态并返回结果.

Any advise on improving this would be much appreciated. I am simply trying to make a manager which will be like a service/daemon on a linux machine. It will be used to get jobs (currently commands but possibly more) from a redis list and returns results back into a redis list. Then down the road a GUI will interact with this manager to get status of queues and return results.

谢谢

我意识到自己有点傻瓜.我不需要从工作人员访问redis服务器,这会导致一些错误(特别是ValueError).

I realized I was being a bit of a goof. I do not need to access the redis server from a worker and that was leading to some errors (specifically the ValueError).

要解决此问题,循环现在为:

To fix this the loop is now:

while not r_server.llen("cmds") == 0:
    cmd = r_server.lpop("cmds")
    pool.apply_async(work, [cmd])

这些行之后,我叫pool.close().我使用了os.getpid()os.getppid()来检查我是否确实有多个孩子在跑来跑去.

After these lines I call pool.close(). I used os.getpid() and os.getppid() to check that I did in fact have multiple children running around.

如果这听起来像是创建使用Redis的经理/工作人员应用程序的好方法,我还是很高兴听到的.

I would still enjoy hearing if this sounds like a good way to create a manager/worker application that uses redis.

推荐答案

您的问题是您试图通过单个Redis连接同时运行多个命令.

Your problem is that you are trying to run multiple commands concurrently with a single redis connection.

您期望的是类似的东西

Thread 1     Thread 2
LLEN test    
1                            
LPOP test   
command      
             LLEN test
             0

但是您得到

Thread 1     Thread 2
LLEN test    
1                            
LPOP test   
             LLEN test
             command
0

结果以相同顺序返回,但是没有将线程或命令链接到特定结果的任何内容.单独的redis连接不是线程安全的-每个工作线程都需要一个.

The results come back in the same order, but there is nothing linking a thread or command to a specific result. Individual redis connections are not thread safe - you will need one for each worker thread.

如果使用不正确的流水线操作,您还会看到类似的问题-它专门用于只写场景,例如将大量项目添加到列表中,在这种情况下,您可以通过假设LPUSH成功而不是等待服务器通知您来提高性能.在每个项目之后都成功. Redis仍会返回结果,但不一定是最后发送的命令的结果.

You can also see similar problems if you use pipelining inappropriately - it is designed for write only scenarios like adding lots of items to a list, where you can improve performance by assuming LPUSH succeeded rather than waiting for the server to tell you it succeeded after each item. Redis will still return the results, but they will not necessarily be results from the last command sent.

除此之外,基本方法是合理的.不过,您可以进行一些增强:

Other than that, the basic approach is reasonable. There are a couple of enhancements you could make though:

  • 使用非阻塞LPOP而不是检查长度-如果返回的是null,则列表为空
  • 添加一个计时器,以便如果列表为空,它将等待而不是仅仅发出另一个命令.
  • 在while循环条件中包括取消检查
  • 处理连接错误-我使用外部循环设置,以便如果连接失败,工作程序将尝试以合理的尝试次数重新连接(基本上重新启动 main ),然后再完全终止工作程序进程
  • Rather than checking the length, just use non-blocking LPOP - if it returns null, the list is empty
  • Add a timer so that if the list is empty it will wait rather than just issuing another command.
  • Include a cancellation check in the while loop condition
  • Handle connection errors - I use an outer loop set up so that if the connection fails the worker will attempt to reconnect (basically restart main) for a reasonable number of attempts before terminating the worker process altogether.

这篇关于python& Redis:Manager/Worker应用程序最佳实践的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆