为什么我的数据插入在我的Cassandra数据库有时稳定,有时慢? [英] Why is my data insertion in my Cassandra database sometimes stable and sometimes slow?

查看:1669
本文介绍了为什么我的数据插入在我的Cassandra数据库有时稳定,有时慢?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我的查询,如果Cassandra数据库中存在或不存在当前数据ID:

This is my query if the current data ID is present or absent in the Cassandra database:

row = session.execute("SELECT * FROM articles where id = %s", [id]) 

在Kafka中解析的消息,然后确定此消息是否存在于Cassandra数据库中,如果它不存在,那么它应该执行插入操作,如果它存在,则不应该插入数据。 p>

Resolved messages in Kafka, then determine whether or not this message exists in the Cassandra database if it does not exist, then it should perform an insert operation, if it does exist, it should not be inserted in the data.

messages = consumer.get_messages(count=25)

if len(messages) == 0:
    print 'IDLE'
    sleep(1)
    continue

for message in messages:
    try:
        message = json.loads(message.message.value)
        data = message['data']
        if data:
            for article in data:
                source = article['source']
                id = article['id']
                title = article['title']
                thumbnail = article['thumbnail']
                #url = article['url']
                text = article['text']
                print article['created_at'],type(article['created_at'])
                created_at = parse(article['created_at'])
                last_crawled = article['last_crawled']
                channel = article['channel']#userid
                category = article['category']
                #scheduled_for = created_at.replace(minute=created_at.minute + 5, second=0, microsecond=0)
                scheduled_for=(datetime.utcnow() + timedelta(minutes=5)).replace(second=0, microsecond=0)
                row = session.execute("SELECT * FROM articles where id = %s", [id])
                if len(list(row))==0:
                #id parse base62
                    ids = [id[0:2],id[2:9],id[9:16]]
                    idstr=''
                    for argv in ids:
                        num = int(argv)
                        idstr=idstr+encode(num)
                    url='http://weibo.com/%s/%s?type=comment' % (channel,idstr)
                    session.execute("INSERT INTO articles(source, id, title,thumbnail, url, text, created_at, last_crawled,channel,category) VALUES (%s,%s, %s, %s, %s, %s, %s, %s, %s, %s)", (source, id, title,thumbnail, url, text, created_at, scheduled_for,channel,category))
                    session.execute("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (%s, %s, %s,%s) USING TTL 86400", (source,'article', scheduled_for, id))
                    log.info('%s %s %s %s %s %s %s %s %s %s' % (source, id, title,thumbnail, url, text, created_at, scheduled_for,channel,category))

    except Exception, e:
        log.exception(e)
        #log.info('error %s %s' % (message['url'],body))
        print e
        continue

我有一个ID只有一个唯一的表行,我想这样。一旦我为唯一ID添加不同的scheduled_for次,我的系统崩溃。添加这个如果len(list(row))== 0:是正确的想法,但我的系统是非常慢之后。

I have one ID which only has one unique table row, which I want to be like this. As soon as I add different scheduled_for times for the unique ID my system crashes. Add this if len(list(row))==0: is the right thought but my system is very slow after that.

这是我的表描述:

DROP TABLE IF EXISTS schedules;

CREATE TABLE schedules (
 source text,
 type text,
 scheduled_for timestamp,
 id text,
 PRIMARY KEY (source, type, scheduled_for, id)
);

此scheduled_for可更改。这里也有一个具体的例子:

This scheduled_for is changeable. Here is also a concrete example:

Hao article 2016-01-12 02:09:00+0800 3930462206848285
Hao article 2016-01-12 03:09:00+0801 3930462206848285
Hao article 2016-01-12 04:09:00+0802 3930462206848285
Hao article 2016-01-12 05:09:00+0803 3930462206848285

这是我的文章CQL模式:

Here is my article CQL schema:

CREATE TABLE crawler.articles (
    source text,
    created_at timestamp,
    id text,
    category text,
    channel text,
    last_crawled timestamp,
    text text,
    thumbnail text,
    title text,
    url text,
    PRIMARY KEY (source, created_at, id)
) WITH CLUSTERING ORDER BY (created_at DESC, id ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"ALL"}'
AND comment = ''
AND compaction = {'sstable_size_in_mb': '160', 'enabled': 'true', 'unchecked_tombstone_compaction': 'false', 'tombstone_compaction_interval': '86400', 'tombstone_threshold': '0.2', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 604800
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';

CREATE INDEX articles_id_idx ON crawler.articles (id);
CREATE INDEX articles_url_idx ON crawler.articles (url);


推荐答案

查看您的SCHEMA及其使用方式可以假定ID字段的辅助索引正在创建问题并减慢查询速度。您可以查看更多详细信息,了解为什么次要索引在许多地方只是Google搜索时有问题(此源代码是一个好的开始,也是 DataStax文档页< a>)。基本上当你在5节点集群中使用辅助索引时,你必须命中每个节点以找到你正在寻找的项目,当使用主键时,每个节点知道哪个节点保存数据。

Looking at your SCHEMA and the way you use it I could assume that secondary index on ID field is creating problems and slowing down queries. You can check more details why secondary indexes are bad on many places just googling it (this source is a good start, also DataStax documentation page). Basically when you use secondary index in 5 node cluster you must hit each node to find item you are looking for and when using primary key each node knows which node holds data.

如果您使用具有高基数的数据(添加更多项目时性能会下降),并且使用每篇文章不同的ID,二级索引尤其糟糕。当你使用低基数时,它们是确定的,例如一周的星期几索引一些数据(你knwo一周只有7天,所以你可以预测索引表的大小)或类别,如果你有有限数量的类别。

Secondary indexes are particularly bad if you use data with high cardinality (performance drops when you add more items) and you use ID which is different for each article. They are ok when you use low cardinality such as index some data by days of week (you knwo there will be only 7 days in a week so you can predict size of index table) or category in your case if you have finite number of categories.

我建议再创建一个表, article_by_id ,这将是你的文章表的反向索引。您可以使用轻量级事务,然后执行 INSERT ... IF NOT EXISTS 首先到该表,如果操作返回 true (意味着插入通过,因此记录不存在以前),您可以对文章表执行常规INSERT,如果返回 false (表示数据未插入,已存在),您可以跳过INSERT到文章表。

I would advice to create one more table, article_by_id which will be reverse index to your article table. You can use Lightweight Transaction and do INSERT ... IF NOT EXISTS first to that table and if operation returns true (meaning insert went through so record was not present previously) you can do regular INSERT to your articles table and if it return false (meaning data was not inserted because it already exists) you can skip INSERT to articles table.

这里是表(我建议使用UUID的文本ID,但是我基于您的文章表创建了表):

Here is table (I would suggest to use UUID instead of text for ID but I created table based on your article table):

CREATE TABLE article_by_id (
    id text,
    source text,
    created_at timestamp,
    PRIMARY KEY (id)
) WITH comment = 'Article by id.';

这样,你总是可以根据ID来查找你的密钥的所有部分。如果ID是你的输入参数,从这个表中选择将给你的来源和created_at。

This way you can always find all parts of your key based on just ID. If ID is your input parameter selecting from this table will give you source and created_at.

这里是插入查询将返回true或false:

Here is insert query which will return true or false:

INSERT INTO article_by_id(id, source, created_at) VALUES (%s,%s, %s) IF NOT EXISTS; 

更多提示,如果您可以根据实体中的一些不可更改的数据,不需要第二表。例如,如果source和created_at唯一标识系统中的文章,并且从不更改,您可以删除id并使用原始表。

And more tip, if you can find key based on some non changeable data in your entity than you do not need second table. In example if source and created_at uniquely identifies article in your system and never change you can remove id and use your original table.

这篇关于为什么我的数据插入在我的Cassandra数据库有时稳定,有时慢?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆