I wrote a python function that scans the file system using the provided directory template, with additional โactionsโ to be performed at each level. Then I tried multithreading, as some volumes are in network resources, and I want to minimize I / O blocking. I started by using the multiprocessor pool class, as it was most convenient ... (seriously, is there no pool class for streaming?). My function extends the provided FS template as much as possible and sends the newly returned paths to the pool until new paths are returned. I got this to work perfectly when directly using a function and class, but now I'm trying to use this function from another class, and my program seems to be hanging. To simplify, I rewrote the function using Threads instead of Processes, and even wrote a simple ThreadPool class ... the same problem. Here's a very simplified version of the code that still has the same problems:
file test1.py: ------------------------------------------------ import os import glob from multiprocessing import Pool def mapGlob(pool,paths,pattern): results = [] paths = [os.path.join(p,pattern) for p in paths] for result in pool.map(glob.glob,paths): results += result return results def findAllMyPaths(): pool = Pool(10) paths = ['/Volumes'] follow = ['**','ptid_*','expid_*','slkid_*'] for pattern in follow: paths = mapGlob(pool,paths,pattern) return paths file test2.py: ---------------------------------------------------------------------------- from test1 import findAllMyPaths allmypaths = findAllMyPaths()
Now if i call
>>>from test1 import findAllMyPaths >>>findAllMyPaths() >>>...long list of all the paths
this works fine, but if you try:
>>>from test2 import allmypaths
python hangs forever. The functions of the action are called (in this example, glob), but they never return ... I need help ... the parallel version works much faster when it works correctly (6-20X faster depending on what "actions" are displayed at every point of the FS tree), so I would like to be able to use it.
also if I changed the mapping function to a non-parallel version:
def mapGlob(pool,paths,pattern): results = [] paths = [os.path.join(p,pattern) for p in paths] for path in paths: results += glob.glob(path) return results
everything is working fine.
Edit:
I turned on debugging on a multiprocessor system to find out if this could help me in the future. In the case when it works, I get:
[DEBUG/MainProcess] created semlock with handle 5 [DEBUG/MainProcess] created semlock with handle 6 [DEBUG/MainProcess] created semlock with handle 9 [DEBUG/MainProcess] created semlock with handle 10 [INFO/PoolWorker-1] child process calling self.run() [INFO/PoolWorker-2] child process calling self.run() [INFO/PoolWorker-3] child process calling self.run() [INFO/PoolWorker-5] child process calling self.run() [INFO/PoolWorker-4] child process calling self.run() [INFO/PoolWorker-6] child process calling self.run() [INFO/PoolWorker-7] child process calling self.run() [INFO/PoolWorker-9] child process calling self.run() [INFO/PoolWorker-8] child process calling self.run() [INFO/PoolWorker-10] child process calling self.run() [DEBUG/MainProcess] closing pool [SUBDEBUG/MainProcess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.Pool'>> with args (<Queue.Queue instance at 0x34af918>, <multiprocessing.queues.SimpleQueue object at 0x3494950>, <multiprocessing.queues.SimpleQueue object at 0x34a61b0>, [<Process(PoolWorker-1, started daemon)>, <Process(PoolWorker-2, started daemon)>, <Process(PoolWorker-3, started daemon)>, <Process(PoolWorker-4, started daemon)>, <Process(PoolWorker-5, started daemon)>, <Process(PoolWorker-6, started daemon)>, <Process(PoolWorker-7, started daemon)>, <Process(PoolWorker-8, started daemon)>, <Process(PoolWorker-9, started daemon)>, <Process(PoolWorker-10, started daemon)>], <Thread(Thread-1, started daemon -1341648896)>, <Thread(Thread-2, started daemon -1341116416)>, {}) and kwargs {} [DEBUG/MainProcess] finalizing pool [DEBUG/MainProcess] helping task handler/workers to finish [DEBUG/MainProcess] removing tasks from inqueue until task handler finished [DEBUG/MainProcess] task handler got sentinel [DEBUG/MainProcess] task handler sending sentinel to result handler [DEBUG/MainProcess] task handler sending sentinel to workers [DEBUG/MainProcess] task handler exiting [DEBUG/MainProcess] result handler got sentinel [DEBUG/MainProcess] ensuring that outqueue is not full [DEBUG/MainProcess] result handler exiting: len(cache)=0, thread._state=0 [DEBUG/PoolWorker-2] worker got sentinel
and when thatโs not all I get is:
[DEBUG/MainProcess] created semlock with handle 6 [DEBUG/MainProcess] created semlock with handle 7 [DEBUG/MainProcess] created semlock with handle 10 [DEBUG/MainProcess] created semlock with handle 11 [INFO/PoolWorker-1] child process calling self.run() [INFO/PoolWorker-2] child process calling self.run() [INFO/PoolWorker-3] child process calling self.run() [INFO/PoolWorker-8] child process calling self.run() [INFO/PoolWorker-5] child process calling self.run() [INFO/PoolWorker-4] child process calling self.run() [INFO/PoolWorker-9] child process calling self.run() [INFO/PoolWorker-6] child process calling self.run() [INFO/PoolWorker-7] child process calling self.run() [INFO/PoolWorker-10] child process calling self.run()