Large numpy arrays in shared memory for multiprocessing: is there something wrong with this approach?

Multiprocessing is a great tool, but it's not easy to use large chunks of memory with it. You can load chunks in each process and return the results to disk, but sometimes you need to store the results in memory. And on top, use the numpy functionality.

I read / searched a lot and came up with answers:

Use numpy array in shared memory for multiprocessing

Share a read-only large continuous-memory array between multiprocessing processes

Python Multiprocessor Multicore Arrays

How to pass large numpy arrays between python subprocesses without saving to disk?

Etc etc. etc.

Everyone has disadvantages: Not-so-mainstream ( sharedmem ) sharedmem ; global storage of variables; not so easy to read code, pipes, etc. etc.

My goal was to easily use numpy in my workers, without worrying about conversions, etc.

After much testing, I came up with this . And it works on my ubuntu 16, python 3.6, 16GB, 8 main machines. I have made many shortcuts compared to previous approaches. There is no global general state, there are no pure pointers to memory that need to be converted to numpy inside workers, large numpy arrays are passed as process arguments, etc.

the Pastebin link above , but here I will give a few snippets.

Some imported goods:

 import numpy as np import multiprocessing as mp import multiprocessing.sharedctypes import ctypes 

Select some common mem and wrap it in a numpy array:

 def create_np_shared_array(shape, dtype, ctype) . . . . shared_mem_chunck = mp.sharedctypes.RawArray(ctype, size) numpy_array_view = np.frombuffer(shared_mem_chunck, dtype).reshape(shape) return numpy_array_view 

Create a shared array and put something in it

 src = np.random.rand(*SHAPE).astype(np.float32) src_shared = create_np_shared_array(SHAPE,np.float32,ctypes.c_float) dst_shared = create_np_shared_array(SHAPE,np.float32,ctypes.c_float) src_shared[:] = src[:] # Some numpy ops accept an 'out' array where to store the results 

Create a process:

 p = mp.Process(target=lengthly_operation,args=(src_shared, dst_shared, k, k + STEP)) p.start() p.join() 

Below are some results (see full code):

 Serial version: allocate mem 2.3741257190704346 exec: 17.092209577560425 total: 19.46633529663086 Succes: True Parallel with trivial np: allocate mem 2.4535582065582275 spawn process: 0.00015354156494140625 exec: 3.4581971168518066 total: 5.911908864974976 Succes: False Parallel with shared mem np: allocate mem 4.535916328430176 (pure alloc:4.014216661453247 copy: 0.5216996669769287) spawn process: 0.00015664100646972656 exec: 3.6783478260040283 total: 8.214420795440674 Succes: True 

I also made cProfile (why 2 extra seconds when sharing a shared mem?) And realized that there are some calls to tempfile.py , {method 'write' of '_io.BufferedWriter' objects} .

Questions

  • Am I doing something wrong?
  • Are the (large) arrays pickled back and forth and I got nothing from the speed? Note that the 2nd run (using regular np arrays, validation fails)
  • Is there a way to improve timings, code clarity, etc.? (in relation to the multiprocessor paradigm)

Notes

  • I cannot work with process pools because mem must be inherited in fork and not sent as a parameter.
+5
source share

Source: https://habr.com/ru/post/1272724/


All Articles