如何并行化对“大数据"的计算?列表字典? [英] How to parallelize computation on "big data" dictionary of lists?

查看:68
本文介绍了如何并行化对“大数据"的计算?列表字典?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在这里有一个关于在python字典上进行计算的问题-在这种情况下,字典有数百万个键,并且列表也很长.在这里是否可以使用并行化似乎存在分歧,因此在这里我将更明确地提出问题.这是原始问题:

如果要处理的每个项目的CPU时间很长,您可以从 ProcessPoolExecutor 中看到真正的好处

 从simple_benchmark导入BenchmarkBuilder导入时间并发导入b = BenchmarkBuilder()def handler_values1(k_v):k,v = k_v时间睡眠(0.05)返回k,vdef handler_values2(v):时间睡眠(0.05)返回v@ b.add_function()def test_with_process_pool_executor(d):与并发.futures.ProcessPoolExecutor()作为执行者:返回dict(executor.map(manipulate_values1,d.items()))@ b.add_function()def test_simple_for_loop(d):对于键,值在d.items()中:d [key] = handler_values2((key,value))@ b.add_arguments('字典中的键数')def arguments_provider():对于范围为(2,10)的exp:大小= 2 ** exp产量大小,{i:[i] * 10_000(范围内(大小)中的i)}r = b.run()r.plot() 

如果您没有为

因此,如果要处理的一项的CPU时间很短,最好使用简单的for循环.


@ user3666197提出的一个好点是当您有大量项目/列表时,我在列表中使用 1_000_000_000 随机数对这两种方法进行了基准测试:

如您所见,

在这种情况下更适合使用 ProcessPoolExecutor

 从simple_benchmark导入BenchmarkBuilder导入时间并发导入从随机进口选择b = BenchmarkBuilder()def handler_values1(k_v):k,v = k_vreturn_values = []对于我在v中:new_value = i ** 2-13return_values.append(new_value)返回k,返回值def handler_values2(v):return_values = []对于我在v中:new_value = i ** 2-13return_values.append(new_value)返回return_values@ b.add_function()def test_with_process_pool_executor(d):与并发.futures.ProcessPoolExecutor()作为执行者:返回dict(executor.map(manipulate_values1,d.items()))@ b.add_function()def test_simple_for_loop(d):对于键,值在d.items()中:d [键] =操作值2(值)@ b.add_arguments('字典中的键数')def arguments_provider():对于范围为(2,5)的exp:大小= 2 ** exp产生大小,{i:[在范围(size)中的_的选择(范围(1000(1000))]}}r = b.run()r.plot() 

由于处理一项需要大约209ms的时间,因此预计:

  l = [367] * 1_000_000%timeit handler_values2(l)每个循环#209毫秒±1.45毫秒(平均±标准偏差,共7次运行,每个循环1次) 


仍然,最快的选择是使用

 从simple_benchmark导入BenchmarkBuilder导入时间并发导入将numpy导入为npb = BenchmarkBuilder()def handler_values1(k_v):k,v = k_v返回k,v ** 2-13def handler_values2(v):返回v ** 2-13@ b.add_function()def test_with_process_pool_executor(d):与并发.futures.ProcessPoolExecutor()作为执行者:返回dict(executor.map(manipulate_values1,d.items()))@ b.add_function()def test_simple_for_loop(d):对于键,值在d.items()中:d [键] =操作值2(值)@ b.add_arguments('字典中的键数')def arguments_provider():对于范围为(2,7)的exp:大小= 2 ** exp产量大小,{i:np.random.randint(0,1000,size = 1_000_000)for range in(size)}r = b.run()r.plot() 

由于处理一个numpy.array需要<,所以简单的 for 循环有望更快.1毫秒:

  def handler_value2(input_list):返回input_list ** 2-13l = np.random.randint(0,1000,size = 1_000_000)%timeit handler_values2(l)每个循环#951 µs±5.7 µs(平均±标准偏差,共运行7次,每个循环1000个) 

I have a question here regarding doing calculations on a python dictionary----in this case, the dictionary has millions of keys, and the lists are similarly long. There seems to be disagreement whether one could use parallelization here, so I'll ask the question here more explicitly. Here is the original question:

Optimizing parsing of massive python dictionary, multi-threading

This is a toy (small) python dictionary:

example_dict1 = {'key1':[367, 30, 847, 482, 887, 654, 347, 504, 413, 821],
    'key2':[754, 915, 622, 149, 279, 192, 312, 203, 742, 846], 
    'key3':[586, 521, 470, 476, 693, 426, 746, 733, 528, 565]}

Let's say I need to parse the values of the lists, which I've implemented into the following simple (toy) function:

def manipulate_values(input_list):
    return_values = []
    for i in input_list:
        new_value = i ** 2 - 13
        return_values.append(new_value)
    return return_values

Now, I can easily parse the values of this dictionary as follows:

for key, value in example_dict1.items():
    example_dict1[key] = manipulate_values(value)

resulting in the following:

example_dict1 = {'key1': [134676, 887, 717396, 232311, 786756, 427703, 120396, 254003, 170556, 674028], 
     'key2': [568503, 837212, 386871, 22188, 77828, 36851, 97331, 41196, 550551, 715703], 
     'key3': [343383, 271428, 220887, 226563, 480236, 181463, 556503, 537276, 278771, 319212]}

Question: Why couldn't I use multiple threads to do this calculation, e.g. three threads, one for key1, key2, and key3? Would concurrent.futures.ProcessPoolExecutor() work here?

Original question: Are there better ways to optimize this take to be quick?

解决方案

python threads will not really help you to process in parallel since they are executed on the same one "real CPU thread", python threads are helpful when you deal with asynchronous HTTP calls

AboutProcessPoolExecutor from the docs:

concurrent.futures.ProcessPoolExecutor()

The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

it can help you if you need high CPU processing, you can use:

import concurrent


def manipulate_values(k_v):
    k, v = k_v
    return_values = []
    for i in v :
        new_value = i ** 2 - 13
        return_values.append(new_value)
    return k, return_values


with concurrent.futures.ProcessPoolExecutor() as executor:
        example_dict = dict(executor.map(manipulate_values, example_dict1.items()))


here is a simple benchmark, using a simple for loop to process your data versus using ProcessPoolExecutor, my scenario assume that for each item to be processed you need ~50ms CPU time:

you can see the real benefit from ProcessPoolExecutor if the CPU time per item to be processed is high

from simple_benchmark import BenchmarkBuilder
import time
import concurrent

b = BenchmarkBuilder()

def manipulate_values1(k_v):
    k, v = k_v
    time.sleep(0.05)
    return k, v

def manipulate_values2(v):
    time.sleep(0.05)
    return v

@b.add_function()
def test_with_process_pool_executor(d):
    with concurrent.futures.ProcessPoolExecutor() as executor:
            return dict(executor.map(manipulate_values1, d.items()))

@b.add_function()       
def test_simple_for_loop(d):
    for key, value in d.items():
        d[key] = manipulate_values2((key, value))


@b.add_arguments('Number of keys in dict')
def argument_provider():
    for exp in range(2, 10):
        size = 2**exp
        yield size, {i: [i] * 10_000 for i in range(size)}

r = b.run()
r.plot()

if you do not set the number of workers for ProcessPoolExecutor the default number of workers will be equal with the number of processors on your machine (for the benchmark I used a pc with 8 CPU).


but in your case, with the data provided in your question, to process 1 item will take ~3 µs:

%timeit manipulate_values([367, 30, 847, 482, 887, 654, 347, 504, 413, 821])
2.32 µs ± 25.8 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

in which case the benchmark will look:

So it is better to use a simple for loop if the CPU time for one item to be processed is low.


a good point raised by @user3666197 is the case when you have huge items/lists, I benchmarked both approaches using 1_000_000_000 random numbers in a list:

as you can see in this case is more suitable to use ProcessPoolExecutor

from simple_benchmark import BenchmarkBuilder
import time
import concurrent
from random import choice

b = BenchmarkBuilder()

def manipulate_values1(k_v):
    k, v = k_v
    return_values = []
    for i in v:
        new_value = i ** 2 - 13
        return_values.append(new_value)

    return k, return_values

def manipulate_values2(v):
    return_values = []
    for i in v:
        new_value = i ** 2 - 13
        return_values.append(new_value)
    return return_values

@b.add_function()
def test_with_process_pool_executor(d):
    with concurrent.futures.ProcessPoolExecutor() as executor:
            return dict(executor.map(manipulate_values1, d.items()))

@b.add_function()       
def test_simple_for_loop(d):
    for key, value in d.items():
        d[key] = manipulate_values2(value)


@b.add_arguments('Number of keys in dict')
def argument_provider():
    for exp in range(2, 5):
        size = 2**exp
        yield size, {i: [choice(range(1000)) for _ in range(1_000_000)] for i in range(size)}

r = b.run()
r.plot()

expected since to process one item it takes ~209ms:

l = [367] * 1_000_000
%timeit manipulate_values2(l)
# 209 ms ± 1.45 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


still, the fastest option will be to use numpy.arrays with the for loop solution:

from simple_benchmark import BenchmarkBuilder
import time
import concurrent
import numpy as np

b = BenchmarkBuilder()

def manipulate_values1(k_v):
    k, v = k_v
    return k,  v ** 2 - 13

def manipulate_values2(v):
    return v ** 2 - 13

@b.add_function()
def test_with_process_pool_executor(d):
    with concurrent.futures.ProcessPoolExecutor() as executor:
            return dict(executor.map(manipulate_values1, d.items()))

@b.add_function()       
def test_simple_for_loop(d):
    for key, value in d.items():
        d[key] = manipulate_values2(value)


@b.add_arguments('Number of keys in dict')
def argument_provider():
    for exp in range(2, 7):
        size = 2**exp
        yield size, {i: np.random.randint(0, 1000, size=1_000_000) for i in range(size)}

r = b.run()
r.plot()

it is expected that the simple for loop to be faster since to process one numpy.array takes < 1ms:

def manipulate_value2( input_list ):
    return input_list ** 2 - 13

l = np.random.randint(0, 1000, size=1_000_000)
%timeit manipulate_values2(l)
# 951 µs ± 5.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

这篇关于如何并行化对“大数据"的计算?列表字典?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆