What is the best way to get back pressure for Cassandra Writes?

I have a service that spends messages from the queue at a speed that I control. I do some processing and then try to write a Cassandra cluster through the Java Datastax client. I installed my Cassandra cluster with maxRequestsPerConnectionand maxConnectionsPerHost. However, when testing, I found that when I reached maxConnectionsPerHostand maxRequestsPerConnection, the calls are session.executeAsyncnot blocked.

Now I use new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection)and increase it before each asynchronous request and reduce it when the future returned comes to an end executeAsync. This works quite well, but seems redundant, as the driver is already tracking requests and connections internally.

Has anyone come up with a better solution to this problem?

One caveat: I would like the request to be considered outstanding before it is completed. This includes replays! The situation in which I receive failover failures from the cluster (for example, timeouts awaiting consistency) is the main situation when I want backpressure and stop using messages from the queue.

Problem:

// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
    // this appears to return immediately even if I have exhausted connections/requests
    session.executeAsync(preparedStatement.bind(...));
}

Current solution:

constructor() {
    this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}

processMessage(message) {
    ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
    CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
    concurrentRequestsSemaphore.acquireUninterruptibly();
    future.whenComplete((result, exception) -> concurrentRequests.release());
}

Also, can anyone see any obvious problems with this solution?

+4
source share
2 answers

- "" executeAsync, . , 100 ( ), 100 ( Guava )

, 100 , , , . - future.get(), . RetryStrategy Java.

, C3 VQL, , , : https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v3.spec#L951

:

+4

Semaphore (maxConnectionsPerHost * maxRequestsPerConnection) , executeAsync. , , .

, , . .

, , , . , .

. C * native_transport_max_threads ( 128), . 2 *, . (.: Cassandra java datastax )

, . ! , (, , ), , .

, , - . , .

+2

Source: https://habr.com/ru/post/1628186/


All Articles