Java Concurrency in put / get in collections

I connect to an external service with an interactive session + personal feed (InputStream), which runs on separate threads. In an interactive session, I send outgoing messages and receive synchronous responses with an object containing different fields, one of them being an identifier and a β€œstatus” confirming success or failure. At the same time, I receive posts in a private feed for this identifier with additional status updates. I currently store status information on ID in ConcurrentHashMap. It is imperative that I maintain the correct sequence of events on these objects, but currently I have race conditions when I sometimes process and update objects in a private feed before receiving and processing a synchronous response in an interactive session, so I leave it outdated and incorrect status for identifier.

Ideally, I would like to have some collection with the PutIfKeyExistOrWait (w timeout) method, which will only update the value if the key exists or wait, which I could use when processing objects in the feed.

Does anyone know if there is a suitable collection or can offer an alternative solution to my problem? Thanks.

+4
source share
3 answers

You already have the ConcurrentHashMap iDAndStatus , which stores the identifier and the latest status. However, I would only allow the thread associated with the service to create a new record on this map.

When a message comes from the feed, if the identifier already exists in iDAndStatus , it simply changes the status. If the key does not exist, simply temporarily store the ID / status updates in another data structure, pendingFeedUpdates .

Each time a new entry is created in iDAndStatus , check pendingFeedUpdates to see if there are any updates for the new identifier.

I'm not sure which synchronized data structure to use for pendingFeedUpdates : you need to get by ID, but you can have many messages for each identifier, and you want to keep the message order. Maybe a synchronized HashMap that associates each ID with some type of ordered ordered queue?

0
source

You can try to encapsulate the logic to handle this situation in the values ​​of your card, for example:

  • If the feed is the first to add a value for a specific id, this value is considered incomplete and the thread waits until it completes
  • If the interactive session thread is not the first to add a value, it notes that the incomplete value is complete
  • Incomplete values ​​are considered absent when received from the card.

This solution is based on the atomicity of putIfAbsent() .

 public class StatusMap { private Map<Long, StatusHolder> map = new ConcurrentHashMap<Long, StatusHolder>(); public Status getStatus(long id) { StatusHolder holder = map.get(id); if (holder == null || holder.isIncomplete()) { return null; } else { return holder.getStatus(); } } public void newStatusFromInteractiveSession(long id, Status status) { StatusHolder holder = StatusHolder.newComplete(status); if ((holder = map.putIfAbsent(id, holder)) != null) { holder.makeComplete(status); // Holder already exists, complete it } } public void newStatusFromFeed(long id, Status status) { StatusHolder incomplete = StatusHolder.newIncomplete(); StatusHolder holder = null; if ((holder = map.putIfAbsent(id, incomplete)) == null) { holder = incomplete; // New holder added, wait for its completion holder.waitForCompletion(); } holder.updateStatus(status); } } public class StatusHolder { private volatile Status status; private volatile boolean incomplete; private Object lock = new Object(); private StatusHolder(Status status, boolean incomplete) { ... } public static StatusHolder newComplete(Status status) { return new StatusHolder(status, false); } public static StatusHolder newIncomplete() { return new StatusHolder(null, true); } public boolean isIncomplete() { return incomplete; } public void makeComplete(Status status) { synchronized (lock) { this.status = status; incomplete = false; lock.notifyAll(); } } public void waitForCompletion() { synchronized (lock) { while (incomplete) lock.wait(); } } ... } 
+2
source

I would suggest you look at the Collections.getSynchronized collection: http://docs.oracle.com/javase/1.4.2/docs/api/java/util/Collections.html#synchronizedList%28java.util.List% 29

This may solve your problem. Another option, depending on how the calls are made, has a synchronization method that allows thread safe execution and ensures the atomicity of the transaction. See http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html

The third option is to force the use of concurrency control in the application after an optimistic or pessimistic approach, depending on what you are trying to achieve. This is the most difficult of 3, but will give you more control in combination with the previous options.

It depends on the specific implementation.

0
source

Source: https://habr.com/ru/post/1383662/


All Articles