Multiprocessing is a great tool, but it's not easy to use large chunks of memory with it. You can load chunks in each process and return the results to disk, but sometimes you need to store the results in memory. And on top, use the numpy functionality.
I read / searched a lot and came up with answers:
Use numpy array in shared memory for multiprocessing
Share a read-only large continuous-memory array between multiprocessing processes
Python Multiprocessor Multicore Arrays
How to pass large numpy arrays between python subprocesses without saving to disk?
Etc etc. etc.
Everyone has disadvantages: Not-so-mainstream ( sharedmem ) sharedmem ; global storage of variables; not so easy to read code, pipes, etc. etc.
My goal was to easily use numpy in my workers, without worrying about conversions, etc.
After much testing, I came up with this . And it works on my ubuntu 16, python 3.6, 16GB, 8 main machines. I have made many shortcuts compared to previous approaches. There is no global general state, there are no pure pointers to memory that need to be converted to numpy inside workers, large numpy arrays are passed as process arguments, etc.
the Pastebin link above , but here I will give a few snippets.
Some imported goods:
import numpy as np import multiprocessing as mp import multiprocessing.sharedctypes import ctypes
Select some common mem and wrap it in a numpy array:
def create_np_shared_array(shape, dtype, ctype) . . . . shared_mem_chunck = mp.sharedctypes.RawArray(ctype, size) numpy_array_view = np.frombuffer(shared_mem_chunck, dtype).reshape(shape) return numpy_array_view
Create a shared array and put something in it
src = np.random.rand(*SHAPE).astype(np.float32) src_shared = create_np_shared_array(SHAPE,np.float32,ctypes.c_float) dst_shared = create_np_shared_array(SHAPE,np.float32,ctypes.c_float) src_shared[:] = src[:]
Create a process:
p = mp.Process(target=lengthly_operation,args=(src_shared, dst_shared, k, k + STEP)) p.start() p.join()
Below are some results (see full code):
Serial version: allocate mem 2.3741257190704346 exec: 17.092209577560425 total: 19.46633529663086 Succes: True Parallel with trivial np: allocate mem 2.4535582065582275 spawn process: 0.00015354156494140625 exec: 3.4581971168518066 total: 5.911908864974976 Succes: False Parallel with shared mem np: allocate mem 4.535916328430176 (pure alloc:4.014216661453247 copy: 0.5216996669769287) spawn process: 0.00015664100646972656 exec: 3.6783478260040283 total: 8.214420795440674 Succes: True
I also made cProfile (why 2 extra seconds when sharing a shared mem?) And realized that there are some calls to tempfile.py , {method 'write' of '_io.BufferedWriter' objects} .
Questions
- Am I doing something wrong?
- Are the (large) arrays pickled back and forth and I got nothing from the speed? Note that the 2nd run (using regular np arrays, validation fails)
- Is there a way to improve timings, code clarity, etc.? (in relation to the multiprocessor paradigm)
Notes
- I cannot work with process pools because mem must be inherited in fork and not sent as a parameter.