非阻塞线程队列 [英] Nonblocking queue of threads

查看:58
本文介绍了非阻塞线程队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想创建简单的线程队列.

I want to create simple queue of threads.

线程将从请求后开始

我创建了没有请求的简单示例.我试图加入线程,但是它不能按我的意愿工作.

I created simple example without requests. I tried to join threads, but it doesn't work as i want.

def a():
    print('start a')
    sleep(5)
    print('end a')


def b():
    print('start b')
    sleep(5)
    print('end b')


t = Thread(target=a)
t.start()
t.join()
print('test1')
t = Thread(target=b)
t.start()
t.join()
print('test2')

代码结果:

start a
end a
test1
start b
end b
test2

期望:

start a
test1
end a
start b
test2
end b

我知道我可以在加入之前打印('test1'),但是当我使用发布请求时,线程可以在打印('test1')时启动

I know i can print('test1') before join, but when i use post request thread can start in time of printing ('test1')

t.start()
print('test1') <- here comes post request
               t1.start()
               print('test2')
               t1.join()
t.join()

在这种情况下,线程将并行运行

In this case threads will run parallel

推荐答案

您可以将 timeout 参数传递给 join(),这将阻止调用线程最大超时秒.最大",因为线程可以更早终止.如果不传递超时,它将一直阻塞直到线程完成.因此,在您的示例中,您需要做的是将线程连接两次,第一次连接超时,该超时允许打印 test1 .

You can pass a timeout argument to join(), this will block the calling thread maximal timeout seconds. 'Maximal' because the thread can terminate earlier. If you don't pass a timeout, it will block until the thread finishes. So what you would have to do in your example is join the thread twice, the first time with a timeout which allows printing test1.

from threading import Thread
from time import sleep

def a():
    print('start a')
    sleep(5)
    print('end a')


def b():
    print('start b')
    sleep(5)
    print('end b')


t = Thread(target=a)
t.start()
t.join(0.1)
print('test1')
t.join()

t = Thread(target=b)
t.start()
t.join(0.1)
print('test2')
t.join()

输出:

start a
test1
end a
start b
test2
end b

除此之外,如果您担心的只是限制每秒请求数,那么我怀疑这是否是一种好方法.您可以重用线程,而不必为每个请求重新创建线程,这将节省创建开销.您可以使用ThreadPool并使用指定的心跳向池中发出请求任务.

Apart from that, I'm in doubt this is a good approach if your concern is just limiting requests per second. You could reuse your threads instead of recreating them for every request this would spare you the creation overhead. You could use a ThreadPool and emit a request task into the pool with your specified heartbeat.

import logging
from time import sleep
from multiprocessing.pool import ThreadPool


def make_request(*args):
    logger.debug(f'making request number {args[0]}')
    sleep(5)


def init_logging(log_level=logging.DEBUG):
    fmt = '[%(asctime)s %(levelname)-8s %(threadName)s' \
          ' %(funcName)s()] --- %(message)s'
    logging.basicConfig(format=fmt, level=log_level)


if __name__ == '__main__':

    N_THREADS = 5
    N_REQUESTS = 10

    arguments = [*zip(range(N_REQUESTS))]  # [(0,), (1,), (2,) ...]

    init_logging()
    logger = logging.getLogger()

    with ThreadPool(N_THREADS) as pool:
        for args in arguments:
            pool.apply_async(make_request, args=args)
            sleep(1)  # heartbeat

示例输出:

[2018-09-09 03:17:06,303 DEBUG    Thread-1 make_request()] --- making request number 0
[2018-09-09 03:17:07,304 DEBUG    Thread-2 make_request()] --- making request number 1
[2018-09-09 03:17:08,306 DEBUG    Thread-3 make_request()] --- making request number 2
[2018-09-09 03:17:09,307 DEBUG    Thread-4 make_request()] --- making request number 3
[2018-09-09 03:17:10,308 DEBUG    Thread-5 make_request()] --- making request number 4
[2018-09-09 03:17:11,309 DEBUG    Thread-1 make_request()] --- making request number 5
[2018-09-09 03:17:12,310 DEBUG    Thread-2 make_request()] --- making request number 6
[2018-09-09 03:17:13,311 DEBUG    Thread-3 make_request()] --- making request number 7
[2018-09-09 03:17:14,312 DEBUG    Thread-4 make_request()] --- making request number 8
[2018-09-09 03:17:15,313 DEBUG    Thread-5 make_request()] --- making request number 9

Process finished with exit code 0

这篇关于非阻塞线程队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆