多处理/线程:数据附加和输出回报 [英] multiprocessing/threading: data appending & output return

查看:95
本文介绍了多处理/线程:数据附加和输出回报的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我下面有一个名为 run 的冗长函数,其中包含一些附加数据的实例。

I have a lengthy function called run below that contains a few instances of appending data.

from multiprocessing import Process

data = []

def run():
    global data
    ...
    data.append(trace)
    ...

if __name__ == '__main__':
    jobs = []

    gen_count = 0
    leaked_count = 0
    system_count = 0

    N = 100

    for i in range(N):
        p = Process(target=run)
        jobs.append(p)
        p.start()

但是,使用多重处理不会添加任何数据。此外,函数 run 返回几个需要添加到 gen_count leaked_count的值。 system_count ,但我不确定如何检索这些值。我之所以选择多处理,是因为在 for 循环中运行 run 速度很慢,并且每次迭代都独立于其余部分。我想稍后对那些对此有任何想法的人在此代码中加入GPU加速。

However, using multiprocessing no data is appended. In addition, the function run returns several values that need to be added to gen_count, leaked_count, and system_count and I am not sure how to retrieve those values. I chose multiprocessing because running run in a for-loop is slow and each iteration was independent of the rest. I would like to incorporate GPU acceleration in this code later for anyone that has any ideas on that.

所以我的问题是:


  1. 我应该使用多处理而不是线程处理吗?

  2. 为什么 trace 不会附加到数据

  3. 我如何检索 run 在多处理程序段中?

  1. Should I even be using multiprocessing as opposed to threading?
  2. Why is trace not being appended to data?
  3. How can I retrieve the output of run within the multiprocessing block?

编辑:

from plotly.offline import init_notebook_mode
import plotly.graph_objs as go
import plotly as py
import time
import Cross_Section_Loading
from multiprocess import Process, Pool, Queue, Manager, cpu_count
from functools import partial
import numpy as np
init_notebook_mode(connected=True)

...

def particle_func(x, y, z):

    leaked = 0
    nu = 0

    # get initial direction
    theta = np.random.uniform(0, np.pi, 1)
    phi = np.random.uniform(0, 2 * np.pi, 1)

    # compute energy via rejection sampling
    expfiss = lambda e: 0.453 * np.exp(-1.036 * e / 1.0e6) * np.sinh(np.sqrt(2.29 * e / 1.0e6))

    min_eng = np.min(E)
    max_eng = np.max(E)
    max_prob = np.max(expfiss(E))

    rejected = 1
    while rejected:
        a = np.random.uniform(min_eng, max_eng, 1)
        b = np.random.uniform(0, max_prob, 1)
        rel_prob = expfiss(a)
        if b <= rel_prob:
            energy = a
            rejected = 0

    alive = 1

    # vector to keep track of positions
    xvec = np.ones(1) * x
    yvec = np.ones(1) * y
    zvec = np.ones(1) * z

    while alive:
        # Get real/new cross-sections for corresponding energy
        index = energy_lookup(E, energy)

        interacted = 0
        total_distance = 0
        # Interacted may still be alive (scattering)
        while interacted == 0:

            ###################################################
            # Determine starting location for sample distance using sigma_total
            material_start = material_type(x, y)

            if material_start == 1:
                sig_tot = sigma_total_fuel(ENRICHMENT_1)[index]
            elif material_start == 2:
                sig_tot = sigma_total_fuel(ENRICHMENT_2)[index]
            elif material_start == 3:
                sig_tot = sigma_total_fuel(ENRICHMENT_3)[index]
            else:
                sig_tot = sigma_total_mod[index]

            ###################################################

            if material_start == 1 or material_start == 2 or material_start == 3:  # if in fuel pin

                # Get distance to edge of fuel rod (from fuel)
                d = distance_to_edge(x, y, phi)

                # get sample distance to collision
                s = -np.log(1.0 - np.random.random(1)) / sig_tot

                # Incidence on interface (denoted by code "no-interface")
                if d != 'no-interface':

                    # Sample distance is greater than interface distance (does not account for material change)
                    # Must convert between 2D and 3D
                    if s * np.sin(theta) > d:
                        total_distance += d / np.sin(theta)

                    # Sample distance is correct and interaction occurs
                    else:
                        total_distance += s
                        interacted = 1

                # Statement may be redundant but idk how to handle return from distance_to_rod
                else:
                    total_distance += s
                    interacted = 1

            else:               # if in moderator
                # get distance to edge of fuel rod (from moderator)
                d = distance_to_edge(x, y, phi)

                # get distance to collision
                s = -np.log(1.0 - np.random.random(1)) / sig_tot

                # Incidence on interface (denoted by code "no-interface")
                if d != 'no-interface':

                    # Sample distance is greater than interface distance (does not account for material change)
                    # Must convert between 2D and 3D
                    if s * np.sin(theta) > d:
                        total_distance += d / np.sin(theta)  # <- Right conversion?

                    # Sample distance is correct and interaction occurs
                    else:
                        total_distance += s
                        interacted = 1

                # Statement may be redundant but idk how to handle return from distance_to_rod
                else:
                    total_distance += s
                    interacted = 1

            # move particle
            z += total_distance * np.cos(theta)
            y += total_distance * np.sin(theta) * np.sin(phi)
            x += total_distance * np.sin(theta) * np.cos(phi)

        # material_end = material_type(x, y)
        #
        # if material_start != material_end:
        #     print("Neutron has crossed material interface(s)")

        # Trace/Track particle movement
        xvec = np.append(xvec, x)
        yvec = np.append(yvec, y)
        zvec = np.append(zvec, z)

        ###################################################

        # Leakage
        if x > X_BOUNDARY or x < -X_BOUNDARY:
            # Still need implementation
            leaked = 1
            alive = 0

        if y > Y_BOUNDARY or y < -Y_BOUNDARY:
            # Still need implementation
            leaked = 1
            alive = 0

        if z > HEIGHT or z < 0:
            # Still need implementation
            leaked = 1
            alive = 0

        ###################################################

        # Determine Type of interaction based on energy and corresponding cross-sections
        # In fuel
        material = material_type(x, y)
        if material == 1:
            sig_scat_temp = sigma_scatter_fuel(ENRICHMENT_1)[index]
            sig_fiss_temp = sigma_fission_fuel(ENRICHMENT_1)[index]
            sig_tot_temp = sigma_total_fuel(ENRICHMENT_1)[index]
            nu_temp = nu_fuel(ENRICHMENT_1)[index]

            # scatter or absorb
            if np.random.random(1) < sig_scat_temp / sig_tot_temp:

                # scatter, pick new angles & energy
                theta = np.random.uniform(0, np.pi, 1)
                phi = np.random.uniform(0, 2 * np.pi, 1)
                energy = np.random.uniform(alpha_fuel * energy, energy, 1)

            elif np.random.random(1) < sig_fiss_temp / sig_tot_temp:

                # Determine number of neutrons produced from fission
                # round or int or both?
                nu = int(round(nu_temp))
                alive = 0

            else:
                # absorbed
                alive = 0

        #############################

        elif material == 2:
            sig_scat_temp = sigma_scatter_fuel(ENRICHMENT_2)[index]
            sig_fiss_temp = sigma_fission_fuel(ENRICHMENT_2)[index]
            sig_tot_temp = sigma_total_fuel(ENRICHMENT_2)[index]
            nu_temp = nu_fuel(ENRICHMENT_2)[index]

            # scatter or absorb
            if np.random.random(1) < sig_scat_temp / sig_tot_temp:

                # scatter, pick new angles & energy
                theta = np.random.uniform(0, np.pi, 1)
                phi = np.random.uniform(0, 2 * np.pi, 1)
                energy = np.random.uniform(alpha_fuel * energy, energy, 1)

            elif np.random.random(1) < sig_fiss_temp / sig_tot_temp:

                # Determine number of neutrons produced from fission
                # round or int or both?
                nu = int(round(nu_temp))
                alive = 0

            else:
                # absorbed
                alive = 0

        #############################

        if material == 3:
            sig_scat_temp = sigma_scatter_fuel(ENRICHMENT_3)[index]
            sig_fiss_temp = sigma_fission_fuel(ENRICHMENT_3)[index]
            sig_tot_temp = sigma_total_fuel(ENRICHMENT_3)[index]
            nu_temp = nu_fuel(ENRICHMENT_3)[index]

            # scatter or absorb
            if np.random.random(1) < sig_scat_temp / sig_tot_temp:

                # scatter, pick new angles & energy
                theta = np.random.uniform(0, np.pi, 1)
                phi = np.random.uniform(0, 2 * np.pi, 1)
                energy = np.random.uniform(alpha_fuel * energy, energy, 1)

            elif np.random.random(1) < sig_fiss_temp / sig_tot_temp:

                # Determine number of neutrons produced from fission
                # round or int or both?
                nu = int(round(nu_temp))
                alive = 0

            else:
                # absorbed
                alive = 0

        #############################

        # In water
        else:
            mod_scat = sigma_scatter_mod[index]
            mod_tot = sigma_total_mod[index]

            # scatter or absorb
            if np.random.random(1) < mod_scat / mod_tot:

                # scatter, pick new angles & energy
                theta = np.random.uniform(0, np.pi, 1)
                phi = np.random.uniform(0, 2 * np.pi, 1)
                energy = np.random.uniform(alpha_mod * energy, energy, 1)

            else:
                # absorbed
                alive = 0

        ###################################################

    return xvec, yvec, zvec, nu, leaked


##################################################################


def run(data_test):

    ###################################################

    # Uniformly Dispersed Source (Cylinder)
    # x = np.random.uniform(-X_BOUNDARY, X_BOUNDARY, 1)
    # y = np.random.uniform(-Y_BOUNDARY, Y_BOUNDARY, 1)
    # z = np.random.uniform(-HEIGHT, HEIGHT, 1)

    # Uniformly Dispersed FUEL Source (Cylinder)
    rejected = 1
    while rejected:
        x = np.random.uniform(-X_BOUNDARY, X_BOUNDARY, 1)
        y = np.random.uniform(-Y_BOUNDARY, Y_BOUNDARY, 1)
        z = np.random.uniform(-HEIGHT, HEIGHT, 1)
        if material_type(x, y):
            rejected = 0

    ###################################################

    # Get normal particle info (trace)
    x_vec, y_vec, z_vec, nu, leaked = particle_func(x, y, z)
    leaked_count = leaked
    gen_count = nu
    system_count = (1 + nu - leaked)

    # particle_trace = go.Scatter3d(
    #     x=x_vec,
    #     y=y_vec,
    #     z=z_vec,
    #     mode='lines',
    #     line=dict(color='rgb(173, 255, 47)')
    # )
    #
    # data_test.append(particle_trace)

    data_test.append((x_vec, y_vec, z_vec))

    ###################################################

    nu_vec = [nu]
    x_vecs = [x_vec]
    y_vecs = [y_vec]
    z_vecs = [z_vec]

    if nu > 0:
        print("{} neutrons generated for neutron {}".format(nu, i))

    else:
        print("No neutrons generated for neutron {}".format(i + 1))

    t = 0
    recent_nus = nu_vec
    while np.any(recent_nus) != 0:

        print(nu_vec[-t:])

        tracker = 0

        nu_vec_temp = []

        x_vecs_temp = []
        y_vecs_temp = []
        z_vecs_temp = []

        for a in range(len(nu_vec[-t:])):

            x = x_vecs[-(a + 1)][-1]
            y = y_vecs[-(a + 1)][-1]
            z = z_vecs[-(a + 1)][-1]

            for j in range(nu_vec[-(a + 1)]):
                x_vec, y_vec, z_vec, nu, leaked = particle_func(x, y, z)
                leaked_count += leaked

                print("Particle {} starting coords:".format(j + 1), x_vec[0], y_vec[0], z_vec[0])
                print("Particle {} ending coords:".format(j + 1), x_vec[-1], y_vec[-1], z_vec[-1])
                print("Particle {} nu value".format(j + 1), nu)

                nu_vec_temp.append(nu)
                tracker += 1

                x_vecs_temp.append(x_vec)
                y_vecs_temp.append(y_vec)
                z_vecs_temp.append(z_vec)

                # time.sleep(1)

                # particle_trace = go.Scatter3d(
                #     x=x_vec,
                #     y=y_vec,
                #     z=z_vec,
                #     mode='lines',
                #     line=dict(color='rgb(255, 0, 0)')
                # )

                # data_test.append(particle_trace)
                data_test.append((x_vec, y_vec, z_vec))

            print()
            t = tracker

        nu_vec.extend(nu_vec_temp)
        x_vecs.extend(x_vecs_temp)
        y_vecs.extend(y_vecs_temp)
        z_vecs.extend(z_vecs_temp)

        recent_nus = nu_vec_temp

        print("Continuing fission:", (np.any(recent_nus) != 0))

    return leaked_count, gen_count, system_count

##################################################################

if __name__ == '__main__':
    jobs = []

    manager = Manager()
    list_ = manager.list()
    for _ in range(cpu_count() - 1):
        p = Process(target=run, args=(list_,))
        jobs.append(p)
        p.start()
        p.join()
    while True:  # stops main thread from completing execution
        time.sleep(5)  # wait 5 second before checking if processes are terminated
        if all([not x.is_alive() for x in jobs]):  # check if all processes terminated
            break  # breaks the loop

# print("\nTotal number of neutrons in system:", system_count)
# print("Total number of neutrons generated from {} neutron source: {}".format(N, gen_count))
# print("System Multiplication factor:", gen_count/N)
# print("Total number of leaked neutrons:", leaked_count)


layout = go.Layout(
    title='Monte Carlo Assembly',
    autosize=True,
    showlegend=False,
    height=1000,
    width=1000,
    scene=dict(zaxis=dict(range=[-1, HEIGHT + 1]),
               yaxis=dict(range=[-(Y_DIM * PITCH + 5), (Y_DIM * PITCH + 5)]),
               xaxis=dict(range=[-(X_DIM * PITCH + 5), (X_DIM * PITCH + 5)])
               ),
)

fig = go.Figure(data=data, layout=layout)
py.offline.plot(fig, filename='file.html')

输出&错误消息:

Output & Error Message:

/Users/sterlingbutters/anaconda/bin/python "/Users/sterlingbutters/PycharmProjects/Monte Carlo Simulation/MC Plotly (Cylindrical Assembly) Reflector.py"
For 17 x 17 assembly, 9 x 9 is needed. Your shape: (9, 9)
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
[(array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757,  9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294]))]
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x114ea8488>
Traceback (most recent call last):
  File "/Users/sterlingbutters/anaconda/lib/python3.5/weakref.py", line 117, in remove
TypeError: 'NoneType' object is not callable
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x114ea8488>
Traceback (most recent call last):
  File "/Users/sterlingbutters/anaconda/lib/python3.5/weakref.py", line 117, in remove
TypeError: 'NoneType' object is not callable
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x114ea8488>
Traceback (most recent call last):
  File "/Users/sterlingbutters/anaconda/lib/python3.5/weakref.py", line 117, in remove
TypeError: 'NoneType' object is not callable

Process finished with exit code 0


推荐答案

多重处理从当前环境中生成具有自己的全局变量副本的另一个Process。在该过程中对变量所做的所有更改都不会反映在父过程中。您需要在进程之间共享内存,并且共享内存中的变量可以交换。

Multiprocessing spawns a different Process with it's own global variables copies from current environment. All the changes in variable made in that process does not reflect in parent process. You need to share memory between the process and variables in shared memory can be exchanged.

您可以使用 multiprocessing.Manager 创建一个共享对象,如列表或字典,并操作该对象。

You can use multiprocessing.Manager to create a shared object like list or dictionary, and manipulate that object.

进程被分配给处理器的不同内核/线程。如果您拥有4核/ 8线程系统,最多可产生7个进程以最大限度地提高性能,但超出此数量的进程将干扰其他进程,并减慢/减少分配给操作系统的cpu时间,这可能会导致系统崩溃。始终使用cpu核心/ cpu线程-1进程来稳定处理,而让至少一个核心可以由os处理其他操作。

Processes are assigned to different cores/thread of your processor. If you have a 4 core/8 thread system, spawn a maximum of 7 processes to maximize performance, any more than that some processes will interfere with other processes and can slow down/reduce the cpu time allotted to your os which can crash your system. It's always the cpu cores/cpu threads -1 processes for stable processing leaving atleast one core to os to handle other operations.

您可以像这样修改代码

from multiprocessing import Process, Manager
import time

def run(list_):
    list_.append(trace)

if __name__ == "__main__":
    jobs = []
    gen_count = 0
    leaked_count = 0
    system_count = 0

    with Manager() as manager:
        list_ = manager.list()
        for _ in range(multiprocessing.cpu_count()-1):
            p = Process(target=run,args=(list_))
            jobs.append(p)
            p.start()
        while True: #stops main thread from completing execution
            time.sleep(5) #wait 5 second before checking if processes are terminated
            if all([not x.is_alive() for x in jobs]): #check if all processes terminated
                break #breaks the loop 

这篇关于多处理/线程:数据附加和输出回报的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆