I have a service that spends messages from the queue at a speed that I control. I do some processing and then try to write a Cassandra cluster through the Java Datastax client. I installed my Cassandra cluster with maxRequestsPerConnection
and maxConnectionsPerHost
. However, when testing, I found that when I reached maxConnectionsPerHost
and maxRequestsPerConnection
, the calls are session.executeAsync
not blocked.
Now I use new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection)
and increase it before each asynchronous request and reduce it when the future returned comes to an end executeAsync
. This works quite well, but seems redundant, as the driver is already tracking requests and connections internally.
Has anyone come up with a better solution to this problem?
One caveat: I would like the request to be considered outstanding before it is completed. This includes replays! The situation in which I receive failover failures from the cluster (for example, timeouts awaiting consistency) is the main situation when I want backpressure and stop using messages from the queue.
Problem:
// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
// this appears to return immediately even if I have exhausted connections/requests
session.executeAsync(preparedStatement.bind(...));
}
Current solution:
constructor() {
this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}
processMessage(message) {
ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
concurrentRequestsSemaphore.acquireUninterruptibly();
future.whenComplete((result, exception) -> concurrentRequests.release());
}
Also, can anyone see any obvious problems with this solution?
source
share