Multiprocessor write to pandas dataframe

So what I am trying to do with the following code is to read the list of lists and put them through a function called checker , and then log_result process the result of the checker function. I'm trying to do this with multithreading, because the variable name rows_to_parse actually has millions of rows, so using multiple cores should speed up this process by a considerable amount.

Currently the code is not working and Python is crashing.

Problems and problems that I have:

  • You want the existing df contained in the df variable to support the index throughout the process, because otherwise log_result will be confused as to which row needs updating.
  • I am pretty sure that apply_async not suitable for a multiprocessor function to fulfill this responsibility, because I believe that the order in which the computer reads and writes df can possibly ruin it ???
  • I think it might be necessary to set up a queue for writing and reading df but I'm not sure how I would do it.

Thanks for any help.

 import pandas as pd import multiprocessing from functools import partial def checker(a,b,c,d,e): match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)] index_of_match = match.index.tolist() if len(index_of_match) == 1: #one match in df return index_of_match elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__: return [index_of_match[0]] else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df return [a,b,c,d,e] def log_result(result, dataf): if len(result) == 1: # dataf.loc[result[0]]['e'] += 1 else: #append new row to exisiting df new_row = pd.DataFrame([result],columns=cols) dataf = dataf.append(new_row,ignore_index=True) def apply_async_with_callback(parsing_material, dfr): pool = multiprocessing.Pool() for var_a, var_b, var_c, var_d, var_e in parsing_material: pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr)) pool.close() pool.join() if __name__ == '__main__': #setting up main dataframe cols = ['a','b','c','d','e'] existing_data = [["YES","A","16052011","13031999",3], ["NO","Q","11022003","15081999",3], ["YES","A","22082010","03012001",9]] #main dataframe df = pd.DataFrame(existing_data,columns=cols) #new data rows_to_parse = [['NO', 'A', '09061997', '06122003', 5], ['YES', 'W', '17061992', '26032012', 6], ['YES', 'G', '01122006', '07082014', 2], ['YES', 'N', '06081992', '21052008', 9], ['YES', 'Y', '18051995', '24011996', 6], ['NO', 'Q', '11022003', '15081999', 3], ['NO', 'O', '20112004', '28062008', 0], ['YES', 'R', '10071994', '03091996', 8], ['NO', 'C', '09091998', '22051992', 1], ['YES', 'Q', '01051995', '02012000', 3], ['YES', 'Q', '26022015', '26092007', 5], ['NO', 'F', '15072002', '17062001', 8], ['YES', 'I', '24092006', '03112003', 2], ['YES', 'A', '22082010', '03012001', 9], ['YES', 'I', '15072016', '30092005', 7], ['YES', 'Y', '08111999', '02022006', 3], ['NO', 'V', '04012016', '10061996', 1], ['NO', 'I', '21012003', '11022001', 6], ['NO', 'P', '06041992', '30111993', 6], ['NO', 'W', '30081992', '02012016', 6]] apply_async_with_callback(rows_to_parse, df) 
+5
source share
1 answer

Updating a DataFrame like this in MultiProcessing will not work:

 dataf = dataf.append(new_row,ignore_index=True) 

On the one hand, it is very inefficient (O (n) for each is added so that O (n ^ 2) as a whole. The preferred method is to combine several objects in one pass.

For the other and, more importantly, dataf is not blocked for every update, so there is no guarantee that the two operations will not conflict (I assume this is a python failure).

Finally, append is not in place, so the dataf variable dataf discarded after the callback is complete! and no changes to the parent dataf occur.


We could use a MultiProcessing list or a dict . if you do not care about the order or dict, if you do it (for example, enumerate), since you should notice that the values ​​are returned not in a specific order from async.
(or we could create an object that implements Lock on our own, see Eli Bendersky )

So, the following changes have been made:

 df = pd.DataFrame(existing_data,columns=cols) # becomes df = pd.DataFrame(existing_data,columns=cols) d = MultiProcessing.list([df]) dataf = dataf.append(new_row,ignore_index=True) # becomes d.append(new_row) 

Now, after the completion of async, you have MultiProcessing.list from DataFrames. You can execute these (and ignore_index ) to get the desired result:

 pd.concat(d, ignore_index=True) 

Gotta do the trick.


Note. Creating a new DataFrame at each stage is also less efficient, allowing pandas to analyze the list of lists directly in the DataFrame at a time. I hope this is an example of toys, really, you want your pieces to be big enough to get wins with MultiProcessing (I heard that 50kb is usually a thumb ...), a line will never win here at a time.


In addition: you should avoid using global codes (for example, df) in your code, it is much cleaner to pass them in your functions (in this case, as an argument for checking).

+8
source

Source: https://habr.com/ru/post/1235312/


All Articles