为 Cassandra Writes 获得背压的最佳方法是什么? [英] What is the best way to get backpressure for Cassandra Writes?

查看:23
本文介绍了为 Cassandra Writes 获得背压的最佳方法是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个服务,它以我控制的速度从队列中消耗消息.我做了一些处理,然后尝试通过 Datastax Java 客户端写入 Cassandra 集群.我已经使用 maxRequestsPerConnectionmaxConnectionsPerHost 设置了我的 Cassandra 集群.但是,在测试中,我发现当我达到 maxConnectionsPerHostmaxRequestsPerConnection 时,对 session.executeAsync 的调用不会阻塞.

I have a service that consumes messages off of a queue at a rate that I control. I do some processing and then attempt to write to a Cassandra cluster via the Datastax Java client. I have setup my Cassandra cluster with maxRequestsPerConnection and maxConnectionsPerHost. However, in testing I have found that when I have reached maxConnectionsPerHost and maxRequestsPerConnection calls to session.executeAsync don't block.

我现在正在做的是使用 new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection) 并在每个异步请求之前递增它并在 executeAsync 返回的未来完成时递减它.这很有效,但似乎是多余的,因为驱动程序已经在内部跟踪请求和连接.

What I am doing right now is using a new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection) and incrementing it before every async request and decrementing it when the future returned by executeAsync completes. This works well enough, but it seems redundant since the driver is already tracking requests and connections internally.

有没有人想出更好的解决方案来解决这个问题?

Has anyone come up with a better solution to this problem?

一个警告:我希望请求在完成之前被视为未完成.这包括重试!我从集群中获得可重试失败的情况(例如等待一致性的超时)是我想要背压并停止使用队列中的消息的主要情况.

One caveat: I would like a request to be considered outstanding until it has completed. This includes retries! The situation where I am getting retryable failures from the cluster (such as timeouts waiting for consistency) is primary situation where I want to backpressure and stop consuming messages from the queue.

问题:

// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
    // this appears to return immediately even if I have exhausted connections/requests
    session.executeAsync(preparedStatement.bind(...));
}

目前的解决方案:

constructor() {
    this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}

processMessage(message) {
    ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
    CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
    concurrentRequestsSemaphore.acquireUninterruptibly();
    future.whenComplete((result, exception) -> concurrentRequests.release());
}

另外,有人能看出这个解决方案有什么明显的问题吗?

Also, can anyone see any obvious problems with this solution?

推荐答案

一种不杀死集群的可能想法是限制"对 executeAsync 的调用,例如在一批 100(或任何最适合您的集群和工作负载的数字)之后,您将在客户端代码中休眠并对所有 100 个期货进行阻塞调用(或使用 Guava 库转换未来列表进入列表的未来)

One possible idea not to kill the cluster is to "throttle" your calls to executeAsync e.g. after a batch of 100 (or whatever number is the best for your cluster and workload), you'll do a sleep in the client code and do a blocking call on all the 100 futures (or use Guava library to transform a list of future into a future of list)

这样,在发出 100 个异步查询后,您将强制客户端应用程序等待所有查询成功,然后再继续进行.如果您在调用 future.get() 时捕捉到任何异常,您可以安排重试.通常,Java 驱动程序的默认 RetryStrategy 已尝试重试.

This way, after issuing 100 async queries, you'll force the client application to wait for all of them to succeed before proceeding further. If you catch any exception when calling future.get(), you can schedule a retry. Normally the retry is already attempted by the default RetryStrategy of the Java driver.

关于来自服务器的背压信号,从CQL二进制协议V3开始,有一个错误代码通知客户端协调器过载:https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v3.spec#L951

About back-pressure signal from server, starting from CQL binary protocol V3, there is an error code that notifies the client that the coordinator is overloaded : https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v3.spec#L951

您可以通过两种方式从客户端获取此过载信息:

From the client, you can get this overloaded information in 2 ways:

这篇关于为 Cassandra Writes 获得背压的最佳方法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆