Cassandra簇具有差的插入性能和插入稳定性 [英] Cassandra cluster with bad insert performance and insert stability

查看:689
本文介绍了Cassandra簇具有差的插入性能和插入稳定性的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

每个客户端每秒必须存储大约250个数字值,每小时大约900k个数字。它可能不是一个全天的记录(大概在一天5-10小时之间),但我将根据客户端ID和读取日期划分我的数据。最大行长度达到约22-23M,这仍然是可管理的。无计划,我的计划看起来像这样:

I have to store around 250 numerical values per second, per client, which is around 900k numbers per hour. It probably will not be a full-day recording (probably between 5-10 hours a day), but I will partition my data based on the client id and the day the reading is made. The maximum row length comes at about 22-23M which is still manageable. Neverteless, my scheme looks like this:

CREATE TABLE measurement (
  clientid text,
  date text,
  event_time timestamp,
  value int,
  PRIMARY KEY ((clientid,date), event_time)
);

键空间的复制因子为2,仅用于测试,信号为 GossipingPropertyFileSnitch NetworkTopologyStrategy 。我知道复制因子3是更多的生产标准。

The keyspace has a replication factor of 2, just for testing, the snitch is GossipingPropertyFileSnitch and NetworkTopologyStrategy. I know that replication factor 3 is more production standard.

接下来,我在公司服务器上创建了一个小集群,三个裸机虚拟化机器有2个CPU x 2个核心和16GB的RAM和大量的空间。我和他们在千兆LAN。集群是可操作的,基于nodetool。

Next up, I created a small cluster on the companies servers, three bare metal virtualized machines with 2 CPUs x 2 cores and 16GB of RAM and a lot of space. I'm in gigabit LAN with them. The cluster is operational, based on the nodetool.

这是我用来测试我的设置的代码:

Here is the code I'm using to test my setup:

        Cluster cluster = Cluster.builder()
                .addContactPoint("192.168.1.100")
                .addContactPoint("192.168.1.102")
                .build();
        Session session = cluster.connect();
        DateTime time = DateTime.now();
        BlockingQueue<BatchStatement> queryQueue = new ArrayBlockingQueue(50, true);

    try {

        ExecutorService pool = Executors.newFixedThreadPool(15); //changed the pool size also to throttle inserts

        String insertQuery = "insert into keyspace.measurement (clientid,date,event_time,value) values (?, ?, ?, ?)";
        PreparedStatement preparedStatement = session.prepare(insertQuery);
        BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED); //tried with unlogged also

        //generating the entries
        for (int i = 0; i < 900000; i++) { //900000 entries is an hour worth of measurements
            time = time.plus(4); //4ms between each entry
            BoundStatement bound = preparedStatement.bind("1", "2014-01-01", time.toDate(), 1); //value not important
            batch.add(bound);

            //The batch statement must have 65535 statements at most
            if (batch.size() >= 65534) {
                queryQueue.put(batch);
                batch = new BatchStatement();
            }
        }
        queryQueue.put(batch); //the last batch, perhaps shorter than 65535

        //storing the data
        System.out.println("Starting storing");
        while (!queryQueue.isEmpty()) {
            pool.execute(() -> {
                try {

                    long threadId = Thread.currentThread().getId();
                    System.out.println("Started: " + threadId);
                    BatchStatement statement = queryQueue.take();
                    long start2 = System.currentTimeMillis();
                    session.execute(statement);
                    System.out.println("Finished " + threadId + ": " + (System.currentTimeMillis() - start2));
                } catch (Exception ex) {
                    System.out.println(ex.toString());
                }
            });

        }
        pool.shutdown();
        pool.awaitTermination(120,TimeUnit.SECONDS);


    } catch (Exception ex) {
        System.out.println(ex.toString());
    } finally {
        session.close();
        cluster.close();
    }



我通过阅读这里和其他博客和网站上的帖子。正如我所知,客户端使用多线程很重要,这就是为什么我这样做。我也试过使用异步操作。

I came up with the code by reading posts here and on other blogs and websites. As I understood it is important for the client to use multiple threads, that's why I have done this. I also tried using async operations.

底线结果是这样的,不管我使用哪种方法,一个批处理在5-6秒内执行,尽管它可能需要10个。它需要相同的如果我只输入一个批次(所以,只有〜65k列)或如果我使用一个愚蠢的单线程应用程序。老实说,我希望多一点。特别是因为我在笔记本电脑上用一个本地实例获得了或多或少的类似性能。

The bottom line result is this, no matter which approach I use, one batch executes in 5-6 seconds, although it might take up to 10. It takes the same if I enter just one batch (so, only ~65k columns) or if I use a dumb single thread application. Honestly, I expected a bit more. Especially since I get more or less similar performance on my laptop with a local instance.

第二个也许更重要的问题是我以不可预测的方式面对的例外。这两个:

The second, maybe more important issue, are the exceptions I am facing in an unpredictable manner. These two:


com.datastax.driver.core.exceptions.WriteTimeoutException:Cassandra
在一致性写入查询期间超时1个副本需要
,但只有0个人承认写入)

com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency ONE (1 replica were required but only 0 acknowledged the write)


com.datastax.driver.core.exceptions.NoHostAvailableException:所有
尝试查询的主机失败(尝试:/192.168.1.102:9042
com.datastax.driver.core.TransportException:[/192.168.1.102:9042]
连接已关闭),/192.168.1.100:9042
(com.datastax.driver.core.TransportException:[ /192.168.1.100:9042]
连接已关闭),/192.168.1.101:9042
(com.datastax.driver.core.TransportException:[/192.168.1.101:9042]
连接已关闭))

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.1.102:9042 (com.datastax.driver.core.TransportException: [/192.168.1.102:9042] Connection has been closed), /192.168.1.100:9042 (com.datastax.driver.core.TransportException: [/192.168.1.100:9042] Connection has been closed), /192.168.1.101:9042 (com.datastax.driver.core.TransportException: [/192.168.1.101:9042] Connection has been closed))

在底线,我做错了什么?我应该重组我加载数据的方式,或更改方案。我试着减少行长度(所以我有12小时行),但没有产生很大的区别。

In the bottom line, am I doing something wrong? Should I reorganize the way I load data, or change the scheme. I tried reducing the row length (so I have 12-hour rows) but that didn't make a big difference.

==============================
更新:

============================== Update:

我很粗鲁,忘了在问题回答后粘贴我使用的代码示例。它工作得相当不错,但我继续我的研究与KairosDB和二进制传输与Astyanax。看起来我可以通过CQL获得更好的性能与他们在CQL,虽然KairosDB可以有一些问题,当它在过载(但我正在努力),Astyanax是一个有点冗长,用于我的口味。不过,这里是代码,我可能会错误地在某个地方。

I was rude and forgot to paste an example of the code I used after the question was answered. It works reasonably well, however I'm continuing my research with KairosDB and binary transfer with Astyanax. It looks like I can get much better performance with them over CQL, although KairosDB can have some issues when it is in overload (but I'm working on it) and Astyanax is a bit verbose to use for my taste. Nevertheless, here is the code, I'm maybe mistaken somewhere.

当信号量槽号超过5000时,信号量槽号对性能没有影响,它几乎是常数。

The semaphore slot number has no effect on performance when going above 5000, its almost constant.

String insertQuery = "insert into keyspace.measurement     (userid,time_by_hour,time,value) values (?, ?, ?, ?)";
        PreparedStatement preparedStatement =     session.prepare(insertQuery);
        Semaphore semaphore = new Semaphore(15000);

    System.out.println("Starting " + Thread.currentThread().getId());
    DateTime time = DateTime.parse("2015-01-05T12:00:00");
    //generating the entries
    long start = System.currentTimeMillis();

    for (int i = 0; i < 900000; i++) { 

        BoundStatement statement = preparedStatement.bind("User1", "2015-01-05:" + time.hourOfDay().get(), time.toDate(), 500); //value not important
        semaphore.acquire();
        ResultSetFuture resultSetFuture = session.executeAsync(statement);
        Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
            @Override
            public void onSuccess(@Nullable com.datastax.driver.core.ResultSet resultSet) {

                semaphore.release();
            }

            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("Error: " + throwable.toString());
                semaphore.release();
            }
        });
        time = time.plus(4); //4ms between each entry
    }


推荐答案

使用未记录的批处理结果是什么?您确定要使用批处理语句吗?
https://medium.com/ @ foundev / cassandra-batch-loading-without-the-batch-keyword-40f00e35e23e

What are your results using unlogged batching? Are you sure you want to use batch statements at all? https://medium.com/@foundev/cassandra-batch-loading-without-the-batch-keyword-40f00e35e23e

这篇关于Cassandra簇具有差的插入性能和插入稳定性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆