Python: multi-core process file

I am currently trying to read a large file (80 million lines), where I need to do the multiplied matrix multiplication for each record. After calculating this value, I want to insert the result into the database. Due to the intensive time of this process, I want to split the file into several cores in order to speed up the process.

After researching, I found this promising attempt that split a file into n parts.

def file_block(fp, number_of_blocks, block): ''' A generator that splits a file into blocks and iterates over the lines of one of the blocks. ''' assert 0 <= block and block < number_of_blocks assert 0 < number_of_blocks fp.seek(0,2) file_size = fp.tell() ini = file_size * block / number_of_blocks end = file_size * (1 + block) / number_of_blocks if ini <= 0: fp.seek(0) else: fp.seek(ini-1) fp.readline() while fp.tell() < end: yield fp.readline() 

Iteratively, you can call the function as follows:

 if __name__ == '__main__': fp = open(filename) number_of_chunks = 4 for chunk_number in range(number_of_chunks): print chunk_number, 100 * '=' for line in file_block(fp, number_of_chunks, chunk_number): process(line) 

While this works, I am encountering problems parallelizing this using multiprocessing:

 fp = open(filename) number_of_chunks = 4 li = [file_block(fp, number_of_chunks, chunk_number) for chunk_number in range(number_of_chunks)] p = Pool(cpu_count() - 1) p.map(processChunk,li) 

It’s a mistake that the generators cannot be pickled.

Although I understand this error, it is too expensive to iterate over the entire file first to put all the lines in a list.

In addition, I want to use line blocks per kernel for iteration, because it’s more efficient to insert multiple rows at once into the database (instead of 1 on 1 when using the typical map approach)

Thank you for your help.

+5
source share
1 answer

Instead of creating generators in front and passing them to each stream, leave this in the stream code.

 def processChunk(params): filename, chunk_number, number_of_chunks = params with open(filename, 'r') as fp: for line in file_block(fp, number_of_chunks, chunk_number): process(line) li = [(filename, i, number_of_chunks) for i in range(number_of_chunks)] p.map(processChunk, li) 
+3
source

Source: https://habr.com/ru/post/1260152/


All Articles