Python Multiprocessing Modules

EDIT: Updated with environment information (see first section)

Environment

I am using Python 2.7

Ubuntu 16.04

Problem

I have an application that I simplified into a three-step process:

  • Collecting data from multiple data sources (HTTP requests, system information, etc.)
  • Calculate metrics based on this data.
  • Display these metrics in various formats

Each of these steps should be completed before moving on to the next step, however each step consists of several subtasks that can be executed in parallel (I can send three HTTP requests and read system logs, waiting for their return)

I divided the stages into modules and subtasks into submodules, so my project hierarchy looks like this:

+ datasources
|-- __init__.py
|-- data_one.py
|-- data_two.py
|-- data_three.py
+ metrics
|-- __init__.py
|-- metric_one.py
|-- metric_two.py
+ outputs
|-- output_one.py
|-- output_two.py
- app.py

app.py looks something like this (pseudo-code for short):

import datasources
import metrics
import outputs

for datasource in dir(datasources):
    datasource.refresh()
for metric in dir(metrics):
    metric.calculate()
for output in dir(outputs):
    output.dump()

( dir , ..), )

:

data = []

def refresh():
    # Populate the "data" member somehow
    data = [1, 2, 3]
    return

:

import datasources.data_one as data_one
import datasources.data_two as data_two

data = []

def calculate():
    # Use the datasources to compute the metric
    data = [sum(x) for x in zip(data_one, data_two)]
    return

( ), :

def run_thread(datasource):
    datasource.refresh()

threads = []
for datasource in dir(datasources):
    thread = threading.Thread(target=run_thread, args=(datasource))
    threads.append(thread)
    thread.start()
for thread in threads:
    thread.join()

, , datasources.x.data

(), - CPU, , , , . :

def run_pool(calculate):
    calculate()

pool = multiprocessing.Pool()
pool.map(run_pool, [m.calculate for m in dir(metrics)]
pool.close()
pool.join()

( , ?), , :

metrics.metric_one.data

[],

, , , , . , , , , ?

+4
2

: 2,7, , , , . . . partial, , with.

:

-, , , . Global Interpreter Lock, . , multicore.

-, , , . , ( ) .

:

app.py:

import datasources
import metrics
import outputs

pool = multiprocessing.Pool()
data_list = pool.map(lambda o: o.refresh, list(dir(datasources)))
pool.close()
pool.join()

pool = multiprocessing.Pool()
metrics_funcs = [(m, data_list) for m in dir(metrics)]
metrics_list = pool.map(lambda m: m[0].calculate(m[1]), metrics_funcs)
pool.close()
pool.join()

pool = multiprocessing.Pool()
output_funcs = [(o, data_list, metrics_list) for o in dir(outputs)]
output_list = pool.map(lambda o: o[0].dump(o[1], o[2]), output_funcs)
pool.close()
pool.join()

:

def refresh():
    # Populate the "data" member somehow
    return [1, 2, 3]

:

def calculate(data_list):
    # Use the datasources to compute the metric
    return [sum(x) for x in zip(data_list)]

, , :

def dump(data_list, metrics_list):
    # do whatever; you now have all the information

"" ( ). , . , , , - , , map, , . , . , . , , partial, . , . :

from functools import partial

do_dump(module_name, data_list, metrics_list):
    globals()[module_name].dump(data_list, metrics_list)

invoke = partial(do_dump, data_list=data_list, metrics_list=metrics_list)
with multiprocessing.Pool() as pool:
    output_list = pool.map(invoke, [o.__name__ for o in dir(outputs)])
    pool.close()
    pool.join()

:

, , , .. data_list[i] - dir(datasources)[i].refresh(). , , app.py:

data_list = ...
pool.close()
pool.join()
data_map = {name: data_list[i] for i, name in enumerate(dir(datasources))}

data_map . , , .

d1 = data_map['data_one']
d2 = data_map['data_two']
return [sum(x) for x in zip([d1, d2])]
0

Process Thread - python. , .

, multiprocessing.Array, .

. : https://docs.python.org/2/library/multiprocessing.html#sharing-state-between-processes

0

Source: https://habr.com/ru/post/1680975/


All Articles