Python multiprocessing queues are slower than pool.map

I recently started experimenting with multiprocessing to speed things up. I created a script that does a fuzzy string comparison and calculates points using different algorithms (I wanted to compare different matching methods). You can find the full source here: https://bitbucket.org/bergonzzi/fuzzy-compare/src . As input, 2 files are required, which are combined in pairs (each line of file1 with each line of file2). For each pair, fuzzy match estimates are calculated.

I made 3 versions. Running with the example data provided in my repo (which consists of 697.340 elements after pairing), I have the following timings:

  • Simple single process - 0:00:47
  • Multiprocessor using Pool.map () - 0:00:13
  • Multiprocessor using queues (producer / consumer pattern) - 0:01:04

I am trying to understand why my version of Pool.map () is much faster than my version of Queue, which is actually slower than a simple uniprocessor.

My reasoning about trying to use queues is that the version of Pool.map () is saved until everything is finished and written to a file at the end. This means that for large files this results in a lot of memory. I am talking about this version (link to it, because there is a lot of code to insert here).

To solve this problem, I reorganized it into a producer / consumer template (or at least tried). Here I first create jobs by combining both input files and placing them in a queue that consumers process (calculate fuzzy matches). Completed jobs are queued. Then I have one process that captures the elements made from this queue and writes them to a file. Thus, theoretically, I do not need so much memory, since the results will be uploaded to disk. It seems to be working fine, but it is much slower. I also noticed that the 4 processes I create do not seem to use a 100% processor when viewing the Activity Monitor on Mac OSX (which does not match the version of Pool.map ()).

Another thing that I notice is that my producer function seems to fill the queue correctly, but consumer processes seem to wait until the queue is full, and starts working as soon as the first element arrives. I probably have something wrong there ...

For reference, here is the code for the corresponding code for the Queue version (although it is better to see the full code in the repo linked above).

Here is my producer function:

def combine(list1, list2): ''' Combine every item of list1 with every item of list 2, normalize put the pair in the job queue. ''' pname = multiprocessing.current_process().name for x in list1: for y in list2: # slugify is a function to normalize the strings term1 = slugify(x.strip(), separator=' ') term2 = slugify(y.strip(), separator=' ') job_queue.put_nowait([term1, term2]) 

This is the recording function:

 def writer(writer_queue): out = open(file_out, 'wb') pname = multiprocessing.current_process().name out.write(header) for match in iter(writer_queue.get, "STOP"): print("%s is writing %s") % (pname, str(match)) line = str(';'.join(match) + '\n') out.write(line) out.close() 

This is a working function that performs the actual calculations (it deletes most of the code, since there is no value here, the full source in the repo):

 def score_it(job_queue, writer_queue): '''Calculate scores for pair of words.''' pname = multiprocessing.current_process().name for pair in iter(job_queue.get_nowait, "STOP"): # do all the calculations and put the result into the writer queue writer_queue.put(result) 

This is how I set up the processes:

 # Files to_match = open(args.file_to_match).readlines() source_list = open(args.file_to_be_matched).readlines() workers = 4 job_queue = multiprocessing.Manager().Queue() writer_queue = multiprocessing.Manager().Queue() processes = [] print('Start matching with "%s", minimum score of %s and %s workers') % ( args.algorithm, minscore, workers) # Fill up job queue print("Filling up job queue with term pairs...") c = multiprocessing.Process(target=combine, name="Feeder", args=(to_match, source_list)) c.start() c.join() print("Job queue size: %s") % job_queue.qsize() # Start writer process w = multiprocessing.Process(target=writer, name="Writer", args=(writer_queue,)) w.start() for w in xrange(workers): p = multiprocessing.Process(target=score_it, args=(job_queue, writer_queue)) p.start() processes.append(p) job_queue.put("STOP") for p in processes: p.join() writer_queue.put("STOP") 

I read a little about the fact that multiprocessing is slower, and I know that this is due to the overhead of creating and managing new processes. In addition, when the work being performed is not โ€œlarge,โ€ the multiprocessing effect may not be visible. However, in this case, I think the work is pretty big, and the Pool.map () version seems to prove it, because it is much faster.

Am I doing something really wrong when managing all these processes and passing queue objects? How can this be optimized so that the results can be written to a file as they are processed, in order to minimize the amount of memory required when it starts?

Thanks!

+6
source share
1 answer

I think the problem with your timings is that there is no optimization in your version with a multi-threaded queue. You made a comment, basically saying that your job_queue is filled before workflows start accepting jobs from it. I believe the reason for this is c.join (), which you have in the #Fill up queue. This prevents the main thread from continuing until the job queue is full. I carried c.join () to the end after p.join (). You also need to figure out a way to get your stop flags at the end of the queue. The combinational function may be a good place to do this. Something along the lines of adding x the number of stop flags after it ran out of data to merge.

One more note: you write over the variable w as part of a for loop that starts p-processes. Regarding style / readability / etc., I would change w to a different variable name. If you are not using it, underscores work like a good throwaway variable name. I.e

 for w in xrange(workers): 

should become

 for _ in xrange(workers): 

In short, if you move c.join () to the end, you should get more accurate timings. Currently, the only thing multithreaded is fuzzy string matching. One of the advantages of having a producer / consumer stream is that consumer threads do not have to wait for the producer stream to complete, and thus you get less memory.

+2
source

Source: https://habr.com/ru/post/978222/


All Articles