Python multiprocessing on a generator that reads files in

I am trying to read and process 1000 files, but unfortunately it takes about 3 times to process a file to read it from disk, so I would like to process these files as they are read (and for now I continue to read in additional files).

In an ideal world, I have a generator that reads one file at a time, and I would like to transfer this generator to a pool of workers who process elements from the generator as they are created (slowly).

Here is an example:

def process_file(file_string):
     ...
     return processed_file

 pool = Pool(processes=4)
 path = 'some/path/'
 results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))

The only problem with the above code is that all files are read into memory before the pool starts, which means that I need to wait until the disk reads everything, and I also use a large amount of memory.

+4
source share
2 answers

Pool.mapand ify is passed to them, so your generator will always be fully implemented before processing even starts .Pool.map_async listiterable

Various functions Pool.imap*seem to handle inputs as generators, so you can change:

results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))

in

# If you can process outputs one at a time, drop the list wrapper
# If you can process outputs without order mattering, imap_unordered will
# get you the best results
results = list(pool.imap(process_file, (open(path+part,'rb').read() for part in os.listdir(path))))

, AFAICT, , , ; , , IPC, , - -.

(, , , , ). , IPC, , , ; , . , :

def process_file(path):
     with open(path, 'rb') as f:
         file_string = f.read()
     ... same as before ...
     return processed_file

pool = Pool(processes=4)
path = 'some/path/'
results = pool.imap(process_file, (os.path.join(path, part) for part in os.listdir(path)))
+5

. . -. , , .

map , . , . , chunksize - , .

def process_file(filename):
     with open(filename, 'rb') as fp:
         file_string = fp.read()
     ...
     return processed_file

 pool = Pool(processes=4)
 path = 'some/path/'
 results = pool.map(process_file, path+part for part in os.listdir(path)), chunksize=1)
+2

Source: https://habr.com/ru/post/1618947/


All Articles