The best solution I have found (and it only works for some types of problems) is to use the client / server setup using the Python BaseManager and SyncManager classes. To do this, first configure the server that serves the proxy class for the data.
DataServer.py
from multiprocessing.managers import SyncManager
import numpy
gData = {}
class DataProxy(object):
def __init__(self):
pass
def getData(self, key, default=None):
global gData
return gData.get(key, None)
if __name__ == '__main__':
port = 5000
print 'Simulate loading some data'
for i in xrange(1000):
gData[i] = numpy.random.rand(1000)
print 'Serving data. Press <ctrl>-c to stop.'
class myManager(SyncManager): pass
myManager.register('DataProxy', DataProxy)
mgr = myManager(address=('', port), authkey='DataProxy01')
server = mgr.get_server()
server.serve_forever()
Run the above and leave it in working condition. The following is the client class that you use to access data.
DataClient.py
from multiprocessing.managers import BaseManager
import psutil
class DataClient(object):
def __init__(self, port):
assert self._checkForProcess('DataServer.py'), 'Must have DataServer running'
class myManager(BaseManager): pass
myManager.register('DataProxy')
self.mgr = myManager(address=('localhost', port), authkey='DataProxy01')
self.mgr.connect()
self.proxy = self.mgr.DataProxy()
@staticmethod
def _checkForProcess(name):
for proc in psutil.process_iter():
if proc.name() == name:
return True
return False
Below is the test code to try this with multiprocessing.
TestMP.py
import time
import multiprocessing as mp
import numpy
from DataClient import *
gProxy = None
gMode = None
gDummy = None
def init(port, mode):
global gProxy, gMode, gDummy
gProxy = DataClient(port).proxy
gMode = mode
gDummy = numpy.random.rand(1000)
def worker(key):
global gProxy, gMode, gDummy
if 0 == gMode:
array = gProxy.getData(key)
elif 1 == gMode:
array = gDummy
else: assert 0, 'unknown mode: %s' % gMode
for i in range(1000):
x = sum(array)
return x
if __name__ == '__main__':
port = 5000
maxkey = 1000
numpts = 100
for mode in [1, 0]:
for nprocs in [16, 1]:
if 0==mode: print 'Using client/server and %d processes' % nprocs
if 1==mode: print 'Using local data and %d processes' % nprocs
keys = [numpy.random.randint(0,maxkey) for k in xrange(numpts)]
pool = mp.Pool(nprocs, initializer=init, initargs=(port,mode))
start = time.time()
ret_data = pool.map(worker, keys, chunksize=1)
print ' took %4.3f seconds' % (time.time()-start)
pool.close()
When I run this on my machine, I get ...
Using local data and 16 processes
took 0.695 seconds
Using local data and 1 processes
took 5.849 seconds
Using client/server and 16 processes
took 0.811 seconds
Using client/server and 1 processes
took 5.956 seconds
, , . . , x=sum(array)
. - , .
, , , , . , , ; .
, , python, DB ints .. , , , , , .