Hashing (sha1) multiple files simultaneously using streams

I have N large files (at least 250M) for the hash. These files are located on P physical disks.

I would like to use them simultaneously with the maximum active K streams, but I can’t hash more than M files on physical disks because it slows down the whole process (I ran a test, parsing 61 files and 8 streams it was slower than with 1 stream, the file was almost everything on one disk).

I am wondering what would be the best approach to this:

  • I could use Executors.newFixedThreadPool (K)
  • then I have to submit the task using the counter to determine if I should add a new task.

My code is:

int K = 8; int M = 1; Queue<Path> queue = null; // get the files to hash final ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(K); final ConcurrentMap<FileStore, Integer> counter = new ConcurrentHashMap<>(); final ConcurrentMap<FileStore, Integer> maxCounter = new ConcurrentHashMap<>(); for (FileStore store : FileSystems.getDefault().getFileStores()) { counter.put(store, 0); maxCounter.put(store, M); } List<Future<Result>> result = new ArrayList<>(); while (!queue.isEmpty()) { final Path current = queue.poll(); final FileStore store = Files.getFileStore(current); if (counter.get(store) < maxCounter.get(store)) { result.add(newFixedThreadPool.submit(new Callable<Result>() { @Override public Entry<Path, String> call() throws Exception { counter.put(store, counter.get(store) + 1); String hash = null; // Hash the file counter.put(store, counter.get(store) - 1); return new Result(path, hash); } })); } else queue.offer(current); } 

Discarding a potentially unsafe operation (like playing with a counter, for example), is there a better way to achieve my goal?

I also think that the loop here may be too big, as it can take a lot of process (almost like an infinite loop).

+4
source share
2 answers

After a lot of time, I found a solution to meet my needs: instead of an integer counter or AtomicInteger or something else, I used ExecutorService , and each task set uses Semaphore shared for each file of one disk.

how

 ConcurrentMap<FileStore, Semaphore> map = new ConcurrentHashMap<>(); ExecutorService es = Executors.newFixedThreadPool(10); for (Path path : listFile()) { final FileStore store = Files.getFileStore(path); final Semaphore semaphore = map.computeIfAbsent(store, key -> new Semaphore(getAllocatedCredits(store))); final int cost = computeCost(path); es.submit(() -> { semaphore.acquire(cost); try { ... some work ... } finally { semaphore.release(cost); } }); } int getAllocatedCredits(FileStore store) {return 2;} int computeCost(Path path) {return 1;} 

Check out the help of Java 8, especially in computeIfAbsent and submit .

0
source

If the hardware configuration of the drive is unknown at compile time and can be debugged / updated, it is tempting to use the thread pool on each disk and configure the number of threads configured by the user. I am not familiar with "newFixedThreadPool" - is it a thread counter that can be changed at runtime to optimize performance?

0
source

Source: https://habr.com/ru/post/1379776/


All Articles