如何有效地转置67 gb文件/Dask数据帧而不将其完全加载到内存中? [英] How can I efficiently transpose a 67 gb file/Dask dataframe without loading it entirely into memory?

查看:69
本文介绍了如何有效地转置67 gb文件/Dask数据帧而不将其完全加载到内存中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有3个相当大的文件(67gb,36gb,30gb),我需要在这些文件上训练模型.但是,要素是行,样本是列.由于Dask尚未实现转置并存储按行拆分的DataFrame,因此我需要自己编写一些内容.有没有一种方法可以有效进行转置而不加载到内存中?

I have 3 rather large files (67gb, 36gb, 30gb) that I need to train models on. However, the features are rows and the samples are columns. Since Dask hasn't implemented transpose and stores DataFrames split by row, I need to write something to do this myself. Is there a way I can efficiently transpose without loading into memory?

我已经可以使用16 gb的ram,并且正在使用jupyter笔记本.我写了一些相当慢的代码,但是真的希望有一个更快的解决方案.以下代码的速度将花费一个月的时间来完成所有文件.Awk是最慢的几个数量级.

I've got 16 gb of ram at my disposal and am using jupyter notebook. I have written some rather slow code, but would really appreciate a faster solution. The speed of the code below will take a month to finish all the files. The slowest step by a few orders of magnitude is awk.

import dask.dataframe as dd
import subprocess
from IPython.display import clear_output

df = dd.read_csv('~/VeryLarge.tsv')
with open('output.csv','wb') as fout:
    for i in range(1, len(df.columns)+1):
        print('AWKing')
        #read a column from the original data and store it elsewhere
        x = "awk '{print $"+str(i)+"}' ~/VeryLarge.tsv > ~/file.temp"
        subprocess.check_call([x], shell=True)

        print('Reading')
        #load and transpose the column
        col = pd.read_csv('~/file.temp')
        row = col.T
        display(row)

        print('Deleting')
        #remove the temporary file created
        !rm ../file.temp

        print('Storing')
        #store the row in its own csv just to be safe. not entirely necessary
        row.to_csv('~/columns/col_{:09d}'.format(i), header=False)

        print('Appending')
        #append the row (transposed column) to the new file
        with open('~/columns/col_{:09d}', 'rb') as fin:
            for line in fin:
                fout.write(line)

        clear_output()
        #Just a measure of progress
        print(i/len(df.columns))

数据本身是1000万行(特征)和2000列(样本).它只需要换位.当前,它看起来像这样:

The data itself is 10million rows (features) and 2000 columns (samples). It just needs to be transposed. Currently, it looks like this:

推荐答案

我修改了原始脚本,以将其部署在任意数量的cpus上.因为我可以使用多个线程并将其部署在aws上,所以它的工作速度要快得多.我使用了一台96核机器,该机器在大约8小时内完成了任务.我很惊讶,因为这几乎是线性缩放!这个想法是使一些重复的任务可分配.然后,您将能够将任务分配给cpus.在这里,并行化是通过命令 pool.map().

I've modified my original script to deploy on any number of cpus. It worked much faster since I could use multiple threads and deployed on aws. I used a 96 core machine that completed the task in about 8 hours. I was quite surprised since that is nearly linear scaling! The idea is to make some repetitive task distributable. Then you will be able to assign tasks to cpus. Here the parallelizing is done with the command pool.map().

从命令行使用此脚本非常简单:

The usage of this script from command line is quite simple:

python3 transposer.py -i largeFile.tsv

如果需要,您也可以指定其他参数.

you can specify other args as well if required.

import argparse, subprocess
import numpy as np
import pandas as pd
import dask.dataframe as dd
from IPython.display import clear_output
from contextlib import closing
from os import cpu_count
from multiprocessing import Pool

parser = argparse.ArgumentParser(description='Transpose csv')
parser.add_argument('-i', '--infile', help='Path to input folder',
                    default=None)
parser.add_argument('-s', '--sep', help='input separator',
                    default='\t')

args = parser.parse_args()
infile = args.infile
sep = args.sep    
df = pd.read_csv(infile, sep='\t', nrows=3)    

def READ_COL(item):
    print(item)
    outfile = 'outfile{}.temp'.format(item)
    if item !=0:
                x = "awk '{print $"+str(item)+"}' "+infile+" > "+outfile
                subprocess.check_call([x], shell=True)
                col = pd.read_csv(outfile)
                row = col.T
                display(row)
                row.to_csv('col_{:09d}.csv'.format(item), header=False)
                subprocess.check_call(['rm '+outfile], shell=True)
                print(item/len(df.columns))

with closing(Pool(processes=cpu_count())) as pool:
    pool.map(READ_COL, list(range(1, len(df.columns)+1)))

此后,您应该具有许多转置列的文件.您只需要使用 cat 或其他命令行工具将它们连接在一起.我只是跑了 cat col_ *>full_file_transposed.csv

After this, you should have a number of files that are transposed columns. You'll just need to join them together with cat or some other command line tool. I just ran cat col_* > full_file_transposed.csv

这篇关于如何有效地转置67 gb文件/Dask数据帧而不将其完全加载到内存中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆