Dynamic processes in Python

I have a question about Python multiprocessing. I am trying to take a dataset, break chunks and pass these chunks to simultaneously running processes. I need to convert large data tables using simple calculations (e.g. electrical resistance → temperature for a thermistor).

The code below almost works as desired, but it doesn't seem to spawn any new processes (or, if so, only one at a time). I'm new to Python, so it's probably a pretty simple solution to this problem.

Thanks in advance!

from multiprocessing import Process

class Worker (Process):
    # example data transform
    def process (self, x): return (x * 2) / 3

    def __init __ (self, list):
        self.data = list
        self.result = map (self.process, self.data)
        super (Worker, self) .__ init __ ()

if __name__ == '__main__':
    start = datetime.datetime.now ()
    dataset = range (10000) # null dataset
    processes = 3

    for i in range (processes):
        chunk = int (math.floor (len (dataset) / float (processes)))

        if i + 1 == processes:
            remainder = len (dataset)% processes
        else: remainder = 0

        tmp = dataset [i * chunk: (i + 1) * chunk + remainder]
        exec ('worker' + str (i) + '= Worker (tmp)')
        exec ('worker' + str (i) + '. start ()')

    for i in range (processes):
        exec ('worker' + str (i) + '. join ()')
        # just a placeholder to make sure the initial values ​​of the set are as expected
        exec ('print worker' + str (i) + '. result [0]')
+3
3

, get_nowait() Queue.Empty. , .

import multiprocessing, Queue

class Worker(multiprocessing.Process):
    def process(self, x): 
        for i in range(15):
            x += (float(i) / 2.6)
        return x

    def __init__(self, input, output):
        self.input = input
        self.output = output
        super(Worker, self).__init__()

    def run(self):
        try:
            while True:
                self.output.put(self.process(self.input.get_nowait()))
        except Queue.Empty:
            pass


if name == 'main':
    dataset = range(10)
    processes = multiprocessing.cpu_count()
    input = multiprocessing.Queue()
    output = multiprocessing.Queue()

    for obj in dataset:
        input.put(obj)
    for i in range(processes):
        Worker(input, output).start()

    for i in range(len(dataset)):
        print output.get()
+1

run. , ( ), :

  • , run.

__init__ , . , , , , , .

:

exec('worker'+str(i)+' = Worker(tmp)')

:

exec('worker'+str(i)+'.start()')

, exec('print worker'+str(i)+'.result[0]') - , , , , , , .

:

class Worker(Process):
    # example data transform
    def process(self, x): return (x * 2) / 3

    def __init__(self, list):
        self.data = list
        self.result = []
        super(Worker, self).__init__()

    def run(self):
        self.result = map(self.process, self.data)

EDIT:

... , , . , , , . , , , . : . , , . , .

+1

, , , ( ). , :

import math, multiprocessing

class Worker(multiprocessing.Process):
    def process(self, x): 
        for i in range(15):
            x += (float(i) / 2.6)
        return x

    def __init__(self, input, output, chunksize):
        self.input = input
        self.output = output
        self.chunksize = chunksize
        super(Worker, self).__init__()

    def run(self):
        for x in range(self.chunksize):
            self.output.put(self.process(self.input.get()))


if __name__ == '__main__':
    dataset = range(10)
    processes = multiprocessing.cpu_count()
    input = multiprocessing.Queue()
    output = multiprocessing.Queue()

    for obj in dataset:
        input.put(obj)

    for i in range(processes):
        chunk = int(math.floor(len(dataset) / float(processes)))
        if i + 1 == processes:
            remainder = len(dataset) % processes
        else: remainder = 0

        Worker(input, output, chunk + remainder).start()

    for i in range(len(dataset)):
        print output.get()
0

Source: https://habr.com/ru/post/1706280/


All Articles