Read, Compress, Write Using Multiprocessing

I am compressing files. The only process is suitable for some of them, but I am compressing thousands of them, and it may (and was) take several days, so Id would like to speed it up with multiprocessing. Ive read that I should avoid reading multiple files at the same time, and I assume that I should not have multiple processes at the same time. This is my current method, which runs separately:

import tarfile, bz2, os def compress(folder): "compresses a folder into a file" bz_file = bz2.BZ2File(folder+'.tbz', 'w') with tarfile.open(mode='w', fileobj = bz_file) as tar: for fn in os.listdir(folder): read each file in the folder and do some pre processing that will make the compressed file much smaller than without tar.addfile( processed file ) bz_file.close() return 

This transfers the folder and compresses all its contents into a single file. This simplifies their processing and more organization. If I just threw it into the pool, then Id has several processes that read and write all at once, so I want to avoid this. I can recycle it, so that only one process reads the files, but I still have a few emails:

 import multiprocessing as mp import tarfile, bz2, os def compress(file_list): folder = file_list[0] bz_file = bz2.BZ2File(folder+'.tbz', 'w') with tarfile.open(mode='w', fileobj = bz_file) as tar: for i in file_list[1:]: preprocess file data tar.addfile(processed data) bz_file.close() return cpu_count = mp.cpu_count() p = mp.Pool(cpu_count) for subfolder in os.listdir(main_folder): read all files in subfolder into memory, place into file_list place file_list into fld_list until fld_list contains cpu_count file lists. then pass to p.map(compress, fld_list) 

It still has several processes writing compressed files at the same time. Just a tarfile story about what kind of compression it starts to write to the hard drive. I can’t read all the files that I need to compress into memory, since I don’t have that much RAM, so it also has a problem with Im restarting Pool.map many times.

How can I read and write files in one process, but they all have compression in several processes, while avoiding restarting multiprocessing. Repeatedly?

+2
source share
1 answer

Instead of multiprocessing.Pool , use multiprocessing.Queue and create an Inbox and Outbox.

Start one process for reading in files and put the data in the queue of incoming messages and set a limit on the size of the queue so as not to fill your RAM. The example here compresses individual files, but it can be configured to process entire folders at the same time.

 def reader(inbox, input_path, num_procs): "process that reads in files to be compressed and puts to inbox" for fn in os.listdir(input_path): path = os.path.join(input_path, fn) # read in each file, put data into inbox fname = os.path.basename(fn) with open(fn, 'r') as src: lines = src.readlines() data = [fname, lines] inbox.put(data) # read in everything, add finished notice for all running processes for i in range(num_procs): inbox.put(None) # when a compressor sees a None, it will stop inbox.close() return 

But only half the question, the other part is to compress the file without writing it to disk. We provide a StringIO object StringIO compression functions instead of an open file; it is passed to the tarfile . After compression, we put the StringIO object in the outgoing message queue.

Except that we cannot do this, because StringIO objects cannot be pickled, only pickletable objects can be queued. However, the getvalue function getvalue can provide the content in a selectable format, so grab the content using getvalue, close the StringIO object, and then put the content in the source folder.

 from io import StringIO import tarfile def compressHandler(inbox, outbox): "process that pulls from inbox, compresses and puts to outbox" supplier = iter(inbox.get, None) # stops when gets a None while True: try: data = next(supplier) # grab data from inbox pressed = compress(data) # compress it ou_que.put(pressed) # put into outbox except StopIteration: outbox.put(None) # finished compressing, inform the writer return # and quit def compress(data): "compress file" bz_file = StringIO() fname, lines = dat # see reader def for package order with tarfile.open(mode='w:bz2', fileobj=bz_file) as tar: info = tarfile.TarInfo(fname) # store file name tar.addfile(info, StringIO(''.join(lines))) # compress data = bz_file.getvalue() bz_file.close() return data 

The recording process then extracts the contents from the outgoing message queue and writes them to disk. This function should know how many compression processes have been started, so he only knows to stop when he heard that each process is stopped.

 def writer(outbox, output_path, num_procs): "single process that writes compressed files to disk" num_fin = 0 while True: # all compression processes have finished if num_finished >= num_procs: break tardata = outbox.get() # a compression process has finished if tardata == None: num_fin += 1 continue fn, data = tardata name = os.path.join(output_path, fn) + '.tbz' with open(name, 'wb') as dst: dst.write(data) return 

Finally, there is a kit to put everything together

 import multiprocessing as mp import os def setup(): fld = 'file/path' # multiprocess setup num_procs = mp.cpu_count() # inbox and outbox queues inbox = mp.Queue(4*num_procs) # limit size outbox = mp.Queue() # one process to read reader = mp.Process(target = reader, args = (inbox, fld, num_procs)) reader.start() # n processes to compress compressors = [mp.Process(target = compressHandler, args = (inbox, outbox)) for i in range(num_procs)] for c in compressors: c.start() # one process to write writer = mp.Process(target = writer, args=(outbox, fld, num_procs)) writer.start() writer.join() # wait for it to finish print('done!') 
+3
source

Source: https://habr.com/ru/post/1271941/


All Articles