Python多重处理,带参数的函数 [英] Python multiprocessing, functions with arguments

查看:77
本文介绍了Python多重处理,带参数的函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个程序可以模拟整个棒球赛季,但是每场比赛都要进行很多计算,因此每场比赛大约需要30秒.一个赛季有2430场比赛,该程序每个赛季大约需要运行20个小时.显然,我想加快速度,因此最直接的解决方案似乎是多处理.我可以将其手动分成约600个组,并运行四个进程,但我想弄清楚多处理模块的工作原理.

I have a program that simulates an entire baseball season, but does a lot of calculations per game, so each game takes around 30 seconds to run. With 2430 games in a season, the program takes about 20 hours to run, per season. Obviously I'd like to speed this up, so the most immediate solution seems like multiprocessing. I could manually split it up into groups of ~600 and run four processes, but I'd like to figure out how the multiprocessing module works.

这是我到目前为止尝试过的方法,但显然不起作用.

Here's what I've tried so far, but obviously it doesn't work.

def test_func():
    algorithm_selection = 1

    # Create sqlite database connection
    conn = sqlite3.connect('C:/F5 Prediction Engine/sqlite3/Version 2/statcast_db.db')
    c = conn.cursor()

    season = input('Year to simulate: ')
    c.execute('SELECT * FROM gamelogs_' + season)
    season_games = c.fetchall()

    game_num = 0
    for game in season_games:

        game_num = game_num + 1
        #Get away lineup in terms of MLB IDs
        away_lineup = ConvertLineup(game[105], game[108], game[111], game[114], game[117], game[120], game[123], game[126], game[129])
        #Get home lineup in terms of MLB IDs
        home_lineup = ConvertLineup(game[132], game[135], game[138], game[141], game[144], game[147], game[150], game[153], game[156])
        #Get away starting pitcher and hand in terms of MLB ID
        away_pitcher_results = GetPitcherIDandHand(game[101])
        away_pitcher_id = away_pitcher_results[0][0]
        away_pitcher_hand = away_pitcher_results[0][1]
        #Get home starting pitcher and hand in terms of MLB ID
        home_pitcher_results = GetPitcherIDandHand(game[103])
        home_pitcher_id = home_pitcher_results[0][0]
        home_pitcher_hand = home_pitcher_results[0][1]
        #Get the date of the game
        today_date = game[0]

        if algorithm_selection == 1:
            #Check if the current game has already been evaluated and entered into the database
            c.execute('SELECT * FROM pemstein_results_' + season + ' WHERE date = "' + game[0] + '" AND away_team = "' + game[3] + '" AND home_team = "' + game[6] + \
                  '" AND away_team_score = "' + game[9] + '" AND home_team_score = "' + game[10] + '"')
            check_results = c.fetchall()
            if len(check_results) == 0:
                exp_slgs = PemsteinSimulation(home_pitcher_id, away_pitcher_id, season, home_pitcher_hand, away_pitcher_hand, home_lineup, away_lineup, game[0])
                if exp_slgs[2] == 0: #if both pitches had at least 300 PAs to use for simulation
                    c.execute([long string to insert results into database])
                conn.commit()
                print('Game ' + str(game_num) + ' finished.')
                if exp_slgs[2] == 1: #if one of the pitches did not have enough PAs to qualify
                    c.execute([long string to insert results into database])
                    conn.commit()
                    print('Game ' + str(game_num) + ' finished.')
            if len(check_results) > 0:
                print('Game ' + str(game_num) + ' has already been evaluated.')

from multiprocessing import Process
import os

processes = []

for i in range(0, os.cpu_count()):
    print('Registering process %d' % i)
    processes.append(Process(target=test))

for process in processes:
    process.start()

for process in processes:
    process.join()

==================

==================

#Child Process
def simulate_games(games_list, counter, lock):
    while(1):
        # Create sqlite database connection
        conn = sqlite3.connect('C:/F5 Prediction Engine/sqlite3/Version 2/statcast_db.db')
        c = conn.cursor()

        #acquire the lock which grants access to the shared variable
        with lock:

            #check the termination condition
            if counter >= len(games_list):
                break

            #get the game_num and game to simulate
            game_num = counter.value
            game_to_simulate = game_list[counter.value]

            #update the counter for the next process
            counter.value += 1

        #Do simulation
        game_num = 0


        game_num = game_num + 1
        #Get away lineup in terms of MLB IDs
        away_lineup = ConvertLineup(game_to_simulate[105], game_to_simulate[108], game_to_simulate[111], game_to_simulate[114], game_to_simulate[117], game_to_simulate[120], game_to_simulate[123], game_to_simulate[126], game_to_simulate[129])
        #Get home lineup in terms of MLB IDs
        home_lineup = ConvertLineup(game_to_simulate[132], game_to_simulate[135], game_to_simulate[138], game_to_simulate[141], game_to_simulate[144], game_to_simulate[147], game_to_simulate[150], game_to_simulate[153], game_to_simulate[156])
        #Get away starting pitcher and hand in terms of MLB ID
        away_pitcher_results = GetPitcherIDandHand(game[101])
        away_pitcher_id = away_pitcher_results[0][0]
        away_pitcher_hand = away_pitcher_results[0][1]
        #Get home starting pitcher and hand in terms of MLB ID
        home_pitcher_results = GetPitcherIDandHand(game[103])
        home_pitcher_id = home_pitcher_results[0][0]
        home_pitcher_hand = home_pitcher_results[0][1]
        #Get the date of the game
        today_date = game_to_simulate[0]
        if algorithm_selection == 1:
            #Check if the current game has already been evaluated and entered into the database
            c.execute('SELECT * FROM pemstein_results_' + season + ' WHERE date = "' + game_to_simulate[0] + '" AND away_team = "' + game_to_simulate[3] + '" AND home_team = "' + game_to_simulate[6] + \
                      '" AND away_team_score = "' + game_to_simulate[9] + '" AND home_team_score = "' + game_to_simulate[10] + '"')
            check_results = c.fetchall()
            if len(check_results) == 0:
                exp_slgs = PemsteinSimulation(home_pitcher_id, away_pitcher_id, season, home_pitcher_hand, away_pitcher_hand, home_lineup, away_lineup, game_to_simulate[0])
                if exp_slgs[2] == 0: #if both pitches had at least 300 PAs to use for simulation
                    c.execute('long sql')
                    conn.commit()
                    print('Game ' + str(game_num) + ' finished.')
                if exp_slgs[2] == 1: #if one of the pitches did not have enough PAs to qualify
                    c.execute('long sql')
                    conn.commit()
                    print('Game ' + str(game_num) + ' finished.')
            if len(check_results) > 0:
                print('Game ' + str(game_num) + ' has already been evaluated.')


if __name__ == "__main__":
    # Create sqlite database connection
    conn = sqlite3.connect('C:/F5 Prediction Engine/sqlite3/Version 2/statcast_db.db')
    c = conn.cursor()

    #Query all games for season to be simulated
    season = int(input('Year to simulate: '))
    c.execute('SELECT * FROM gamelogs_' + str(season))
    season_games = c.fetchall()

    algorithmSelection = 1 

    if algorithmSelection == 1:
        PemsteinSQLresults(str(season))

    counter = mp.Value('i', 0)
    lock = mp.Lock()
    children = []
    for i in range(os.cpu_count()):
        children.append(mp.Process(target=simulate_games, args=(season_games, counter, lock)))

    for child in children:
        child.start()

    for child in children:
        child.join()

错误:

Traceback (most recent call last):
  File "C:\F5 Prediction Engine\Version 2\SimulateSeason v2.py", line 126, in <module>
    child.start()
  File "C:\Python\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\Python\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Python\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Python\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Python\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
BrokenPipeError: [Errno 32] Broken pipe

=============

所以我去此网站进行了一些检查,并尝试了新的方法脚本,其中包含我从网站复制的以下代码:

=============

So I went to this website to review some things, and tried a new script with the following code that I copied from the site:

import mp

def worker(num):
    """thread worker function"""
    print('Worker:' + num)
    return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = mp.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

但是它什么也没做.该网站说应该打印Worker:0 Worker:1等,但是我没有任何打印.我的机器上本地可能有问题吗?

But it likewise doesn't do anything. The site says it should print Worker:0 Worker:1 etc, but I'm getting no prints. Is it possible there's something wrong locally on my machine?

推荐答案

在我看来,您只是尝试为每个cpu实例化一个新进程,并使它们运行与您最初编写的函数相同的函数,但是如果您想要使用流程,则必须对其进行调整并处理流程同步化.

It seems to me that you have simply tried to instantiate a new process for each cpu and had them run the same function that you wrote at first, however if you want to work with processes you would have to adapt it and handle process synchonization.

作为示例,您可以有一个主进程,该主进程提示用户输入本赛季的年份,获取该年份的所有游戏,然后子进程将从结果数组中读取.请参见以下示例:

As an example you could have a master process which prompts the user for the season year, fetches all the games for that year and then the child processes would read from the resulting array. See the following example:

# Parent Process
import multiprocessing as mp

# establish db connection [ ... ]

season = int(input("Year to simulate: "))
c.execute('SELECT * FROM gamelogs_' + season)
season_games = c.fetchall()

counter = mp.Value("i", 0)
lock = mp.Lock()
children = []
for i in range(os.cpu_count()):
    children.append(mp.Process(target=simulate_games, args=(season_games, counter, lock,)))

for child in children:
    child.start()

for child in children:
    child.join()

# Child Process
def simulate_games(games_list, counter, lock):
    while(1):
        # acquire the lock which grants the access to the shared variable
        with lock:

            # check the termination condition
            if counter.value >= len(games_list):
                break

            # get the game_num and the game to simulate
            game_num = counter.value
            game_to_simulate = games_list[counter.value]

            # update counter for the next process
            counter.value += 1

        #  Do simulation here

我们在上面是一个父进程,它基本上是在准备一些数据并创建新的子进程.

What we have above is a parent process which is basically preparing some data and creating new child processes.

该计数器是通过特殊类实现的,即锁定基本上是互斥体,我们用来同步对计数器变量的访问并避免并发访问:请注意,您可以使用在计数器共享变量内部自动创建的Lock,但我认为将二者分开会更容易理解.

The counter is implemented by means of a special class, i.e Value, which is used for sharing scalar values among processes; Lock is basically a mutex, which we use to synchronize the access to the counter variable and avoid concurrent access: note that you could have used the Lock which is automatically created inside of the counter shared variable, but I thought it would be easier to understand by separating the two.

孩子们首先获取锁,读取计数器值并将其递增,然后继续其正常行为,从而模拟游戏

The children processes by first acquiring the lock, read the counter value and increment it, then proceed to their normal behavior, thus simulating the games

这篇关于Python多重处理,带参数的函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆