修复了在计算多处理期间调用函数的频率时返回负值的问题 [英] Fixing the issue where upon calculating how frequently a function is called during multiprocessing it returns a negative value

查看:32
本文介绍了修复了在计算多处理期间调用函数的频率时返回负值的问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个函数 foo() 可以被多个工作进程同时访问.这个函数阻塞直到一个输出准备好,然后返回它.示例 foo 如下:

I have a function foo() which might be accessed by multiple worker processes concurrently. This function blocks until an output is ready, and then returns it. A sample foo is below:

import random
from time import sleep

def foo():
    
    # Mimic blocking of function
    sleep(random.randint(1, 3))
    
    output = 'result of some logic'
    return output

我需要计算这个函数被调用的频率(速率)(例如每 15 秒一次).但是,我不希望这个计算出的速率包含在实际函数中花费的时间(因为 foo 可能会阻塞很长时间).为了只用 1 个工人做到这一点,我运行了这个:

I had a need to calculate how frequently (rate) this function is called (For example once every 15 seconds). However, I do not want this calculated rate to include the time spent in the actual function (since foo may block for long time). To do this with only 1 worker, I ran this:

import random
import time
from time import sleep

call_rate = {'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0}

def foo():
    global call_rate
    enter_time = time.time()
    # Mimic blocking of function
    sleep(random.randint(1, 3))

    output = 'result of some logic'
    time_waited = time.time() - enter_time

    # Add the time since last function call, and remove time spent inside the function
    call_rate['total_time'] += time.time() - call_rate['last_call'] - time_waited
    call_rate['last_call'] = time.time()
    call_rate['total_calls'] += 1

    # calculate rate
    call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
    return output

def worker(num):
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo()

worker(3)

# Output: 1.005s . As expected since worker waits 1s before each call
print('foo called once every {}s'.format(call_rate['rate']))  

基本上,我计算了连续调用之间的总时间差,并在减去函数内花费的时间后,除以调用总数(rate = total_time/total_calls)

Basically, I calculated the total time-differences between consecutive calls, and, after subracting the time spent within the function, divided that by the total number of calls (rate = total_time / total_calls)

但是当我与多个工作人员一起运行时,输出为负数:

But when I run this with multiple workers the output is negative:

import random
import time
from time import sleep
from multiprocessing import Manager, Process


def foo(call_rate):

    enter_time = time.time()
    # Mimic blocking of function
    sleep(random.randint(1, 3))

    output = 'result of some logic'
    time_waited = time.time() - enter_time

    # Add the time since last function call, and remove time spent inside the function
    call_rate['total_time'] += time.time() - call_rate['last_call'] - time_waited
    call_rate['last_call'] = time.time()
    call_rate['total_calls'] += 1

    # calculate rate
    call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
    return output

def worker(num, call_rate):
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(call_rate)

if __name__ == '__main__':
    # Create a shared dictionary accessible by all processes
    m = Manager()
    call_rate = m.dict({'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0})
    
    w = []
    
    # Create 3 worker processes that run foo() thrice 
    for i in range(3):
        w.append(Process(target=worker, args=(3, call_rate,)))
        w[i].start()
    for i in range(3):
        w[i].join()
        
    # Output: -0.97s 
    print('foo called once every {}s'.format(call_rate['rate'])) 

我有点理解为什么输出是负数.因为现在有多个进程,每个连续函数调用之间的时间差越来越小,减去在一个进程的函数中花费的时间现在没有多大意义,因为函数调用现在可以来自不同的进程.所以我的问题是如何在不知道运行的工作人员数量的情况下,在第二种情况下获得大约 0.3 秒的输出(因为有 3 个工作人员同时调用该方法,延迟为 1 秒)?

I can kind-of understand why the output is negative. Because there are now multiple processes, the time difference between each consecutive function call becomes smaller and smaller and subtracting the time spent wihtin the function of one process doesn't make much sense now because the function calls can now be from different processes. So my question is how can I get the output in the second case as approximately 0.3s (since there are 3 workers calling the method concurrently with 1s delay) without knowing the number of workers running?

免责声明此处.然而,在发布这个问题之前,我阅读了元讨论这里此处.我认为这个问题与我之前的问题不重复的原因是,它关注的是一个更小、更好解释的问题,而不是我原来的问题,后者更广泛且未能清楚地解释自己.当时我的目标不仅是为这个问题寻找答案,而且是在我更广泛的方法本身中寻找替代方案,这导致它变得模糊和神秘.与之前不同的是,我提供了专注于单个明确问题的可重现代码,并且这个问题作为一个整体具有更有用的应用.

Disclaimer I have already asked (a quite crude varient of) this question before here. However, before posting this question, I read the meta discussions here and here. The reason I believe this question is not a duplicate of my previous one is because it focuses on a much more smaller, better explained issue rather than then my original question which was much broader and failed to explain itself clearly. My aim at that time was to not only seek an answer for this query, but alternatives in my broader approach itself, which led it to becoming vague and arcane. Unlike previously, I have given reproducible code focused on a single, explicit issue and this question as a whole has more useful applications.

推荐答案

我找到了一种无需询问正在运行的 worker 数量的方法:

I found a way to do it without asking for the number of workers running:

import random
import time
from time import sleep
from multiprocessing import Manager, Process, Lock

def foo(call_rate, lock):
    # Shift this to the start of the function
    with lock:
        call_rate['total_time'] += time.time() - call_rate['last_call']
        call_rate['last_call'] = time.time()
        call_rate['total_calls'] += 1
        call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
    
    # Mimic blocking of function
    sleep(random.randint(1, 3))

    output = 'result of some logic'


    # By doing this, we are ignoring the time spent within the function
    with lock:
        call_rate['last_call'] = time.time()
    return output

def worker(num, call_rate, lock):
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(call_rate, lock)

if __name__ == '__main__':
    # Create a shared dictionary accessible by all processes
    m = Manager()
    lock = m.Lock()
    call_rate = m.dict({'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0})

    w = []

    # Create 3 worker processes that run foo() thrice 
    for i in range(3):
        w.append(Process(target=worker, args=(3, call_rate, lock, )))
        w[i].start()
    for i in range(3):
        w[i].join()

    # Output: 0.354s 
    print('foo called once every {}s'.format(call_rate['rate']))

我会解释为什么会这样.在原始代码中,最后一次调用时间是在函数被阻塞后记录的.这意味着需要减去在函数中花费的时间.但是,正如@Booboo 在对他们的回答的评论中已经指出的那样,这是有问题的,因为可能有多个工作人员在运行,我们不能只减去每个工作人员在函数中花费的等待时间.

I will explain why this works. In the original code, the last call time was recorded AFTER the function had blocked. This meant that the time spent in the function need to be subtracted. But, as @Booboo had already pointed out in the comment to their answer, this was problematic because there maybe multiple workers running and we can't just subtract the waiting time EACH worker spends in the function.

对此的一个简单解决方法是在函数开始时记录上次调用时间,其中尚未添加函数内花费的时间.但它仍然没有解决更广泛的问题,因为下一次 foo() 将从工作人员调用,它将包括从上次调用开始在函数内花费的时间,让我们留在第一个再次.但是这个,我不知道为什么我以前没有看到这个,可以很简单地修复;通过在函数退出之前添加这一行:

A simple workaround to this is to record the last call time at the start of the function, where the time spent within the function has not yet been added. But it still doesn't solve the broader problem because the next time foo() will be called from the worker, it will include the time spent within the function from the last call, leaving us at square one again. But this, and I don't know why I didn't see this before, can be fixed very simply; by adding this line just before the function exits:

call_rate['last_call'] = time.time()

这确保当函数退出时,最后一次调用被刷新,这样工作人员似乎根本没有在函数中花费任何时间.这种方法不需要减去任何东西,这就是它起作用的原因.

This makes sure that when the function exits, the last call is refreshed such that it seems the worker did not spend any time in the function at all. This approach does not require subtracting anything and thats why it works.

我做了一个测试,我运行了 10 次,并使用下面的代码计算了一些统计数据:

I did a test where I ran this 10 times and calculated some statistics using the code below:

import random
import time
from time import sleep
from multiprocessing import Manager, Process, Lock
import statistics


def foo(call_rate, lock):
    with lock:
        call_rate['total_time'] += time.time() - call_rate['last_call']
        call_rate['last_call'] = time.time()
        call_rate['total_calls'] += 1
        call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
    # Mimic blocking of function
    sleep(2)

    output = 'result of some logic'


    # By doing this, we are ignoring the time spent within the function
    with lock:
        call_rate['last_call'] = time.time()
    return output

def worker(num, call_rate, lock):
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(call_rate, lock)

def main():
    # Create a shared dictionary accessible by all processes
    m = Manager()
    lock = m.Lock()
    call_rate = m.dict({'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0})

    w = []

    # Create 3 worker processes that run foo() thrice
    for i in range(3):
        w.append(Process(target=worker, args=(3, call_rate, lock, )))
        w[i].start()
    for i in range(3):
        w[i].join()

    return call_rate['rate']

if __name__ == '__main__':
    avgs = []
    for i in range(10):
        avgs.append(main())

    print("Highest is : {}".format(max(avgs)))
    print("Lowest is : {}".format(min(avgs)))
    print("Avergae is : {}".format(statistics.mean(avgs)))

输出:

Highest is : 0.35980285538567436
Lowest is : 0.3536567423078749
Avergae is : 0.356808172331916

作为证明"上述代码确实忽略了函数内花费的时间,您可以使函数块的时间更长,例如 15 秒,并且输出仍将大致相同.

As a 'proof' that the above code does ignore the time spent within the function, you can make the function block for a larger time, say 15s, and the output will still be approximately the same.

更新

不同时间功能块的频率不是0.3s的原因与worker进入和退出有关foo().考虑下面的代码,其中两个 worker 运行一次,执行 foo() 两次并输出 call_rate 每次进入和退出 foo() 以及用于标识工作人员的唯一 ID:

The reason why the frequency is not 0.3s when the function blocks for a varying time has to do with when the workers enter and exit foo(). Consider the code below where two workers are run once which execute foo() twice and output call_rate every enter and exit of foo() along with a unique id to identify the worker:

import random
import time
from time import sleep
from multiprocessing import Manager, Process, Lock
import statistics
import string

def foo(call_rate, lock, id):
    with lock:
        call_rate['total_time'] += time.time() - call_rate['last_call']
        call_rate['last_call'] = time.time()
        call_rate['total_calls'] += 1
        call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
        print("{} entered, call rate {}".format(id, call_rate))
    # Mimic blocking of function
    sleep(1)

    output = 'result of some logic'

    # By doing this, we are ignoring the time spent within the function
    with lock:
        call_rate['last_call'] = time.time()
        print("{} exited, call rate {}".format(id, call_rate))
    return output


def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))


def worker(num, call_rate, lock):
    id = id_generator()
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(call_rate, lock, id)

def main():
    # Create a shared dictionary accessible by all processes
    m = Manager()
    lock = m.Lock()
    call_rate = m.dict({'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0})

    w = []

    # Create 3 worker processes that run foo() thrice
    for i in range(2):
        w.append(Process(target=worker, args=(2, call_rate, lock, )))
        w[i].start()
    for i in range(2):
        w[i].join()

    return call_rate['rate']

if __name__ == '__main__':
    avgs = []
    for i in range(1):
        avgs.append(main())

    print("Highest is : {}".format(max(avgs)))
    print("Lowest is : {}".format(min(avgs)))
    print("Avergae is : {}".format(statistics.mean(avgs)))

注意在这段代码中,foo() 总是阻塞 1s.由于有两名工作人员在场,因此该速率应接近 0.5 秒.运行此代码:

Note that in this code, foo() always blocks for 1s. The rate should be close to 0.5s since there are two workers present. Running this code:

输出 #1:

XEC6AU entered, call rate {'rate': 1.1851444244384766, 'total_time': 1.1851444244384766, 'last_call': 1624950732.381014, 'total_calls': 1}
O43FUI entered, call rate {'rate': 0.6178374290466309, 'total_time': 1.2356748580932617, 'last_call': 1624950732.4325447, 'total_calls': 2}
XEC6AU exited, call rate {'rate': 0.6178374290466309, 'total_time': 1.2356748580932617, 'last_call': 1624950733.4327667, 'total_calls': 2}
O43FUI exited, call rate {'rate': 0.6178374290466309, 'total_time': 1.2356748580932617, 'last_call': 1624950733.4484024, 'total_calls': 2}
XEC6AU entered, call rate {'rate': 0.7401185035705566, 'total_time': 2.22035551071167, 'last_call': 1624950734.433083, 'total_calls': 3}
O43FUI entered, call rate {'rate': 0.558994710445404, 'total_time': 2.235978841781616, 'last_call': 1624950734.4487064, 'total_calls': 4}
XEC6AU exited, call rate {'rate': 0.558994710445404, 'total_time': 2.235978841781616, 'last_call': 1624950735.4333804, 'total_calls': 4}
O43FUI exited, call rate {'rate': 0.558994710445404, 'total_time': 2.235978841781616, 'last_call': 1624950735.4958992, 'total_calls': 4}
Highest is : 0.558994710445404
Lowest is : 0.558994710445404
Avergae is : 0.558994710445404

速率为 0.5 秒,这应该是预期的.请注意工作人员如何同时进入和退出功能.现在将函数阻塞时间从 1s 更改为 random.randint(1, 10),这就是我得到的:

The rate is 0.5s, which should be expected. Notice how both the workers enter and exit the functions simultaneously. Now after changing the function blocking time from 1s to random.randint(1, 10), this is what I get:

输出 #2

NHXAKF entered, call rate {'rate': 1.1722326278686523, 'total_time': 1.1722326278686523, 'last_call': 1624886294.4630196, 'total_calls': 1}
R2DD8H entered, call rate {'rate': 0.5939309597015381, 'total_time': 1.1878619194030762, 'last_call': 1624886294.478649, 'total_calls': 2}
NHXAKF exited, call rate {'rate': 0.5939309597015381, 'total_time': 1.1878619194030762, 'last_call': 1624886300.4648588, 'total_calls': 2}
NHXAKF entered, call rate {'rate': 0.7293914159138998, 'total_time': 2.188174247741699, 'last_call': 1624886301.465171, 'total_calls': 3}
R2DD8H exited, call rate {'rate': 0.7293914159138998, 'total_time': 2.188174247741699, 'last_call': 1624886302.4811018, 'total_calls': 3}
R2DD8H entered, call rate {'rate': 0.7971136569976807, 'total_time': 3.1884546279907227, 'last_call': 1624886303.4813821, 'total_calls': 4}
NHXAKF exited, call rate {'rate': 0.7971136569976807, 'total_time': 3.1884546279907227, 'last_call': 1624886304.4660738, 'total_calls': 4}
R2DD8H exited, call rate {'rate': 0.7971136569976807, 'total_time': 3.1884546279907227, 'last_call': 1624886307.4826, 'total_calls': 4}
Highest is : 0.7971136569976807
Lowest is : 0.7971136569976807
Avergae is : 0.7971136569976807

与以前不同,该比率几乎为 0.8.此外,两个工作人员也不再一起进入和退出该功能.这当然是由于一个阻塞的时间比另一个更长.但是因为它们不再同步,它们在不同的时间等待 1,而不是在 worker() 函数内一起等待.您甚至可以在 call_rate['total_time'] 中看到这一点.对于输出 #1,worker 是同步的,它是 ~2s,而对于输出 #2,它是 ~3s.因此,利率的差异.所以 0.8s 是在这种情况下工作人员调用 foo() 的真实速率,而不是假设的 0.5s.将速率乘以进程数会忽略这种细微差别.

The rate, unlike before, is almost 0.8. Moreover, both workers are no longer entering and exiting the function together either. This is ofcourse due to one blocking for a longer time than the other. But because they are no longer in sync, they are waiting for 1s at separate times instead of together inside of the worker() function. You can even see that in the call_rate['total_time']. For Output #1, where the workers are in sync, it is ~2s, while for Output #2 it is ~3s. And hence the difference in rates. So the 0.8s is the true rate of the workers calling foo() in this scenario, not the assumed 0.5s. Multiplying the rate by the number of processes would miss this nuance.

这篇关于修复了在计算多处理期间调用函数的频率时返回负值的问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆