如何多线程从列表中读取字典并进入数据库 [英] How to multithread reading dictionaries from a list and entering into database

查看:33
本文介绍了如何多线程从列表中读取字典并进入数据库的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试对以下代码进行多线程处理,但似乎无法使其正常工作.

I am trying to multithread the following code and I just can't seem to get it working.

以下代码(我删除了其中的大部分代码仅用于说明目的)目前运行顺利,但速度较慢(对于 3600 条推文的列表,大约需要 5 分钟).

The following code (of which I removed most of the code just for illustrative purposes) currently works smoothly, but slowly (approximately 5 minutes for a list of 3600 tweets).

import dataset
import datetime
import json

with open("postgresConnecString.txt", 'r') as f:
    DB_CONNECTIONSTRING = f.readline()
DB = dataset.connect(DB_CONNECTIONSTRING)

def load_tweet(tweet, tweets_saved):
    """Takes a tweet (dictionary) and upserts its contents to a PostgreSQL database"""
    try:
        data = {'tweet_id': tweet['tweet_id',
                'tweet_json': json.dumps(tweet)} # Dictionary that contains the data I need from the tweet
        DB['tweets'].upsert(data, ['tweet_id'])
        tweets_saved += 1
        if tweets_saved % 100 == 0:
            print('Saved ' + str(tweets_saved) + ' tweets')
        return tweets_saved
    except KeyError:
        return tweets_saved

if __name__ == "__main__":
    tweets['tweet1', 'tweet2']
    for tweet in tweets:
        tweets_saved = load_tweet(tweet, tweets_saved)

因此,我正在寻找一个选项来执行此多线程.但是,我还没有找到一种方法:

As such, I was looking for an option to do this multithreaded. However, I have not yet found a way in which I can:

  • 多线程提取过程;
  • 每 100、500 或 1000 条推文打印一个计数器;

浏览本教程 还没有让我理解这样做:每个线程一个类的概念,我需要在类中定制什么,实现一个队列,此刻我有很多需要掌握;我只是半开始.

Going through this tutorial hasn't given me the understanding to do this yet: the concepts of a class for each thread, what I need to customize in the class and implementing a Queue at the moment is a lot for me to grasp at the moment; I'm only semi-starting out.

  • 有人可以提供有关如何使用多线程合并上述脚本的提示吗?
  • 我应该使用多少线程?Python 目前在运行脚本时使用了大约 1% 的 CPU 和大约 10% 的 RAM(我的 system规格)
  • 如何处理递增计数器(使用 Lock()?),并在达到计数器 % 100 时打印?

根据要求:这里是分析器结果中的大人物(使用 dataset.upsert):

As requested: here are the big shots from the profiler result (with dataset.upsert):

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    5898  245.133    0.042  245.133    0.042 :0(_connect)
    5898   12.137    0.002   12.206    0.002 :0(execute)

这是第二次尝试使用dataset.insert"而不是dataset.upsert":

Here is a second try with 'dataset.insert' instead of 'dataset.upsert':

1386332 function calls (1382960 primitive calls) in 137.255 seconds

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
  2955  122.646    0.042  122.646    0.042 :0             (_connect)

最后(绝对不是最不重要的),这是运行原始 psycopg2 代码的时机.

Last (and definitely not least), here's the timing when running raw psycopg2 code.

63694 function calls (63680 primitive calls) in 2.203 seconds

总结,不要使用数据集来提高性能(尽管编写 psycopg2 代码花了我 10 分钟,这是 >> 10 秒的数据集.upsert)

Concluding, don't use dataset for performance (though writing the psycopg2 code took me 10 minutes which is >> the 10 seconds for a dataset.upsert)

  • 现在,至于最初的问题.通过多线程,我能减少每个文件大约 2 秒的时间吗?怎么样?

可以在此处找到完整代码

推荐答案

几个可以改进的地方:

在单个事务上运行整个批次.使用事务意味着数据库不需要在每次写入时实际提交(将数据写入磁盘),而是可以在内存中缓冲未提交的数据.这通常会导致更有效的资源使用.

Run the whole batch on a single transaction. Using transaction means the database are not required to actually commit (write the data to disk) on every single writes, but rather it can buffer uncommitted data on memory. This usually leads to more efficient resource usage.

在 tweet_id 上添加唯一索引.如果没有唯一索引,您可能会强制数据库对每个 upsert 进行顺序扫描,这导致批量 upsert 按 O(n**2) 进行扩展.

Add a unique index over tweet_id. Without a unique index, you may be forcing the database to do a sequential scan on every upserts, which leads bulk upsert to scale by O(n**2).

拆分插入和更新,尽可能使用 .insert_many() 而不是 .upsert().在执行批量 upsert 之前,您执行预检查询以找出存在于数据库和推文列表中的 tweet_id 列表.使用 .insert_many() 插入数据库中尚不存在的项目,使用 .update() 插入已存在的项目.

Split the insert and updates, use .insert_many() whenever you can rather than .upsert(). Before doing the bulk upsert, you do a preflight query to find out the list of tweet_ids that exists in both the database and your list of tweets. Use .insert_many() to insert items that don't already exists in the database and plain .update() for those that already exists.

这篇关于如何多线程从列表中读取字典并进入数据库的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆