How to efficiently submit jobs with large arguments to Dask?

I want to send functions using Dask that have large arguments (dimensions). What is the best way to do this? I want to run this function multiple times with various (small) parameters.

Example (bad)

The concurrent.futures interface is used here. We could also easily use the dask.delayed interface.

x = np.random.random(size=100000000) # 800MB array params = list(range(100)) # 100 small parameters def f(x, param): pass from dask.distributed import Client c = Client() futures = [c.submit(f, x, param) for param in params] 

But this is slower than I expected or led to memory errors.

+6
source share
1 answer

OK, so it’s wrong here that every task contains an array of numpy x , which is large. For each of the 100 tasks that we send, we need to serialize x , send it to the scheduler, send it to the employee, etc.

Instead, we will send the array to the cluster once:

 [future] = c.scatter([x]) 

Now future is a token pointing to the x array that lives in the cluster. Now we can send tasks related to this remote future, instead of the numpy array on our local client.

 # futures = [c.submit(f, x, param) for param in params] # sends x each time futures = [c.submit(f, future, param) for param in params] # refers to remote x already on cluster 

Now it is much faster and more efficient in managing Dask data.

Scatter data for all workers

If you expect that you will need to move the x array to all employees, then you may need to translate the array to run

 [future] = c.scatter([x], broadcast=True) 

Use Dask Delayed

Futures work great with dask.delayed. There is no performance benefit here, but some people prefer this interface:

 # futures = [c.submit(f, future, param) for param in params] from dask import delayed lazy_values = [delayed(f)(future, param) for param in params] futures = c.compute(lazy_values) 
+5
source

Source: https://habr.com/ru/post/1013763/


All Articles