Cassandra 中的异步写入似乎被破坏了 [英] Async writes seem to be broken in Cassandra

查看:15
本文介绍了Cassandra 中的异步写入似乎被破坏了的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在将 900 万行的批次写入 12 个节点的 cassandra (2.1.2) 集群时,我遇到了 spark-cassandra-connector (1.0.4, 1.1.0) 的问题.我写的都是一致的,读的都是一致的,但每次读取的行数都不是 900 万(8.865.753、8.753.213 等).

我检查了连接器的代码,没有发现任何问题.然后,我决定独立于 spark 和连接器编写自己的应用程序来调查问题(唯一的依赖是 datastax-driver-code 版本 2.1.3).

完整代码、启动脚本和配置文件现在可以找到在 github 上.

在伪代码中,我编写了两个不同版本的应用程序,同步一个:

try (Session session = cluster.connect()) {String cql = "insert into ";PreparedStatement pstm = session.prepare(cql);for(String partitionKey : keySource) {//keySource 是一个 Iterable分区键BoundStatement bound = pstm.bind(partitionKey/*, <<加上其他参数>> */);bound.setConsistencyLevel(ConsistencyLevel.ALL);session.execute(bound);}}

还有异步的:

try (Session session = cluster.connect()) {列表futures = new LinkedList();String cql = "insert into ";PreparedStatement pstm = session.prepare(cql);for(String partitionKey : keySource) {//keySource 是一个 Iterable分区键while(futures.size()>=10/* 最多 10 个并发写入 */) {//等待第一个发出的写入终止结果集未来未来 = futures.get(0);未来.get();期货.删除(0);}BoundStatement bound = pstm.bind(partitionKey/*, <<加上其他参数>> */);bound.setConsistencyLevel(ConsistencyLevel.ALL);futures.add(session.executeAsync(bound));}while(futures.size()>0) {//等待其他写请求终止结果集未来未来 = futures.get(0);未来.get();期货.删除(0);}}

最后一个类似于connector在no-batch配置的情况下使用的那个.

应用程序的两个版本在所有情况下都相同,除非负载很高.

例如,当在 9 台机器(45 个线程)上运行 5 个线程的同步版本向集群写入 900 万行时,我找到了后续读取中的所有行(使用 spark-cassandra-connector).

如果我使用每台机器 1 个线程(9 个线程)运行异步版本,执行速度会快得多,但我无法在后续读取中找到所有行(与 spark-cassandra-connector 出现的问题相同).

代码在执行过程中没有抛出异常.

问题的原因可能是什么?

我添加了一些其他结果(感谢您的评论):

问题似乎开始出现在异步写入和多个并发写入器 > 45 和 <= 90 时,所以我做了其他测试以确保发现是正确的:

最后的发现表明,大量并发写入者 (90) 并不是第一次测试中预期的问题.问题是使用同一会话的大量异步写入.

在同一个会话上有 5 个并发异步写入时,问题不存在.如果我将并发写入数增加到 10,某些操作会在没有通知的情况下丢失.

如果您在同一个会话中同时发出多个 (>5) 写入,则 Cassandra 2.1.2(或 Cassandra Java 驱动程序)似乎会破坏异步写入.

Nicola 和我本周末通过电子邮件进行了交流,我想我会在这里提供我当前理论的更新.我查看了 github 项目 Nicola 在 EC2 上分享并试验了一个 8 节点集群.>

我能够在 2.1.2 中重现该问题,但确实观察到在一段​​时间后我可以重新执行 spark 作业并返回了所有 900 万行.

我似乎注意到的是,当节点处于压缩状态时,我没有得到所有 900 万行.一时兴起,我查看了 2.1 的更改日志 和观察到一个问题 CASSANDRA-8429 - 压缩过程中某些键不可读" 可能解释了这个问题.>

看到问题已在 2.1.3 中得到解决,我重新针对 cassandra-2.1 分支进行了测试,并在进行压缩活动时运行了计数作业,并返回了 900 万行.

我想再试验一下,因为我对 cassandra-2.1 分支的测试相当有限,而且压缩活动可能纯属巧合,但我希望这可以解释这些问题.

I have had issues with spark-cassandra-connector (1.0.4, 1.1.0) when writing batches of 9 millions rows to a 12 nodes cassandra (2.1.2) cluster. I was writing with consistency ALL and reading with consistency ONE but the number of rows read was every time different from 9 million (8.865.753, 8.753.213 etc.).

I've checked the code of the connector and found no issues. Then, I decided to write my own application, independent from spark and the connector, to investigate the problem (the only dependency is datastax-driver-code version 2.1.3).

The full code, the startup scripts and the configuration files can now be found on github.

In pseudo-code, I wrote two different version of the application, the sync one:

try (Session session = cluster.connect()) {

    String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
    PreparedStatement pstm = session.prepare(cql);

    for(String partitionKey : keySource) {
        // keySource is an Iterable<String> of partition keys

        BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
        bound.setConsistencyLevel(ConsistencyLevel.ALL);

        session.execute(bound);
    }

}

And the async one:

try (Session session = cluster.connect()) {

    List<ResultSetFuture> futures = new LinkedList<ResultSetFuture>();

    String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
    PreparedStatement pstm = session.prepare(cql);

    for(String partitionKey : keySource) {
        // keySource is an Iterable<String> of partition keys

        while(futures.size()>=10 /* Max 10 concurrent writes */) {
            // Wait for the first issued write to terminate
            ResultSetFuture future = futures.get(0);
            future.get();
            futures.remove(0);
        }

        BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
        bound.setConsistencyLevel(ConsistencyLevel.ALL);

        futures.add(session.executeAsync(bound));
    }

    while(futures.size()>0) {
        // Wait for the other write requests to terminate
        ResultSetFuture future = futures.get(0);
        future.get();
        futures.remove(0);
    }
}

The last one is similar to that used by the connector in the case of no-batch configuration.

The two versions of the application work the same in all circumstances, except when the load is high.

For instance, when running the sync version with 5 threads on 9 machines (45 threads) writing 9 millions rows to the cluster, I find all the rows in the subsequent read (with spark-cassandra-connector).

If I run the async version with 1 thread per machine (9 threads), the execution is much faster but I cannot find all the rows in the subsequent read (the same problem that arised with the spark-cassandra-connector).

No exception was thrown by the code during the executions.

What could be the cause of the issue ?

I add some other results (thanks for the comments):

  • Async version with 9 threads on 9 machines, with 5 concurrent writers per thread (45 concurrent writers): no issues
  • Sync version with 90 threads on 9 machines (10 threads per JVM instance): no issues

Issues seemed start arising with Async writes and a number of concurrent writers > 45 and <=90, so I did other tests to ensure that the finding were right:

  • Replaced the "get" method of ResultSetFuture with "getUninterruptibly": same issues.
  • Async version with 18 threads on 9 machines, with 5 concurrent writers per thread (90 concurrent writers): no issues.

The last finding shows that the high number of concurrent writers (90) is not an issue as was expected in the first tests. The problem is the high number of async writes using the same session.

With 5 concurrent async writes on the same session the issue is not present. If I increase to 10 the number of concurrent writes, some operations get lost without notification.

It seems that the async writes are broken in Cassandra 2.1.2 (or the Cassandra Java driver) if you issue multiple (>5) writes concurrently on the same session.

解决方案

Nicola and I communicated over email this weekend and thought I'd provide an update here with my current theory. I took a look at the github project Nicola shared and experimented with an 8 node cluster on EC2.

I was able to reproduce the issue with 2.1.2, but did observe that after a period of time I could re-execute the spark job and all 9 million rows were returned.

What I seemed to notice was that while nodes were under compaction I did not get all 9 million rows. On a whim I took a look at the change log for 2.1 and observed an issue CASSANDRA-8429 - "Some keys unreadable during compaction" that may explain this problem.

Seeing that the issue has been fixed at is targeted for 2.1.3, I reran the test against the cassandra-2.1 branch and ran the count job while compaction activity was happening and got 9 million rows back.

I'd like to experiment with this some more since my testing with the cassandra-2.1 branch was rather limited and the compaction activity may have been purely coincidental, but I'm hoping this may explain these issues.

这篇关于Cassandra 中的异步写入似乎被破坏了的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆