I have to process huge pandas.DataFrame(several tens of GB) in lines by lines, where each operation of the line is quite long (a couple of tens of milliseconds). Therefore, I had the idea to split the frame into pieces and process each piece in parallel using multiprocessing. This speeds up the task, but memory consumption is a nightmare.
Although each child process should, in principle, consume only a tiny piece of data, it needs (almost) as much memory as the original parent process containing the original DataFrame. Even removing used parts in the parent process does not help.
I wrote a minimal example that replicates this behavior. The only thing he does is create a large one DataFramewith random numbers, cut it into small pieces with no more than 100 lines and just print out some information about DataFrameduring multiprocessing (here through mp.Poolsize 4).
The main function performed in parallel:
def just_wait_and_print_len_and_idx(df):
"""Waits for 5 seconds and prints df length and first and last index"""
idx_values = df.index.values
first_idx, last_idx = idx_values[0], idx_values[-1]
length = len(df)
pid = os.getpid()
time.sleep(1)
print('First idx {}, last idx {} and len {} '
'from process {}'.format(first_idx, last_idx, length, pid))
Auxiliary generator to put a DataFrameinto pieces:
def df_chunking(df, chunksize):
"""Splits df into chunks, drops data of original df inplace"""
count = 0
while len(df):
count += 1
print('Preparing chunk {}'.format(count))
yield df.iloc[:chunksize].copy()
df.drop(df.index[:chunksize], inplace=True)
And the main procedure:
def main():
n_jobs = 4
size = (10000, 1000)
chunksize = 100
df = pd.DataFrame(np.random.rand(*size))
pool = mp.Pool(n_jobs)
print('Starting MP')
pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize))
pool.close()
pool.join()
print('DONE')
The standard output is as follows:
Starting MP
Preparing chunk 1
Preparing chunk 2
First idx 0, last idx 99 and len 100 from process 9913
First idx 100, last idx 199 and len 100 from process 9914
Preparing chunk 3
First idx 200, last idx 299 and len 100 from process 9915
Preparing chunk 4
...
DONE
Problem:
The main process requires about 120 MB of memory. However, the child processes of the pool require the same amount of memory , although they contain only 1% of the original DataFame(pieces of size 100 and the original length of 10,000). Why?
? Python (3) DataFrame , , chunking? t213 multiprocessing ? !
script , :
import multiprocessing as mp
import pandas as pd
import numpy as np
import time
import os
def just_wait_and_print_len_and_idx(df):
"""Waits for 5 seconds and prints df length and first and last index"""
idx_values = df.index.values
first_idx, last_idx = idx_values[0], idx_values[-1]
length = len(df)
pid = os.getpid()
time.sleep(1)
print('First idx {}, last idx {} and len {} '
'from process {}'.format(first_idx, last_idx, length, pid))
def df_chunking(df, chunksize):
"""Splits df into chunks, drops data of original df inplace"""
count = 0
while len(df):
count += 1
print('Preparing chunk {}'.format(count))
yield df.iloc[:chunksize].copy()
df.drop(df.index[:chunksize], inplace=True)
def main():
n_jobs = 4
size = (10000, 1000)
chunksize = 100
df = pd.DataFrame(np.random.rand(*size))
pool = mp.Pool(n_jobs)
print('Starting MP')
pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize))
pool.close()
pool.join()
print('DONE')
if __name__ == '__main__':
main()