Cassandra集群插入性能和插入稳定性差 [英] Cassandra cluster with bad insert performance and insert stability

查看:28
本文介绍了Cassandra集群插入性能和插入稳定性差的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我必须每秒为每个客户端存储大约 250 个数值,即每小时大约 90 万个数字.它可能不会是一整天的记录(可能每天 5-10 小时),但我会根据客户端 ID 和读取的日期对我的数据进行分区.最大行长度约为 22-23M,这仍然是可管理的.尽管如此,我的方案是这样的:

I have to store around 250 numerical values per second, per client, which is around 900k numbers per hour. It probably will not be a full-day recording (probably between 5-10 hours a day), but I will partition my data based on the client id and the day the reading is made. The maximum row length comes at about 22-23M which is still manageable. Neverteless, my scheme looks like this:

CREATE TABLE measurement (
  clientid text,
  date text,
  event_time timestamp,
  value int,
  PRIMARY KEY ((clientid,date), event_time)
);

keyspace 的复制因子为 2,只是为了测试,snitch 是 GossipingPropertyFileSnitchNetworkTopologyStrategy.我知道复制因子 3 更符合生产标准.

The keyspace has a replication factor of 2, just for testing, the snitch is GossipingPropertyFileSnitch and NetworkTopologyStrategy. I know that replication factor 3 is more production standard.

接下来,我在公司服务器上创建了一个小集群,三个裸机虚拟机,2 个 CPU x 2 个内核和 16GB 的 RAM 和大量空间.我和他们在千兆局域网中.集群是可操作的,基于 nodetool.

Next up, I created a small cluster on the companies servers, three bare metal virtualized machines with 2 CPUs x 2 cores and 16GB of RAM and a lot of space. I'm in gigabit LAN with them. The cluster is operational, based on the nodetool.

这是我用来测试我的设置的代码:

Here is the code I'm using to test my setup:

        Cluster cluster = Cluster.builder()
                .addContactPoint("192.168.1.100")
                .addContactPoint("192.168.1.102")
                .build();
        Session session = cluster.connect();
        DateTime time = DateTime.now();
        BlockingQueue<BatchStatement> queryQueue = new ArrayBlockingQueue(50, true);

    try {

        ExecutorService pool = Executors.newFixedThreadPool(15); //changed the pool size also to throttle inserts

        String insertQuery = "insert into keyspace.measurement (clientid,date,event_time,value) values (?, ?, ?, ?)";
        PreparedStatement preparedStatement = session.prepare(insertQuery);
        BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED); //tried with unlogged also

        //generating the entries
        for (int i = 0; i < 900000; i++) { //900000 entries is an hour worth of measurements
            time = time.plus(4); //4ms between each entry
            BoundStatement bound = preparedStatement.bind("1", "2014-01-01", time.toDate(), 1); //value not important
            batch.add(bound);

            //The batch statement must have 65535 statements at most
            if (batch.size() >= 65534) {
                queryQueue.put(batch);
                batch = new BatchStatement();
            }
        }
        queryQueue.put(batch); //the last batch, perhaps shorter than 65535

        //storing the data
        System.out.println("Starting storing");
        while (!queryQueue.isEmpty()) {
            pool.execute(() -> {
                try {

                    long threadId = Thread.currentThread().getId();
                    System.out.println("Started: " + threadId);
                    BatchStatement statement = queryQueue.take();
                    long start2 = System.currentTimeMillis();
                    session.execute(statement);
                    System.out.println("Finished " + threadId + ": " + (System.currentTimeMillis() - start2));
                } catch (Exception ex) {
                    System.out.println(ex.toString());
                }
            });

        }
        pool.shutdown();
        pool.awaitTermination(120,TimeUnit.SECONDS);


    } catch (Exception ex) {
        System.out.println(ex.toString());
    } finally {
        session.close();
        cluster.close();
    }

我通过阅读此处以及其他博客和网站上的帖子想出了代码.据我了解,客户端使用多线程很重要,这就是我这样做的原因.我也尝试使用异步操作.

I came up with the code by reading posts here and on other blogs and websites. As I understood it is important for the client to use multiple threads, that's why I have done this. I also tried using async operations.

最重要的结果是,无论我使用哪种方法,一个批次都会在 5-6 秒内执行,尽管可能需要长达 10 秒.如果我只输入一个批次(因此,只有 ~65k列)或者如果我使用愚蠢的单线程应用程序.老实说,我期待更多.特别是因为我的笔记本电脑在本地实例上获得了或多或少相似的性能.

The bottom line result is this, no matter which approach I use, one batch executes in 5-6 seconds, although it might take up to 10. It takes the same if I enter just one batch (so, only ~65k columns) or if I use a dumb single thread application. Honestly, I expected a bit more. Especially since I get more or less similar performance on my laptop with a local instance.

第二个可能更重要的问题是我以不可预测的方式面临的例外情况.这两个:

The second, maybe more important issue, are the exceptions I am facing in an unpredictable manner. These two:

com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra在一致性为 ONE 的写入查询期间超时(需要 1 个副本但只有 0 个确认写入)

com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency ONE (1 replica were required but only 0 acknowledged the write)

com.datastax.driver.core.exceptions.NoHostAvailableException:全部尝试查询的主机失败(尝试:/192.168.1.102:9042(com.datastax.driver.core.TransportException: [/192.168.1.102:9042]连接已关闭),/192.168.1.100:9042(com.datastax.driver.core.TransportException: [/192.168.1.100:9042]连接已关闭),/192.168.1.101:9042(com.datastax.driver.core.TransportException: [/192.168.1.101:9042]连接已关闭))

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.1.102:9042 (com.datastax.driver.core.TransportException: [/192.168.1.102:9042] Connection has been closed), /192.168.1.100:9042 (com.datastax.driver.core.TransportException: [/192.168.1.100:9042] Connection has been closed), /192.168.1.101:9042 (com.datastax.driver.core.TransportException: [/192.168.1.101:9042] Connection has been closed))

归根结底,我做错了什么吗?我应该重新组织加载数据的方式,还是更改方案.我尝试减少行长度(所以我有 12 小时的行),但这并没有太大的不同.

In the bottom line, am I doing something wrong? Should I reorganize the way I load data, or change the scheme. I tried reducing the row length (so I have 12-hour rows) but that didn't make a big difference.

==============================更新:

============================== Update:

我很粗鲁,在回答问题后忘记粘贴我使用的代码示例.它运行得相当好,但是我正在继续研究 KairosDB 和使用 Astyanax 进行二进制传输.看起来我可以通过 CQL 获得更好的性能,尽管 KairosDB 在过载时可能会遇到一些问题(但我正在处理它)并且 Astyanax 对于我的口味来说有点冗长.尽管如此,这是代码,我可能在某处弄错了.

I was rude and forgot to paste an example of the code I used after the question was answered. It works reasonably well, however I'm continuing my research with KairosDB and binary transfer with Astyanax. It looks like I can get much better performance with them over CQL, although KairosDB can have some issues when it is in overload (but I'm working on it) and Astyanax is a bit verbose to use for my taste. Nevertheless, here is the code, I'm maybe mistaken somewhere.

信号量槽数在超过 5000 时对性能没有影响,几乎是恒定的.

The semaphore slot number has no effect on performance when going above 5000, its almost constant.

String insertQuery = "insert into keyspace.measurement     (userid,time_by_hour,time,value) values (?, ?, ?, ?)";
        PreparedStatement preparedStatement =     session.prepare(insertQuery);
        Semaphore semaphore = new Semaphore(15000);

    System.out.println("Starting " + Thread.currentThread().getId());
    DateTime time = DateTime.parse("2015-01-05T12:00:00");
    //generating the entries
    long start = System.currentTimeMillis();

    for (int i = 0; i < 900000; i++) { 

        BoundStatement statement = preparedStatement.bind("User1", "2015-01-05:" + time.hourOfDay().get(), time.toDate(), 500); //value not important
        semaphore.acquire();
        ResultSetFuture resultSetFuture = session.executeAsync(statement);
        Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
            @Override
            public void onSuccess(@Nullable com.datastax.driver.core.ResultSet resultSet) {

                semaphore.release();
            }

            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("Error: " + throwable.toString());
                semaphore.release();
            }
        });
        time = time.plus(4); //4ms between each entry
    }

推荐答案

使用无日志批处理的结果是什么?您确定要使用批处理语句吗?https://medium.com/@foundev/cassandra-batch-loading-without-the-batch-keyword-40f00e35e23e

What are your results using unlogged batching? Are you sure you want to use batch statements at all? https://medium.com/@foundev/cassandra-batch-loading-without-the-batch-keyword-40f00e35e23e

这篇关于Cassandra集群插入性能和插入稳定性差的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆