使用Python和多线程处理巨大的CSV文件 [英] Processing huge CSV file using Python and multithreading

查看:901
本文介绍了使用Python和多线程处理巨大的CSV文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个函数可以懒惰地从巨大的CSV文件中产生行:

I have a function that yields lines from a huge CSV file lazily:

def get_next_line():
    with open(sample_csv,'r') as f:
        for line in f:
            yield line

def do_long_operation(row):
    print('Do some operation that takes a long time')

我需要使用线程,以便我从上述函数中获得的每条记录都可以调用do_long_operation.

I need to use threads such that each record I get from the above function I can call do_long_operation.

Internet上的大多数地方都有这样的示例,但我不确定自己走的路是否正确.

Most places on Internet have examples like this, and I am not very sure if I am on the right path.

import threading
thread_list = []
for i in range(8):
   t = threading.Thread(target=do_long_operation, args=(get_next_row from get_next_line))
   thread_list.append(t)

for thread in thread_list:
    thread.start()

for thread in thread_list:
    thread.join()

我的问题是:

  1. 如何仅启动有限数量的线程(例如8个)?

  1. How do I start only a finite number of threads, say 8?

如何确保每个线程都从get_next_line获取行?

How do I make sure that each of the threads will get a row from get_next_line?

推荐答案

您可以使用multiprocessing中的线程池并将任务映射到工作池:

You could use a thread pool from multiprocessing and map your tasks to a pool of workers:

from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool
from random import randint
from time import sleep


def process_line(l):
    print l, "started"
    sleep(randint(0, 3))
    print l, "done"


def get_next_line():
    with open("sample.csv", 'r') as f:
        for line in f:
            yield line

f = get_next_line()

t = Pool(processes=8)

for i in f:
    t.map(process_line, (i,))
t.close()
t.join()

这将创建八名工作人员,并将您的线路逐一提交给他们.进程免费"后,将立即为其分配新任务.

This will create eight workers and submit your lines to them, one by one. As soon as a process is "free", it will be allocated a new task.

也有一条注释掉的import语句.如果您注释掉ThreadPool并从多处理中导入Pool,则将获得子进程而不是线程,这在您的情况下可能更有效.

There is a commented out import statement, too. If you comment out the ThreadPool and import Pool from multiprocessing instead, you will get subprocesses instead of threads, which may be more efficient in your case.

这篇关于使用Python和多线程处理巨大的CSV文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆