对大(ish)数据集的慢django数据库操作。 [英] Slow django database operations on large (ish) dataset.

查看:194
本文介绍了对大(ish)数据集的慢django数据库操作。的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我设置了一个系统来过滤Twitter的实时流示例。显然,数据库写入速度太慢,无法跟上比几个小批量关键字更复杂的东西。我实现了django-rq作为一个简单的排队系统,将推特推送到一个基于redis的队列进来,这是非常好的。我的问题是在另一边。这个问题的背景是我现在有一个运行的系统,用于分析的是150万个tweets,另有375,000个通过redis排队。按照目前的表现,如果我关闭流量,我不需要我〜3天的时间赶上。如果我保持流,那么大概需要一个月的时间。



数据库现在有两个主表的数百万行,写入速度非常慢。最优员工人数似乎是4人,平均每秒1.6次排队任务。 (下面排队的代码)。我以为也许这个问题是为每个新的队列任务打开数据库连接,所以把CONN_MAX_AGE设置为60,但这并没有改善。刚刚在本地主机上测试了这一点,我在Macbook 2011上,Chromebook等上运行了超过13次写入/秒,但只有几个该数据库中的一千行,这导致我相信它的大小相关。有几个 get_or_create 命令,我正在使用(见下文),这可能会减慢事情,但是看不到任何其他方式通过使用它们 - 我需要检查用户是否存在,并且我需要检查是否已经存在该tweet(可能的是,我怀疑,将后者移动到一个尝试/除外,因为来自直播流的推文不应该已经存在,显而易见的原因)。我会从中获得更多的绩效收益吗?由于这是运行状态,我渴望优化代码,并获得一些更快/更有效的工作在那里,所以我可以赶上!会运行一个预审工作人员批量上班吗? (即所以我可以批量创建不存在的用户,或类似的东西?)



我在数字海洋上运行4核/ 8Gb Ram液滴,所以感觉这是一个非常可怕的表现,大概是代码相关的。我在哪里错了?

(我已经发布在这里,而不是代码审查,因为我认为这与Q&一个SO的格式,因为我正在尝试解决一个特定的代码问题,而不是我该怎么做一般会更好?)



注意: 我正在django 1.6工作,因为这是我已经浮动了一段时间的代码,当时并不信任升级 - 这不是公开的,所以除非现在有一个引人注目的原因喜欢这个性能问题),我不会升级(对于这个项目)。



流监听器

  class StdOutListener(tweepy.StreamListener):
def on_data(self,data):
#Twitter以JSON格式返回数据 - 我们需要先解码它
decoded = json.loads(data)
#print类型(解码),解码
#另外,我们转换UTF-8 ASCII忽略用户发送的所有坏字符
尝试:
如果解码['lang'] =='en':
django_rq.enqueue(read_both,decoding)
其他:
pass
除了KeyError,e:
打印Key on Key,e
除了DataError,e:
打印DataError,e
返回

def on_error(self,status):
打印状态

读取User / Tweet / Both

  def read_user(tweet):
from harvester.models import来自django.core.exceptions import的用户
Obj ectDoesNotExist,MultipleObjectsReturned
#We可能会得到奇怪的结果,用户已经改变了他们的细节],所以首先我们检查UID。
#printMULTIPLE USER DEBUG,tweet [user] [id_str]
try:
current_user = User.objects.get(id_str = tweet [user] [ id_str])
created = False
返回current_user,创建
除了ObjectDoesNotExist:
pass
除了MultipleObjectsReturned:
current_user = User.objects.filter (id_str = tweet [user] [id_str])[0]
返回current_user,False
如果没有tweet [user] [follow_request_sent]:
tweet [ user] [follow_request_sent] = False
如果没有tweet [user] [following]:
tweet [user] [following] = False
如果没有tweet [user] [description]:
tweet [user] [description] =
如果没有tweet [user] [notifications]:
tweet [user] [notifications] = False

#如果不工作],那么我们将使用get_or_create(作为故障恢复而不是save() )
from dateutil.parser import parse
i f不tweet [user] [contribut_enabled]:
current_user,created = User.objects.get_or_create(
follow_request_sent = tweet [user] [follow_request_sent],
_json = {},
verified = tweet [user] [verified],
followers_count = tweet [user] [followers_count],
profile_image_url_https = tweet [用户] [profile_image_url_https],
id_str = tweet [user] [id_str],
listed_count = tweet [user] [listed_count],
utc_offset = tweet [user] [utc_offset],
statuses_count = tweet [user] [statuses_count],
description = tweet [user] [description],
friends_count = tweet [user] [friends_count],
location = tweet [user] [location],
profile_image_url = tweet [user] [profile_image_url ],
以下= tweet [user] [following],
geo_enable d = tweet [user] [geo_enabled],
profile_background_image_url = tweet [user] [profile_background_image_url],
screen_name = tweet [user] [screen_name],
lang = tweet [user] [lang],
profile_background_tile = tweet [user] [profile_background_tile],
favourites_count = tweet [user] [ favourites_count],
name = tweet [user] [name],
notifications = tweet [user] [notifications],
url = tweet [user ] [url],
created_at = parse(tweet [user] [created_at]),
contribut_enabled = False,
time_zone = tweet [user] [ time_zone],
protected = tweet [user] [protected],
default_profile = tweet [user] [default_profile],
is_translator = tweet [ user] [is_translator]

else:
current_user,created = User.objects.get_or_cre ate(
follow_request_sent = tweet [user] [follow_request_sent],
_json = {},
verified = tweet [user] [verified],
followers_count = tweet [user] [followers_count],
profile_image_url_https = tweet [user] [profile_image_url_https],
id_str = tweet [user] [id_str] ,
listed_count = tweet [user] [listed_count],
utc_offset = tweet [user] [utc_offset],
statuses_count = tweet [user] [ statuses_count],
description = tweet [user] [description],
friends_count = tweet [user] [friends_count],
location = tweet [ user] [location],
profile_image_url = tweet [user] [profile_image_url],
following = tweet [user] [following],
geo_enabled = tweet [user] [geo_enabled],
profile_background_image_url = tweet [user] [profile_b ackground_image_url],
screen_name = tweet [user] [screen_name],
lang = tweet [user] [lang],
profile_background_tile = tweet [user ] [profile_background_tile],
favourites_count = tweet [user] [favourites_count],
name = tweet [user] [name],
notifications = tweet [user] [notifications],
url = tweet [user] [url],
created_at = parse(tweet [user] [created_at]) ,
contribut_enabled = tweet [user] [contributers_enabled],
time_zone = tweet [user] [time_zone],
protected = tweet [user] [ protected],
default_profile = tweet [user] [default_profile],
is_translator = tweet [user] [is_translator]

#打印CURRENT USER:],type(current_user)],current_user
#current_user],created = User.objects.get_or_create(current_user)
return current_user,创建

def read_tweet(tweet,current_user):
import logging
logger = logging.getLogger('django')
from datetime import date,datetime
#printinside read_Tweet
from harvester.models import Tweet
from django.core.exceptions import ObjectDoesNotExist,MultipleObjectsReturned
from django.db import DataError
#我们可能会得到奇怪的结果,用户已经更改了他们的细节],所以首先我们检查UID。
#print tweet_data [created_at]
from dateutil.parser import parse
tweet [created_at] = parse(tweet [created_at])
try:
#print尝试tweet_data [id
current_tweet = Tweet.objects.get(id_str = tweet [id_str])
created = False
返回current_user,创建
除ObjectDoesNotExist外:
pass
除了MultipleObjectsReturned:
current_tweet = Tweet.objects.filter(id_str = tweet [id_str])[0]
try:
current_tweet,created = Tweet.objects.get_or_create(
truncated = tweet [truncated],
text = tweet [text],
favorite_count = tweet [favorite_count],
author = current_user,
_json = {},
source = tweet [source],
retweeted = tweet [retweeted],
coordinates = tweet [ coordinates],
entities = tweet [entities],
in_reply_to_screen_name = tweet [i n_reply_to_screen_name],
id_str = tweet [id_str],
retweet_count = tweet [retweet_count],
favorited = tweet [favorited],
user = tweet [user],
geo = tweet [geo],
in_reply_to_user_id_str = tweet [in_reply_to_user_id_str],
lang = tweet [lang],
created_at = tweet [created_at],
place = tweet [place])
打印DEBUG,current_user,current_tweet
返回current_tweet,创建
除了DataError,e:
#Catchall拿起未解析的tweets
打印DEBUG ERROR,e,tweet
return None,False

def read_both(tweet):
current_user,created = read_user(tweet)
current_tweet,created = read_tweet(tweet,current_user)


解决方案

我最终成功地拼凑了一些重要的答案和一些其他的东西。



从根本上说,尽管我在id_str字段上进行了双重查找,但没有编入索引。我在 read_tweet read_user db_index = True $ c>,并移动读取推文尝试/除了 Tweet.objects.create 方法,如果有问题回落到get_or_create,并看到一个50-60倍的速度改进,现在工人的可扩展性 - 如果我增加10个工人,我得到10倍的速度。



我目前有一个工作人员高兴地处理了6个左右的tweets。接下来,我将添加一个监视守护程序来检查队列大小,并添加额外的工作人员,如果它仍在增加。



tl; dr - 记住索引!


I set up a system to filter the twitter real time stream sample. Obviously, the database writes are too slow to keep up with anything more complex than a couple of low-volume keywords. I implemented django-rq as a simple queuing system to push the tweets off into a redis based queue as they came in, and that works great. My issue is on the other side. The context to this question is I have a system that's running right now, with 1.5m tweets for analysis, and another 375,000 queued through redis. At current rates of performance, it's going to take me ~3 days to catch up, if I turn off the streams, which I don't want to. If I maintain the streams, then it'll take about a month, on my last estimates.

The database has a couple of million rows across two main tables now, and the writes are very slow. The optimal number of rq-workers seems to be four, and that's averaging out at 1.6 queue tasks per second. (Code of what's being enqueued below). I thought that maybe the issue was the opening of DB connections for every new queue task, so put CONN_MAX_AGE to 60, but that hasn't improved anything.

Having just tested this on localhost, I got in excess of 13 writes/second, on a Macbook 2011, with Chrome, etc etc running, but there are only a few thousand rows in that database, which leads me to believe it's size related. There are a couple of get_or_create commands I'm using (see below), which could be slowing things down, but can't see any other way through using them - I need to check if the user exists, and I need to check if the tweet already exists (I could possibly, I suspect, move the latter to a try/except, on the basis that tweets coming in from the live stream shouldn't already exist, for obvious reasons.) Would I get much performance gain out of that? As this is running still, I'm keen to optimise the code a bit and get some faster/more efficient workers in there so I can catch up! Would running a pre-vetting worker to batch things up work? (i.e. so I can batch create users that don't exist, or something similar?)

I"m running a 4 Core/8Gb Ram droplet on digital ocean, so feel this is some pretty terrible performance, and presumably code related. Where am I going wrong here?
(I've posted this here rather than code-review, as I think this is relevant to the Q&A format for SO, as I'm trying to solve a specific code problem, rather than 'how can I do this generally better?')

Note: I'm working in django 1.6 as this is code that I've had floating around for a while and wasn't confident about upgrading at the time - it's not public facing, so unless there's a compelling reason right now (like this performance issue), I wasn't going to upgrade (for this project).

Stream Listener:

class StdOutListener(tweepy.StreamListener):
            def on_data(self, data):
                # Twitter returns data in JSON format - we need to decode it first
                decoded = json.loads(data)
                #print type(decoded), decoded
                # Also, we convert UTF-8 to ASCII ignoring all bad characters sent by users
                try:
                    if decoded['lang'] == 'en':
                        django_rq.enqueue(read_both, decoded)
                    else:
                        pass
                except KeyError,e:
                    print "Error on Key", e
                except DataError, e:
                    print "DataError", e
                return True

            def on_error(self, status):
                print status

Read User/Tweet/Both

def read_user(tweet):
    from harvester.models import User
    from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
    #We might get weird results where user has changed their details"], so first we check the UID.
    #print "MULTIPLE USER DEBUG", tweet["user"]["id_str"]
    try:
        current_user = User.objects.get(id_str=tweet["user"]["id_str"])
        created=False
        return current_user, created
    except ObjectDoesNotExist:
        pass
    except MultipleObjectsReturned:
        current_user = User.objects.filter(id_str=tweet["user"]["id_str"])[0]
        return current_user, False
    if not tweet["user"]["follow_request_sent"]:
        tweet["user"]["follow_request_sent"] = False
    if not tweet["user"]["following"]:
        tweet["user"]["following"] = False
    if not tweet["user"]["description"]:
        tweet["user"]["description"] = " "
    if not tweet["user"]["notifications"]:
        tweet["user"]["notifications"] = False

    #If that doesn't work"], then we'll use get_or_create (as a failback rather than save())
    from dateutil.parser import parse
    if not tweet["user"]["contributors_enabled"]:
        current_user, created = User.objects.get_or_create(
            follow_request_sent=tweet["user"]["follow_request_sent"],
            _json = {},
            verified = tweet["user"]["verified"],
            followers_count = tweet["user"]["followers_count"],
            profile_image_url_https = tweet["user"]["profile_image_url_https"],
            id_str = tweet["user"]["id_str"],
            listed_count = tweet["user"]["listed_count"],
            utc_offset = tweet["user"]["utc_offset"],
            statuses_count = tweet["user"]["statuses_count"],
            description = tweet["user"]["description"],
            friends_count = tweet["user"]["friends_count"],
            location = tweet["user"]["location"],
            profile_image_url= tweet["user"]["profile_image_url"],
            following = tweet["user"]["following"],
            geo_enabled = tweet["user"]["geo_enabled"],
            profile_background_image_url =tweet["user"]["profile_background_image_url"],
            screen_name = tweet["user"]["screen_name"],
            lang =  tweet["user"]["lang"],
            profile_background_tile = tweet["user"]["profile_background_tile"],
            favourites_count = tweet["user"]["favourites_count"],
            name = tweet["user"]["name"],
            notifications = tweet["user"]["notifications"],
            url = tweet["user"]["url"],
            created_at = parse(tweet["user"]["created_at"]),
            contributors_enabled = False,
            time_zone = tweet["user"]["time_zone"],
            protected = tweet["user"]["protected"],
            default_profile = tweet["user"]["default_profile"],
            is_translator = tweet["user"]["is_translator"]
        )
    else:
        current_user, created = User.objects.get_or_create(
            follow_request_sent=tweet["user"]["follow_request_sent"],
            _json = {},
            verified = tweet["user"]["verified"],
            followers_count = tweet["user"]["followers_count"],
            profile_image_url_https = tweet["user"]["profile_image_url_https"],
            id_str = tweet["user"]["id_str"],
            listed_count = tweet["user"]["listed_count"],
            utc_offset = tweet["user"]["utc_offset"],
            statuses_count = tweet["user"]["statuses_count"],
            description = tweet["user"]["description"],
            friends_count = tweet["user"]["friends_count"],
            location = tweet["user"]["location"],
            profile_image_url= tweet["user"]["profile_image_url"],
            following = tweet["user"]["following"],
            geo_enabled = tweet["user"]["geo_enabled"],
            profile_background_image_url =tweet["user"]["profile_background_image_url"],
            screen_name = tweet["user"]["screen_name"],
            lang =  tweet["user"]["lang"],
            profile_background_tile = tweet["user"]["profile_background_tile"],
            favourites_count = tweet["user"]["favourites_count"],
            name = tweet["user"]["name"],
            notifications = tweet["user"]["notifications"],
            url = tweet["user"]["url"],
            created_at = parse(tweet["user"]["created_at"]),
            contributors_enabled = tweet["user"]["contributers_enabled"],
            time_zone = tweet["user"]["time_zone"],
            protected = tweet["user"]["protected"],
            default_profile = tweet["user"]["default_profile"],
            is_translator = tweet["user"]["is_translator"]
        )
    #print "CURRENT USER:""], type(current_user)"], current_user
    #current_user"], created = User.objects.get_or_create(current_user)
    return current_user, created

def read_tweet(tweet, current_user):
    import logging
    logger = logging.getLogger('django')
    from datetime import date, datetime
    #print "Inside read_Tweet"
    from harvester.models import Tweet
    from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
    from django.db import DataError
    #We might get weird results where user has changed their details"], so first we check the UID.
    #print tweet_data["created_at"]
    from dateutil.parser import parse
    tweet["created_at"] = parse(tweet["created_at"])
    try:
        #print "trying tweet_data["id"
        current_tweet =Tweet.objects.get(id_str=tweet["id_str"])
        created=False
        return current_user, created
    except ObjectDoesNotExist:
        pass
    except MultipleObjectsReturned:
        current_tweet =Tweet.objects.filter(id_str=tweet["id_str"])[0]
    try:
        current_tweet, created = Tweet.objects.get_or_create(
        truncated=tweet["truncated"],
        text=tweet["text"],
        favorite_count=tweet["favorite_count"],
        author = current_user,
        _json = {},
        source=tweet["source"],
        retweeted=tweet["retweeted"],
        coordinates = tweet["coordinates"],
        entities = tweet["entities"],
        in_reply_to_screen_name = tweet["in_reply_to_screen_name"],
        id_str = tweet["id_str"],
        retweet_count = tweet["retweet_count"],
        favorited = tweet["favorited"],
        user = tweet["user"],
        geo = tweet["geo"],
        in_reply_to_user_id_str = tweet["in_reply_to_user_id_str"],
        lang = tweet["lang"],
        created_at = tweet["created_at"],
        place = tweet["place"])
        print "DEBUG", current_user, current_tweet
        return current_tweet, created
    except DataError, e:
        #Catchall to pick up non-parsed tweets
        print "DEBUG ERROR", e, tweet
        return None, False

def read_both(tweet):
    current_user, created = read_user(tweet)
    current_tweet, created = read_tweet(tweet, current_user)

解决方案

I eventually managed to cobble together an answer from some redditors and a couple of other things.

Fundamentally, though I was doing a double lookup on the id_str field, which wasn't indexed. I added indexes db_index=True to that field on both read_tweet and read_user, and moved read tweet to a try/except Tweet.objects.create approach, falling back to the get_or_create if there's a problem, and saw a 50-60x speed improvement, with the workers now being scalable - if I add 10 workers, I get 10x speed.

I currently have one worker that's happily processing 6 or so tweets a second. Next up I'll add a monitoring daemon to check the queue size and add extra workers if it's still increasing.

tl;dr - REMEMBER INDEXING!

这篇关于对大(ish)数据集的慢django数据库操作。的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆