为什么我的 Cassandra 数据库中的数据插入有时稳定有时缓慢? [英] Why is my data insertion in my Cassandra database sometimes stable and sometimes slow?

查看:28
本文介绍了为什么我的 Cassandra 数据库中的数据插入有时稳定有时缓慢?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我的查询,如果当前数据 ID 在 Cassandra 数据库中存在或不存在:

This is my query if the current data ID is present or absent in the Cassandra database:

row = session.execute("SELECT * FROM articles where id = %s", [id]) 

在Kafka中解析消息,然后判断这条消息在Cassandra数据库中是否存在,如果不存在,则执行插入操作,如果存在,则不应该插入数据中.

Resolved messages in Kafka, then determine whether or not this message exists in the Cassandra database if it does not exist, then it should perform an insert operation, if it does exist, it should not be inserted in the data.

messages = consumer.get_messages(count=25)

if len(messages) == 0:
    print 'IDLE'
    sleep(1)
    continue

for message in messages:
    try:
        message = json.loads(message.message.value)
        data = message['data']
        if data:
            for article in data:
                source = article['source']
                id = article['id']
                title = article['title']
                thumbnail = article['thumbnail']
                #url = article['url']
                text = article['text']
                print article['created_at'],type(article['created_at'])
                created_at = parse(article['created_at'])
                last_crawled = article['last_crawled']
                channel = article['channel']#userid
                category = article['category']
                #scheduled_for = created_at.replace(minute=created_at.minute + 5, second=0, microsecond=0)
                scheduled_for=(datetime.utcnow() + timedelta(minutes=5)).replace(second=0, microsecond=0)
                row = session.execute("SELECT * FROM articles where id = %s", [id])
                if len(list(row))==0:
                #id parse base62
                    ids = [id[0:2],id[2:9],id[9:16]]
                    idstr=''
                    for argv in ids:
                        num = int(argv)
                        idstr=idstr+encode(num)
                    url='http://weibo.com/%s/%s?type=comment' % (channel,idstr)
                    session.execute("INSERT INTO articles(source, id, title,thumbnail, url, text, created_at, last_crawled,channel,category) VALUES (%s,%s, %s, %s, %s, %s, %s, %s, %s, %s)", (source, id, title,thumbnail, url, text, created_at, scheduled_for,channel,category))
                    session.execute("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (%s, %s, %s,%s) USING TTL 86400", (source,'article', scheduled_for, id))
                    log.info('%s %s %s %s %s %s %s %s %s %s' % (source, id, title,thumbnail, url, text, created_at, scheduled_for,channel,category))

    except Exception, e:
        log.exception(e)
        #log.info('error %s %s' % (message['url'],body))
        print e
        continue

我有一个 ID,它只有一个唯一的表格行,我想要这样.一旦我为唯一 ID 添加不同的 schedule_for 时间,我的系统就会崩溃.添加此 if len(list(row))==0: 是正确的想法,但此后我的系统非常慢.

I have one ID which only has one unique table row, which I want to be like this. As soon as I add different scheduled_for times for the unique ID my system crashes. Add this if len(list(row))==0: is the right thought but my system is very slow after that.

这是我的表描述:

DROP TABLE IF EXISTS schedules;

CREATE TABLE schedules (
 source text,
 type text,
 scheduled_for timestamp,
 id text,
 PRIMARY KEY (source, type, scheduled_for, id)
);

这个scheduled_for 是可变的.这里还有一个具体的例子:

This scheduled_for is changeable. Here is also a concrete example:

Hao article 2016-01-12 02:09:00+0800 3930462206848285
Hao article 2016-01-12 03:09:00+0801 3930462206848285
Hao article 2016-01-12 04:09:00+0802 3930462206848285
Hao article 2016-01-12 05:09:00+0803 3930462206848285

这是我的文章 CQL 架构:

Here is my article CQL schema:

CREATE TABLE crawler.articles (
    source text,
    created_at timestamp,
    id text,
    category text,
    channel text,
    last_crawled timestamp,
    text text,
    thumbnail text,
    title text,
    url text,
    PRIMARY KEY (source, created_at, id)
) WITH CLUSTERING ORDER BY (created_at DESC, id ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"ALL"}'
AND comment = ''
AND compaction = {'sstable_size_in_mb': '160', 'enabled': 'true', 'unchecked_tombstone_compaction': 'false', 'tombstone_compaction_interval': '86400', 'tombstone_threshold': '0.2', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 604800
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';

CREATE INDEX articles_id_idx ON crawler.articles (id);
CREATE INDEX articles_url_idx ON crawler.articles (url);

推荐答案

查看您的 SCHEMA 和您使用它的方式,我可以假设 ID 字段上的二级索引会产生问题并减慢查询速度.您可以通过谷歌搜索来查看为什么二级索引在很多地方都不好的更多细节(这个 source 是一个好的开始,也是 DataStax 文档页面).基本上,当您在 5 节点集群中使用二级索引时,您必须点击每个节点才能找到您要查找的项目,并且在使用主键时,每个节点都知道哪个节点保存数据.

Looking at your SCHEMA and the way you use it I could assume that secondary index on ID field is creating problems and slowing down queries. You can check more details why secondary indexes are bad on many places just googling it (this source is a good start, also DataStax documentation page). Basically when you use secondary index in 5 node cluster you must hit each node to find item you are looking for and when using primary key each node knows which node holds data.

如果您使用具有高基数的数据(添加更多项目时性能下降)并且您使用每篇文章不同的 ID,则二级索引尤其糟糕.当您使用低基数时它们是可以的,例如按周中的天数索引一些数据(您知道一周中只有 7 天,因此您可以预测索引表的大小)或类别(如果您的类别数量有限).

Secondary indexes are particularly bad if you use data with high cardinality (performance drops when you add more items) and you use ID which is different for each article. They are ok when you use low cardinality such as index some data by days of week (you knwo there will be only 7 days in a week so you can predict size of index table) or category in your case if you have finite number of categories.

我建议再创建一个表,article_by_id,它将是您文章表的反向索引.您可以使用 Lightweight Transaction 并执行 INSERT ... IF NOT EXISTS 首先到该表,如果操作返回 true(意味着插入已通过,因此记录以前不存在)您可以对您的 进行常规 INSERT文章表,如果它返回false(意味着数据没有插入,因为它已经存在)你可以跳过INSERT到articles表.

I would advice to create one more table, article_by_id which will be reverse index to your article table. You can use Lightweight Transaction and do INSERT ... IF NOT EXISTS first to that table and if operation returns true (meaning insert went through so record was not present previously) you can do regular INSERT to your articles table and if it return false (meaning data was not inserted because it already exists) you can skip INSERT to articles table.

这是表格(我建议使用 UUID 而不是文本作为 ID,但我根据您的文章表格创建了表格):

Here is table (I would suggest to use UUID instead of text for ID but I created table based on your article table):

CREATE TABLE article_by_id (
    id text,
    source text,
    created_at timestamp,
    PRIMARY KEY (id)
) WITH comment = 'Article by id.';

通过这种方式,您始终可以仅根据 ID 找到密钥的所有部分.如果 ID 是您从该表中选择的输入参数,则会为您提供 source 和 created_at.

This way you can always find all parts of your key based on just ID. If ID is your input parameter selecting from this table will give you source and created_at.

这是将返回 true 或 false 的插入查询:

Here is insert query which will return true or false:

INSERT INTO article_by_id(id, source, created_at) VALUES (%s,%s, %s) IF NOT EXISTS; 

还有更多提示,如果您可以根据实体中的一些不可更改的数据找到键,则不需要第二个表.例如,如果 source 和 created_at 在您的系统中唯一标识文章并且永远不会更改,您可以删除 id 并使用您的原始表.

And more tip, if you can find key based on some non changeable data in your entity than you do not need second table. In example if source and created_at uniquely identifies article in your system and never change you can remove id and use your original table.

这篇关于为什么我的 Cassandra 数据库中的数据插入有时稳定有时缓慢?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆