So what I am trying to do with the following code is to read the list of lists and put them through a function called checker , and then log_result process the result of the checker function. I'm trying to do this with multithreading, because the variable name rows_to_parse actually has millions of rows, so using multiple cores should speed up this process by a considerable amount.
Currently the code is not working and Python is crashing.
Problems and problems that I have:
- You want the existing df contained in the
df variable to support the index throughout the process, because otherwise log_result will be confused as to which row needs updating. - I am pretty sure that
apply_async not suitable for a multiprocessor function to fulfill this responsibility, because I believe that the order in which the computer reads and writes df can possibly ruin it ??? - I think it might be necessary to set up a queue for writing and reading
df but I'm not sure how I would do it.
Thanks for any help.
import pandas as pd import multiprocessing from functools import partial def checker(a,b,c,d,e): match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)] index_of_match = match.index.tolist() if len(index_of_match) == 1: #one match in df return index_of_match elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__: return [index_of_match[0]] else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df return [a,b,c,d,e] def log_result(result, dataf): if len(result) == 1: # dataf.loc[result[0]]['e'] += 1 else: #append new row to exisiting df new_row = pd.DataFrame([result],columns=cols) dataf = dataf.append(new_row,ignore_index=True) def apply_async_with_callback(parsing_material, dfr): pool = multiprocessing.Pool() for var_a, var_b, var_c, var_d, var_e in parsing_material: pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr)) pool.close() pool.join() if __name__ == '__main__': #setting up main dataframe cols = ['a','b','c','d','e'] existing_data = [["YES","A","16052011","13031999",3], ["NO","Q","11022003","15081999",3], ["YES","A","22082010","03012001",9]] #main dataframe df = pd.DataFrame(existing_data,columns=cols) #new data rows_to_parse = [['NO', 'A', '09061997', '06122003', 5], ['YES', 'W', '17061992', '26032012', 6], ['YES', 'G', '01122006', '07082014', 2], ['YES', 'N', '06081992', '21052008', 9], ['YES', 'Y', '18051995', '24011996', 6], ['NO', 'Q', '11022003', '15081999', 3], ['NO', 'O', '20112004', '28062008', 0], ['YES', 'R', '10071994', '03091996', 8], ['NO', 'C', '09091998', '22051992', 1], ['YES', 'Q', '01051995', '02012000', 3], ['YES', 'Q', '26022015', '26092007', 5], ['NO', 'F', '15072002', '17062001', 8], ['YES', 'I', '24092006', '03112003', 2], ['YES', 'A', '22082010', '03012001', 9], ['YES', 'I', '15072016', '30092005', 7], ['YES', 'Y', '08111999', '02022006', 3], ['NO', 'V', '04012016', '10061996', 1], ['NO', 'I', '21012003', '11022001', 6], ['NO', 'P', '06041992', '30111993', 6], ['NO', 'W', '30081992', '02012016', 6]] apply_async_with_callback(rows_to_parse, df)