Dead simple example of using multiprocessor queue, pool and lock

I tried to read the documentation at http://docs.python.org/dev/library/multiprocessing.html , but I'm still afraid of multiprocessing queue, pool and blocking. And now I was able to build an example below.

As for the queue and the pool, I'm not sure if I understood the concept correctly, so correct me if I am wrong. What I'm trying to achieve is to process 2 requests in time (the data list contains 8 in this example), so what should I use? A pool for creating 2 processes that can handle two different queues (2 for max.), Or should I just use the Queue to process 2 inputs each time? The lock should have printed outputs correctly.

import multiprocessing import time data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'], ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7'] ) def mp_handler(var1): for indata in var1: p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1])) p.start() def mp_worker(inputs, the_time): print " Processs %s\tWaiting %s seconds" % (inputs, the_time) time.sleep(int(the_time)) print " Process %s\tDONE" % inputs if __name__ == '__main__': mp_handler(data) 
+44
python multiprocessing
Jan 02 '14 at 16:45
source share
5 answers

The best solution for your problem is to use Pool . Using Queue and having a separate “load queue” function is probably too large.

Here's a slightly rebuilt version of your program, this time with just 2 processes coralled in Pool . I believe this is the easiest way, with minimal changes to the source code:

 import multiprocessing import time data = ( ['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'], ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7'] ) def mp_worker((inputs, the_time)): print " Processs %s\tWaiting %s seconds" % (inputs, the_time) time.sleep(int(the_time)) print " Process %s\tDONE" % inputs def mp_handler(): p = multiprocessing.Pool(2) p.map(mp_worker, data) if __name__ == '__main__': mp_handler() 

Note that the mp_worker() function now takes a single argument (a tuple of the two previous arguments), because the map() function blocks your input in the sublists, each sublist is set as one argument to your working function.

Output:

 Processs a Waiting 2 seconds Processs b Waiting 4 seconds Process a DONE Processs c Waiting 6 seconds Process b DONE Processs d Waiting 8 seconds Process c DONE Processs e Waiting 1 seconds Process e DONE Processs f Waiting 3 seconds Process d DONE Processs g Waiting 5 seconds Process f DONE Processs h Waiting 7 seconds Process g DONE Process h DONE 



Edit as per @Thales comment below:

If you want to "block the restriction for each pool" so that your processes run in tandem pairs, ala:

Waiting B Waiting | Done, D done | Waiting, Waiting D | C done, D done | ...

then change the function of the handler to start pools (from 2 processes) for each data pair:

 def mp_handler(): subdata = zip(data[0::2], data[1::2]) for task1, task2 in subdata: p = multiprocessing.Pool(2) p.map(mp_worker, (task1, task2)) 

Now your output:

  Processs a Waiting 2 seconds Processs b Waiting 4 seconds Process a DONE Process b DONE Processs c Waiting 6 seconds Processs d Waiting 8 seconds Process c DONE Process d DONE Processs e Waiting 1 seconds Processs f Waiting 3 seconds Process e DONE Process f DONE Processs g Waiting 5 seconds Processs h Waiting 7 seconds Process g DONE Process h DONE 
+73
Jan 02 '14 at
source share

Here is my personal translation for this topic:

Here, (invitations to receive are welcome!): Https://gist.github.com/thorsummoner/b5b1dfcff7e7fdd334ec

 import multiprocessing import sys THREADS = 3 # Used to prevent multiple threads from mixing thier output GLOBALLOCK = multiprocessing.Lock() def func_worker(args): """This function will be called by each thread. This function can not be a class method. """ # Expand list of args into named args. str1, str2 = args del args # Work # ... # Serial-only Portion GLOBALLOCK.acquire() print(str1) print(str2) GLOBALLOCK.release() def main(argp=None): """Multiprocessing Spawn Example """ # Create the number of threads you want pool = multiprocessing.Pool(THREADS) # Define two jobs, each with two args. func_args = [ ('Hello', 'World',), ('Goodbye', 'World',), ] try: # Spawn up to 9999999 jobs, I think this is the maximum possible. # I do not know what happens if you exceed this. pool.map_async(func_worker, func_args).get(9999999) except KeyboardInterrupt: # Allow ^C to interrupt from any thread. sys.stdout.write('\033[0m') sys.stdout.write('User Interupt\n') pool.close() if __name__ == '__main__': main() 
+6
Apr 24 '16 at 6:33
source share

It may not be 100% related to the question, but in my search an example of using multiprocessing with a queue is displayed first in google.

This is a basic sample class that you can create and queue in a queue and wait for the queue to finish. It's all I need.

 from multiprocessing import JoinableQueue from multiprocessing.context import Process class Renderer: queue = None def __init__(self, nb_workers=2): self.queue = JoinableQueue() self.processes = [Process(target=self.upload) for i in range(nb_workers)] for p in self.processes: p.start() def render(self, item): self.queue.put(item) def upload(self): while True: item = self.queue.get() if item is None: break # process your item here self.queue.task_done() def terminate(self): """ wait until queue is empty and terminate processes """ self.queue.join() for p in self.processes: p.terminate() r = Renderer() r.render(item1) r.render(item2) r.terminate() 
+3
Feb 23 '16 at 9:47
source share

Here is an example from my code (for a streaming pool, but just change the class name and you will have a process pool):

 def execute_run(rp): ... do something pool = ThreadPoolExecutor(6) for mat in TESTED_MATERIAL: for en in TESTED_ENERGIES: for ecut in TESTED_E_CUT: rp = RunParams( simulations, DEST_DIR, PARTICLE, mat, 960, 0.125, ecut, en ) pool.submit(execute_run, rp) pool.join() 

Mostly:

  • pool = ThreadPoolExecutor(6) creates a pool for 6 threads
  • Then you have a bunch to add tasks to the pool
  • pool.submit(execute_run, rp) adds a task for pooling, first arogument is a function called in a thread / process, the rest of the arguments are passed to the called function.
  • pool.join waiting for all tasks to complete.
0
Jan 2 '14 at 17:19
source share

For anyone using editors like Komodo Edit (win10), add sys.stdout.flush() to:

 def mp_worker((inputs, the_time)): print " Process %s\tWaiting %s seconds" % (inputs, the_time) time.sleep(int(the_time)) print " Process %s\tDONE" % inputs sys.stdout.flush() 

or as the first line:

  if __name__ == '__main__': sys.stdout.flush() 

This helps to see what happens when the script starts; instead of looking at the black box of the command line.

0
Nov 12 '17 at 15:45
source share



All Articles