Dask:如何将我的代码与延迟的dask并行化? [英] Dask: How would I parallelize my code with dask delayed?

查看:296
本文介绍了Dask:如何将我的代码与延迟的dask并行化?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我对并行处理的第一次尝试,我一直在研究Dask,但实际上很难对其进行编码.

This is my first venture into parallel processing and I have been looking into Dask but I am having trouble actually coding it.

我看了看他们的示例和文档,我认为dask.delayed效果最好.我试图用delay(function_name)包装我的函数,或添加一个@delayed装饰器,但是我似乎无法使其正常工作.我首选Dask而不是其他方法,因为它是用python制作的,并且(因为它很简单).我知道dask在for循环中不起作用,但是他们说它可以在循环中工作.

I have had a look at their examples and documentation and I think dask.delayed will work best. I attempted to wrap my functions with the delayed(function_name), or add an @delayed decorator, but I can't seem to get it working properly. I preferred Dask over other methods since it is made in python and for its (supposed) simplicity. I know dask doesn't work on the for loop, but they say it can work inside a loop.

我的代码通过一个函数传递文件,该函数包含其他函数的输入,如下所示:

My code passes files through a function that contains inputs to other functions and looks like this:

from dask import delayed
filenames = ['1.csv', '2.csv', '3.csv', etc. etc. ]
for count, name in enumerate(filenames)"
    name = name.split('.')[0]
    ....

然后做一些预处理,例如:

then do some pre-processing ex:

    preprocess1, preprocess2 = delayed(read_files_and_do_some_stuff)(name)

然后我调用构造函数并将pre_results传递给函数调用:

then I call a constructor and pass the pre_results in to the function calls:

    fc = FunctionCalls()
    Daily = delayed(fc.function_runs)(filename=name, stringinput='Daily',
                             input_data=pre_result1, model1=pre_result2)

我在这里要做的是将文件传递给for循环,进行一些预处理,然后将文件传递给两个模型.

What i do here is I pass the file into the for loop, do some pre-processing and then pass the file into two models.

关于如何并行化的想法或技巧?我开始遇到奇怪的错误,而且我不知道如何修复代码.该代码按原样工作.我使用了一堆熊猫数据框,系列和numpy数组,但我不想回过头来更改所有内容以使其与dask.dataframes等配合使用.

Thoughts or tips on how to do parallelize this? I began getting odd errors and I had no idea how to fix the code. The code does work as is. I use a bunch of pandas dataframes, series, and numpy arrays, and I would prefer not to go back and change everything to work with dask.dataframes etc.

我评论中的代码可能很难阅读.在这里,它以更格式化的方式.

The code in my comment may be difficult to read. Here it is in a more formatted way.

在下面的代码中,当我键入print(mean_squared_error)时,我得到的是:Delayed('mean_squared_error-3009ec00-7ff5-4865-8338-1fec3f9ed138')

In the code below, when I type print(mean_squared_error) I just get: Delayed('mean_squared_error-3009ec00-7ff5-4865-8338-1fec3f9ed138')

from dask import delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = ['file1.csv']

for count, name in enumerate(filenames):
    file1 = pd.read_csv(name)
    df = pd.DataFrame(file1)
    prediction = df['Close'][:-1]
    observed = df['Close'][1:]
    mean_squared_error = delayed(mse)(observed, prediction)

推荐答案

您需要调用dask.compute最终计算结果.请参见 dask.delayed文档.

You need to call dask.compute to eventually compute the result. See dask.delayed documentation.

import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]

results = []
for count, name in enumerate(filenames):
    file1 = pd.read_csv(name)
    df = pd.DataFrame(file1)  # isn't this already a dataframe?
    prediction = df['Close'][:-1]
    observed = df['Close'][1:]
    mean_squared_error = mse(observed, prediction)  
    results.append(mean_squared_error)

并行代码

import dask
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]

delayed_results = []
for count, name in enumerate(filenames):
    df = dask.delayed(pd.read_csv)(name)
    prediction = df['Close'][:-1]
    observed = df['Close'][1:]
    mean_squared_error = dask.delayed(mse)(observed, prediction)
    delayed_results.append(mean_squared_error)

results = dask.compute(*delayed_results)

这篇关于Dask:如何将我的代码与延迟的dask并行化?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆