如何在ipyparallel客户端和远程引擎之间最好地共享静态数据? [英] How to best share static data between ipyparallel client and remote engines?

查看:106
本文介绍了如何在ipyparallel客户端和远程引擎之间最好地共享静态数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在具有不同参数的循环中运行相同的模拟。每个模拟使用一个pandas DataFrame( data ),它只能被读取,永远不会被修改。使用 ipyparallel (IPython并行),我可以在模拟开始之前将此DataFrame放入我视图中每个引擎的全局变量空间:

I am running the same simulation in a loop with different parameters. Each simulation makes use a pandas DataFrame (data) which is only read, never modified. Using ipyparallel (IPython parallel), I can put this DataFrames into the global variable space of each engine in my view before simulations start:

view['data'] = data

然后,引擎可以访问DataFrame,以便在其上运行所有模拟。复制数据的过程(如果是pickled,数据是40MB)只需几秒钟。但是,似乎如果模拟的数量增加,则内存使用量会变得非常大。我想这个共享数据正在为每个任务复制,而不是仅为每个引擎复制。从具有引擎的客户端共享静态只读数据的最佳实践是什么?每个引擎复制一次是可以接受的,但理想情况下每个主机只需要复制一次(我在host1上有4个引擎,在host2上有8个引擎)。

The engines then have access to the DataFrame for all the simulations which get run on them. The process of copying the data (if pickled, data is 40MB) is only a few seconds. However, It appears that if the number of simulations grows, memory usage grows very large. I imagine this shared data is getting copied for each task rather than just for each engine. What's the best practice for sharing static read-only data from a client with engines? Copying it once per engine is acceptable, but ideally it would only have to be copied once per host (I have 4 engines on host1 and 8 engines on host2).

这里是我的代码:

from ipyparallel import Client
import pandas as pd

rc = Client()
view = rc[:]  # use all engines
view.scatter('id', rc.ids, flatten=True)  # So we can track which engine performed what task

def do_simulation(tweaks):
    """ Run simulation with specified tweaks """
    #  Do sim stuff using the global data DataFrame
    return results, id, tweaks

if __name__ == '__main__':
    data = pd.read_sql("SELECT * FROM my_table", engine)
    threads = []  # store list of tweaks dicts
    for i in range(4):
        for j in range(5):
            for k in range(6):
                threads.append(dict(i=i, j=j, k=k)

    # Set up globals for each engine.  This is the read-only DataFrame
    view['data'] = data
    ar = view.map_async(do_simulation, threads)

    # Our async results should pop up over time.  Let's measure our progress:
    for idx, (results, id, tweaks) in enumerate(ar):
        print 'Progress: {}%: Simulation {} finished on engine {}'.format(100.0 * ar.progress / len(ar), idx, id)
        # Store results as a pickle for the future
        pfile = '{}_{}_{}.pickle'.format(tweaks['i'], tweaks['j'], tweaks['j'])
        # Save our results to a pickle file
        pd.to_pickle(results, out_file_path + pfile)

    print 'Total execution time: {} (serial time: {})'.format(ar.wall_time, ar.serial_time)

如果模拟计数很小(~50),那么开始需要一段时间,但我开始看到进度打印语句。奇怪的是,多个任务将被分配到同一个引擎,并且在为该引擎完成所有分配的任务之前我看不到响应。每次单个模拟任务完成时,我都希望看到来自枚举(ar)的响应。

If simulation counts are small (~50), then it takes a while to get started, but i start to see progress print statements. Strangely, multiple tasks will get assigned to the same engine and I don't see a response until all of those assigned tasks are completed for that engine. I would expect to see a response from enumerate(ar) every time a single simulation task completes.

如果模拟计数很大(~1000),开始需要很长时间,我看到CPU在所有引擎上都加油,但没有进度打印语句看到很长一段时间(约40分钟),当我看到进展时,似乎一个大块(> 100)的任务进入同一个引擎,等待从那个引擎完成之前提供一些进展。当那个引擎完成后,我看到 ar 对象提供了4秒的新响应 - 这可能是编写输出pickle文件的时间延迟。

If simulation counts are large (~1000), it takes a long time to get started, i see the CPUs throttle up on all engines, but no progress print statements are seen until a long time (~40mins), and when I do see progress, it appears a large block (>100) of tasks went to same engine, and awaited completion from that one engine before providing some progress. When that one engine did complete, i saw the ar object provided new responses ever 4 secs - this may have been the time delay to write the output pickle files.

最后,host1还运行ipycontroller任务,它的内存使用率就像疯了一样(Python任务显示使用> 6GB RAM,内核任务显示使用3GB)。 host2引擎根本没有真正显示大量内存使用情况。什么会导致内存中的这个峰值?

Lastly, host1 also runs the ipycontroller task, and it's memory usage goes up like crazy (a Python task shows using >6GB RAM, a kernel task shows using 3GB). The host2 engine doesn't really show much memory usage at all. What would cause this spike in memory?

推荐答案

几年前我在代码中使用了这个逻辑,我使用了< a href =http://ipyparallel.readthedocs.io/en/latest/multiengine.html?highlight=sync_imports#moving-python-objects-around =nofollow noreferrer>这个。我的代码类似于:

I have used this logic in a code couple years ago, and I got using this. My code was something like:

shared_dict = {
    # big dict with ~10k keys, each with a list of dicts
}

balancer = engines.load_balanced_view()

with engines[:].sync_imports(): # your 'view' variable 
    import pandas as pd
    import ujson as json

engines[:].push(shared_dict)

results = balancer.map(lambda i: (i, my_func(i)), id)
results_data = results.get()




如果模拟计数很小(~50),那么启动
需要一段时间,但我开始看到进度打印语句。奇怪的是,
多个任务将被分配到同一个引擎,并且在为
引擎完成所有分配的任务之前,我没有看到
响应。每次
a单个模拟任务完成时,我希望看到来自枚举(ar)的响应。

If simulation counts are small (~50), then it takes a while to get started, but i start to see progress print statements. Strangely, multiple tasks will get assigned to the same engine and I don't see a response until all of those assigned tasks are completed for that engine. I would expect to see a response from enumerate(ar) every time a single simulation task completes.

在我的情况下, my_func()是一个复杂的方法,我将大量的日志消息写入文件,所以我有我的打印语句。

In my case, my_func() was a complex method where I put lots of logging messages written into a file, so I had my print statements.

关于任务分配,因为我使用 load_balanced_view(),我离开了库找到它的方式,它做得很好。

About the task assignment, as I used load_balanced_view(), I left to the library find its way, and it did great.


如果模拟计数很大(~1000),启动
需要很长时间,我看到CPU在所有引擎上加油,但没有进展
打印报表可以看到很长时间(约40分钟),当我看
看到进度时,看起来大块(> 100)的任务转到同一
引擎,并且在提供
一些进展之前,等待从那个引擎完成。当那个引擎完成时,我看到ar对象
提供了4秒的新响应 - 这可能是写入输出pickle文件的时间延迟

If simulation counts are large (~1000), it takes a long time to get started, i see the CPUs throttle up on all engines, but no progress print statements are seen until a long time (~40mins), and when I do see progress, it appears a large block (>100) of tasks went to same engine, and awaited completion from that one engine before providing some progress. When that one engine did complete, i saw the ar object provided new responses ever 4 secs - this may have been the time delay to write the output pickle files.

关于很长一段时间,我没有经历过,所以我不能说什么。

About the long time, I haven't experienced that, so I can't say nothing.

I希望这可能会对你的问题有所启发。

I hope this might cast some light in your problem.

PS:正如我在评论中所说,你可以试试 multiprocessing.Pool 。我想我没有尝试使用它来共享一个大的只读数据作为全局变量。我会尝试一下,因为它似乎有用

PS: as I said in the comment, you could try multiprocessing.Pool. I guess I haven't tried to share a big, read-only data as a global variable using it. I would give a try, because it seems to work.

这篇关于如何在ipyparallel客户端和远程引擎之间最好地共享静态数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆