Python multiprocessing pool is interrupted if value

I am developing a script where I create objects at random, but I do not want duplicates. They are saved and every time I create a new one, I check it for existing ones. Since I want to do this for a large number of objects, I am now trying to parallelize it, but so far without success. I tried some solutions found on the Internet (mainly here), but still not working.

The idea is that I start the pool and bind my function to it. When the process finds a match, it sets the value to 1. This value is read by all processes, they can write it with a lock, and I need to return at the end. So I did Lock, and Valuethat all processes can read the value (thus lock=False) to check whether a match in another process found. Then I tried something different with Eventand checked if it was installed, but it still does not work ... Then I tried to raise a special one Exception, but still did not manage to make the code successful.

Please, I prefer to program OOP, so I will avoid it until my last resource defines a variable global, because I consider them unsure (personal opinion).

Here is the MWE, I replaced my complex objects with intand my saved objects with range(10000)to help you understand.

#!/usr/bin/env python3

import multiprocessing as muproc


def ParallelCheck(me):
    print(" Testing {}...".format(me))
    #manager = muproc.Manager()
    #lock = manager.Lock()
    lock = muproc.Lock()
    back = muproc.Value("i", 0, lock=False)
    ParChild = ParallelChild(me, lock, back)
    with muproc.Pool() as pool:
        try:
            pool.map(ParChild.run, range(10000))
        except AbortPool:
            pool.terminate()
            print("pool")
    return back.value


def Computation(me, neighbour):
    return me == neighbour



class ParallelChild(object):
    def __init__(self, me, lock, back):
        self.abort = muproc.Event()
        self.lock = lock
        self.me = me
        self.back = back

    def run(self, neighbour):
        print("run")
        if self.abort.is_set():
            print("Aborting")
            pass
        else:
            if Computation(self.me, neighbour):
                self.lock.acquire()
                self.abort.set()
                self.back.value = 1
                print("GOTCHA")
                self.lock.release()
                raise AbortPool
            else:
                print("...")


class AbortPool(Exception):
    #pass
    def __init__(self):
        ## Just to check
        print("AbortPool raised!")



if __name__ == "__main__":
    values = [12000, 13, 7]
    for v in values:
        print("value={} match={}".format(v, ParallelCheck(v)))

Now it gives RunTimeError:

me@stigepc4$ python3 mwe.py 
 Testing 12000...
Traceback (most recent call last):
  File "mwe.py", line 63, in <module>
    print("value={} match={}".format(v, ParallelCheck(v)))
  File "mwe.py", line 16, in ParallelCheck
    pool.map(ParChild.run, range(10000))
  File "/usr/lib/python3.4/multiprocessing/pool.py", line 260, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.4/multiprocessing/pool.py", line 599, in get
    raise self._value
  File "/usr/lib/python3.4/multiprocessing/pool.py", line 383, in _handle_tasks
    put(task)
  File "/usr/lib/python3.4/multiprocessing/connection.py", line 206, in send
    self._send_bytes(ForkingPickler.dumps(obj))
  File "/usr/lib/python3.4/multiprocessing/reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
  File "/usr/lib/python3.4/multiprocessing/sharedctypes.py", line 128, in reduce_ctype
    assert_spawning(obj)
  File "/usr/lib/python3.4/multiprocessing/context.py", line 347, in assert_spawning
    ' through inheritance' % type(obj).__name__
RuntimeError: c_int objects should only be shared between processes through inheritance

I think this has something to do with Lock(although the comment Manager, but it didn’t work better) or with Value, and now the idea is how to get rid of it ...

Edit

, , , , , , . , , . , , , . , , , , ...

2

, ...

for o in objects:
    if too_close(o, existing_objects):
        return 1
return 0

... - ...

for o in objects:
    if too_close(o, some_existing_objects):
        return 1 and abort other processes
return 0
+4
1

, script. - . , .

python, , , pool.map , - , , . , .

. , , .

, ( ). , . range(10000) , . 4 2500.

, , . , , , , . , , , .

, ( , , ). , , .

, multiprocessing.Event... , , .

, bj0, ...

, :

  • : 7s
  • : 910s
  • , : 97s

... , , ...

MWE:

#!/usr/bin/env python3
import multiprocessing as muproc

def ParallelCheck(me):
    print(" Testing {}...".format(me))
    global abort
    abort.clear()
    ParChild = ParallelChild(me)
    jobs = []
    N = 4
    for i in range(N):
        jobs.append(muproc.Process(target = ParChild.run, args=(range(i * 2500, (i+1) * 2500),)))
    for p in jobs:
        p.start()
    for p in jobs:
        p.join()
    if abort.is_set():
        print("MATCH FOUND")
        return 1
    else:
        print(" no match...")
        return 0


def Computation(me, neighbour):
    return me == neighbour


class ParallelChild(object):
    def __init__(self, me):
        self.me = me

    def run(self, neighbours):
        global abort
        for neighbour in neighbours:
            print("{} vs {} by {}".format(self.me, neighbour, self.CurProc()))
            if abort.is_set():
                print("Aborting {}".format(self.CurProc()))
                return 0
            else:
                if Computation(self.me, neighbour):
                    abort.set()
                    print("GOTCHA {}".format(self.CurProc()))
                    return 1

    def CurProc(self):
        return muproc.current_process()._identity[0]



if __name__ == "__main__":
    abort = muproc.Event()
    values = [12000, 130, 7]
    for v in values:
        print("value={} match={}".format(v, ParallelCheck(v)))
+1

Source: https://habr.com/ru/post/1607983/


All Articles