Multiprocessing with common queue and termination options

I have this original function that I want to switch to a multiprocessor:

def optimal(t0, tf, frequences, delay, ratio = 0): First = True # First for s in delay: delay = 0 # delay between signals, timelines = list() for i in range(len(frequences)): timelines.append(time_builder(frequences[i], t0+delay, tf)) delay += s trio_overlap = trio_combination(timelines, ratio) valid = True for items in trio_overlap.values(): if len(list(set(items))) == len(items): continue else: valid = False if not valid: continue overlap = duo_combination(timelines) optimal = ... depending of conditions return optimal 

If valid = True after the test, it will calculate the optimization parameter named optim_param and try to minimize it. If it falls under a certain threshold, optim_param < 0.3 , I optim_param < 0.3 loop and take this value as my answer.

My problem is that as my model develops, complexity starts to grow, and it takes too long to compute single threads. I would like to handle the calculations in parallel. Since each process will have to compare the result with the value of s with the current optimal, I tried to implement a queue.

This is my first multiprocessing experience, and even if I think I'm on the right track, it seems to me that my code is dirty and incomplete. Can i get help?

Thanks: D

+5
source share
1 answer

Instead of manually creating a process for each case, consider using Pool.imap_unordered . The trick is how to close cleanly when an acceptable result is obtained: you can implement this by passing a generator that will exit early if a flag is set that checks each cycle. The main program reads from the iterator, saves the best result, and sets the flag when it is good enough. The final trick is to slow down the (internal) reading of the stream from the generator in order to prevent a large lag in the scheduled tasks that need to wait (or, uncleanly, kill) after getting a good result. Given the number of processes in the pool, this stimulation can be achieved using a semaphore.

Here is an example (with trivial analysis) to demonstrate:

 import multiprocessing,threading,os def interrupted(data,sem,interrupt): for x in data: yield x sem.acquire() if interrupt: break def analyze(x): return x**2 np=os.cpu_count() pool=multiprocessing.Pool(np) sem=threading.Semaphore(np-1) token=[] # mutable vals=pool.imap_unordered(analyze,interrupted(range(-10,10),sem,token)) pool.close() # optional: to let processes exit faster best=None for res in vals: if best is None or res<best: best=res if best<5: token.append(None) # make it truthy sem.release() pool.join() print(best) 

There are, of course, other ways of exchanging a semaphore and an interrupt flag with a generator; this method uses an ugly data type, but has the advantage of using global variables (or even closures).

+3
source

Source: https://habr.com/ru/post/1275517/


All Articles