在python看门狗中并行处理多个onCreated事件 [英] Process Multiple onCreated events parallelly in python watchdog

查看:387
本文介绍了在python看门狗中并行处理多个onCreated事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试检测是否在目录上创建了任何新文件;如果创建了该文件,我想对其进行处理(需要10分钟才能输出),与此同时,还会在该文件夹中创建其他新文件.

I am trying to detect if any new files are created on a directory ; if created I want to process it (takes 10 minutes to give output), in the mean time other new files would also be created in the folder.

我如何注册使用多进程创建的看门狗,以使它不必等待一个文件完成,而是在每次创建文件时都产生一个新进程.

How do i register the watchdog's oncreated with multiprocess such that instead of waiting for one file to be completed it spawns a new process everytime a file it created.

import time
import datetime
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import multiprocessing as mp
def on_created(event):
    print(f"hey, {event.src_path} has been created!")
    time.sleep(10)
    doProcessing(event.src_path)
    print(f"hey for {event.src_path}")

if __name__ == "__main__":
    patterns = "*"
    ignore_patterns = ""
    ignore_directories = False
    case_sensitive = True
    my_event_handler = PatternMatchingEventHandler(patterns, ignore_patterns, ignore_directories, case_sensitive)
    path = "D:\watcher"
    go_recursively = True
    my_observer = Observer()
    my_observer.schedule(my_event_handler, path, recursive=go_recursively)
    my_observer.start()    
    my_event_handler.on_created = on_created
    #my_event_handler.on_deleted = on_deleted
    #my_event_handler.on_modified = on_modified
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        my_observer.stop()
    my_observer.join()

def doProcessing(filename):
    print("Processing")

推荐答案

对不起,这么多注释掉了部分代码;本质上,pool.apply_async(print_func, (event,))是帮助解决该问题的原因;一旦事件被推入队列; process_on_load函数遍历队列并异步运行print_func.

Sorry for so many commented out portions of the code; in essence pool.apply_async(print_func, (event,)) is what helped solve the problem; once events are pushed into the queue; the process_on_load function iterates through the queue and asynchronously runs the print_func.

# -*- coding: utf-8 -*-
"""
Created on Mon Oct 21 22:02:55 2019

@author: 1009758
"""
import os
import time
import datetime
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import multiprocessing as mp
from multiprocessing import Process
from multiprocessing import Queue
import threading

from multiprocessing import Pool
PROCESSES = mp.cpu_count() - 1
NUMBER_OF_TASKS = 10
class FileLoaderWatchdog(PatternMatchingEventHandler):
    ''' Watches a nominated directory and when a * type  file is 
        moved 

    '''

    def __init__(self, queue, patterns):
        PatternMatchingEventHandler.__init__(self, patterns=patterns)
        self.queue = queue

    def process(self, event):
        '''
        event.event_type
            'modified' | 'created' | 'moved' | 'deleted'
        event.is_directory
            True | False
        event.src_path
            path/to/observed/file
        '''
        self.queue.put(event)

    def on_created(self, event):
        self.process(event)
        now = datetime.datetime.utcnow()
        #print(f"hey for {event.src_path}")
        print ("{0} -- event {1} off the queue ...".format(now.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))


def print_func(event):
    time.sleep(5)
    now = datetime.datetime.utcnow()
    print ("{0} -- Pulling {1} off the queue ...".format(now.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())   

def process_load_queue(q):
    '''This is the worker thread function. It is run as a daemon 
       threads that only exit when the main thread ends.

       Args
       ==========
         q:  Queue() object
    '''
    while True:
        if not q.empty():
            #mp.set_start_method('spawn')
            event = q.get()
            pool = Pool(processes=1)
            pool.apply_async(print_func, (event,))
            ##p = Pool(5)
            #p.map(print_func,(event,))
            #print_func(event)
            #info('main line')
            #procs = []
            #proc = Process(target=print_func, args=(event,))
            #procs.append(proc)
            #proc.start()
            #for proc in procs:
             #   proc.join()
            #print ("{0} -- Pulling {1} off the queue ...".format(now.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))
            #time.sleep(5)
           # now2 = datetime.datetime.utcnow()
            #print ("{0} -- Replying {1} off the queue ...".format(now2.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))
        else:
            time.sleep(1)



if __name__ == '__main__':

    # create queue
    watchdog_queue = Queue()


    # Set up a worker thread to process database load


    # setup watchdog to monitor directory for trigger files
    #args = sys.argv[1:]
    patt = ["*"]
    path_watch = "D:\watcher"
    event_handler = FileLoaderWatchdog(watchdog_queue, patterns=patt)
    observer = Observer()
    observer.schedule(event_handler, path=path_watch)
    observer.start()
    #pool=Pool(processes = 1)
    #pool.apply_async(process_load_queue, (watchdog_queue,))
    worker = threading.Thread(target=process_load_queue, args=(watchdog_queue,))

    worker.setDaemon(True)
    worker.start()
    #p = Pool(2)
    #p.map(observer,watchdog_queue)


    #asyncio.run(main())
    try:
        while True:
            time.sleep(2)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

这篇关于在python看门狗中并行处理多个onCreated事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆