将python看门狗与多处理或线程结合 [英] combining python watchdog with multiprocessing or threading

查看:165
本文介绍了将python看门狗与多处理或线程结合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Python的 Watchdog 监视给定目录中正在创建的新文件.创建文件后,将运行一些代码,这些代码会生成一个子进程shell命令,以运行不同的代码来处理该文件.这应该为创建的每个新文件运行.我已经在创建一个文件时进行了测试,并且一切正常,但是在同时创建多个文件时却很难使它正常工作.

I'm using Python's Watchdog to monitor a given directory for new files being created. When a file is created, some code runs that spawns a subprocess shell command to run different code to process this file. This should run for every new file that is created. I've tested this out when one file is created, and things work great, but am having trouble getting it working when multiple files are created, either at the same time, or one after another.

我当前的问题是...在外壳中运行的处理代码需要一段时间才能运行,并且在目录中创建新文件之前不会完成.我对此无能为力.在运行此代码时,看门狗将无法识别已经创建了新文件,并且不会继续执行该代码.

My current problem is this... the processing code run in the shell takes a while to run and will not finish before a new file is created in the directory. There's nothing I can do about that. While this code is running, watchdog will not recognize that a new file has been created, and will not proceed with the code.

所以我认为我需要为每个新文件生成一个新进程,或者做一些使事情能够并行运行的事情,而不是等到一个文件完成后再处理下一个文件.

So I think I need to spawn a new process for each new file, or do something get things to run concurrently, and not wait until one file is done before processing the next one.

所以我的问题是:

1.)实际上,我将在一个目录中同时创建四个不同系列的文件.看门狗一次为所有4个文件创建文件时运行代码的最佳方法是什么?

1.) In reality I will have 4 files, in different series, created at the same time, in one directory. What's the best way get watchdog to run the code on file creation for all 4 files at once?

2.)当代码正在为一个文件运行时,如何让看门狗开始处理同一系列中的下一个文件,而不必等到对上一个文件的处理完成.这是必需的,因为这些文件是特定的,并且我需要暂停一个文件的处理,直到另一个文件完成为止,但是创建它们的顺序可能会有所不同.

2.) When the code is running for one file, how do I get watchdog to begin processing the next file in the same series without waiting until processing for the previous file has completed. This is necessary because the files are particular and I need to pause the processing of one file until another file is finished, but the order in which they are created may vary.

我需要以某种方式将我的看门狗与多处理或线程结合吗?还是我需要实现多个观察员?我有点茫然.谢谢你的帮助.

Do I need to combine my watchdog with multiprocessing or threading somehow? Or do I need to implement multiple observers? I'm kind of at a loss. Thanks for any help.

class MonitorFiles(FileSystemEventHandler):
    '''Sub-class of watchdog event handler'''

    def __init__(self, config=None, log=None):
        self.log = log
        self.config = config

    def on_created(self, event):
        file = os.path.basename(event.src_path)
        self.log.info('Created file {0}'.format(event.src_path))
        dosWatch.go(event.src_path, self.config, self.log)

    def on_modified(self, event):
        file = os.path.basename(event.src_path)
        ext = os.path.splitext(file)[1]
        if ext == '.fits':
            self.log.warning('Modifying a FITS file is not allowed')
            return

    def on_deleted(self, event):
        self.log.critical('Nothing should ever be deleted from here!')
        return      

主要监控

def monitor(config, log):
    '''Uses the Watchdog package to monitor the data directory for new files.
    See the MonitorFiles class in dosClasses for actual monitoring code'''

    event_handler = dosclass.MonitorFiles(config, log)

    # add logging the the event handler
    log_handler = LoggingEventHandler()

    # set up observer
    observer = Observer()
    observer.schedule(event_handler, path=config.fitsDir, recursive=False)
    observer.schedule(log_handler, config.fitsDir, recursive=False)
    observer.start()
    log.info('Begin MaNGA DOS!')
    log.info('Start watching directory {0} for new files ...'.format(config.fitsDir))

    # monitor
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.unschedule_all()
        observer.stop()
        log.info('Stop watching directory ...')
        log.info('End MaNGA DOS!')
        log.info('--------------------------')
        log.info('')
    observer.join() 

在上面,我的monitor方法设置了看门狗来监视主目录. MonitorFiles类定义创建文件时发生的情况.它基本上调用了此dosWatch.go方法,该方法最终调用了subprocess.Popen以运行shell命令.

In the above, my monitor method sets up watchdog to monitor the main directory. The MonitorFiles class defines what happens when a file is created. It basically calls this dosWatch.go method which eventually calls a subprocess.Popen to run a shell command.

推荐答案

这就是我最终要做的事情,它解决了我的问题.我使用多处理程序启动了一个单独的看门狗监视过程,以分别监视每个文件.看门狗已经为我排队了新文件,这对我来说很好.

Here's what I ended up doing, which solved my problem. I used multiprocessing to start a separate watchdog monitoring process to watch for each file separately. Watchdog already queues up new files for me, which is fine for me.

至于上面的第二点,我需要即使先创建了file1,也要在file1之前处理一个file2.因此,在file1期间,我检查了file2处理的输出.如果找到它,则继续处理file1.如果没有,则退出.在处理file2时,我检查是否已经创建了file1,如果已经创建,则处理file1. (此代码未显示)

As for point 2 above, I needed, e.g. a file2 to process before a file1, even though file1 was created first. So during file1 I check for the output of the file2 processing. If it finds it, it goes ahead processing file1. If it doesn't it exits. On file2 processing, I check to see if file1 was created already, and if so, then process file1. (Code for this not shown)

def monitorCam(camera, config, mainlog):
    '''Uses the Watchdog package to monitor the data directory for new files.
    See the MonitorFiles class in dosClasses for actual monitoring code.  Monitors each camera.'''

    mainlog.info('Process Name, PID: {0},{1}'.format(mp.current_process().name,mp.current_process().pid))

    #init cam log
    camlog = initLogger(config, filename='manga_dos_{0}'.format(camera))
    camlog.info('Camera {0}, PID {1} '.format(camera,mp.current_process().pid))
    config.camera=camera

    event_handler = dosclass.MonitorFiles(config, camlog, mainlog)

    # add logging the the event handler
    log_handler = LoggingEventHandler()

    # set up observer
    observer = Observer()
    observer.schedule(event_handler, path=config.fitsDir, recursive=False)
    observer.schedule(log_handler, config.fitsDir, recursive=False)
    observer.daemon=True
    observer.start()
    camlog.info('Begin MaNGA DOS!')
    camlog.info('Start watching directory {0} for new files ...'.format(config.fitsDir))
    camlog.info('Watching directory {0} for new files from camera {1}'.format(config.fitsDir,camera))

    # monitor
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.unschedule_all()
        observer.stop()
        camlog.info('Stop watching directory ...')
        camlog.info('End MaNGA DOS!')
        camlog.info('--------------------------')
        camlog.info('')
    #observer.join()

    if observer.is_alive():
        camlog.info('still alive')
    else:
        camlog.info('thread ending')    

开始多个摄像机处理

def startProcess(camera,config,log):
    ''' Uses multiprocessing module to start 4 different camera monitoring processes'''

    jobs=[]

    #pdb.set_trace()

    #log.info(mp.log_to_stderr(logging.DEBUG))
    for i in range(len(camera)):
        log.info('Starting to monitor camera {0}'.format(camera[i]))
        print 'Starting to monitor camera {0}'.format(camera[i])
        try:
            p = mp.Process(target=monitorCam, args=(camera[i],config, log), name=camera[i])
            p.daemon=True
            jobs.append(p)
            p.start()
        except KeyboardInterrupt:
            log.info('Ending process: {0} for camera {1}'.format(mp.current_process().pid, camera[i]))
            p.terminate()
            log.info('Terminated: {0}, {1}'.format(p,p.is_alive()))

    for i in range(len(jobs)):
        jobs[i].join()  

    return      

这篇关于将python看门狗与多处理或线程结合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆