I am developing a script where I create objects at random, but I do not want duplicates. They are saved and every time I create a new one, I check it for existing ones. Since I want to do this for a large number of objects, I am now trying to parallelize it, but so far without success. I tried some solutions found on the Internet (mainly here), but still not working.
The idea is that I start the pool and bind my function to it. When the process finds a match, it sets the value to 1. This value is read by all processes, they can write it with a lock, and I need to return at the end. So I did Lock, and Valuethat all processes can read the value (thus lock=False) to check whether a match in another process found. Then I tried something different with Eventand checked if it was installed, but it still does not work ... Then I tried to raise a special one Exception, but still did not manage to make the code successful.
Please, I prefer to program OOP, so I will avoid it until my last resource defines a variable global, because I consider them unsure (personal opinion).
Here is the MWE, I replaced my complex objects with intand my saved objects with range(10000)to help you understand.
import multiprocessing as muproc
def ParallelCheck(me):
print(" Testing {}...".format(me))
lock = muproc.Lock()
back = muproc.Value("i", 0, lock=False)
ParChild = ParallelChild(me, lock, back)
with muproc.Pool() as pool:
try:
pool.map(ParChild.run, range(10000))
except AbortPool:
pool.terminate()
print("pool")
return back.value
def Computation(me, neighbour):
return me == neighbour
class ParallelChild(object):
def __init__(self, me, lock, back):
self.abort = muproc.Event()
self.lock = lock
self.me = me
self.back = back
def run(self, neighbour):
print("run")
if self.abort.is_set():
print("Aborting")
pass
else:
if Computation(self.me, neighbour):
self.lock.acquire()
self.abort.set()
self.back.value = 1
print("GOTCHA")
self.lock.release()
raise AbortPool
else:
print("...")
class AbortPool(Exception):
def __init__(self):
print("AbortPool raised!")
if __name__ == "__main__":
values = [12000, 13, 7]
for v in values:
print("value={} match={}".format(v, ParallelCheck(v)))
Now it gives RunTimeError:
me@stigepc4$ python3 mwe.py
Testing 12000...
Traceback (most recent call last):
File "mwe.py", line 63, in <module>
print("value={} match={}".format(v, ParallelCheck(v)))
File "mwe.py", line 16, in ParallelCheck
pool.map(ParChild.run, range(10000))
File "/usr/lib/python3.4/multiprocessing/pool.py", line 260, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.4/multiprocessing/pool.py", line 599, in get
raise self._value
File "/usr/lib/python3.4/multiprocessing/pool.py", line 383, in _handle_tasks
put(task)
File "/usr/lib/python3.4/multiprocessing/connection.py", line 206, in send
self._send_bytes(ForkingPickler.dumps(obj))
File "/usr/lib/python3.4/multiprocessing/reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
File "/usr/lib/python3.4/multiprocessing/sharedctypes.py", line 128, in reduce_ctype
assert_spawning(obj)
File "/usr/lib/python3.4/multiprocessing/context.py", line 347, in assert_spawning
' through inheritance' % type(obj).__name__
RuntimeError: c_int objects should only be shared between processes through inheritance
I think this has something to do with Lock(although the comment Manager, but it didn’t work better) or with Value, and now the idea is how to get rid of it ...
Edit
, , , , , , . , , . , , , . , , , , ...
2
, ...
for o in objects:
if too_close(o, existing_objects):
return 1
return 0
... - ...
for o in objects:
if too_close(o, some_existing_objects):
return 1 and abort other processes
return 0