Value lock

I have a multi-threaded Java application that adds to a lot of files on dynamically generated paths (large numbers - more than 100 thousand). I want to protect against simultaneous recording. Since this is a contradiction in the JVM, I cannot use FileLock s.

Instead, I try to synchronize Path objects as follows ( PathLocker is a singleton).

  public class PathLocker { private final ConcurrentMap<Path, ReentrantLock> pathLockMap = new ConcurrentHashMap<>(); public void lock(Path path) { pathLockMap.computeIfAbsent(path, p -> new ReentrantLock()).lock(); } public void unlock(Path path) { ReentrantLock reentrantLock = pathLockMap.get(path); if (!reentrantLock.hasQueuedThreads()) { // NPE OCCURS HERE pathLockMap.remove(path); } reentrantLock.unlock(); } } 

The only client code is as follows:

 Path path = findPath(directory, dataType, bucketEnd, referenceId); pathLocker.lock(path); try { try (FileWriter fileWriter = new FileWriter(path.toFile(), true)) { fileWriter.write(string); } } finally { pathLocker.unlock(path); } 

However, this code pretty quickly throws a null pointer when it searches for reentrantLock inside PathLocker::unlock .

I do not understand how this NPE can happen. Obviously, some other thread deleted the value at the same time, but as I understand it, the only possible threads that could remove the lock were those that were queued and waiting for the lock in the first place. What am I missing?

+5
source share
2 answers

There is a small point that the computeIfAbsent and lock functions in thread 1 were called by the hasQueuedThreads function (and return 0) in thread 2. The NPE occurs in thread 2 when it exits and tries to unlock.

If my assumption is correct, you should put a double barrier in the unlock method.

 public void unlock(Path path) { ReentrantLock reentrantLock = pathLockMap.get(path); if (!reentrantLock.hasQueuedThreads()) { // NPE OCCURS HERE pathLockMap.remove(path); if (reentrantLock.hasQueuedThreads()) { pathLockMap.put(path, reentrantLock); } } reentrantLock.unlock(); } 
+2
source

Another option is to simply leave the deletion on some kind of timeout cache based on a timeout, like Guava has.

 public class PathLocker { private static final int PROCESSING_TIME = 60; private final LoadingCache<Path, Lock> pathLockMap = CacheBuilder.newBuilder() .maximumSize(Long.MAX_VALUE) .expireAfterAccess(PROCESSING_TIME, TimeUnit.SECONDS) .build(new CacheLoader<Path, Lock>() { public Lock load(Path path) { return new ReentrantLock(); } }); public void lock(Path path) { Lock lock = pathLockMap.get(path); lock.lock(); } public void unlock(Path path) { /* Retrieval of the lock again will avoid expiration/eviction in the cache, as counting towards PROCESSING_TIME will be restarted as the other thread is unblocked for processing. */ Lock lock = pathLockMap.get(path); lock.unlock(); } } 

PROCESSING_TIME is the approximate processing time for a single task. The goal is to avoid eviction during task processing. During unlocking, access to the cache will be made, which will restart the shutdown timer for such tasks.

Assuming PROCESSING_TIME of 10:

  • Topic 1 returns the lock at time 0 and begins processing.
  • Thread 2 restores the same lock at time 1, but it must wait.
  • Subject 1 is unlocked at time 10. The cache will not preempt upon access. This allows Thread 2 to safely process later, even if its access to the first / last cache is at a point in time.
  • Thread 2 begins processing at a point in time.
  • Thread 2 unlocks at time.
  • The cache issues a lock at a point in time.
0
source

Source: https://habr.com/ru/post/1234981/


All Articles