Cassandra中的异步写操作似乎被破坏了 [英] Async writes seem to be broken in Cassandra

查看:468
本文介绍了Cassandra中的异步写操作似乎被破坏了的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在将9百万行的批次写入12个节点cassandra(2.1.2)群集时,我遇到过spark-cassandra-connector(1.0.4,1.1.0)的问题。我写的是一致性的所有和阅读与一致性,但读取的行数每次不同于900万(8.865.753,8.753.213等)。



我检查了连接器的代码,发现没有问题。然后,我决定编写自己的应用程序,独立于spark和连接器,调查问题(唯一的依赖是datastax驱动程序代码版本2.1.3)。



现在可以找到完整代码,启动脚本和配置文件



在伪代码中,我写了两个不同版本的应用程序,同步一个:

  try(Session session = cluster.connect()){

String cql =insert into< 9个正常字段和2个集合>>>;
PreparedStatement pstm = session.prepare(cql);

for(String partitionKey:keySource){
// keySource是一个Iterable< String>的分区键

BoundStatement bound = pstm.bind(partitionKey / *,<<其他参数>> * /);
bound.setConsistencyLevel(ConsistencyLevel.ALL);

session.execute(bound);
}

}

p>

  try(Session session = cluster.connect()){

List< ResultSetFuture> futures = new LinkedList< ResultSetFuture>();

String cql =insert into<< a table with 9 normal fields and 2 collections>>;
PreparedStatement pstm = session.prepare(cql);

for(String partitionKey:keySource){
// keySource是一个Iterable< String>的分区键

while(futures.size()> = 10 / * Max 10并发写入* /){
//等待第一次发出的写操作终止
ResultSetFuture future = futures.get(0);
future.get();
futures.remove(0);
}

BoundStatement bound = pstm.bind(partitionKey / *,<<其他参数>> * /);
bound.setConsistencyLevel(ConsistencyLevel.ALL);

futures.add(session.executeAsync(bound));
}

while(futures.size()> 0){
//等待其他写入请求终止
ResultSetFuture future = futures.get 0);
future.get();
futures.remove(0);
}
}

最后一个类似于连接器使用的



应用程序的两个版本在所有情况下都工作相同,除非负载很高。



例如,当在9台机器(45个线程)上运行5个线程的同步版本,向集群写入9百万行时,我发现后续读取中的所有行(使用spark-cassandra-



如果我运行异步版本,每个机器有一个线程(9个线程),执行速度更快,但我不能在后续读取中找到所有的行(与spark-cassandra连接器相同的问题)。



在执行期间代码抛出异常。



这是什么原因的问题?



我添加一些其他结果(感谢您的意见):




  • 在9台机器上有9个线程的异步版本,每个线程有5个并发写入者(45个并发写入者):没有问题

  • < 9个机器上有90个线程(每个JVM实例10个线程):没有问题


Async写入和一些并发作者> 45和<= 90,所以我做了其他测试,以确保发现是正确的:





  • 在9台机器上有18个线程的异步版本,每个线程有5个并发
    写入程序(90个并发作者) :没有问题



最后一个发现显示并发作者这是第一次测试中所预期的问题。问题是使用同一会话的大量异步写入。



在同一会话中有5个并发异步写入,问题不存在。如果我将并发写入数增加到10,一些操作会丢失而不通知。



似乎在Cassandra 2.1.2(或Cassandra Java驱动程序)如果你在同一个会话上同时发出多个(> 5)个写入。

解决方案

Nicola和我通过电子邮件周末,我想提供一个更新在这里与我目前的理论。我看看 github项目 Nicola在EC2上共享和试验了一个8节点群集。



我能够重现2.1.2的问题,但是观察到一段时间后,我可以重新执行spark作业,并返回所有900万行。



我注意到的是,当节点压缩时,我没有得到所有的900万行。我突然想起了 2.1的更改日志,以及观察到 CASSANDRA-8429 - 在压缩过程中无法读取某些密钥可能会解释此问题的问题。 p>

查看问题已修复的目标是2.1.3,我针对cassandra-2.1分支重新执行测试,并在压缩活动发生时运行计数作业9百万行。



我想更多地尝试这一点,因为我对cassandra-2.1分支的测试相当有限,并且压缩活动可能是纯粹的巧合,但我希望这可以解释这些问题。


I have had issues with spark-cassandra-connector (1.0.4, 1.1.0) when writing batches of 9 millions rows to a 12 nodes cassandra (2.1.2) cluster. I was writing with consistency ALL and reading with consistency ONE but the number of rows read was every time different from 9 million (8.865.753, 8.753.213 etc.).

I've checked the code of the connector and found no issues. Then, I decided to write my own application, independent from spark and the connector, to investigate the problem (the only dependency is datastax-driver-code version 2.1.3).

The full code, the startup scripts and the configuration files can now be found on github.

In pseudo-code, I wrote two different version of the application, the sync one:

try (Session session = cluster.connect()) {

    String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
    PreparedStatement pstm = session.prepare(cql);

    for(String partitionKey : keySource) {
        // keySource is an Iterable<String> of partition keys

        BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
        bound.setConsistencyLevel(ConsistencyLevel.ALL);

        session.execute(bound);
    }

}

And the async one:

try (Session session = cluster.connect()) {

    List<ResultSetFuture> futures = new LinkedList<ResultSetFuture>();

    String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
    PreparedStatement pstm = session.prepare(cql);

    for(String partitionKey : keySource) {
        // keySource is an Iterable<String> of partition keys

        while(futures.size()>=10 /* Max 10 concurrent writes */) {
            // Wait for the first issued write to terminate
            ResultSetFuture future = futures.get(0);
            future.get();
            futures.remove(0);
        }

        BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
        bound.setConsistencyLevel(ConsistencyLevel.ALL);

        futures.add(session.executeAsync(bound));
    }

    while(futures.size()>0) {
        // Wait for the other write requests to terminate
        ResultSetFuture future = futures.get(0);
        future.get();
        futures.remove(0);
    }
}

The last one is similar to that used by the connector in the case of no-batch configuration.

The two versions of the application work the same in all circumstances, except when the load is high.

For instance, when running the sync version with 5 threads on 9 machines (45 threads) writing 9 millions rows to the cluster, I find all the rows in the subsequent read (with spark-cassandra-connector).

If I run the async version with 1 thread per machine (9 threads), the execution is much faster but I cannot find all the rows in the subsequent read (the same problem that arised with the spark-cassandra-connector).

No exception was thrown by the code during the executions.

What could be the cause of the issue ?

I add some other results (thanks for the comments):

  • Async version with 9 threads on 9 machines, with 5 concurrent writers per thread (45 concurrent writers): no issues
  • Sync version with 90 threads on 9 machines (10 threads per JVM instance): no issues

Issues seemed start arising with Async writes and a number of concurrent writers > 45 and <=90, so I did other tests to ensure that the finding were right:

  • Replaced the "get" method of ResultSetFuture with "getUninterruptibly": same issues.
  • Async version with 18 threads on 9 machines, with 5 concurrent writers per thread (90 concurrent writers): no issues.

The last finding shows that the high number of concurrent writers (90) is not an issue as was expected in the first tests. The problem is the high number of async writes using the same session.

With 5 concurrent async writes on the same session the issue is not present. If I increase to 10 the number of concurrent writes, some operations get lost without notification.

It seems that the async writes are broken in Cassandra 2.1.2 (or the Cassandra Java driver) if you issue multiple (>5) writes concurrently on the same session.

解决方案

Nicola and I communicated over email this weekend and thought I'd provide an update here with my current theory. I took a look at the github project Nicola shared and experimented with an 8 node cluster on EC2.

I was able to reproduce the issue with 2.1.2, but did observe that after a period of time I could re-execute the spark job and all 9 million rows were returned.

What I seemed to notice was that while nodes were under compaction I did not get all 9 million rows. On a whim I took a look at the change log for 2.1 and observed an issue CASSANDRA-8429 - "Some keys unreadable during compaction" that may explain this problem.

Seeing that the issue has been fixed at is targeted for 2.1.3, I reran the test against the cassandra-2.1 branch and ran the count job while compaction activity was happening and got 9 million rows back.

I'd like to experiment with this some more since my testing with the cassandra-2.1 branch was rather limited and the compaction activity may have been purely coincidental, but I'm hoping this may explain these issues.

这篇关于Cassandra中的异步写操作似乎被破坏了的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆