并行迭代pandas df [英] iteration over a pandas df in parallel
问题描述
所以,我想并行迭代一个 Pandas df,所以假设我有 15 行,那么我想并行迭代它而不是一一迭代.
So, I want to iterate over a pandas df in parallel so suppose i am having 15 rows then i want to iterate over it parallel and not one by one.
df:-
df = pd.DataFrame.from_records([
{'domain':'dnd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
{'domain':'hrpd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
{'domain':'blhp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
{'domain':'rbswp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
{'domain':'foxbp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
{'domain':'rbsxbp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
{'domain':'dnd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
{'domain':'hrpd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' }
])
所以,我正在迭代 df 并制作命令行,然后将输出存储在 df 中并进行数据过滤,最后将其存储到 influxdb 中.问题是我正在一个一个地做,因为我正在迭代它.我想并行迭代所有行.
So, I am iterating over the df and making command line and then storing the output in a df and doing data filtering and then finally storing it into influxdb. The problem is i am doing it one by one as i am iterating over it. what i want to iterate over all the rows in parallel.
截至目前,我已经制作了 20 个脚本并使用多处理并行处理所有脚本.当我必须进行更改时很痛苦,因为我必须在所有 20 个脚本中进行更改.我的脚本如下所示:-
As of now i have made 20 scripts and using multiprocessing to go over all the scripts in parallel. Its a pain when i have to do a change as i have to do it in all 20 scripts. My script looks like below :-
for index, row in dff.iterrows():
domain = row['domain']
duration = str(row['duration'])
media_file = row['media_file']
user = row['user']
channel = row['channel']
cmda = './vaa -s https://' + domain + '.www.vivox.com/api2/ -d ' +
duration + ' -f ' + media_file + ' -u .' + user + '. -c
sip:confctl-2@' + domain + '.localhost.com -ati 0ps-host -atk 0ps-
test'
rows = [shlex.split(line) for line in os.popen(
cmda).read().splitlines() if line.strip()]
df = pd.DataFrame(rows)
"""
Bunch of data filteration and pushing it into influx
"""
截至目前,如果我在 df 中处理 15 行并执行如下并行处理,我将有 15 个脚本:-
As of now i am having 15 script if i am hvaing 15 rows in df and doing parallel processing like below :-
import os
import time
from multiprocessing import Process
os.chdir('/Users/akumar/vivox-sdk-4.9.0002.30719.ebb523a9')
def run_program(cmd):
# Function that processes will run
os.system(cmd)
# Creating command to run
commands = ['python testv.py']
commands.extend(['python testv{}.py'.format(i) for i in range(1, 15)])
# Amount of times your programs will run
runs = 1
for run in range(runs):
# Initiating Processes with desired arguments
running_programs = []
for command in commands:
running_programs.append(Process(target=run_program, args=(command,)))
running_programs[-1].daemon = True
# Start our processes simultaneously
for program in running_programs:
program.start()
# Wait untill all programs are done
while any(program.is_alive() for program in running_programs):
time.sleep(1)
问题:- 我如何遍历 df 并使所有 15 行并行运行并在 for 循环中执行所有操作.
推荐答案
我将在 Reddit 上复制并粘贴我的回答(以防有人遇到类似情况):
I'm gonna copy and paste my answer from Reddit on here (in case anyone stumbles upon it with a similar situation):
import dask.dataframe as ddf
def your_function(row):
domain = row['domain']
duration = str(row['duration'])
media_file = row['media_file']
user = row['user']
channel = row['channel']
cmda = './vaa -s https://' + domain + '.www.vivox.com/api2/ -d ' +
duration + ' -f ' + media_file + ' -u .' + user + '. -c
sip:confctl-2@' + domain + '.localhost.com -ati 0ps-host -atk 0ps- test'
rows = [shlex.split(line) for line in os.popen(
cmda).read().splitlines() if line.strip()]
df_dask = ddf.from_pandas(df, npartitions=4) # where the number of partitions is the number of cores you want to use
df_dask['output'] = df_dask.apply(lambda x: your_function(x), meta=('str')).compute(scheduler='multiprocessing')
您可能需要在 apply
方法中使用轴参数.
You might have to play around with the axis parameter in the apply
method.
这篇关于并行迭代pandas df的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!