Python观察者模式:示例,提示? [英] Python Observer Pattern: Examples, Tips?

查看:148
本文介绍了Python观察者模式:示例,提示?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在Python中实现了GoF Observer的示例吗?我有一个代码,目前有一些调试代码通过关键类(当前生成消息到stderr如果一个魔术env设置)。此外,该类还具有用于递增返回结果的接口,并将其存储(在内存中)进行后处理。 (该类本身是通过ssh在远程机器上同时执行命令的作业管理器。)



目前,该类的用法类似于:

  job = SSHJobMan(hostlist,cmd)
job.start()
而不是job.done():
for each in job.poll():
incrementally_process(job.results [each])
time.sleep(0.2)#或其他更有用的工作
post_process(job.results)

一个同义使用模式是:

  job = SSHJobMan(hostlist,cmd)
job.wait()#隐式地执行一个start()
进程(job.results)

这一切都适用于当前的实用程序。但它确实缺乏灵活性。例如,我目前支持简单的输出格式或进度条作为增量结果,我还支持
简要,完整和合并的消息输出为 post_process()功能



但是,我想支持多个结果/输出流(进度条到终端,调试和警告到日志文件,从成功的作业输出到一个文件/目录,错误消息和其他结果从不成功的作业到另一个等)。



这听起来像一个情况,要求观察者...有我的类的实例接受来自其他对象的注册,并在发生特定类型的事件时将其调回回来。



我正在查看 PyPubSub ,因为我在SO相关问题中看到几个引用。我不知道我准备好将外部依赖关系添加到我的实用程序中,但是我可以看到使用他们的界面作为我的模型的价值,如果这将使其他人更容易使用。 (该项目既是独立的命令行实用程序,也是用于编写其他脚本/实用程序的类)。



简而言之,我知道如何做我想要的但是有很多方法可以实现。我希望长期以来可能为其他代码使用者提供的建议。



代码本身在: classh

解决方案


但是它确实没有灵活性。


嗯...实际上,如果异步API是你要什么。通常是也许你需要的是从stderr切换到Python的 logging 模块,它有一种自己的发布/订阅模型,与 Logger.addHandler()等等。



如果你想支持观察员,我的建议是保持简单。你真的只需要几行代码。

  class Event(object):
pass

class Observable(object):
def __init __(self):
self.callbacks = []
def subscribe(self,callback):
self.callbacks.append (回调)
def fire(self,** attrs):
e = Event()
e.source = self
for k,v in attrs.iteritems():
setattr(e,k,v)
在self.callbacks中的fn
fn(e)

您的Job类可以子类 Observable 。当有兴趣发生时,请致电 self.fire(type =progress,percent = 50)等。


Are there any exemplary examples of the GoF Observer implemented in Python? I have a bit code which currently has bits of debugging code laced through the key class (currently generating messages to stderr if a magic env is set). Additionally, the class has an interface for incrementally return results as well as storing them (in memory) for post processing. (The class itself is a job manager for concurrently executing commands on remote machines over ssh).

Currently the usage of the class looks something like:

job = SSHJobMan(hostlist, cmd)
job.start()
while not job.done():
    for each in job.poll():
        incrementally_process(job.results[each])
        time.sleep(0.2) # or other more useful work
post_process(job.results)

An alernative usage model is:

job = SSHJobMan(hostlist, cmd)
job.wait()  # implicitly performs a start()
process(job.results)

This all works fine for the current utility. However it does lack flexibility. For example I currently support a brief output format or a progress bar as incremental results, I also support brief, complete and "merged message" outputs for the post_process() function.

However, I'd like to support multiple results/output streams (progress bar to the terminal, debugging and warnings to a log file, outputs from successful jobs to one file/directory, error messages and other results from non-successful jobs to another, etc).

This sounds like a situation that calls for Observer ... have instances of my class accept registration from other objects and call them back with specific types of events as they occur.

I'm looking at PyPubSub since I saw several references to that in SO related questions. I'm not sure I'm ready to add the external dependency to my utility but I could see value in using their interface as a model for mine if that's going to make it easier for others to use. (The project is intended as both a standalone command line utility and a class for writing other scripts/utilities).

In short I know how to do what I want ... but there are numerous ways to accomplish it. I want suggestions on what's most likely to work for other users of the code in the long run.

The code itself is at: classh.

解决方案

However it does lack flexibility.

Well... actually, this looks like a good design to me if an asynchronous API is what you want. It usually is. Maybe all you need is to switch from stderr to Python's logging module, which has a sort of publish/subscribe model of its own, what with Logger.addHandler() and so on.

If you do want to support observers, my advice is to keep it simple. You really only need a few lines of code.

class Event(object):
    pass

class Observable(object):
    def __init__(self):
        self.callbacks = []
    def subscribe(self, callback):
        self.callbacks.append(callback)
    def fire(self, **attrs):
        e = Event()
        e.source = self
        for k, v in attrs.iteritems():
            setattr(e, k, v)
        for fn in self.callbacks:
            fn(e)

Your Job class can subclass Observable. When something of interest happens, call self.fire(type="progress", percent=50) or the like.

这篇关于Python观察者模式:示例,提示?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆