为Cassandra写入获得背压的最佳方法是什么? [英] What is the best way to get backpressure for Cassandra Writes?

查看:270
本文介绍了为Cassandra写入获得背压的最佳方法是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个服务,使用我控制的速率消耗队列中的消息。我做一些处理,然后尝试通过Datastax Java客户端写入Cassandra集群。我已经设置我的Cassandra集群与 maxRequestsPerConnection maxConnectionsPerHost 。但是,在测试中,我发现当我达到 maxConnectionsPerHost maxRequestsPerConnection 调用会话.executeAsync 不要阻止。



我现在正在做的是使用一个 new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection ),并在每个异步请求之前递增它,并在 executeAsync 返回的未来完成时递减。这工作得很好,但它似乎多余,因为驱动程序已经在内部跟踪请求和连接。



有没有人想出一个更好的解决方案这个问题? >

一个警告:我希望请求被视为未完成,直到它完成。此包括重试!我从集群中得到可重试失败(例如超时等待一致性)的情况是我想要反压并停止从队列中消耗消息的主要情况。



问题:

  //我使用消息的速率取决于此方法返回的速度
processMessage {
//这似乎立即返回,即使我已经用尽连接/请求
session.executeAsync(preparedStatement.bind(...));
}

当前解决方案:

  constructor(){
this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}

processMessage(message){
ResultSetFuture resultSetFuture = session.executeAsync(prepareStatement.bind(...));
CompletableFuture< ResultSet> future = completableFromListenable(resultSetFuture);
concurrentRequestsSemaphore.acquireUninterruptibly();
future.whenComplete((result,exception) - > concurrentRequests.release());
}

此外,任何人都能看到此解决方案的任何明显问题吗?

解决方案

一个可能的想法不是杀死集群是节流你的调用 executeAsync 例如在一批100(或任何数量是最适合您的集群和工作负载)之后,您将在客户端代码中休眠,并对所有100个期货执行阻塞调用(或使用Guava库转换未来列表到列表的未来)



这样,在发出100个异步查询后,您将强制客户端应用程序等待所有这些请求成功,然后再继续。如果在调用 future.get()时捕获任何异常,您可以计划重试。通常,Java驱动程序的默认RetryStrategy尝试重试。



关于来自服务器的反压信号,从CQL二进制协议V3开始,有一个错误代码通知客户协调人过载 https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v3.spec#L951



从客户端,您可以以两种方式获取此重载信息:




I have a service that consumes messages off of a queue at a rate that I control. I do some processing and then attempt to write to a Cassandra cluster via the Datastax Java client. I have setup my Cassandra cluster with maxRequestsPerConnection and maxConnectionsPerHost. However, in testing I have found that when I have reached maxConnectionsPerHost and maxRequestsPerConnection calls to session.executeAsync don't block.

What I am doing right now is using a new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection) and incrementing it before every async request and decrementing it when the future returned by executeAsync completes. This works well enough, but it seems redundant since the driver is already tracking requests and connections internally.

Has anyone come up with a better solution to this problem?

One caveat: I would like a request to be considered outstanding until it has completed. This includes retries! The situation where I am getting retryable failures from the cluster (such as timeouts waiting for consistency) is primary situation where I want to backpressure and stop consuming messages from the queue.

Problem:

// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
    // this appears to return immediately even if I have exhausted connections/requests
    session.executeAsync(preparedStatement.bind(...));
}

Current solution:

constructor() {
    this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}

processMessage(message) {
    ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
    CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
    concurrentRequestsSemaphore.acquireUninterruptibly();
    future.whenComplete((result, exception) -> concurrentRequests.release());
}

Also, can anyone see any obvious problems with this solution?

解决方案

One possible idea not to kill the cluster is to "throttle" your calls to executeAsync e.g. after a batch of 100 (or whatever number is the best for your cluster and workload), you'll do a sleep in the client code and do a blocking call on all the 100 futures (or use Guava library to transform a list of future into a future of list)

This way, after issuing 100 async queries, you'll force the client application to wait for all of them to succeed before proceeding further. If you catch any exception when calling future.get(), you can schedule a retry. Normally the retry is already attempted by the default RetryStrategy of the Java driver.

About back-pressure signal from server, starting from CQL binary protocol V3, there is an error code that notifies the client that the coordinator is overloaded : https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v3.spec#L951

From the client, you can get this overloaded information in 2 ways:

这篇关于为Cassandra写入获得背压的最佳方法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆