在python看门狗中并行处理多个onCreated事件 [英] Process Multiple onCreated events parallelly in python watchdog
问题描述
我正在尝试检测是否在目录上创建了任何新文件;如果创建了该文件,我想对其进行处理(需要10分钟才能输出),与此同时,还会在该文件夹中创建其他新文件.
I am trying to detect if any new files are created on a directory ; if created I want to process it (takes 10 minutes to give output), in the mean time other new files would also be created in the folder.
我如何注册使用多进程创建的看门狗,以使它不必等待一个文件完成,而是在每次创建文件时都产生一个新进程.
How do i register the watchdog's oncreated with multiprocess such that instead of waiting for one file to be completed it spawns a new process everytime a file it created.
import time
import datetime
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import multiprocessing as mp
def on_created(event):
print(f"hey, {event.src_path} has been created!")
time.sleep(10)
doProcessing(event.src_path)
print(f"hey for {event.src_path}")
if __name__ == "__main__":
patterns = "*"
ignore_patterns = ""
ignore_directories = False
case_sensitive = True
my_event_handler = PatternMatchingEventHandler(patterns, ignore_patterns, ignore_directories, case_sensitive)
path = "D:\watcher"
go_recursively = True
my_observer = Observer()
my_observer.schedule(my_event_handler, path, recursive=go_recursively)
my_observer.start()
my_event_handler.on_created = on_created
#my_event_handler.on_deleted = on_deleted
#my_event_handler.on_modified = on_modified
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
my_observer.stop()
my_observer.join()
def doProcessing(filename):
print("Processing")
推荐答案
对不起,这么多注释掉了部分代码;本质上,pool.apply_async(print_func, (event,))
是帮助解决该问题的原因;一旦事件被推入队列; process_on_load函数遍历队列并异步运行print_func.
Sorry for so many commented out portions of the code; in essence pool.apply_async(print_func, (event,))
is what helped solve the problem; once events are pushed into the queue; the process_on_load function iterates through the queue and asynchronously runs the print_func.
# -*- coding: utf-8 -*-
"""
Created on Mon Oct 21 22:02:55 2019
@author: 1009758
"""
import os
import time
import datetime
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import multiprocessing as mp
from multiprocessing import Process
from multiprocessing import Queue
import threading
from multiprocessing import Pool
PROCESSES = mp.cpu_count() - 1
NUMBER_OF_TASKS = 10
class FileLoaderWatchdog(PatternMatchingEventHandler):
''' Watches a nominated directory and when a * type file is
moved
'''
def __init__(self, queue, patterns):
PatternMatchingEventHandler.__init__(self, patterns=patterns)
self.queue = queue
def process(self, event):
'''
event.event_type
'modified' | 'created' | 'moved' | 'deleted'
event.is_directory
True | False
event.src_path
path/to/observed/file
'''
self.queue.put(event)
def on_created(self, event):
self.process(event)
now = datetime.datetime.utcnow()
#print(f"hey for {event.src_path}")
print ("{0} -- event {1} off the queue ...".format(now.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))
def print_func(event):
time.sleep(5)
now = datetime.datetime.utcnow()
print ("{0} -- Pulling {1} off the queue ...".format(now.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def process_load_queue(q):
'''This is the worker thread function. It is run as a daemon
threads that only exit when the main thread ends.
Args
==========
q: Queue() object
'''
while True:
if not q.empty():
#mp.set_start_method('spawn')
event = q.get()
pool = Pool(processes=1)
pool.apply_async(print_func, (event,))
##p = Pool(5)
#p.map(print_func,(event,))
#print_func(event)
#info('main line')
#procs = []
#proc = Process(target=print_func, args=(event,))
#procs.append(proc)
#proc.start()
#for proc in procs:
# proc.join()
#print ("{0} -- Pulling {1} off the queue ...".format(now.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))
#time.sleep(5)
# now2 = datetime.datetime.utcnow()
#print ("{0} -- Replying {1} off the queue ...".format(now2.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))
else:
time.sleep(1)
if __name__ == '__main__':
# create queue
watchdog_queue = Queue()
# Set up a worker thread to process database load
# setup watchdog to monitor directory for trigger files
#args = sys.argv[1:]
patt = ["*"]
path_watch = "D:\watcher"
event_handler = FileLoaderWatchdog(watchdog_queue, patterns=patt)
observer = Observer()
observer.schedule(event_handler, path=path_watch)
observer.start()
#pool=Pool(processes = 1)
#pool.apply_async(process_load_queue, (watchdog_queue,))
worker = threading.Thread(target=process_load_queue, args=(watchdog_queue,))
worker.setDaemon(True)
worker.start()
#p = Pool(2)
#p.map(observer,watchdog_queue)
#asyncio.run(main())
try:
while True:
time.sleep(2)
except KeyboardInterrupt:
observer.stop()
observer.join()
这篇关于在python看门狗中并行处理多个onCreated事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!