when using multiprocessing on Windows I am writing a multiprocessor program to process a large .CSV fil...">

"Unable to sort error <type" _csv.reader '> when using multiprocessing on Windows

I am writing a multiprocessor program to process a large .CSV file in parallel using Windows.

I found this great example for a similar problem. When starting under Windows, I get an error that csv.reader is not Picklable.

I believe that I can open the CSV file in a read subprocess and just send it the file name from the parent process. However, I would like to transfer an already opened CSV file (for example, the code should do), with a certain state, that is, really use a shared object.

Any idea how to do this under Windows or what's missing there?

This is the code (I just rewrite it for readability):

"""A program that reads integer values from a CSV file and writes out their sums to another CSV file, using multiple processes if desired. """ import csv import multiprocessing import optparse import sys NUM_PROCS = multiprocessing.cpu_count() def make_cli_parser(): """Make the command line interface parser.""" usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV", __doc__, """ ARGUMENTS: INPUT_CSV: an input CSV file with rows of numbers OUTPUT_CSV: an output file that will contain the sums\ """]) cli_parser = optparse.OptionParser(usage) cli_parser.add_option('-n', '--numprocs', type='int', default=NUM_PROCS, help="Number of processes to launch [DEFAULT: %default]") return cli_parser class CSVWorker(object): def __init__(self, numprocs, infile, outfile): self.numprocs = numprocs self.infile = open(infile) self.outfile = outfile self.in_csvfile = csv.reader(self.infile) self.inq = multiprocessing.Queue() self.outq = multiprocessing.Queue() self.pin = multiprocessing.Process(target=self.parse_input_csv, args=()) self.pout = multiprocessing.Process(target=self.write_output_csv, args=()) self.ps = [ multiprocessing.Process(target=self.sum_row, args=()) for i in range(self.numprocs)] self.pin.start() self.pout.start() for p in self.ps: p.start() self.pin.join() i = 0 for p in self.ps: p.join() print "Done", i i += 1 self.pout.join() self.infile.close() def parse_input_csv(self): """Parses the input CSV and yields tuples with the index of the row as the first element, and the integers of the row as the second element. The index is zero-index based. The data is then sent over inqueue for the workers to do their thing. At the end the input thread sends a 'STOP' message for each worker. """ for i, row in enumerate(self.in_csvfile): row = [ int(entry) for entry in row ] self.inq.put( (i, row) ) for i in range(self.numprocs): self.inq.put("STOP") def sum_row(self): """ Workers. Consume inq and produce answers on outq """ tot = 0 for i, row in iter(self.inq.get, "STOP"): self.outq.put( (i, sum(row)) ) self.outq.put("STOP") def write_output_csv(self): """ Open outgoing csv file then start reading outq for answers Since I chose to make sure output was synchronized to the input there is some extra goodies to do that. Obviously your input has the original row number so this is not required. """ cur = 0 stop = 0 buffer = {} # For some reason csv.writer works badly across threads so open/close # and use it all in the same thread or else you'll have the last # several rows missing outfile = open(self.outfile, "w") self.out_csvfile = csv.writer(outfile) #Keep running until we see numprocs STOP messages for works in range(self.numprocs): for i, val in iter(self.outq.get, "STOP"): # verify rows are in order, if not save in buffer if i != cur: buffer[i] = val else: #if yes are write it out and make sure no waiting rows exist self.out_csvfile.writerow( [i, val] ) cur += 1 while cur in buffer: self.out_csvfile.writerow([ cur, buffer[cur] ]) del buffer[cur] cur += 1 outfile.close() def main(argv): cli_parser = make_cli_parser() opts, args = cli_parser.parse_args(argv) if len(args) != 2: cli_parser.error("Please provide an input file and output file.") c = CSVWorker(opts.numprocs, args[0], args[1]) if __name__ == '__main__': main(sys.argv[1:]) 

When working under windows, I get the following message:

 Traceback (most recent call last): File "C:\Users\ron.berman\Documents\Attribution\ubrShapley\test.py", line 130, in <module> main(sys.argv[1:]) File "C:\Users\ron.berman\Documents\Attribution\ubrShapley\test.py", line 127, in main c = CSVWorker(opts.numprocs, args[0], args[1]) File "C:\Users\ron.berman\Documents\Attribution\ubrShapley\test.py", line 44, in __init__ self.pin.start() File "C:\Python27\lib\multiprocessing\process.py", line 130, in start self._popen = Popen(self) File "C:\Python27\lib\multiprocessing\forking.py", line 271, in __init__ dump(process_obj, to_child, HIGHEST_PROTOCOL) File "C:\Python27\lib\multiprocessing\forking.py", line 193, in dump ForkingPickler(file, protocol).dump(obj) File "C:\Python27\lib\pickle.py", line 224, in dump self.save(obj) File "C:\Python27\lib\pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "C:\Python27\lib\pickle.py", line 419, in save_reduce save(state) File "C:\Python27\lib\pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "C:\Python27\lib\pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems save(v) File "C:\Python27\lib\pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "C:\Python27\lib\multiprocessing\forking.py", line 66, in dispatcher self.save_reduce(obj=obj, *rv) File "C:\Python27\lib\pickle.py", line 401, in save_reduce save(args) File "C:\Python27\lib\pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "C:\Python27\lib\pickle.py", line 548, in save_tuple save(element) File "C:\Python27\lib\pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "C:\Python27\lib\pickle.py", line 419, in save_reduce save(state) File "C:\Python27\lib\pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "C:\Python27\lib\pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems save(v) File "C:\Python27\lib\pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "C:\Python27\lib\pickle.py", line 396, in save_reduce save(cls) File "C:\Python27\lib\pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "C:\Python27\lib\pickle.py", line 753, in save_global (obj, module, name)) pickle.PicklingError: Can't pickle <type '_csv.reader'>: it not the same object as _csv.reader Traceback (most recent call last): File "<string>", line 1, in <module> File "C:\Python27\lib\multiprocessing\forking.py", line 374, in main self = load(from_parent) File "C:\Python27\lib\pickle.py", line 1378, in load return Unpickler(file).load() File "C:\Python27\lib\pickle.py", line 858, in load dispatch[key](self) File "C:\Python27\lib\pickle.py", line 880, in load_eof raise EOFError EOFError 
+4
source share
2 answers

The problem you are working with is caused by using the methods of the CSVWorker class as process goals; and this class has members that cannot be pickled; these open files just don't work;

What you want to do is break this class into two classes; which coordinates all employee subprocesses, and another that actually does the computing work. workflows take file names as arguments and open individual files as needed, or at least wait until they call their working methods and open files. they can also take multiprocessing.Queue as arguments or as members of an instance; what is safe to get around.

To a certain extent you are already doing this; your write_output_csv method opens the file of its file in a subprocess, but your parse_input_csv method expects to find an already open and prepared file as a self attribute. Do it differently and you must be in good shape.

+2
source

Since multiprocessing depends on the serialization and de-serialization of objects when passing then as parameters between the process, and your code relies on passing the CSVWorker instance around the process (the instance designated as β€œI”), you received this error - since both csv readers and open files can be pickled.

You mentioned that your CSV is big, I don’t think that reading all the data in the list will be a solution for you - so you need to think about how to transfer one line from your input CSV to each employee at once, and receive the processed line from each worker and execute all I / O operations in the main process.

It seems that .Pool multiprocessing will be the best way to write your application - Check out the documentation on multiprocessing at http://docs.python.org/library/multiprocessing.html - and try using the process pool and pool.map to process your CSV. It also takes care of maintaining order - which eliminates a lot of the complex logic of your code.

+2
source

Source: https://habr.com/ru/post/1386436/


All Articles