具有共享变量(值)的Python多处理Pool.apply_async [英] Python multiprocessing Pool.apply_async with shared variables (Value)

查看:142
本文介绍了具有共享变量(值)的Python多处理Pool.apply_async的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于我的大学项目,我正在尝试开发一个基于python的流量生成器.我在vmware上创建了2台CentOS计算机,我将1台用作客户端,将1台用作服务器计算机.我使用了 IP别名仅使用一台客户端/服务器计算机即可增加客户端和服务器数量的技术.到目前为止,我已经在客户端计算机上创建了50个IP别名,并在服务器计算机上创建了10个IP别名.我还使用多处理模块来同时生成从所有50个客户端到所有10个服务器的流量.我还在服务器上(由于使用Apache服务器,所以在/var/www/html目录中)开发了一些配置文件(1kb,10kb,50kb,100kb,500kb,1mb),并且我使用urllib2将请求从发送给这些配置文件我的客户端计算机.我正在使用 httplib + urllib2 首先绑定到任何一个源别名ip,然后使用urllib2从该ip发送请求.在这里增加我的TCP连接数,我正在尝试使用multiprocessing.Pool.apply_async模块.但是我在运行脚本时收到此错误"RuntimeError:同步的对象仅应通过继承在进程之间共享".经过一些调试后,我发现此错误是由于使用multiprocessing.Value引起的.但是我想在进程之间共享一些变量,并且我还想增加TCP连接的数量.在这里可以使用什么其他模块(multiprocessing.Value除外)来共享一些公共变量?否则此查询还有其他解决方案吗?

For my college project I am trying to develop a python based traffic generator.I have created 2 CentOS machines on vmware and I am using 1 as my client and 1 as my server machine. I have used IP aliasing technique to increase number of clients and severs using just single client/server machine. Upto now I have created 50 IP alias on my client machine and 10 IP alias on my server machine. I am also using multiprocessing module to generate traffic concurrently from all 50 clients to all 10 servers. I have also developed few profiles(1kb,10kb,50kb,100kb,500kb,1mb) on my server(in /var/www/html directory since I am using Apache Server) and I am using urllib2 to send request to these profiles from my client machine. I am using httplib+urllib2 to first bind to any one of the source alias ip and then send request from this ip using urllib2. Here to increase my number of TCP Connections, I am trying to use multiprocessing.Pool.apply_async module. But I am getting this error 'RuntimeError: Synchronized objects should only be shared between processes through inheritance' while running my scripts. After a bit of debugging I found that this error is caused due to use of multiprocessing.Value. But I want to share some variables between my processes and I also want to increase my number of TCP Connections. What other module(other than multiprocessing.Value) can be used here to share some common variables? Or else is there any other solution for this query?

'''
Traffic Generator Script:

 Here I have used IP Aliasing to create multiple clients on single vm machine. 
 Same I have done on server side to create multiple servers. I have around 50 clients and 10 servers
'''
import multiprocessing
import urllib2
import random
import myurllist    #list of all destination urls for all 10 servers
import time
import socbindtry   #script that binds various virtual/aliased client ips to the script
m=multiprocessing.Manager()
response_time=m.list()    #some shared variables
error_count=multiprocessing.Value('i',0)
def send_request3():    #function to send requests from alias client ip 1
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3)    #bind to alias client ip1
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        error_count.value=error_count.value+1
def send_request4():    #function to send requests from alias client ip 2
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4)    #bind to alias client ip2
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        error_count.value=error_count.value+1
#50 such functions are defined here for 50 clients
def func():
    pool=multiprocessing.Pool(processes=750)
    for i in range(5):
        pool.apply_async(send_request3)
        pool.apply_async(send_request4)
        pool.apply_async(send_request5)
#append 50 functions here
    pool.close()
    pool.join()
    print"All work Done..!!"
    return
start=float(time.time())
func()
end=float(time.time())-start
print end

推荐答案

错误消息指出,您不能通过泡菜传递multiprocessing.Value.但是,您可以使用 multiprocessing.Manager().Value :

As the error messages states, you can't pass a multiprocessing.Value via pickle. However, you can use a multiprocessing.Manager().Value:

import multiprocessing
import urllib2
import random
import myurllist    #list of all destination urls for all 10 servers
import time
import socbindtry   #script that binds various virtual/aliased client ips to the script

def send_request3(response_time, error_count):    #function to send requests from alias client ip 1
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3)    #bind to alias client ip1
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        with error_count.get_lock():
            error_count.value += 1

def send_request4(response_time, error_count):    #function to send requests from alias client ip 2
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4)    #bind to alias client ip2
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        with error_count.get_lock():
            error_count.value += 1

#50 such functions are defined here for 50 clients

def func(response_time, error_count):
    pool=multiprocessing.Pool(processes=2*multiprocessing.cpu_count())
    args = (response_time, error_count)
    for i in range(5):
        pool.apply_async(send_request3, args=args)
        pool.apply_async(send_request4, args=args)
#append 50 functions here
    pool.close()
    pool.join()
    print"All work Done..!!"
    return

if __name__ == "__main__":
    m=multiprocessing.Manager()
    response_time=m.list()    #some shared variables
    error_count=m.Value('i',0)

    start=float(time.time())
    func(response_time, error_count)
    end=float(time.time())-start
    print end

这里还有一些其他说明:

A few other notes here:

  1. 使用具有750个进程的Pool并不是一个好主意.除非您使用具有数百个CPU内核的服务器,否则这将使您的计算机不堪重负.这样可以更快,更省力地使用更少的进程. 2 * multiprocessing.cpu_count()之类的东西.
  2. 作为最佳实践,您应该将需要使用的所有共享参数显式传递给子进程,而不要使用全局变量.这增加了在Windows上运行代码的机会.
  3. 看起来您的所有send_request*函数几乎都做同样的事情.为什么不只执行一个函数并使用变量来确定要使用哪个socbindtry.BindableHTTPHandler?这样可以避免 ton 重复代码.
  4. 递增error_count的方式不是进程/线程安全的,并且容易受到竞争条件的影响.您需要使用锁来保护增量(就像我在上面的示例代码中所做的那样).
  1. Using a Pool with 750 processes is not a good idea. Unless you're using a server with hundreds of CPU cores, that's going to overwhelm your machine. It'd be faster and put less strain on your machine to use significantly fewer processes. Something more like 2 * multiprocessing.cpu_count().
  2. As a best practice, you should explicitly pass all the shared arguments you need to use to the child processes, rather than using global variables. This increases the chances that the code will be work on Windows.
  3. It looks like all your send_request* functions do almost the exact same thing. Why not just make one function and use a variable to decide which socbindtry.BindableHTTPHandler to use? You would avoid a ton of code duplication by doing this.
  4. The way you're incrementing error_count is not process/thread-safe, and is susceptible to race conditions. You need to protect the increment with a lock (as I did in the example code above).

这篇关于具有共享变量(值)的Python多处理Pool.apply_async的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆