Like this question , I am facing memory problems with distributed Dask. However, in my case, the explanation is not that the client is trying to collect a large amount of data.
The problem can be illustrated based on a very simple task schedule. The list of operations delayedgenerates some random DataFrames of a fixed size of ~ 500 MB (to simulate loading many partitions from files). The next operation in the task schedule is to take the size of each DataFrame. Finally, all sizes are reduced to one common size, i.e. The data that must be returned to the customer is small.
For testing purposes, I run a local scheduler / working single-threaded, limited to 2 GB, that is:
$ dask-scheduler
$ dask-worker localhost:8786 --nthreads 1 --memory-limit 2000000000
My expectation from the task schedule is that the employee never needs more than 500 MB of RAM, because the start of "get the size of the data" immediately after the "data generation" should immediately make the data small. However, I am observing that an employee needs a lot more memory than this:

A factor of 2 indicates that data should be duplicated internally. Therefore, any attempts to reduce the size of a partition to the physical memory of a node result in a MemoryErrorshard replacement.
Any information to shed light on this is much appreciated. In particular:
- Do I have control over data duplication, and can this be avoided? Or is the general rule for maintaining payloads well below 50% to account for data duplication?
memory-limit ? , -, GC (/ ?), , , , .
, , , , , , Dask , .
1:
from __future__ import division, print_function
import pandas as pd
import numpy as np
from dask import delayed
from dask.distributed import Client, Executor
def simulate_df_partition_load(part_id):
"""
Creates a random DataFrame of ~500 MB
"""
num_rows = 5000000
num_cols = 13
df = pd.DataFrame()
for i in xrange(num_cols):
data_col = np.random.uniform(0, 1, num_rows)
df["col_{}".format(i)] = data_col
del data_col
print("[Partition {}] #rows: {}, #cols: {}, memory: {} MB".format(
part_id, df.shape[0], df.shape[1],
df.memory_usage().sum() / (2 ** 20)
))
return df
e = Executor('127.0.0.1:8786', set_as_default=True)
num_partitions = 2
lazy_dataframes = [
delayed(simulate_df_partition_load)(part_id)
for part_id in xrange(num_partitions)
]
length_partitions = [df.shape[0] for df in lazy_dataframes]
dag = delayed(sum)(length_partitions)
length_total = dag.compute()
2: DAG
