池仅执行一个线程,而不是4.如何使它成为无限线程? [英] Pool only executes a single thread instead of 4, and how do I make it infinite?

查看:94
本文介绍了池仅执行一个线程,而不是4.如何使它成为无限线程?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我正在研究一个小的Python工具来对应用程序的API进行压力测试.

So I am working on a little Python tool to stress test an API of application.

我有一个使用Threading的很好的脚本,但是随后我读到它需要手动编码来维护n个并发线程(意味着,在旧线程完成后立即开始新线程),并在这里提出建议: 如何在旧线程完成后如何启动新线程? /a>是使用ThreadPool,我尝试如下:

I've got a pretty nice script using Threading, but then I read that it will require manual coding to maintain n number of concurrent threads (meaning, starting new ones as soon as old ones finish), and the suggestion here: How to start a new thread when old one finishes? is to use ThreadPool, I tried as follows:

def test_post():
    print "Executing in " + threading.currentThread().getName() + "\n"
    time.sleep(randint(1, 3))
    return randint(1, 5), "Message"


if args.send:
    code, content = post()
    print (code, "\n")
    print (content)
elif args.test:
    # Create new threads
    print threads
    results_list = []
    pool = ThreadPool(processes=threads)
    results = pool.apply_async(test_post())
    pool.close()  # Done adding tasks.
    pool.join()  # Wait for all tasks to complete.
    # results = list(pool.imap_unordered(
    #     test_post(), ()
    # ))
    # thread_list = []
    # while threading.activeCount() <= threads:
    #     thread = LoadTesting(threadID=free_threads, name="Thread-" + str(threading.activeCount()), counter=1)
    #     thread.start()
    #     thread_list.append(thread)
    print "Exiting Main Thread" + "\n"
else:
    print ("cant get here!")

调用脚本时,得到一致的输出,例如:

When I invoke the script, I get consistent output such as:

4

在MainThread中执行

Executing in MainThread

退出主线程

我不确定为什么.正如您在注释栏中所看到的,我尝试了不同的方法,但它仍然只执行一次.

I am not sure why.. as you see in commented out block I tried different ways and it still does it only once.

我的目标是使脚本循环运行,始终随时运行n个线程. test_post(分别是post)函数返回HTTP响应代码,其内容-当响应代码不是200 OK时,我想稍后使用它打印/停止.

My goal is to make the script run in loop, always running n threads at any time. the test_post (and respectively, post) functions return the HTTP response code, and the content - I would like to later use this to print/stop when response code is NOT 200 OK.

推荐答案

您的第一个问题是您已经在MainThread中通过调用了以下函数:

Your first problem is that you already called your function in the MainThread with calling:

pool.apply_async(test_post())

...而不是将test_post作为参数传递给要在工作线程中使用以下命令执行的调用:

...instead of passing test_post as an argument for a call to be executed in a worker-thread with:

pool.apply_async(test_post)


OP:我有一个使用Threading的漂亮脚本,但是随后我读到它需要手动编码来维护n个并发线程(这意味着,一旦旧线程完成,就开始新线程)...

OP: I've got a pretty nice script using Threading, but then I read that it will require manual coding to maintain n number of concurrent threads (meaning, starting new ones as soon as old ones finish) ...

您需要区分工作单元(作业,任务)和线程.首先使用池的全部目的是重新使用执行程序,无论是线程还是进程.实例化Pool时已经创建了工作程序,只要您不关闭Pool,所有初始线程都将保持活动状态.因此,您不必在意重新创建线程,只需在需要分配某些工作时就调用现有池的池方法.池将执行此作业(池方法调用)并从中创建任务.这些任务放在无限制的队列中.每当工人完成一项任务时,它将阻塞性地尝试从这样的inqueueget()进行新任务.

You need to distinguish between the unit of work (job, task) and a thread. The whole point of using a pool in the first place is re-using the executors, be it threads or processes. The workers are already created when a Pool is instantiated and as long as you don't close the Pool, all initial threads stay alive. So you don't care about recreating threads, you just call pool-methods of an existing pool as often as you have some work you want to distribute. Pool takes this jobs (a pool-method call) and creates tasks out of it. These tasks are put on an unbounded queue. Whenever a workers is finished with a task, it will blockingly try to get() a new task from such an inqueue.

OP:Pool只执行一个线程而不是4个线程...我尝试了不同的方法,但仍然只执行一次.

OP: Pool only executes a single thread instead of 4...I tried different ways and it still does it only once.

pool.apply_async(func, args=(), kwds={}, callback=None, error_callback=None)

...是一个单次调用,单个任务生成作业.如果要执行多个func,要么必须多次调用pool.apply_async(),要么使用类似

...is a single-call, single task producing job. In case you want more than one execution of func, you either have to call pool.apply_async() multiple times, or you use a mapping pool-method like

pool.map(func, iterable, chunksize=None)

...,它将一个函数映射到一个可迭代的对象上. pool.apply_async是非阻塞的,这就是为什么它是异步"的.它立即返回一个AsyncResult对象,您可以(阻塞地)调用.wait().get().

..., which maps one function over an iterable. pool.apply_async is non-blocking, that is why it is "async". It immediately returns an AsyncResult-object you can (blockingly) call .wait() or .get() upon.

通过注释,很明显,您希望无尽立即替换完成的任务(自行生成的输入流)...并且程序应停止运行KeyboardInterrupt或结果没有特定值时.

Through the comments it became clear, that you want endless and immediate replacements for finished tasks (self produced input-stream)...and the program should stop on KeyboardInterrupt or when a result does not have a certain value.

您可以使用apply_asynccallback参数在旧任务完成后立即安排新任务.困难在于如何同时使用MainThread来防止整个脚本过早结束,同时保持其对KeyboardInterrupt的响应.让MainThread处于循环睡眠状态,使其仍然可以立即对KeyboardInterrupt做出反应,同时防止提前退出.如果结果应停止该程序,则可以让回调终止该池.然后MainThread只需在其睡眠循环中包括对池状态的检查即可.

You can use the callback-parameter of apply_async to schedule new tasks as soon any of the old ones is finished. The difficulty lies in what to do meanwhile with the MainThread to prevent the whole script from ending prematurely while keeping it responsive for KeyboardInterrupt. Letting the MainThread sleep in a loop lets it still immediately react upon KeyboardInterrupt while preventing early exit. In case a result should stop the program, you can let the callback terminate the pool. The MainThread then just has to include a check of the pool-status in his sleep-loop.

import time
from random import randint, choice
from itertools import count
from datetime import datetime
from threading import current_thread
from multiprocessing.pool import ThreadPool


def test_post(post_id):
    time.sleep(randint(1, 3))
    status_code = choice([200] * 9 + [404])
    return "{} {} Message no.{}: {}".format(
        datetime.now(), current_thread().name, post_id, status_code
    ), status_code


def handle_result(result):
    msg, code = result
    print(msg)
    if code != 200:
        print("terminating")
        pool.terminate()
    else:
        pool.apply_async(
            test_post, args=(next(post_cnt),), callback=handle_result
        )


if __name__ == '__main__':

    N_WORKERS = 4

    post_cnt = count()

    pool = ThreadPool(N_WORKERS)

    # initial distribution
    for _ in range(N_WORKERS):
        pool.apply_async(
            test_post, args=(next(post_cnt),), callback=handle_result
        )

    try:
        while pool._state == 0:  # check if pool is still alive
            time.sleep(1)
    except KeyboardInterrupt:
        print(" got interrupt")

带有KeyboardInterrupt的示例输出:

Example Output with KeyboardInterrupt:

$> python2 scratch.py
2019-02-15 18:46:11.724203 Thread-4 Message no.3: 200
2019-02-15 18:46:12.724713 Thread-2 Message no.1: 200
2019-02-15 18:46:13.726107 Thread-1 Message no.0: 200
2019-02-15 18:46:13.726292 Thread-3 Message no.2: 200
2019-02-15 18:46:14.724537 Thread-4 Message no.4: 200
2019-02-15 18:46:14.726881 Thread-2 Message no.5: 200
2019-02-15 18:46:14.727071 Thread-1 Message no.6: 200
^C got interrupt

示例输出,由于不想要的返回值而终止:

Example Output with termination due to unwanted return value:

$> python2 scratch.py
2019-02-15 18:44:19.966387 Thread-3 Message no.0: 200
2019-02-15 18:44:19.966491 Thread-4 Message no.1: 200
2019-02-15 18:44:19.966582 Thread-1 Message no.3: 200
2019-02-15 18:44:20.967555 Thread-2 Message no.2: 200
2019-02-15 18:44:20.968562 Thread-3 Message no.4: 404
terminating

请注意,在您的方案中,您可以多次调用apply_async而不是N_WORKERS次,以使初始分发具有一些缓冲以减少延迟.

Note, in your scenario you can also call apply_async more often than N_WORKERS-times for your initial distribution to have some buffer for reduced latency.

这篇关于池仅执行一个线程,而不是4.如何使它成为无限线程?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆