记录集中的Python多线程 [英] Python Multi-threading in a recordset
问题描述
我有一个数据库记录集(大约1000行),我目前正在对其进行迭代,以使用每个记录的额外db查询来集成更多数据.
I have a database record set (approx. 1000 rows) and I am currently iterating through them, to integrate more data using extra db query for each record.
这样做,会使整个处理时间增加到大约100秒.
Doing that, raises the overall process time to maybe 100 seconds.
我想做的就是将功能共享给2-4个进程.
What I want to do is share the functionality to 2-4 processes.
我正在使用Python 2.7具有AWS Lambda兼容性.
I am using Python 2.7 to have AWS Lambda compatibility.
def handler(event, context):
try:
records = connection.get_users()
mandrill_client = open_mandrill_connection()
mandrill_messages = get_mandrill_messages()
mandrill_template = 'POINTS weekly-report-to-user'
start_time = time.time()
messages = build_messages(mandrill_messages, records)
print("OVERALL: %s seconds ---" % (time.time() - start_time))
send_mandrill_message(mandrill_client, mandrill_template, messages)
connection.close_database_connection()
return "Process Completed"
except Exception as e:
print(e)
以下是我要放入线程中的函数:
Following is the function which I want to put into threads:
def build_messages(messages, records):
for record in records:
record = dict(record)
stream = get_user_stream(record)
data = compile_loyalty_stream(stream)
messages['to'].append({
'email': record['email'],
'type': 'to'
})
messages['merge_vars'].append({
'rcpt': record['email'],
'vars': [
{
'name': 'total_points',
'content': record['total_points']
},
{
'name': 'total_week',
'content': record['week_points']
},
{
'name': 'stream_greek',
'content': data['el']
},
{
'name': 'stream_english',
'content': data['en']
}
]
})
return messages
我尝试过的是导入多处理库:
What I have tried is importing the multiprocessing library:
from multiprocessing.pool import ThreadPool
在 try 块内创建了一个池,并在该池内映射了函数:
Created a pool inside the try block and mapped the function inside this pool:
pool = ThreadPool(4)
messages = pool.map(build_messages_in, itertools.izip(itertools.repeat(mandrill_messages), records))
def build_messages_in(a_b):
build_msg(*a_b)
def build_msg(a, b):
return build_messages(a, b)
def get_user_stream(record):
response = []
i = 0
for mod, mod_id, act, p, act_created in izip(record['models'], record['model_ids'], record['actions'],
record['points'], record['action_creation']):
information = get_reference(mod, mod_id)
if information:
response.append({
'action': act,
'points': p,
'created': act_created,
'info': information
})
if (act == 'invite_friend') \
or (act == 'donate') \
or (act == 'bonus_500_general') \
or (act == 'bonus_1000_general') \
or (act == 'bonus_500_cancel') \
or (act == 'bonus_1000_cancel'):
response[i]['info']['date_ref'] = act_created
response[i]['info']['slug'] = 'attiki'
if (act == 'bonus_500_general') \
or (act == 'bonus_1000_general') \
or (act == 'bonus_500_cancel') \
or (act == 'bonus_1000_cancel'):
response[i]['info']['title'] = ''
i += 1
return response
最后,我从build_message函数中删除了 for 循环.
Finally I removed the for loop from the build_message function.
我得到的结果是一个'NoneType'对象是不可迭代的.
What I get as a results is a 'NoneType' object is not iterable.
这是正确的方法吗?
推荐答案
您的代码似乎很深入,因此您无法确定multithreading
在高级别应用时是否会带来任何性能提升.因此,值得深入研究,以使您获得最大的延迟,并考虑如何解决特定的瓶颈.有关更多讨论,请参见此处.线程限制.
Your code seems pretty in-depth and so you cannot be sure that multithreading
will lead to any performance gains when applied on a high level. Therefore, it's worth digging down to the point that gives you the largest latency and considering how to approach the specific bottleneck. See here for greater discussion on threading limitations.
如果,例如,正如我们在评论中讨论的那样,您可以查明花费很长时间的单个任务,则可以尝试使用multiprocessing
并行化它-以利用更多的CPU能力.这是一个通用示例,希望它足够简单,可以理解以反映您的Postgres查询,而无需使用您自己的代码库.我认为这是不可行的努力.
If, for example as we discussed in comments, you can pinpoint a single task that is taking a long time, then you could try to parallelize it using multiprocessing
instead - to leverage more of your CPU power. Here is a generic example that hopefully is simple enough to understand to mirror your Postgres queries without going into your own code base; I think that's an unfeasible amount of effort tbh.
import multiprocessing as mp
import time
import random
import datetime as dt
MAILCHIMP_RESPONSE = [x for x in range(1000)]
def chunks(l, n):
n = max(1, n)
return [l[i:i + n] for i in range(0, len(l), n)]
def db_query():
''' Delayed response from database '''
time.sleep(0.01)
return random.random()
def do_queries(query_list):
''' The function that takes all your query ids and executes them
sequentially for each id '''
results = []
for item in query_list:
query = db_query()
# Your super-quick processing of the Postgres response
processing_result = query * 2
results.append([item, processing_result])
return results
def single_processing():
''' As you do now - equivalent to get_reference '''
result_of_process = do_queries(MAILCHIMP_RESPONSE)
return result_of_process
def multi_process(chunked_data, queue):
''' Same as single_processing, except we put our results in queue rather
than returning them '''
result_of_process = do_queries(chunked_data)
queue.put(result_of_process)
def multiprocess_handler():
''' Divide and conquor on our db requests. We split the mailchimp response
into a series of chunks and fire our queries simultaneously. Thus, each
concurrent process has a smaller number of queries to make '''
num_processes = 4 # depending on cores/resources
size_chunk = len(MAILCHIMP_RESPONSE) / num_processes
chunked_queries = chunks(MAILCHIMP_RESPONSE, size_chunk)
queue = mp.Queue() # This is going to combine all the results
processes = [mp.Process(target=multi_process,
args=(chunked_queries[x], queue)) for x in range(num_processes)]
for p in processes: p.start()
divide_and_conquor_result = []
for p in processes:
divide_and_conquor_result.extend(queue.get())
return divide_and_conquor_result
if __name__ == '__main__':
start_single = dt.datetime.now()
single_process = single_processing()
print "Single process took {}".format(dt.datetime.now() - start_single)
print "Number of records processed = {}".format(len(single_process))
start_multi = dt.datetime.now()
multi = multiprocess_handler()
print "Multi process took {}".format(dt.datetime.now() - start_multi)
print "Number of records processed = {}".format(len(multi))
这篇关于记录集中的Python多线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!