Can dask parralelize read fm csv file?

I am converting a large text file to hdf storage in the hope of quick access to data. The conversion works in order, however, reading from the csv file is not performed in parallel. It is very slow (it takes about 30 minutes for a 1 GB text file on an SSD, so I assume it is not tied to IO).

Is there a way to read it in multiple threads in parralel? Sice, this may be important, I'm currently forced to work under Windows - just in case that matters.

from dask import dataframe as ddf df = ddf.read_csv("data/Measurements*.csv", sep=';', parse_dates=["DATETIME"], blocksize=1000000, ) df.categorize([ 'Type', 'Condition', ]) df.to_hdf("data/data.hdf", "Measurements", 'w') 
+11
source share
2 answers

Yes, dask.dataframe can read in parallel. However, you encountered two problems:

Pandas.read_csv only partially frees GIL

By default, dask.dataframe is parallelized with threads, because most pandas can work in parallel on multiple threads (frees GIL). Pandas.read_csv is an exception, especially if your resulting data frames use object dtypes for text

dask.dataframe.to_hdf (file name) forces sequential calculations

Writing to a single HDF file will lead to sequential calculations (it is very difficult to write to a single file in parallel).

Change: new solution

Today I would avoid HDF and use parquet instead. I would probably use multiprocessor or dask.distributed schedulers to avoid GIL problems on the same machine. The combination of these two should give you full linear scaling.

 from dask.distributed import Client client = Client() df = dask.dataframe.read_csv(...) df.to_parquet(...) 

Decision

Since your data set is likely to fit in memory, use dask.dataframe.read_csv to load in parallel with multiple processes, and then immediately switch to Pandas.

 import dask.dataframe as ddf import dask.multiprocessing df = ddf.read_csv("data/Measurements*.csv", # read in parallel sep=';', parse_dates=["DATETIME"], blocksize=1000000, ) df = df.compute(get=dask.multiprocessing.get) # convert to pandas df['Type'] = df['Type'].astype('category') df['Condition'] = df['Condition'].astype('category') df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w') 
+8
source

In response to @MRocklin's answer, in newer versions of dask, you can use df.compute(scheduler='processes') or df.compute(scheduler='threads') to convert to pandas using multiprocessing or multithreading:

 from dask import dataframe as ddf df = ddf.read_csv("data/Measurements*.csv", sep=';', parse_dates=["DATETIME"], blocksize=1000000, ) df = df.compute(scheduler='processes') # convert to pandas df['Type'] = df['Type'].astype('category') df['Condition'] = df['Condition'].astype('category') df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w') 
0
source

Source: https://habr.com/ru/post/1258379/


All Articles