I recently started experimenting with multiprocessing to speed things up. I created a script that does a fuzzy string comparison and calculates points using different algorithms (I wanted to compare different matching methods). You can find the full source here: https://bitbucket.org/bergonzzi/fuzzy-compare/src . As input, 2 files are required, which are combined in pairs (each line of file1 with each line of file2). For each pair, fuzzy match estimates are calculated.
I made 3 versions. Running with the example data provided in my repo (which consists of 697.340 elements after pairing), I have the following timings:
- Simple single process - 0:00:47
- Multiprocessor using Pool.map () - 0:00:13
- Multiprocessor using queues (producer / consumer pattern) - 0:01:04
I am trying to understand why my version of Pool.map () is much faster than my version of Queue, which is actually slower than a simple uniprocessor.
My reasoning about trying to use queues is that the version of Pool.map () is saved until everything is finished and written to a file at the end. This means that for large files this results in a lot of memory. I am talking about this version (link to it, because there is a lot of code to insert here).
To solve this problem, I reorganized it into a producer / consumer template (or at least tried). Here I first create jobs by combining both input files and placing them in a queue that consumers process (calculate fuzzy matches). Completed jobs are queued. Then I have one process that captures the elements made from this queue and writes them to a file. Thus, theoretically, I do not need so much memory, since the results will be uploaded to disk. It seems to be working fine, but it is much slower. I also noticed that the 4 processes I create do not seem to use a 100% processor when viewing the Activity Monitor on Mac OSX (which does not match the version of Pool.map ()).
Another thing that I notice is that my producer function seems to fill the queue correctly, but consumer processes seem to wait until the queue is full, and starts working as soon as the first element arrives. I probably have something wrong there ...
For reference, here is the code for the corresponding code for the Queue version (although it is better to see the full code in the repo linked above).
Here is my producer function:
def combine(list1, list2): ''' Combine every item of list1 with every item of list 2, normalize put the pair in the job queue. ''' pname = multiprocessing.current_process().name for x in list1: for y in list2:
This is the recording function:
def writer(writer_queue): out = open(file_out, 'wb') pname = multiprocessing.current_process().name out.write(header) for match in iter(writer_queue.get, "STOP"): print("%s is writing %s") % (pname, str(match)) line = str(';'.join(match) + '\n') out.write(line) out.close()
This is a working function that performs the actual calculations (it deletes most of the code, since there is no value here, the full source in the repo):
def score_it(job_queue, writer_queue): '''Calculate scores for pair of words.''' pname = multiprocessing.current_process().name for pair in iter(job_queue.get_nowait, "STOP"):
This is how I set up the processes:
# Files to_match = open(args.file_to_match).readlines() source_list = open(args.file_to_be_matched).readlines() workers = 4 job_queue = multiprocessing.Manager().Queue() writer_queue = multiprocessing.Manager().Queue() processes = [] print('Start matching with "%s", minimum score of %s and %s workers') % ( args.algorithm, minscore, workers) # Fill up job queue print("Filling up job queue with term pairs...") c = multiprocessing.Process(target=combine, name="Feeder", args=(to_match, source_list)) c.start() c.join() print("Job queue size: %s") % job_queue.qsize() # Start writer process w = multiprocessing.Process(target=writer, name="Writer", args=(writer_queue,)) w.start() for w in xrange(workers): p = multiprocessing.Process(target=score_it, args=(job_queue, writer_queue)) p.start() processes.append(p) job_queue.put("STOP") for p in processes: p.join() writer_queue.put("STOP")
I read a little about the fact that multiprocessing is slower, and I know that this is due to the overhead of creating and managing new processes. In addition, when the work being performed is not โlarge,โ the multiprocessing effect may not be visible. However, in this case, I think the work is pretty big, and the Pool.map () version seems to prove it, because it is much faster.
Am I doing something really wrong when managing all these processes and passing queue objects? How can this be optimized so that the results can be written to a file as they are processed, in order to minimize the amount of memory required when it starts?
Thanks!