I wrote a script to run several processes (simple unit tests) for parallel operation. It will run N jobs with parallel num_workers processes at a time.
My first implementation ran processes in num_workers and seemed to work fine (I used the false command here to test the behavior)
import subprocess errors = 0 num_workers = 10 N = 100 i = 0 while i < N: processes = [] for j in range(i, min(i+num_workers, N)): p = subprocess.Popen(['false']) processes.append(p) [p.wait() for p in processes] exit_codes = [p.returncode for p in processes] errors += sum(int(e != 0) for e in exit_codes) i += num_workers print(f"There were {errors}/{N} errors")
However, tests do not take the same amount of time, so I sometimes wait for the slow test to finish. So I rewrote it to continue to assign tasks upon completion
import subprocess import os errors = 0 num_workers = 40 N = 100 assigned = 0 completed = 0 processes = set() while completed < N: if assigned < N: p = subprocess.Popen(['false']) processes.add((assigned, p)) assigned += 1 if len(processes) >= num_workers or assigned == N: os.wait() for i, p in frozenset(processes): if p.poll() is not None: completed += 1 processes.remove((i, p)) err = p.returncode print(i, err) if err != 0: errors += 1 print(f"There were {errors}/{N} errors")
However, this leads to incorrect results for the last few processes. For example, in the above example, it produces 98/100 errors instead of 100. I checked, and this has nothing to do with concurrency; The last 2 jobs returned with exit code 0 for some reason.
Why is this happening?
source share