记录集中的Python多线程 [英] Python Multi-threading in a recordset

查看:51
本文介绍了记录集中的Python多线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据库记录集(大约1000行),我目前正在对其进行迭代,以使用每个记录的额外db查询来集成更多数据.

I have a database record set (approx. 1000 rows) and I am currently iterating through them, to integrate more data using extra db query for each record.

这样做,会使整个处理时间增加到大约100秒.

Doing that, raises the overall process time to maybe 100 seconds.

我想做的就是将功能共享给2-4个进程.

What I want to do is share the functionality to 2-4 processes.

我正在使用Python 2.7具有AWS Lambda兼容性.

I am using Python 2.7 to have AWS Lambda compatibility.

def handler(event, context):

    try:

        records = connection.get_users()

        mandrill_client = open_mandrill_connection()

        mandrill_messages = get_mandrill_messages()

        mandrill_template = 'POINTS weekly-report-to-user'

        start_time = time.time()

        messages = build_messages(mandrill_messages, records)

        print("OVERALL: %s seconds ---" % (time.time() - start_time))

        send_mandrill_message(mandrill_client, mandrill_template, messages)

        connection.close_database_connection()

        return "Process Completed"

    except Exception as e:

        print(e)

以下是我要放入线程中的函数:

Following is the function which I want to put into threads:

def build_messages(messages, records):

for record in records:

    record = dict(record)

    stream = get_user_stream(record)

    data = compile_loyalty_stream(stream)

    messages['to'].append({
        'email': record['email'],
        'type': 'to'
    })

    messages['merge_vars'].append({
        'rcpt': record['email'],
        'vars': [
            {
                'name': 'total_points',
                'content': record['total_points']
            },
            {
                'name': 'total_week',
                'content': record['week_points']
            },
            {
                'name': 'stream_greek',
                'content': data['el']
            },
            {
                'name': 'stream_english',
                'content': data['en']
            }
        ]
    })

return messages

我尝试过的是导入多处理库:

What I have tried is importing the multiprocessing library:

from multiprocessing.pool import ThreadPool

try 块内创建了一个池,并在该池内映射了函数:

Created a pool inside the try block and mapped the function inside this pool:

pool = ThreadPool(4)

messages = pool.map(build_messages_in, itertools.izip(itertools.repeat(mandrill_messages), records))

def build_messages_in(a_b):
    build_msg(*a_b)


def build_msg(a, b):
    return build_messages(a, b)

def get_user_stream(record):

    response = []

    i = 0

    for mod, mod_id, act, p, act_created in izip(record['models'], record['model_ids'], record['actions'],
                                                 record['points'], record['action_creation']):

        information = get_reference(mod, mod_id)

        if information:

            response.append({
                'action': act,
                'points': p,
                'created': act_created,
                'info': information
            })

            if (act == 'invite_friend') \
                    or (act == 'donate') \
                    or (act == 'bonus_500_general') \
                    or (act == 'bonus_1000_general') \
                    or (act == 'bonus_500_cancel') \
                    or (act == 'bonus_1000_cancel'):

                response[i]['info']['date_ref'] = act_created
                response[i]['info']['slug'] = 'attiki'

            if (act == 'bonus_500_general') \
                    or (act == 'bonus_1000_general') \
                    or (act == 'bonus_500_cancel') \
                    or (act == 'bonus_1000_cancel'):

                response[i]['info']['title'] = ''

            i += 1

    return response

最后,我从build_message函数中删除了 for 循环.

Finally I removed the for loop from the build_message function.

我得到的结果是一个'NoneType'对象是不可迭代的.

What I get as a results is a 'NoneType' object is not iterable.

这是正确的方法吗?

推荐答案

您的代码似乎很深入,因此您无法确定multithreading在高级别应用时是否会带来任何性能提升.因此,值得深入研究,以使您获得最大的延迟,并考虑如何解决特定的瓶颈.有关更多讨论,请参见此处.线程限制.

Your code seems pretty in-depth and so you cannot be sure that multithreading will lead to any performance gains when applied on a high level. Therefore, it's worth digging down to the point that gives you the largest latency and considering how to approach the specific bottleneck. See here for greater discussion on threading limitations.

如果,例如,正如我们在评论中讨论的那样,您可以查明花费很长时间的单个任务,则可以尝试使用multiprocessing并行化它-以利用更多的CPU能力.这是一个通用示例,希望它足够简单,可以理解以反映您的Postgres查询,而无需使用您自己的代码库.我认为这是不可行的努力.

If, for example as we discussed in comments, you can pinpoint a single task that is taking a long time, then you could try to parallelize it using multiprocessing instead - to leverage more of your CPU power. Here is a generic example that hopefully is simple enough to understand to mirror your Postgres queries without going into your own code base; I think that's an unfeasible amount of effort tbh.

import multiprocessing as mp
import time
import random
import datetime as dt

MAILCHIMP_RESPONSE = [x for x in range(1000)]

def chunks(l, n):
    n = max(1, n)
    return [l[i:i + n] for i in range(0, len(l), n)]


def db_query():
    ''' Delayed response from database '''
    time.sleep(0.01)
    return random.random()


def do_queries(query_list):
    ''' The function that takes all your query ids and executes them 
    sequentially for each id '''
    results = []
    for item in query_list:
        query = db_query()
        # Your super-quick processing of the Postgres response
        processing_result = query * 2
        results.append([item, processing_result])
    return results


def single_processing():
    ''' As you do now - equivalent to get_reference '''
    result_of_process = do_queries(MAILCHIMP_RESPONSE)
    return result_of_process


def multi_process(chunked_data, queue):
    ''' Same as single_processing, except we put our results in queue rather
    than returning them '''
    result_of_process = do_queries(chunked_data)
    queue.put(result_of_process)


def multiprocess_handler():
    ''' Divide and conquor on our db requests. We split the mailchimp response
    into a series of chunks and fire our queries simultaneously. Thus, each
    concurrent process has a smaller number of queries to make '''

    num_processes = 4 # depending on cores/resources
    size_chunk = len(MAILCHIMP_RESPONSE) / num_processes
    chunked_queries = chunks(MAILCHIMP_RESPONSE, size_chunk)

    queue = mp.Queue() # This is going to combine all the results

    processes = [mp.Process(target=multi_process, 
                args=(chunked_queries[x], queue)) for x in range(num_processes)]

    for p in processes: p.start()

    divide_and_conquor_result = []
    for p in processes:
        divide_and_conquor_result.extend(queue.get())

    return divide_and_conquor_result


if __name__ == '__main__':
    start_single = dt.datetime.now()

    single_process = single_processing()

    print "Single process took {}".format(dt.datetime.now() - start_single)
    print "Number of records processed = {}".format(len(single_process))

    start_multi = dt.datetime.now()

    multi = multiprocess_handler()

    print "Multi process took {}".format(dt.datetime.now() - start_multi)
    print "Number of records processed = {}".format(len(multi))

这篇关于记录集中的Python多线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆