Mass File Threading

I'm still in the process of wrapping my brain around how concurrency works in Java. I understand that (if you subscribe to the OO Java 5 concurrency model), you implement Task or Callable using the run() or call() method (respectively), and you need to parallelize most of this implemented method as much as possible.

But I still do not understand something that is inherent in parallel programming in Java:

  • How did the Task run() method assign the correct amount of parallel work to be done?

As a concrete example, what if I have a readMobyDick() method with I / O binding that reads all the contents of Herman Melville Moby Dick into memory from a file on the local system. And let me say that I want this readMobyDick() method to be parallel and handled by three threads, where:

  • Topic number 1 reads the first 1/3 book in memory
  • Topic number 2 reads the second 1/3 book in memory
  • Topic number 3 reads the last 1/3 book in memory

Do I need to put Moby Dick in three files and pass them to each in my task, or just call readMobyDick() from the built-in run() method and (somehow) Executor knows how to split the work among the threads.

I am a very visual student, so any code examples suitable for this are greatly appreciated! Thanks!

+6
source share
4 answers

You probably accidentally chose the worst example of parallel actions!

Reading in parallel from one mechanical disk is actually slower than reading in a single stream, because you actually bounce the mechanical head from different sections of the disk, as each stream starts its work. This is best left as single-threaded activity.

Let's take another example that is similar to yours, but may actually offer some benefit: suppose I want to search for occurrences of a specific word in a huge list of words (this list could even come from a disk file, but as I said, I read one flow). Suppose that I can use 3 threads, as in your example, each search in 1/3 of a huge list of words and save a local counter of how many times the search word appeared.

In this case, you need to split the list into 3 parts, transfer each part to another object, the type of which implements Runnable, and perform a search implemented in the run method.

The runtime itself does not have a clue how to perform partitioning or something like that; you must specify it yourself. There are many other partitioning strategies, each of which has its own strengths and weaknesses, but at the moment we can adhere to the static partitioning.

Check out the code:

 class SearchTask implements Runnable { private int localCounter = 0; private int start; // start index of search private int end; private List<String> words; private String token; public SearchTask(int start, int end, List<String> words, String token) { this.start = start; this.end = end; this.words = words; this.token = token; } public void run() { for(int i = start; i < end; i++) { if(words.get(i).equals(token)) localCounter++; } } public int getCounter() { return localCounter; } } // meanwhile in main :) List<String> words = new ArrayList<String>(); // populate words // let assume you have 30000 words // create tasks SearchTask task1 = new SearchTask(0, 10000, words, "John"); SearchTask task2 = new SearchTask(10000, 20000, words, "John"); SearchTask task3 = new SearchTask(20000, 30000, words, "John"); // create threads Thread t1 = new Thread(task1); Thread t2 = new Thread(task1); Thread t3 = new Thread(task1); // start threads t1.start(); t2.start(); t3.start(); // wait for threads to finish t1.join(); t2.join(); t3.join(); // collect results int counter = 0; counter += task1.getCounter(); counter += task2.getCounter(); counter += task3.getCounter(); 

This should work well. Note that in practical cases you should build a more general partitioning scheme. Otherwise, you can use ExecutorService and implement Callable instead of Runnable if you want to return the result.

So, an alternative example using more advanced constructs:

 class SearchTask implements Callable<Integer> { private int localCounter = 0; private int start; // start index of search private int end; private List<String> words; private String token; public SearchTask(int start, int end, List<String> words, String token) { this.start = start; this.end = end; this.words = words; this.token = token; } public Integer call() { for(int i = start; i < end; i++) { if(words.get(i).equals(token)) localCounter++; } return localCounter; } } // meanwhile in main :) List<String> words = new ArrayList<String>(); // populate words // let assume you have 30000 words // create tasks List<Callable> tasks = new ArrayList<Callable>(); tasks.add(new SearchTask(0, 10000, words, "John")); tasks.add(new SearchTask(10000, 20000, words, "John")); tasks.add(new SearchTask(20000, 30000, words, "John")); // create thread pool and start tasks ExecutorService exec = Executors.newFixedThreadPool(3); List<Future> results = exec.invokeAll(tasks); // wait for tasks to finish and collect results int counter = 0; for(Future f: results) { counter += f.get(); } 
+14
source

You have chosen a bad example, since Tudor was so kind as to indicate. A device with a rotating disk is subject to physical restrictions on the movement of the plates and heads, and the most effective reading implementation is to read each block in order, which reduces the need to move the head or wait for the disk to align.

However, some operating systems do not always permanently store data on disks, and for those who remember, defragmentation can provide increased disk performance if you did not do this job for the OS / file system.

As you mentioned, wanting a program that will benefit, let me offer a simple addition to the matrix.

Assuming that you have made one thread per core, you can triadously divide any two matrices that will be added to the N rows (one for each thread). Adding a matrix (if you remember) works as such:

 A + B = C 

or

 [ a11, a12, a13 ] [ b11, b12, b13] = [ (a11+b11), (a12+b12), (a13+c13) ] [ a21, a22, a23 ] + [ b21, b22, b23] = [ (a21+b21), (a22+b22), (a23+c23) ] [ a31, a32, a33 ] [ b31, b32, b33] = [ (a31+b31), (a32+b32), (a33+c33) ] 

So, to distribute this among N threads, we just need the number of lines and the module divided by the number of threads in order to get the "thread ID" with which it will be added.

 matrix with 20 rows across 3 threads row % 3 == 0 (for rows 0, 3, 6, 9, 12, 15, and 18) row % 3 == 1 (for rows 1, 4, 7, 10, 13, 16, and 19) row % 3 == 2 (for rows 2, 5, 8, 11, 14, and 17) // row 20 doesn't exist, because we number rows from 0 

Now each thread “knows” which rows it should process, and the results “for each row” can be calculated trivially, because the results do not intersect in another area of ​​the calculation flow .

All that is needed now is a “result” data structure that keeps track of when the values ​​were calculated and when the last value is set, then the calculation is complete. In this "fake" example of the result of adding a matrix with two streams, it takes about half the time to calculate the answer by two streams.

 // the following assumes that threads don't get rescheduled to different cores for // illustrative purposes only. Real Threads are scheduled across cores due to // availability and attempts to prevent unnecessary core migration of a running thread. [ done, done, done ] // filled in at about the same time as row 2 (runs on core 3) [ done, done, done ] // filled in at about the same time as row 1 (runs on core 1) [ done, done, .... ] // filled in at about the same time as row 4 (runs on core 3) [ done, ...., .... ] // filled in at about the same time as row 3 (runs on core 1) 

More complex problems can be solved using multithreading, and various problems can be solved using different methods. I deliberately chose one of the simplest examples.

+1
source

you are executing a task or called using the run () or call () method (respectively), and you need to parallelize as many implemented methods as possible.

A Task is a discrete unit of work. Loading a file into memory is a discrete unit of work, and therefore this activity can be delegated to a background thread. That is, the background thread performs this task of downloading the file.
This is a discrete unit of work, because it does not have other dependencies necessary to perform its work (file download) and has discrete boundaries.
What you are asking is once again to divide this into a task. That is, the stream downloads 1/3 of the file, and the other stream - 2/3, etc.
If you could divide the task into additional subtasks, this would not be a task, first of all, by definition. Therefore, downloading a file is a separate task.

To give an example:
Say you have a graphical interface and you need to present user data from 5 different files. To present them, you also need to prepare some data structures for processing the actual data.
All these are separate tasks.
For instance. downloading files - 5 different tasks, so they can be performed using 5 different threads.
Preparing data structures can be done by another thread.
Of course, the GUI starts in a different thread.
All this can happen at the same time.

+1
source

If the system supports high-performance I / O, here's how you can do it:

How to read a file using multiple streams in Java when a high bandwidth file system (3 GB / s) is available

Here is a solution for reading a single file with multiple threads.

Divide the file into N pieces, read each piece in the stream, and then combine them in order. Beware of lines crossing block boundaries. This is a basic idea suggested by the user. slaks

Bench marking below the implementation of multiple threads for a single file of 20 GB:

1 Topic: 50 seconds: 400 MB / s

2 Topics: 30 seconds: 666 MB / s

4 Themes: 20 seconds: 1 GB / s

8 Threads: 60 seconds: 333 MB / s

Equivalent Java7 readAllLines (): 400 seconds: 50 MB / s

Note. This can only work on systems designed to support high-performance I / O, and not on ordinary personal computers.

Here are the main strokes of the code, for complete information, follow the link

 public class FileRead implements Runnable { private FileChannel _channel; private long _startLocation; private int _size; int _sequence_number; public FileRead(long loc, int size, FileChannel chnl, int sequence) { _startLocation = loc; _size = size; _channel = chnl; _sequence_number = sequence; } @Override public void run() { System.out.println("Reading the channel: " + _startLocation + ":" + _size); //allocate memory ByteBuffer buff = ByteBuffer.allocate(_size); //Read file chunk to RAM _channel.read(buff, _startLocation); //chunk to String String string_chunk = new String(buff.array(), Charset.forName("UTF-8")); System.out.println("Done Reading the channel: " + _startLocation + ":" + _size); } //args[0] is path to read file //args[1] is the size of thread pool; Need to try different values to fing sweet spot public static void main(String[] args) throws Exception { FileInputStream fileInputStream = new FileInputStream(args[0]); FileChannel channel = fileInputStream.getChannel(); long remaining_size = channel.size(); //get the total number of bytes in the file long chunk_size = remaining_size / Integer.parseInt(args[1]); //file_size/threads //thread pool ExecutorService executor = Executors.newFixedThreadPool(Integer.parseInt(args[1])); long start_loc = 0;//file pointer int i = 0; //loop counter while (remaining_size >= chunk_size) { //launches a new thread executor.execute(new FileRead(start_loc, toIntExact(chunk_size), channel, i)); remaining_size = remaining_size - chunk_size; start_loc = start_loc + chunk_size; i++; } //load the last remaining piece executor.execute(new FileRead(start_loc, toIntExact(remaining_size), channel, i)); //Tear Down } } 
-1
source

Source: https://habr.com/ru/post/915932/


All Articles