dask分布式数据帧上的慢len函数 [英] Slow len function on dask distributed dataframe

查看:101
本文介绍了dask分布式数据帧上的慢len函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在测试如何使用dask(具有20个内核的集群),而我对调用len函数与通过loc切片的速度感到惊讶。

 从dask.distributed import导入dask.dataframe作为dd 
客户端
客户端=客户端('192.168.1.220:8786')

log = pd.read_csv('800000test',sep ='\t')
logd = dd.from_pandas(log,npartitions = 20)

#这是运行代码慢慢地
#(2.9秒,我希望不会超过几百毫秒)

print(len(logd))

#相反,此代码实际上是运行速度比熊猫快近20倍
logd.loc [:'Host']。count()。compute()

为什么会发生这种情况? len的运行速度对我来说并不重要,但是我觉得通过不了解这种行为,我对图书馆一无所知。





所有绿色框对应于 from_pandas,而在本文中,作者是Matthew Rocklin,

解决方案

很好的问题,这是关于何时数据存储的几点向上移动到群集并向下移动到客户端(您的python会话)。让我们看一下计算的几个阶段



使用熊猫加载数据



这是您的熊猫数据框python会话,因此显然它仍在本地进程中。

  log = pd.read_csv('800000test',sep ='\ \t')#在客户端



转换为惰性Dask.dataframe



这会将您的Pandas数据帧分解为20个Pandas数据帧,但是这些仍在客户端上。 Dask数据帧不会急于将数据发送到群集。

  logd = dd.from_pandas(log,npartitions = 20)#仍在客户端上



计算len



len 实际上会在这里引起计算(通常您会使用 df.some_aggregation()。compute()。因此,现在Dask开始了进入。首先将数据移出群集(慢速),然后在所有20个分区上调用len(快速),将它们聚集(快速),然后将结果移到客户端,以便可以打印。

  print(len(logd))#昂贵的往返客户端->群集->客户端



分析



所以这里的问题是我们的dask.dataframe仍然拥有所有



例如,使用本地线程调度程序比使用分布式调度程序要快得多,这应该以毫秒为单位。

  with dask.set_options(get = dask.threaded.get):#没有集群,只有本地线程
print(len(logd ))#停留在客户端

,但是想必您想知道如何扩展到更大的数据集,所以让我们以正确的方式做到这一点。



将数据加载到工作人员上



而不是在您的熊猫上加载数据客户端/本地会话,让Dask工作者加载csv文件的位。这样就不需要客户与员工之间的沟通了。

 #log = pd.read_csv('800000test',sep ='\ t')#在客户端
上的日志= dd.read_csv('800000test',sep ='\t')#在集群上的工人

但是,与 pd.read_csv 不同的是, dd.read_csv 很懒,因此,这应该几乎立即返回。我们可以强制Dask使用persist方法实际执行计算

  log = client.persist(log)#异步触发计算

现在集群开始运行并将数据直接加载到工作程序中。这是相对较快的。请注意,在后台进行工作时,此方法会立即返回。如果要等到完成,请致电 wait

 从dask.distributed import wait 
wait(log)#阻止读取直到完成

如果正在使用较小的数据集进行测试,并希望获得更多分区,请尝试更改块大小。

  log = dd.read_csv(。 ..,blocksize = 1000000)#1 MB块

无论如何,对的操作日志现在应该快了

  len(log)#快



编辑



针对此博客文章这是我们对文件所在位置的假设



通常,当您为 dd.read_csv 提供文件名时,它会假定该文件在所有工人。如果您使用的是网络文件系统或S3或HDFS之类的全局存储,则为true。如果您使用的是网络文件系统,则需要使用绝对路径(例如 / path / to / myfile。*。csv ),否则请确保您的工作人员和客户端具有相同的工作目录。



如果不是这种情况,并且数据仅在客户端计算机上,那么您将不得不加载并分散数据。



简单但次优



简单的方法只是做您最初做的事情,但坚持您的dask.dataframe

  log = pd.read_csv('800000test',sep ='\t')#在客户端$ b上$ b logd = dd.from_pandas(log,npartitions = 20)#仍在客户端
logd = client.persist(logd)#移至工人

这很好,但是导致交流效果不理想。



复杂但最佳



相反,您可以将数据明确地分散到群集中

  [未来] = client.scatter([log])

虽然这进入了更复杂的API,所以我只将您指向文档



http://distributed.readthedocs.io/en/latest/manage-computation.html
http://distributed.readthedocs.io/en/latest/memory.html
http://dask.pydata.org/en/latest/delayed -collections.html


I have been testing how to use dask (cluster with 20 cores) and I am surprised by the speed that I get on calling a len function vs slicing through loc.

import dask.dataframe as dd
from dask.distributed import Client
client = Client('192.168.1.220:8786')

log = pd.read_csv('800000test', sep='\t')
logd = dd.from_pandas(log,npartitions=20)

#This is the code than runs slowly 
#(2.9 seconds whilst I would expect no more than a few hundred millisencods)

print(len(logd))

#Instead this code is actually running almost 20 times faster than pandas
logd.loc[:'Host'].count().compute()

Any ideas why this could be happening? It isn't important for me that len runs fast, but I feel that by not understanding this behaviour there is something I am not grasping about the library.

All of the green boxes correspond to "from_pandas" whilst in this article by Matthew Rocklin http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes the call graph looks better (len_chunk is called which is significantly faster and the calls don't seem to be locked by and wait for one worker to finish his task before starting the other)

解决方案

Good question, this gets at a few points about when data is moving up to the cluster and back down to the client (your python session). Lets look at a few stages of your compuation

Load data with Pandas

This is a Pandas dataframe in your python session, so it's obviously still in your local process.

log = pd.read_csv('800000test', sep='\t')  # on client

Convert to a lazy Dask.dataframe

This breaks up your Pandas dataframe into twenty Pandas dataframes, however these are still on the client. Dask dataframes don't eagerly send data up to the cluster.

logd = dd.from_pandas(log,npartitions=20)  # still on client

Compute len

Calling len actually causes computation here (normally you would use df.some_aggregation().compute(). So now Dask kicks in. First it moves your data out to the cluster (slow) then it calls len on all of the 20 partitions (fast), it aggregates those (fast) and then moves the result down to your client so that it can print.

print(len(logd))  # costly roundtrip client -> cluster -> client

Analysis

So the problem here is that our dask.dataframe still had all of its data in the local python session.

It would have been much faster to use, say, the local threaded scheduler rather than the distributed scheduler. This should compute in milliseconds

with dask.set_options(get=dask.threaded.get):  # no cluster, just local threads
    print(len(logd))  # stays on client

But presumably you want to know how to scale out to larger datasets, so lets do this the right way.

Load your data on the workers

Instead of loading with Pandas on your client/local session, let the Dask workers load bits of the csv file. This way no client-worker communication is necessary.

# log = pd.read_csv('800000test', sep='\t')  # on client
log = dd.read_csv('800000test', sep='\t')    # on cluster workers

However, unlike pd.read_csv, dd.read_csv is lazy, so this should return almost immediately. We can force Dask to actually do the computation with the persist method

log = client.persist(log)  # triggers computation asynchronously

Now the cluster kicks into action and loads your data directly in the workers. This is relatively fast. Note that this method returns immediately while work happens in the background. If you want to wait until it finishes, call wait.

from dask.distributed import wait
wait(log)  # blocks until read is done

If you're testing with a small dataset and want to get more partitions, try changing the blocksize.

log = dd.read_csv(..., blocksize=1000000)  # 1 MB blocks

Regardless, operations on log should now be fast

len(log)  # fast

Edit

In response to a question on this blogpost here are the assumptions that we're making about where the file lives.

Generally when you provide a filename to dd.read_csv it assumes that that file is visible from all of the workers. This is true if you are using a network file system, or a global store like S3 or HDFS. If you are using a network file system then you will want to either use absolute paths (like /path/to/myfile.*.csv) or else ensure that your workers and client have the same working directory.

If this is not the case, and your data is only on your client machine, then you will have to load and scatter it out.

Simple but sub-optimal

The simple way is just to do what you did originally, but persist your dask.dataframe

log = pd.read_csv('800000test', sep='\t')  # on client
logd = dd.from_pandas(log,npartitions=20)  # still on client
logd = client.persist(logd)  # moves to workers

This is fine, but results in slightly less-than-ideal communication.

Complex but optimal

Instead, you might scatter your data out to the cluster explicitly

[future] = client.scatter([log])

This gets into more complex API though, so I'll just point you to docs

http://distributed.readthedocs.io/en/latest/manage-computation.html http://distributed.readthedocs.io/en/latest/memory.html http://dask.pydata.org/en/latest/delayed-collections.html

这篇关于dask分布式数据帧上的慢len函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆