如何Cassandra处理datastax java驱动程序中的阻塞execute语句 [英] How Cassandra handle blocking execute statement in datastax java driver

查看:317
本文介绍了如何Cassandra处理datastax java驱动程序中的阻塞execute语句的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

阻止来自com.datastax.driver.core.Session的执行方法

Blocking execute fethod from com.datastax.driver.core.Session

public ResultSet execute(Statement statement);

评论此方法:


此方法阻塞,直到从
数据库接收到至少一些结果。但是,对于SELECT查询,它不保证结果已被完全接收
。但它确保已经从数据库接收到一些
响应,并且特别地,
保证如果请求无效,则通过该方法将异常抛出

This method blocks until at least some result has been received from the database. However, for SELECT queries, it does not guarantee that the result has been received in full. But it does guarantee that some response has been received from the database, and in particular guarantee that if the request is invalid, an exception will be thrown by this method.

来自com.datastax.driver.core.Session的无阻塞执行方法

Non-blocking execute fethod from com.datastax.driver.core.Session

public ResultSetFuture executeAsync(Statement statement);




此方法不会阻止。它一旦查询已经
传递到底层网络堆栈就返回。特别是,从
返回这个方法不能保证查询是有效的,或者甚至
提交给一个活动节点。访问{@link
ResultSetFuture}时,将抛出与查询失败
有关的任何异常。

This method does not block. It returns as soon as the query has been passed to the underlying network stack. In particular, returning from this method does not guarantee that the query is valid or has even been submitted to a live node. Any exception pertaining to the failure of the query will be thrown when accessing the {@link ResultSetFuture}.



<

I have 02 questions about them, thus it would be great if you can help me to understand them.

我想说我有100万个问题,因为这个问题可以帮助我理解他们。

问题1: / strong>如果我有n个线程,所有线程将有相同数量的记录,他们需要发送到数据库。所有这些都继续使用阻塞执行调用向cassandra发送多个插入查询。如果我增加n的值,它也有助于加快我需要插入所有的记录到cassandra的时间?

Question 1: If I have n number of threads, all threads will have the same amount of records they need to send to the database. All of them continue sending multiple insert queries to cassandra using blocking execute call. If I increase the value of n, will it also helps to speed up the time that I need to insert all records to cassandra?

这会导致cassandra的性能问题吗? Cassandra必须确保对于每一个插入记录,集群中的所有节点都应该立即知道新的记录吗?为了保持数据的一致性。 (我假设cassandra节点甚至不会考虑使用本地机器时间来控制记录插入时间)。

Will this cause performance problem for cassandra? Does Cassandra have to make sure that for every single insert record, all the nodes in the clusters should know about the new record immediately? In order to maintain the consistency in data. (I assume cassandra node won't even think about using the local machine time for controlling the record insertion time).

问题2:非阻塞执行,我如何保证所有的插入是成功的?我知道的唯一方法是等待ResultSetFuture检查插入查询的执行。有什么更好的方法我可以做吗?非阻塞执行是否更容易失败,然后阻止执行?

Question 2: With non-blocking execute, how can I assure that all of the insertions is successful? The only way I know is waiting for the ResultSetFuture to check the execution of the insert query. Is there any better way I can do ? Is there a higher chance that non-blocking execute is easier to fail then blocking execute?

非常感谢您的帮助。

推荐答案


如果我有n个线程,所有线程都有相同数量的记录发送到数据库。所有这些都继续使用阻塞执行调用向cassandra发送多个插入查询。如果我增加n的值,它也有助于加快我需要插入所有的记录到cassandra的时间?

If I have n number of threads, all threads will have the same amount of records they need to send to the database. All of them continue sending multiple insert queries to cassandra using blocking execute call. If I increase the value of n, will it also helps to speed up the time that I need to insert all records to cassandra?

在某种程度上。让我们离开客户端实现细节一点,从并发请求数的角度来看事情,因为如果使用executeAsync,你不需要为每个正在进行的请求都有一个线程。在我的测试中,我发现虽然有大量的并发请求,有很多的价值,有一个阈值,其中有递减的回报或性能开始下降。我的一般经验法则是(节点数* native_transport_max_threads(默认值:128) * 2)可能会或多或少地找到更多的最佳结果。

To some extent. Lets divorce the client implementation details a bit and look at things from the perspective of "Number of concurrent requests", as you don't need to have a thread for each ongoing request if you use executeAsync. In my testing I have found that while there is a lot of value in having a high number of concurrent requests, there is a threshold for which there are diminishing returns or performance starts to degrade. My general rule of thumb is (number of Nodes *native_transport_max_threads (default: 128)* 2), but you may find more optimal results with more or less.

这里的想法是,在入队更多的请求比cassandra将处理一次没有太多的价值。

The idea here is that there is not much value in enqueuing more requests than cassandra will handle at a time. While reducing the number of inflight requests, you limit unnecessary congestion on the connections between your driver client and cassandra.


问题2:如果使用非授权的驱动程序,阻塞执行,我怎么能保证所有的插入都成功了?我知道的唯一方法是等待ResultSetFuture检查插入查询的执行。有什么更好的方法我可以做吗?非阻塞执行更容易失败,然后阻塞执行吗?

Question 2: With non-blocking execute, how can I assure that all of the insertions is successful? The only way I know is waiting for the ResultSetFuture to check the execution of the insert query. Is there any better way I can do ? Is there a higher chance that non-blocking execute is easier to fail then blocking execute?

等待ResultSetFuture通过 get 是一个路由,但是如果您正在开发一个完全异步的应用程序,您希望尽可能避免阻塞。使用番石榴,你最好的武器是 Futures.addCallback Futures.transform

Waiting on the ResultSetFuture via get is one route, but if you are developing a fully async application, you want to avoid blocking as much as possible. Using guava, your two best weapons are Futures.addCallback and Futures.transform.


  • Futures.addCallback 可让您注册 FutureCallback ,当驱动程序收到响应时执行。 onSuccess 在成功的情况下执行, onFailure 否则

  • Futures.addCallback allows you to register a FutureCallback that gets executed when the driver has received the response. onSuccess gets executed in the success case, onFailure otherwise.

Futures.transform 允许您有效地将返回的 ResultSetFuture 映射为其他值。例如,如果你只想要值1列,你可以使用它将 ListenableFuture< ResultSet> 转换为 ListenableFuture< String> ,而不必在 ResultSetFuture 上阻止您的代码,然后获取String值。

Futures.transform allows you to effectively map the returned ResultSetFuture into something else. For example if you only want the value of 1 column you could use it to transform ListenableFuture<ResultSet> to a ListenableFuture<String> without having to block in your code on the ResultSetFuture and then getting the String value.

在编写数据加载程序的上下文中,您可以执行如下操作:

In the context of writing a dataloader program, you could do something like the following:


  1. 保持简单使用 Semaphore 或其他具有固定数量许可证的构造(这将是您的最大数量的飞行请求)。每当您使用 executeAsync 提交查询时,获取许可证。你真的只需要1个线程(但可能想引入一个#cpu核心大小的池,这样做)从信号量获取许可证并执行查询。
  2. 使用 Futures.addCallback 可以从 executeAsync 。回调应在 onSuccess onFailure Sempahore.release() $ c>个案。通过发放许可,这应该允许您的第1步中的话题继续并提交下一个请求。
  1. To keep things simple use a Semaphore or some other construct with a fixed number of permits (that will be your maximum number of inflight requests). Whenever you go to submit a query using executeAsync, acquire a permit. You should really only need 1 thread (but may want to introduce a pool of # cpu cores size that does this) that acquires the permits from the Semaphore and executes queries. It will just block on acquire until there is an available permit.
  2. Use Futures.addCallback for the future returned from executeAsync. The callback should call Sempahore.release() in both onSuccess and onFailure cases. By releasing a permit, this should allow your thread in step 1 to continue and submit the next request.

为了进一步提高吞吐量可能需要考虑使用 BatchStatement 并批量提交请求。这是一个很好的选择,如果你保持你的批次很小(50-250是一个好的数字),如果你的插入批次都共享相同的分区键。

To further improve throughput, you might want to consider using BatchStatement and submitting requests in batches. This is a good option if you keep your batches small (50-250 is a good number) and if your inserts in a batch all share the same partition key.

这篇关于如何Cassandra处理datastax java驱动程序中的阻塞execute语句的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆