dask相关内容
我正在尝试从HDFS读取200个镶木地板文件,然后尝试使用4个GPU训练一个模型。我的机器上也有48个vcore可用。如果我只使用GPU工作器启动集群,那么读取部分将非常慢(因为它只使用分配给GPU工作器的4个CPU工作器,除非您在单独的外壳上运行它们,否则您不可能真正运行比您拥有的GPU数量更多的工作器,然后情况变得很糟糕,因为内存管理问题是您自己的。)我想使用CPU工作器读取文件,与CPU工作
..
根据标题,如果我通过Helm或Kubernetes创建Worker,是否可以在创建Worker后分配“Worker Resources”(https://distributed.readthedocs.io/en/latest/resources.html#worker-resources)? 使用情形是命中数据库的任务,我想限制在给定运行中能够命中数据库的进程数量,而不限制群集的总大小。
..
我让Dask来处理内存中无法容纳的大型向量数组,并使用SCRICKIT-LINE COSING_SIMPLIZATION来计算这些向量之间的余弦相似度,即: import dask.array as da from sklearn.metrics.pairwise import cosine_similarity vectors = da.from_array(vectors, 10000)
..
我是新来达斯克的,如果你觉得这个问题很愚蠢,请原谅。在DASK中,我正在使用一个包含大约50 GB数据的DASK数据帧。这些数据是字符串数据,我需要在将其提供给机器学习算法(使用线程进行快速处理)之前对其进行预处理(使用进程进行快速处理)。现在的问题是,当我根据进程设计集群时,数据帧操作是快的,但相对于线程来说,它是慢的(但线程使用机器学习是快的)。因此,我正在寻找一种可以从进程切换到线程环境的解
..
我想知道是否有可能在GroupBy Aggregation with Dask.之后从给定列中获得唯一项的数量。我在文档中没有看到任何类似的内容。它可以在 pandas 数据框上使用,并且真的很有用。我看到了一些与此相关的问题,但我不确定它是否已实施。 有人能给我一些提示吗? 推荐答案 若要展开this comment,可以直接在序列组上使用nunique: import
..
我有一个巨大的DataFrame,为了节省时间,我想使用Dask来处理它。问题是,当这个TypeError: can't pickle _thread._local objects错误一开始运行时,我就陷入了这个错误。有人能帮帮我吗? 我已经编写了一个函数,该函数根据df的行来处理存储在df中的数据,并用 out = df_query.progress_apply(lambda row
..
更新: 我能够执行转换。下一步是将其放回ddf。 我按照书中的建议所做的是: 日期已分析并存储为单独的变量。 使用 删除了原始日期列 ddf2=ddf.drop('date',axis=1) 使用Assign追加新的分析日期 ddf3=ddf2.assign(date=parsed_date) 新日期已添加为新列,最后一列。 问题1:有没有更有效的方
..
我是新接触DASK的,我不了解COMPUTE()方法在DASK中到底做什么?它是在调用对象的地方打印对象的方法吗?我已经阅读了其网站上的文档,但不确定是否理解了术语“具体的价值”和“懒惰的任务”。 您可以通过调用.Compute()方法或dask.Compute(...)将任何Dask集合转换为具体的值功能。此函数将一直阻塞,直到计算完成,从惰性Dask集直接转到本地内存中的具体值。
..
我想将一行追加到DaskDataFrames中的特定分区。我试过很多方法,但没有一个是可行的。有人能帮我这个忙吗。提前感谢 我试过- first_partition = df.partitions[0] new_dd = first_partiton.append(row) df.partitions[0] = new_dd 这不起作用 我甚至尝试使用map_artition
..
我有这个DASK数据框,最后一列是这个问题的重要信息: Dask DataFrame Structure: asks[0].amount asks[1].amount asks[2].amount asks[3].amount asks[4].amount asks[5].amount asks[6].amount asks[7].amount asks[8].a
..
如果我有一个具有未知分区的数据集,并且希望根据列对其进行排序并将其输出到Parquet,则在我看来,Dask至少会执行两次部分工作: import dask import dask.dataframe as dd def my_identity(x): """Does nothing, but shows up on the Dask dashboard""" return
..
我需要在DaskDataFrame上使用pd.Cut。 This answer指示map_artitions将通过将pd.Cut作为函数传递来工作。 似乎map_artitions一次只将一个分区传递给该函数。但是,pd.Cut需要访问我的df的整个列才能创建回收站。因此,我的问题是:这种情况下的map_Partitions实际上是对整个数据帧进行操作,还是使用这种方法会得到不正确的结
..
我正试图通过在DASK中滚动均值逻辑来复制下面的 pandas 群体。但停留在1)如何指定时间段(以天为单位)和2)如何将其分配回原始帧? df['avg3d']=df.groupby('g')['v'].transform(lambda x: x.rolling('3D').mean()) 获得如下错误: ValueError: index must be monotonic,V
..
我正在尝试使用以下代码合并1000多个CSV文件: path = r'path_to_files/' all_files = glob.glob(path + "/*.csv") import shutil with open('updated_thirteen_jan.csv','wb') as wfd: for f in all_files: with op
..
我有以下代码,可以从一个数组创建一个DaskDataFrame。问题是所有类型都被转换为Object。我试图指定元数据,但找不到方法。如何在FROM_ARRAY中指定META? b = np.array([(1.5, 2, 3, datetime(2000,1,1)), (4, 5, 6, datetime(2001, 2, 2))]) ddf = dd.from_array(b, colu
..
我有一个奇怪的问题,我希望能提供一些意见。基本上,我在AWS Pangeo Cloud上运行一台笔记本,并使用xr.open_mfdataset在S3(带有s3f)上打开一些GOES-16卫星数据。 如果我根本不使用DASK,这将非常有效,因为数据集在几分钟内就构建好了。 但是,如果我在打开文件之前创建dask.distributed客户端,open_mfdataset似乎永远挂起。
..
将一列随机数添加到DaskDataFrame的正确方法是什么?我显然可以使用map_partitions将列添加到每个分区,但我不确定当Dask并行化该计算时如何处理随机状态。(即,它是否会在所有工作进程中使用相同的随机状态,从而在每个工作进程中生成相同的随机数?) dask.array.random(https://docs.dask.org/en/latest/_modules/dask
..
我正在测试DaskDataFrames的apply()方法,并且正在运行以下代码: import pandas as pd import dask.dataframe as dd import time def enrich_str(str): val1 = f'{str}_1' val2 = f'{str}_2' val3 = f'{str}_
..
我试图实现类似于这些问题(Initializing state on dask-distributed workers,Setting up Dask worker with variable)的内容,其中我有一个(相对)大的模型,我希望在接受需要该模型的任务的工作线程子集上预初始化该模型。理想情况下,我甚至不希望客户端计算机具有该模型。 在发现这些问题之前,我最初的尝试是在共享模块work
..
kubectl run dask --image daskdev/dask为什么失败? # starting the container with docker to make sure it basically works ➜ ~ docker run --rm -it --entrypoint bash daskdev/dask:latest (base) root@5b34ce038
..