多处理/线程:数据附加和输出回报 [英] multiprocessing/threading: data appending & output return
问题描述
我下面有一个名为 run
的冗长函数,其中包含一些附加数据的实例。
I have a lengthy function called run
below that contains a few instances of appending data.
from multiprocessing import Process
data = []
def run():
global data
...
data.append(trace)
...
if __name__ == '__main__':
jobs = []
gen_count = 0
leaked_count = 0
system_count = 0
N = 100
for i in range(N):
p = Process(target=run)
jobs.append(p)
p.start()
但是,使用多重处理不会添加任何数据。此外,函数 run
返回几个需要添加到 gen_count
, leaked_count的值。
和 system_count
,但我不确定如何检索这些值。我之所以选择多处理,是因为在 for
循环中运行 run
速度很慢,并且每次迭代都独立于其余部分。我想稍后对那些对此有任何想法的人在此代码中加入GPU加速。
However, using multiprocessing no data is appended. In addition, the function run
returns several values that need to be added to gen_count
, leaked_count
, and system_count
and I am not sure how to retrieve those values. I chose multiprocessing because running run
in a for
-loop is slow and each iteration was independent of the rest. I would like to incorporate GPU acceleration in this code later for anyone that has any ideas on that.
所以我的问题是:
- 我应该使用多处理而不是线程处理吗?
- 为什么
trace
不会附加到数据
? - 我如何检索
run $ c的输出$ c>在多处理程序段中?
- Should I even be using multiprocessing as opposed to threading?
- Why is
trace
not being appended todata
? - How can I retrieve the output of
run
within the multiprocessing block?
编辑:
from plotly.offline import init_notebook_mode
import plotly.graph_objs as go
import plotly as py
import time
import Cross_Section_Loading
from multiprocess import Process, Pool, Queue, Manager, cpu_count
from functools import partial
import numpy as np
init_notebook_mode(connected=True)
...
def particle_func(x, y, z):
leaked = 0
nu = 0
# get initial direction
theta = np.random.uniform(0, np.pi, 1)
phi = np.random.uniform(0, 2 * np.pi, 1)
# compute energy via rejection sampling
expfiss = lambda e: 0.453 * np.exp(-1.036 * e / 1.0e6) * np.sinh(np.sqrt(2.29 * e / 1.0e6))
min_eng = np.min(E)
max_eng = np.max(E)
max_prob = np.max(expfiss(E))
rejected = 1
while rejected:
a = np.random.uniform(min_eng, max_eng, 1)
b = np.random.uniform(0, max_prob, 1)
rel_prob = expfiss(a)
if b <= rel_prob:
energy = a
rejected = 0
alive = 1
# vector to keep track of positions
xvec = np.ones(1) * x
yvec = np.ones(1) * y
zvec = np.ones(1) * z
while alive:
# Get real/new cross-sections for corresponding energy
index = energy_lookup(E, energy)
interacted = 0
total_distance = 0
# Interacted may still be alive (scattering)
while interacted == 0:
###################################################
# Determine starting location for sample distance using sigma_total
material_start = material_type(x, y)
if material_start == 1:
sig_tot = sigma_total_fuel(ENRICHMENT_1)[index]
elif material_start == 2:
sig_tot = sigma_total_fuel(ENRICHMENT_2)[index]
elif material_start == 3:
sig_tot = sigma_total_fuel(ENRICHMENT_3)[index]
else:
sig_tot = sigma_total_mod[index]
###################################################
if material_start == 1 or material_start == 2 or material_start == 3: # if in fuel pin
# Get distance to edge of fuel rod (from fuel)
d = distance_to_edge(x, y, phi)
# get sample distance to collision
s = -np.log(1.0 - np.random.random(1)) / sig_tot
# Incidence on interface (denoted by code "no-interface")
if d != 'no-interface':
# Sample distance is greater than interface distance (does not account for material change)
# Must convert between 2D and 3D
if s * np.sin(theta) > d:
total_distance += d / np.sin(theta)
# Sample distance is correct and interaction occurs
else:
total_distance += s
interacted = 1
# Statement may be redundant but idk how to handle return from distance_to_rod
else:
total_distance += s
interacted = 1
else: # if in moderator
# get distance to edge of fuel rod (from moderator)
d = distance_to_edge(x, y, phi)
# get distance to collision
s = -np.log(1.0 - np.random.random(1)) / sig_tot
# Incidence on interface (denoted by code "no-interface")
if d != 'no-interface':
# Sample distance is greater than interface distance (does not account for material change)
# Must convert between 2D and 3D
if s * np.sin(theta) > d:
total_distance += d / np.sin(theta) # <- Right conversion?
# Sample distance is correct and interaction occurs
else:
total_distance += s
interacted = 1
# Statement may be redundant but idk how to handle return from distance_to_rod
else:
total_distance += s
interacted = 1
# move particle
z += total_distance * np.cos(theta)
y += total_distance * np.sin(theta) * np.sin(phi)
x += total_distance * np.sin(theta) * np.cos(phi)
# material_end = material_type(x, y)
#
# if material_start != material_end:
# print("Neutron has crossed material interface(s)")
# Trace/Track particle movement
xvec = np.append(xvec, x)
yvec = np.append(yvec, y)
zvec = np.append(zvec, z)
###################################################
# Leakage
if x > X_BOUNDARY or x < -X_BOUNDARY:
# Still need implementation
leaked = 1
alive = 0
if y > Y_BOUNDARY or y < -Y_BOUNDARY:
# Still need implementation
leaked = 1
alive = 0
if z > HEIGHT or z < 0:
# Still need implementation
leaked = 1
alive = 0
###################################################
# Determine Type of interaction based on energy and corresponding cross-sections
# In fuel
material = material_type(x, y)
if material == 1:
sig_scat_temp = sigma_scatter_fuel(ENRICHMENT_1)[index]
sig_fiss_temp = sigma_fission_fuel(ENRICHMENT_1)[index]
sig_tot_temp = sigma_total_fuel(ENRICHMENT_1)[index]
nu_temp = nu_fuel(ENRICHMENT_1)[index]
# scatter or absorb
if np.random.random(1) < sig_scat_temp / sig_tot_temp:
# scatter, pick new angles & energy
theta = np.random.uniform(0, np.pi, 1)
phi = np.random.uniform(0, 2 * np.pi, 1)
energy = np.random.uniform(alpha_fuel * energy, energy, 1)
elif np.random.random(1) < sig_fiss_temp / sig_tot_temp:
# Determine number of neutrons produced from fission
# round or int or both?
nu = int(round(nu_temp))
alive = 0
else:
# absorbed
alive = 0
#############################
elif material == 2:
sig_scat_temp = sigma_scatter_fuel(ENRICHMENT_2)[index]
sig_fiss_temp = sigma_fission_fuel(ENRICHMENT_2)[index]
sig_tot_temp = sigma_total_fuel(ENRICHMENT_2)[index]
nu_temp = nu_fuel(ENRICHMENT_2)[index]
# scatter or absorb
if np.random.random(1) < sig_scat_temp / sig_tot_temp:
# scatter, pick new angles & energy
theta = np.random.uniform(0, np.pi, 1)
phi = np.random.uniform(0, 2 * np.pi, 1)
energy = np.random.uniform(alpha_fuel * energy, energy, 1)
elif np.random.random(1) < sig_fiss_temp / sig_tot_temp:
# Determine number of neutrons produced from fission
# round or int or both?
nu = int(round(nu_temp))
alive = 0
else:
# absorbed
alive = 0
#############################
if material == 3:
sig_scat_temp = sigma_scatter_fuel(ENRICHMENT_3)[index]
sig_fiss_temp = sigma_fission_fuel(ENRICHMENT_3)[index]
sig_tot_temp = sigma_total_fuel(ENRICHMENT_3)[index]
nu_temp = nu_fuel(ENRICHMENT_3)[index]
# scatter or absorb
if np.random.random(1) < sig_scat_temp / sig_tot_temp:
# scatter, pick new angles & energy
theta = np.random.uniform(0, np.pi, 1)
phi = np.random.uniform(0, 2 * np.pi, 1)
energy = np.random.uniform(alpha_fuel * energy, energy, 1)
elif np.random.random(1) < sig_fiss_temp / sig_tot_temp:
# Determine number of neutrons produced from fission
# round or int or both?
nu = int(round(nu_temp))
alive = 0
else:
# absorbed
alive = 0
#############################
# In water
else:
mod_scat = sigma_scatter_mod[index]
mod_tot = sigma_total_mod[index]
# scatter or absorb
if np.random.random(1) < mod_scat / mod_tot:
# scatter, pick new angles & energy
theta = np.random.uniform(0, np.pi, 1)
phi = np.random.uniform(0, 2 * np.pi, 1)
energy = np.random.uniform(alpha_mod * energy, energy, 1)
else:
# absorbed
alive = 0
###################################################
return xvec, yvec, zvec, nu, leaked
##################################################################
def run(data_test):
###################################################
# Uniformly Dispersed Source (Cylinder)
# x = np.random.uniform(-X_BOUNDARY, X_BOUNDARY, 1)
# y = np.random.uniform(-Y_BOUNDARY, Y_BOUNDARY, 1)
# z = np.random.uniform(-HEIGHT, HEIGHT, 1)
# Uniformly Dispersed FUEL Source (Cylinder)
rejected = 1
while rejected:
x = np.random.uniform(-X_BOUNDARY, X_BOUNDARY, 1)
y = np.random.uniform(-Y_BOUNDARY, Y_BOUNDARY, 1)
z = np.random.uniform(-HEIGHT, HEIGHT, 1)
if material_type(x, y):
rejected = 0
###################################################
# Get normal particle info (trace)
x_vec, y_vec, z_vec, nu, leaked = particle_func(x, y, z)
leaked_count = leaked
gen_count = nu
system_count = (1 + nu - leaked)
# particle_trace = go.Scatter3d(
# x=x_vec,
# y=y_vec,
# z=z_vec,
# mode='lines',
# line=dict(color='rgb(173, 255, 47)')
# )
#
# data_test.append(particle_trace)
data_test.append((x_vec, y_vec, z_vec))
###################################################
nu_vec = [nu]
x_vecs = [x_vec]
y_vecs = [y_vec]
z_vecs = [z_vec]
if nu > 0:
print("{} neutrons generated for neutron {}".format(nu, i))
else:
print("No neutrons generated for neutron {}".format(i + 1))
t = 0
recent_nus = nu_vec
while np.any(recent_nus) != 0:
print(nu_vec[-t:])
tracker = 0
nu_vec_temp = []
x_vecs_temp = []
y_vecs_temp = []
z_vecs_temp = []
for a in range(len(nu_vec[-t:])):
x = x_vecs[-(a + 1)][-1]
y = y_vecs[-(a + 1)][-1]
z = z_vecs[-(a + 1)][-1]
for j in range(nu_vec[-(a + 1)]):
x_vec, y_vec, z_vec, nu, leaked = particle_func(x, y, z)
leaked_count += leaked
print("Particle {} starting coords:".format(j + 1), x_vec[0], y_vec[0], z_vec[0])
print("Particle {} ending coords:".format(j + 1), x_vec[-1], y_vec[-1], z_vec[-1])
print("Particle {} nu value".format(j + 1), nu)
nu_vec_temp.append(nu)
tracker += 1
x_vecs_temp.append(x_vec)
y_vecs_temp.append(y_vec)
z_vecs_temp.append(z_vec)
# time.sleep(1)
# particle_trace = go.Scatter3d(
# x=x_vec,
# y=y_vec,
# z=z_vec,
# mode='lines',
# line=dict(color='rgb(255, 0, 0)')
# )
# data_test.append(particle_trace)
data_test.append((x_vec, y_vec, z_vec))
print()
t = tracker
nu_vec.extend(nu_vec_temp)
x_vecs.extend(x_vecs_temp)
y_vecs.extend(y_vecs_temp)
z_vecs.extend(z_vecs_temp)
recent_nus = nu_vec_temp
print("Continuing fission:", (np.any(recent_nus) != 0))
return leaked_count, gen_count, system_count
##################################################################
if __name__ == '__main__':
jobs = []
manager = Manager()
list_ = manager.list()
for _ in range(cpu_count() - 1):
p = Process(target=run, args=(list_,))
jobs.append(p)
p.start()
p.join()
while True: # stops main thread from completing execution
time.sleep(5) # wait 5 second before checking if processes are terminated
if all([not x.is_alive() for x in jobs]): # check if all processes terminated
break # breaks the loop
# print("\nTotal number of neutrons in system:", system_count)
# print("Total number of neutrons generated from {} neutron source: {}".format(N, gen_count))
# print("System Multiplication factor:", gen_count/N)
# print("Total number of leaked neutrons:", leaked_count)
layout = go.Layout(
title='Monte Carlo Assembly',
autosize=True,
showlegend=False,
height=1000,
width=1000,
scene=dict(zaxis=dict(range=[-1, HEIGHT + 1]),
yaxis=dict(range=[-(Y_DIM * PITCH + 5), (Y_DIM * PITCH + 5)]),
xaxis=dict(range=[-(X_DIM * PITCH + 5), (X_DIM * PITCH + 5)])
),
)
fig = go.Figure(data=data, layout=layout)
py.offline.plot(fig, filename='file.html')
输出&错误消息:
Output & Error Message:
/Users/sterlingbutters/anaconda/bin/python "/Users/sterlingbutters/PycharmProjects/Monte Carlo Simulation/MC Plotly (Cylindrical Assembly) Reflector.py"
For 17 x 17 assembly, 9 x 9 is needed. Your shape: (9, 9)
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
No neutrons generated for neutron 9
[(array([ 8.48773757, 9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757, 9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757, 9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757, 9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757, 9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757, 9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294])), (array([ 8.48773757, 9.20971263]), array([-10.08484099, -10.22964405]), array([-6.99776389, -7.45397294]))]
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x114ea8488>
Traceback (most recent call last):
File "/Users/sterlingbutters/anaconda/lib/python3.5/weakref.py", line 117, in remove
TypeError: 'NoneType' object is not callable
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x114ea8488>
Traceback (most recent call last):
File "/Users/sterlingbutters/anaconda/lib/python3.5/weakref.py", line 117, in remove
TypeError: 'NoneType' object is not callable
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x114ea8488>
Traceback (most recent call last):
File "/Users/sterlingbutters/anaconda/lib/python3.5/weakref.py", line 117, in remove
TypeError: 'NoneType' object is not callable
Process finished with exit code 0
推荐答案
多重处理从当前环境中生成具有自己的全局变量副本的另一个Process。在该过程中对变量所做的所有更改都不会反映在父过程中。您需要在进程之间共享内存,并且共享内存中的变量可以交换。
Multiprocessing spawns a different Process with it's own global variables copies from current environment. All the changes in variable made in that process does not reflect in parent process. You need to share memory between the process and variables in shared memory can be exchanged.
您可以使用 multiprocessing.Manager
创建一个共享对象,如列表或字典,并操作该对象。
You can use multiprocessing.Manager
to create a shared object like list or dictionary, and manipulate that object.
进程被分配给处理器的不同内核/线程。如果您拥有4核/ 8线程系统,最多可产生7个进程以最大限度地提高性能,但超出此数量的进程将干扰其他进程,并减慢/减少分配给操作系统的cpu时间,这可能会导致系统崩溃。始终使用cpu核心/ cpu线程-1进程来稳定处理,而让至少一个核心可以由os处理其他操作。
Processes are assigned to different cores/thread of your processor. If you have a 4 core/8 thread system, spawn a maximum of 7 processes to maximize performance, any more than that some processes will interfere with other processes and can slow down/reduce the cpu time allotted to your os which can crash your system. It's always the cpu cores/cpu threads -1 processes for stable processing leaving atleast one core to os to handle other operations.
您可以像这样修改代码
from multiprocessing import Process, Manager
import time
def run(list_):
list_.append(trace)
if __name__ == "__main__":
jobs = []
gen_count = 0
leaked_count = 0
system_count = 0
with Manager() as manager:
list_ = manager.list()
for _ in range(multiprocessing.cpu_count()-1):
p = Process(target=run,args=(list_))
jobs.append(p)
p.start()
while True: #stops main thread from completing execution
time.sleep(5) #wait 5 second before checking if processes are terminated
if all([not x.is_alive() for x in jobs]): #check if all processes terminated
break #breaks the loop
这篇关于多处理/线程:数据附加和输出回报的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!