Python does not free RAM when processing a large file in parallel

I have a 25Gb plaintext file with ~ 10 million lines, a few hundred words per line. Each line must be individually processed, and I am trying to divide the pieces into a dozen workers, which will be processed in parallel. It currently loads a million lines at a time (for some reason, it takes about 10 GB of RAM, although this is only ~ 3Gb, not compressed on disk), dividing it evenly into 12 ways, and then comparing it with 12 workers, using multiprocessing.Pool.

The problem is that each of my 12 employees finishes processing the allocated data, their RAM is not freed, and only increases another ~ 10 Gbit at the next millionth iteration of the lines.

I tried to “delete” the previous data, dropping the previous data to an empty selection, creating iterative variable names using eval (), gc.collect () after deletion and completely separating the IO from the native function, all without luck and the exact same problem. Running debug shows that the python interpreter only recognizes the expected data, and the data from the previous iteration is not available, so why isn't RAM freed up?

Below is my last iteration in an attempt to separate all the environments, not the most efficient one, but "BigFileOnDisk" is on the SSD, so re-reading through the file every time is insignificant compared to the actual data processing. Previously, the “read” function was performed in the distribution function, deleting all data after completion of work, with the same results.

def allocation():
    fileCompleted = False
    currentLine = 0
    while not fileCompleted:
        lineData, currentLine, fileCompleted = read(numLines=1000000, startLine=currentLine)
        list_of_values(function_object=worker, inputs=lineData, workers=12)


def read(numLines, startLine=0):
    currentLine = 0
    lines = []
    with open(BigFileOnDisk, 'r') as fid:
        for line in fid:
            if currentLine >= startLine:
                lines.append(line)
            if currentLine - startLine >= numLines:
                return lines, counter, False
            currentLine += 1
        # or if we've hit the end of the file
        return lines, counter, True


def worker(lines):
    outputPath = *root* + str(datetime.datetime.now().time())
    processedData = {}

    for line in lines:
        # process data

    del lines
    with open(outputPath, 'a') as fid:
        for item in processedData:
            fid.write(str(item) + ', ' + str(processedData[item]) + '\n')


def list_of_values(function_object, inputs, workers = 10):
    inputs_split = []
    subsection_start = 0
    for n in range(workers):
        start = int(subsection_start)
        end = int(subsection_start + len(inputs) / workers)
        subsection_start = end

        inputs_split.append( inputs[start:end] )

    p = Pool(workers)
    p.map(function_object, inputs_split)
+4
1

. list_of_values , Pool, (, , , ). . , ( gc.collect ).

, , Pool with.

def list_of_values(function_object, inputs, workers = 10):
    inputs_split = []
    subsection_start = 0
    for n in range(workers):
        start = int(subsection_start)
        end = int(subsection_start + len(inputs) / workers)
        subsection_start = end

        inputs_split.append( inputs[start:end] )

    with Pool(workers) as p:
        p.map(function_object, inputs_split)
+4

Source: https://habr.com/ru/post/1648441/


All Articles