在 pandas df上并行迭代 [英] iteration over a pandas df in parallel

查看:107
本文介绍了在 pandas df上并行迭代的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以,我想并行地遍历pandas df,所以假设我有15行,那么我想并行而不是一个接一个地遍历它。

So, I want to iterate over a pandas df in parallel so suppose i am having 15 rows then i want to iterate over it parallel and not one by one.

df:-

df = pd.DataFrame.from_records([
    {'domain':'dnd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
    {'domain':'hrpd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
    {'domain':'blhp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
    {'domain':'rbswp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
    {'domain':'foxbp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
    {'domain':'rbsxbp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
    {'domain':'dnd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
    {'domain':'hrpd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' }
   
])

因此,我在df上进行迭代并制作命令行,然后将输出存储在df中并进行数据过滤,最后存储它进入influxdb。问题是我在迭代过程中一个接一个地做。我想要并行遍历所有行。

So, I am iterating over the df and making command line and then storing the output in a df and doing data filtering and then finally storing it into influxdb. The problem is i am doing it one by one as i am iterating over it. what i want to iterate over all the rows in parallel.

截至目前,我已经制作了20个脚本,并使用多处理并行处理所有脚本。当我必须在所有20个脚本中进行更改时,这是一种痛苦。我的脚本如下所示:-

As of now i have made 20 scripts and using multiprocessing to go over all the scripts in parallel. Its a pain when i have to do a change as i have to do it in all 20 scripts. My script looks like below :-

for index, row in dff.iterrows():
    domain = row['domain']
    duration = str(row['duration'])
    media_file = row['media_file']
    user = row['user']
    channel = row['channel']
    cmda = './vaa -s https://' + domain + '.www.vivox.com/api2/ -d ' + 
    duration + ' -f ' + media_file + ' -u .' + user + '. -c 
    sip:confctl-2@' + domain + '.localhost.com -ati 0ps-host -atk 0ps- 
    test'

    rows = [shlex.split(line) for line in os.popen(
    cmda).read().splitlines() if line.strip()]

    df = pd.DataFrame(rows)
    """
    Bunch of data filteration and pushing it into influx 
    """

到目前为止,如果我在df中拥有15行并进行如下并行处理,则我将拥有15个脚本:-

As of now i am having 15 script if i am hvaing 15 rows in df and doing parallel processing like below :-

import os
import time
from multiprocessing import Process
os.chdir('/Users/akumar/vivox-sdk-4.9.0002.30719.ebb523a9')
def run_program(cmd):
    # Function that processes will run
    os.system(cmd)

# Creating command to run
commands = ['python testv.py']
commands.extend(['python testv{}.py'.format(i) for i in range(1, 15)])

# Amount of times your programs will run
runs = 1

for run in range(runs):
    # Initiating Processes with desired arguments
    running_programs = []
    for command in commands:
        running_programs.append(Process(target=run_program, args=(command,)))
        running_programs[-1].daemon = True

    # Start our processes simultaneously
    for program in running_programs:
        program.start()

    # Wait untill all programs are done
    while any(program.is_alive() for program in running_programs):
        time.sleep(1)

问题:-我如何迭代df并使所有15行并行运行并执行其中的所有操作

推荐答案

我要在此处复制并粘贴Reddit的答案(以防有人偶然遇到类似情况):

I'm gonna copy and paste my answer from Reddit on here (in case anyone stumbles upon it with a similar situation):

import dask.dataframe as ddf

def your_function(row):
    domain = row['domain']
    duration = str(row['duration'])
    media_file = row['media_file']
    user = row['user']
    channel = row['channel']
    cmda = './vaa -s https://' + domain + '.www.vivox.com/api2/ -d ' + 
    duration + ' -f ' + media_file + ' -u .' + user + '. -c 
        sip:confctl-2@' + domain + '.localhost.com -ati 0ps-host -atk 0ps- test'

    rows = [shlex.split(line) for line in os.popen(
            cmda).read().splitlines() if line.strip()]

df_dask = ddf.from_pandas(df, npartitions=4)   # where the number of partitions is the number of cores you want to use
df_dask['output'] = df_dask.apply(lambda x: your_function(x), meta=('str')).compute(scheduler='multiprocessing')

您可能必须在应用中使用axis参数方法。

这篇关于在 pandas df上并行迭代的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆