How to handle input in parallel with python, but without processes?

I have a list of input data and you want to process it in parallel, but each one takes time to process when the network is involved. CPU usage is not a problem.

I would not want the overhead for additional processes, since I have a lot of things to process at a time, and I don’t want to configure interprocess communication.

# the parallel execution equivalent of this?
import time
input_data = [1,2,3,4,5,6,7]
input_processor = time.sleep
results = map(input_processor, input_data)

The code I use uses twisted.internet.defer, so a solution including this is also great.

+4
source share
3 answers

Worker , , .

from threading import Thread
from collections import deque
import time


# Create a new class that inherits from Thread
class Worker(Thread):

    def __init__(self, inqueue, outqueue, func):
        '''
        A worker that calls func on objects in inqueue and
        pushes the result into outqueue

        runs until inqueue is empty
        '''

        self.inqueue = inqueue
        self.outqueue = outqueue
        self.func = func
        super().__init__()

    # override the run method, this is starte when
    # you call worker.start()
    def run(self):
        while self.inqueue:
            data = self.inqueue.popleft()
            print('start')
            result = self.func(data)
            self.outqueue.append(result)
            print('finished')


def test(x):
    time.sleep(x)
    return 2 * x


if __name__ == '__main__':
    data = 12 * [1, ]
    queue = deque(data)
    result = deque()

    # create 3 workers working on the same input
    workers = [Worker(queue, result, test) for _ in range(3)]

    # start the workers
    for worker in workers:
        worker.start()

    # wait till all workers are finished
    for worker in workers:
        worker.join()

    print(result)

, ca. 4 .

, :

from threading import Thread
from collections import deque
import time


class Pool():

    def __init__(self, n_threads):
        self.n_threads = n_threads

    def map(self, func, data):
        inqueue = deque(data)
        result = deque()

        workers = [Worker(inqueue, result, func) for i in range(self.n_threads)]

        for worker in workers:
            worker.start()

        for worker in workers:
            worker.join()

        return list(result)


class Worker(Thread):

    def __init__(self, inqueue, outqueue, func):
        '''
        A worker that calls func on objects in inqueue and
        pushes the result into outqueue

        runs until inqueue is empty
        '''

        self.inqueue = inqueue
        self.outqueue = outqueue
        self.func = func
        super().__init__()

    # override the run method, this is starte when
    # you call worker.start()
    def run(self):
        while self.inqueue:
            data = self.inqueue.popleft()
            print('start')
            result = self.func(data)
            self.outqueue.append(result)
            print('finished')


def test(x):
    time.sleep(x)
    return 2 * x


if __name__ == '__main__':
    data = 12 * [1, ]

    pool = Pool(6)
    result = pool.map(test, data)

    print(result)
+1

multiprocessing. , , :

import multiprocessing as mp
import time
input_processor = time.sleep
core_num = mp.cpu_count()
pool=Pool(processes = core_num)
result = [pool.apply_async(input_processor(i)) for for i in range(1,7+1) ]
result_final = [p.get() for p in results]
for n in range(1,7+1):
   print n, result_final[n]

. .

Editted: , :

def parallel_map(processor_count, input_data):
  pool=Pool(processes = processor_count)
  result = [pool.apply_async(input_processor(i)) for for i in input_data ]
  result_final = np.array([p.get() for p in results])
  result_data = np.vstack( (input_data, result_final))
  return result_data
0

, Twisted. DeferredList:

http://twistedmatrix.com/documents/15.4.0/core/howto/defer.html#deferredlist

input_processor ( ):

def main():
    input_data = [1,2,3,4,5,6,7]
    input_processor = asyn_function

    for entry in input_data:
        requests.append(defer.maybeDeferred(input_processor, entry))
    deferredList = defer.DeferredList(requests, , consumeErrors=True)
    deferredList.addCallback(gotResults)
    return deferredList


def gotResults(results):
    for (success, value) in result:
        if success:
            print 'Success:', value
        else:
            print 'Failure:', value.getErrorMessage()

In case input_processor is a long / blocking function, you can use deferToThread instead of MaybeDeferred:

def main():
    input_data = [1,2,3,4,5,6,7]
    input_processor = syn_function

    for entry in input_data:
        requests.append(threads.deferToThread(input_processor, entry))
    deferredList = defer.DeferredList(requests, , consumeErrors=True)
    deferredList.addCallback(gotResults)
    return deferredList
0
source

Source: https://habr.com/ru/post/1613206/


All Articles