Memory error with multiprocessing in Python

I am trying to do some expensive scientific computing using Python. I have to read a bunch of data stored in csv files and then process them. Since each process takes a lot of time and I have 8 processors, I tried to use the Pool method from Multiprocessing .

Here's how I structured a multiprocessor call:

  pool = Pool() vector_components = [] for sample in range(samples): vector_field_x_i = vector_field_samples_x[sample] vector_field_y_i = vector_field_samples_y[sample] vector_component = pool.apply_async(vector_field_decomposer, args=(x_dim, y_dim, x_steps, y_steps, vector_field_x_i, vector_field_y_i)) vector_components.append(vector_component) pool.close() pool.join() vector_components = map(lambda k: k.get(), vector_components) for vector_component in vector_components: CsvH.write_vector_field(vector_component, '../CSV/RotationalFree/rotational_free_x_'+str(sample)+'.csv') 

I performed a dataset of 500 samples of size 100 ( x_dim ) per 100 ( y_dim ).

Until then, everything worked fine.

Then I get a dataset of 500 400 x 400 samples.

When I start, I get an error when calling get .

I also tried to run one 400 x 400 sample and got the same error.

 Traceback (most recent call last): File "__init__.py", line 33, in <module> VfD.samples_vector_field_decomposer(samples, x_dim, y_dim, x_steps, y_steps, vector_field_samples_x, vector_field_samples_y) File "/export/home/pceccon/VectorFieldDecomposer/Sources/Controllers/VectorFieldDecomposerController.py", line 43, in samples_vector_field_decomposer vector_components = map(lambda k: k.get(), vector_components) File "/export/home/pceccon/VectorFieldDecomposer/Sources/Controllers/VectorFieldDecomposerController.py", line 43, in <lambda> vector_components = map(lambda k: k.get(), vector_components) File "/export/home/pceccon/.pyenv/versions/2.7.5/lib/python2.7/multiprocessing/pool.py", line 554, in get raise self._value MemoryError 

What should I do?

Thanks in advance.

+5
source share
1 answer

At the moment, you save several lists in memory - vector_field_x , vector_field_y , vector_components , and then a separate copy of vector_components during the map call (i.e. when you actually end the memory). You can avoid having to copy the vector_components list by using pool.imap instead of pool.apply_async along with a manually created list. imap returns an iterator instead of a complete list, so you will never have all the results in memory.

Usually pool.map breaks the iterability passed to it into pieces, and sends those pieces to child processes, rather than sending one item at a time. This helps increase productivity. Since imap uses an iterator instead of a list, it does not know the full size of the iterative that you pass to it. Not knowing the size of the iterability, he does not know how much to make each piece, so it defaults to chunksize of 1, which will work, but may not work so well. To avoid this, you can give it a good chunksize argument, since you know that iterable has a length of sample . This may not really matter for your list of 500 items, but it’s worth experimenting with it.

Here is an example code that demonstrates all this:

 import multiprocessing from functools import partial def vector_field_decomposer(x_dim, y_dim, x_steps, y_steps, vector_fields): vector_field_x_i = vector_fields[0] vector_field_y_i = vector_fields[1] # Do whatever is normally done here. if __name__ == "__main__": num_workers = multiprocessing.cpu_count() pool = multiprocessing.Pool(num_workers) # Calculate a good chunksize (based on implementation of pool.map) chunksize, extra = divmod(samples // 4 * num_workers) if extra: chunksize += 1 # Use partial so many arguments can be passed to vector_field_decomposer func = partial(vector_field_decomposer, x_dim, y_dim, x_steps, y_steps) # We use a generator expression as an iterable, so we don't create a full list. results = pool.imap(func, ((vector_field_samples_x[s], vector_field_samples_y[s]) for s in xrange(samples)), chunksize=chunksize) for vector in results: CsvH.write_vector_field(vector_component, '../CSV/RotationalFree/rotational_free_x_'+str(sample)+'.csv') pool.close() pool.join() 

This should allow you to avoid MemoryError problems, but if not, you can try to run imap in smaller pieces of your common pattern and just make a few passes. I don’t think that you will have any problems because you do not create any additional lists except the vector_field_* lists with which you start.

+5
source

Source: https://habr.com/ru/post/1200846/


All Articles