Unable to modify shared memory object using pool

I play with python multiprocessing and shared memory. I can use the shared memory object with Process, but not with Pool. I have added a callback for mine Pool, and the callback is not called either.

from multiprocessing import Array, Pool, Process

def flip(x,a):
    a[x] = 0 if a[x] else 1
    return (x, a[x])

def cb(result):
    print(result)

if __name__ == '__main__':

    # size of array
    N = 10

    # shared array - N bytes - unsynchronized - initialized to zeros
    a = Array('B', N, lock=False)

    # flip values to ones using Process
    processes = [Process(target=flip, args=(x, a)) for x in range(N)]
    for p in processes: p.start()
    for p in processes: p.join()
    print([a[i] for i in range(N)])    

    # flip values back to zeros using Pool
    pool = Pool(processes=4)
    for x in range(N):
        pool.apply_async(flip, args=(x, a), callback=cb)
    pool.close()
    pool.join()
    print([a[i] for i in range(N)])

I expect my shared array to print once with all 1, and then single lines printed callback, and the array again with all 0, but instead we get this:

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

Why Poolare tasks not being performed?

Removing shared memory for a minimal example;

def f(x):
    return x

def cb(result):
    print('cb',result)

if __name__ == '__main__':

    pool = Pool(processes=4)
    pool.apply_async(f, range(10), callback=cb)
    pool.close()
    pool.join()

I expect this to print numbers from 0 to 9 on separate lines, but it doesn't print anything.

If I replaced the call apply_syncdirectly above,

pool.apply_async(f, args=[10], callback=cb)

I get a conclusion

cb 10

[10] range(10), [1,2,3], [(1),(2),(3)] ([1],[2],[3]) .

+4
1

multiprocessing, . , N N-.

:

1) . . flip_many() partition()

2) . . flip_one()

.

from multiprocessing import Array, Pool, Process

def flip_many(start_idx, end_idx):
    for idx in range(start_idx, end_idx + 1):
        a[idx] = not(a[idx])

def flip_one(idx):
    a[idx] = not(a[idx])
    return idx, a[idx]

def cb(result):
    print(result)

def partition(range_, n):
    start, end = range_
    size = (end - start) // n
    ranges = []
    for _ in range(n):
        ranges.append((start, start+size-1))
        start += size
    if ranges[-1][1] != end-1:
        ranges[-1] = (ranges[-1][0], end-1)
    return ranges    

if __name__ == '__main__':

    # size of array
    N = 10
    N_procs = 2
    ranges = partition( (0, N), N_procs )

    # shared array - N bytes - unsynchronized - initialized to zeros
    a = Array('B', N, lock=False)
    print([a[i] for i in range(N)], "elements of array initialized to 0")    

    # flip values to ones using Process

    processes = []
    for i in range(N_procs):
        p = Process(target=flip_many, args=(*ranges[i], ))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print([a[i] for i in range(N)], "First flip by N processes, should be 1")    

    # flip values back to zeros using Pool
    pool = Pool()
    indices = range(N)
    pool.map(flip_one, indices)
    print([a[i] for i in range(N)], "Second flip by the pool.map ... 0")

    pool.map(flip_one, indices, chunksize=N // N_procs)
    print([a[i] for i in range(N)], "Third flip by the pool.map ... 1")

    pool.map_async(flip_one, indices, callback=cb)
    print([a[i] for i in range(N)], "Fourth flip by the pool.map_async ... 0")
    print("    Due to the async nature, flip not reflected until .join()")
    print("    But the callback returns the correct results:")

    pool.close()
    pool.join()
    print([a[i] for i in range(N)], "Content after the join... 0")
0

Source: https://habr.com/ru/post/1659116/


All Articles