如何实现支持名称空间的FIFO队列 [英] How to implement a FIFO queue that supports namespaces

查看:137
本文介绍了如何实现支持名称空间的FIFO队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用以下方法来处理基于Google App Engine db.Model的FIFO队列(

 

> from google.appengine.ext从google.appengine.ext导入db
从google.appengine.ext.webapp导入webapp
导入run_wsgi_app

class QueueItem(db.Model):
created = db.DateTimeProperty(required = True,auto_now_add = True)
data = db.BlobProperty(required = True)

@staticmethod
def push(data):
添加一个新的队列项目。
返回QueueItem(data = data).put()

@staticmethod
def pop():
从队列中弹出最老的项目。
def _tx_pop(candidate_key):
#尝试并为自己获取候选项。如果
#的另一个任务击败了它,这将失败。
task = QueueItem.get(candidate_key)
如果任务:
task.delete()
返回任务
#获取一些任务并尝试获取它们,直到找到($)
而不是
#而$ true
candidate_keys = QueueItem.all(keys_only = True).order('created')。fetch(10)
如果不是candidate_keys:
#队列中没有任务
返回无
for candidate_keys中的candidate_key:
task = db.run_in_transaction(_tx_pop,candidate_key)
如果任务:
返回任务

该队列按预期工作(非常好) p>

现在我的代码有一个方法可以访问由延迟队列调用的FIFO队列:

  def deferred_worker():
data = QueueItem.pop()
do_something_with(data)

我想加强这个方法和队列数据结构添加一个client_ID参数,表示需要访问自己的队列的特定客户端。
类似于:

pre $ def deferred_worker(client_ID):
data = QueueItem_of_this_client_ID.pop()#I需要实现这个
do_something_with(data)

如何将Queue编码为client_ID知道?
$ b

约束:

- 客户端的数量是动态的而不是预定义的

- 不能选择Taskqueue 1.十个最大队列2.我想完全控制我的队列)



你知道我该如何使用新的 Namespaces api (请记住,我没有从db.Model调用一个webapp.RequestHandler)?

另一种选择:我可以添加一个 client_ID db.StringProperty 到QueueItem使用它有一个过滤器拉方法:

  QueueItem.all(keys_only = True).filte r(client_ID = an_ID).order('created')。fetch(10)

假设你的客户端类实际上是客户端调用的请求处理程序,你可以这样做:

  from google.appengine.api从google.appengine.api.namespace_manager导入用户
导入set_namespace

class ClientClass(webapp.RequestHandler):
def get(self):
#对于这个例子,假设user_id是唯一的id。
#您可以轻松使用您传递的参数。
user = users.get_current_user()
如果用户:
#如果有用户,请使用他们的队列。否则全局队列。
set_namespace(user.user_id())

item = QueueItem.pop()
self.response.out.write(str(item))

QueueItem.push('The next task。')

或者,您也可以设置命名空间应用范围



通过设置默认名称空间,除非另有明确规定,否则对该数据存储区的所有调用都将位于该名称空间内。请注意,要获取并运行任务,您必须知道命名空间。因此,您可能希望维护默认名称空间中的名称空间列表以进行清理。


I'm using the following approach to handle a FIFO queue based on Google App Engine db.Model (see this question).

from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.webapp import run_wsgi_app

class QueueItem(db.Model):
  created = db.DateTimeProperty(required=True, auto_now_add=True)
  data = db.BlobProperty(required=True)

  @staticmethod
  def push(data):
    """Add a new queue item."""
    return QueueItem(data=data).put()

  @staticmethod
  def pop():
    """Pop the oldest item off the queue."""
    def _tx_pop(candidate_key):
      # Try and grab the candidate key for ourselves. This will fail if
      # another task beat us to it.
      task = QueueItem.get(candidate_key)
      if task:
        task.delete()
      return task
    # Grab some tasks and try getting them until we find one that hasn't been
    # taken by someone else ahead of us
    while True:
      candidate_keys = QueueItem.all(keys_only=True).order('created').fetch(10)
      if not candidate_keys:
        # No tasks in queue
        return None
      for candidate_key in candidate_keys:
        task = db.run_in_transaction(_tx_pop, candidate_key)
        if task:
          return task

This queue works as expected (very good).

Right now my code has a method that access this FIFO queue invoked by a deferred queue:

def deferred_worker():
        data= QueueItem.pop()
        do_something_with(data)

I would like to enhance this method and the queue data structure adding a client_ID parameter representing a specific client that needs to access its own Queue. Something like:

def deferred_worker(client_ID):
        data= QueueItem_of_this_client_ID.pop() # I need to implement this
        do_something_with(data)

How could I code the Queue to be client_ID aware?

Constraints:
- The number of clients is dynamic and not predefined
- Taskqueue is not an option (1. ten max queues 2. I would like to have full control on my queue)

Do you know how could I add this behaviour using the new Namespaces api (Remember that I'm not calling the db.Model from a webapp.RequestHandler)?
Another option: I could add a client_ID db.StringProperty to the QueueItem using it has a filter on pull method:

QueueItem.all(keys_only=True).filter(client_ID=an_ID).order('created').fetch(10)

Any better idea?

解决方案

Assuming your "client class" is really a request handler the client calls, you could do something like this:

from google.appengine.api import users
from google.appengine.api.namespace_manager import set_namespace

class ClientClass(webapp.RequestHandler):
  def get(self):
    # For this example let's assume the user_id is your unique id.
    # You could just as easily use a parameter you are passed.
    user = users.get_current_user()
    if user:
       # If there is a user, use their queue.  Otherwise the global queue.
       set_namespace(user.user_id())

    item = QueueItem.pop()
    self.response.out.write(str(item))

    QueueItem.push('The next task.')

Alternatively, you could also set the namespace app-wide.

By setting the default namespace all calls to the datastore will be "within" that namespace, unless you explicitly specify otherwise. Just note, to fetch and run tasks you'll have to know the namespace. So you probably want to maintain a list of namespaces in the default namespace for cleanup purposes.

这篇关于如何实现支持名称空间的FIFO队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆