I am using the datastax java 3.1.0 driver to connect to the cassandra cluster, and the cassandra cluster version is 2.0.10. I am writing asynchronously with QUORUM
consistency.
private final ExecutorService executorService = Executors.newFixedThreadPool(10); private final Semaphore concurrentQueries = new Semaphore(1000); public void save(String process, int clientid, long deviceid) { String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)"; try { BoundStatement bs = CacheStatement.getInstance().getStatement(sql); bs.setConsistencyLevel(ConsistencyLevel.QUORUM); bs.setString(0, process); bs.setInt(1, clientid); bs.setLong(2, deviceid); concurrentQueries.acquire(); ResultSetFuture future = session.executeAsync(bs); Futures.addCallback(future, new FutureCallback<ResultSet>() { @Override public void onSuccess(ResultSet result) { concurrentQueries.release(); logger.logInfo("successfully written"); } @Override public void onFailure(Throwable t) { concurrentQueries.release(); logger.logError("error= ", t); } }, executorService); } catch (Exception ex) { logger.logError("error= ", ex); } }
My save method will be called from multiple threads at a very high speed. If I write at a very high speed than the Cassandra cluster can work, then it will start throwing errors, and I want all my records to successfully go to cassandra without any loss.
Question:
I was thinking of using some sort of queue or buffer sort to query the queue (e.g. java.util.concurrent.ArrayBlockingQueue
). "Buffer full" means clients must wait. The buffer will also be used to re-queue failed requests. However, to be more fair, unsuccessful requests should probably be queued before they are repeated. Also, we must somehow cope with the situation when the queue is full, and at the same time new failed requests arise. Then the single-threaded worker selects requests from the queue and sends them to Kassandra. Since he does not have to do much, it is unlikely that he will become a bottleneck. This employee can apply his own bet limits, for example. based on time with com.google.common.util.concurrent.RateLimiter
.
What is the best way to implement this queue or buffer function, which can also apply guava speed limit when writing to Cassandra or if there is any better approach, let me know too? I wanted to write to Cassandra with a query of 2000 per second (this must be customizable so that I can play with it to see what is the best setting).
As noted in the comments below, if memory continues to grow, we can use Guava Cache or CLHM to continue to discard old entries to make sure my program has not run out of memory. We will have about 12 GB of memory on the box, and these records are very small, so I donβt see this being a problem.