Multithreaded Python FS Crawler

I wrote a python function that scans the file system using the provided directory template, with additional โ€œactionsโ€ to be performed at each level. Then I tried multithreading, as some volumes are in network resources, and I want to minimize I / O blocking. I started by using the multiprocessor pool class, as it was most convenient ... (seriously, is there no pool class for streaming?). My function extends the provided FS template as much as possible and sends the newly returned paths to the pool until new paths are returned. I got this to work perfectly when directly using a function and class, but now I'm trying to use this function from another class, and my program seems to be hanging. To simplify, I rewrote the function using Threads instead of Processes, and even wrote a simple ThreadPool class ... the same problem. Here's a very simplified version of the code that still has the same problems:

file test1.py: ------------------------------------------------ import os import glob from multiprocessing import Pool def mapGlob(pool,paths,pattern): results = [] paths = [os.path.join(p,pattern) for p in paths] for result in pool.map(glob.glob,paths): results += result return results def findAllMyPaths(): pool = Pool(10) paths = ['/Volumes'] follow = ['**','ptid_*','expid_*','slkid_*'] for pattern in follow: paths = mapGlob(pool,paths,pattern) return paths file test2.py: ---------------------------------------------------------------------------- from test1 import findAllMyPaths allmypaths = findAllMyPaths() 

Now if i call

 >>>from test1 import findAllMyPaths >>>findAllMyPaths() >>>...long list of all the paths 

this works fine, but if you try:

 >>>from test2 import allmypaths 

python hangs forever. The functions of the action are called (in this example, glob), but they never return ... I need help ... the parallel version works much faster when it works correctly (6-20X faster depending on what "actions" are displayed at every point of the FS tree), so I would like to be able to use it.

also if I changed the mapping function to a non-parallel version:

 def mapGlob(pool,paths,pattern): results = [] paths = [os.path.join(p,pattern) for p in paths] for path in paths: results += glob.glob(path) return results 

everything is working fine.

Edit:

I turned on debugging on a multiprocessor system to find out if this could help me in the future. In the case when it works, I get:

 [DEBUG/MainProcess] created semlock with handle 5 [DEBUG/MainProcess] created semlock with handle 6 [DEBUG/MainProcess] created semlock with handle 9 [DEBUG/MainProcess] created semlock with handle 10 [INFO/PoolWorker-1] child process calling self.run() [INFO/PoolWorker-2] child process calling self.run() [INFO/PoolWorker-3] child process calling self.run() [INFO/PoolWorker-5] child process calling self.run() [INFO/PoolWorker-4] child process calling self.run() [INFO/PoolWorker-6] child process calling self.run() [INFO/PoolWorker-7] child process calling self.run() [INFO/PoolWorker-9] child process calling self.run() [INFO/PoolWorker-8] child process calling self.run() [INFO/PoolWorker-10] child process calling self.run() [DEBUG/MainProcess] closing pool [SUBDEBUG/MainProcess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.Pool'>> with args (<Queue.Queue instance at 0x34af918>, <multiprocessing.queues.SimpleQueue object at 0x3494950>, <multiprocessing.queues.SimpleQueue object at 0x34a61b0>, [<Process(PoolWorker-1, started daemon)>, <Process(PoolWorker-2, started daemon)>, <Process(PoolWorker-3, started daemon)>, <Process(PoolWorker-4, started daemon)>, <Process(PoolWorker-5, started daemon)>, <Process(PoolWorker-6, started daemon)>, <Process(PoolWorker-7, started daemon)>, <Process(PoolWorker-8, started daemon)>, <Process(PoolWorker-9, started daemon)>, <Process(PoolWorker-10, started daemon)>], <Thread(Thread-1, started daemon -1341648896)>, <Thread(Thread-2, started daemon -1341116416)>, {}) and kwargs {} [DEBUG/MainProcess] finalizing pool [DEBUG/MainProcess] helping task handler/workers to finish [DEBUG/MainProcess] removing tasks from inqueue until task handler finished [DEBUG/MainProcess] task handler got sentinel [DEBUG/MainProcess] task handler sending sentinel to result handler [DEBUG/MainProcess] task handler sending sentinel to workers [DEBUG/MainProcess] task handler exiting [DEBUG/MainProcess] result handler got sentinel [DEBUG/MainProcess] ensuring that outqueue is not full [DEBUG/MainProcess] result handler exiting: len(cache)=0, thread._state=0 [DEBUG/PoolWorker-2] worker got sentinel -- exiting [DEBUG/PoolWorker-1] worker got sentinel -- exiting [INFO/PoolWorker-2] process shutting down [DEBUG/PoolWorker-7] worker got sentinel -- exiting [INFO/PoolWorker-1] process shutting down [INFO/PoolWorker-7] process shutting down [DEBUG/PoolWorker-7] running all "atexit" finalizers with priority >= 0 [DEBUG/PoolWorker-1] running all "atexit" finalizers with priority >= 0 [DEBUG/PoolWorker-7] running the remaining "atexit" finalizers [INFO/PoolWorker-7] process exiting with exitcode 0 [DEBUG/PoolWorker-1] running the remaining "atexit" finalizers [INFO/PoolWorker-1] process exiting with exitcode 0 [DEBUG/PoolWorker-5] worker got sentinel -- exiting [DEBUG/PoolWorker-2] running all "atexit" finalizers with priority >= 0 [INFO/PoolWorker-5] process shutting down [DEBUG/PoolWorker-5] running all "atexit" finalizers with priority >= 0 [DEBUG/PoolWorker-2] running the remaining "atexit" finalizers [DEBUG/PoolWorker-5] running the remaining "atexit" finalizers [INFO/PoolWorker-2] process exiting with exitcode 0 [INFO/PoolWorker-5] process exiting with exitcode 0 [DEBUG/PoolWorker-6] worker got sentinel -- exiting [INFO/PoolWorker-6] process shutting down [DEBUG/PoolWorker-6] running all "atexit" finalizers with priority >= 0 [DEBUG/PoolWorker-6] running the remaining "atexit" finalizers [INFO/PoolWorker-6] process exiting with exitcode 0 [DEBUG/PoolWorker-4] worker got sentinel -- exiting [DEBUG/PoolWorker-9] worker got sentinel -- exiting [INFO/PoolWorker-9] process shutting down [DEBUG/PoolWorker-9] running all "atexit" finalizers with priority >= 0 [DEBUG/PoolWorker-9] running the remaining "atexit" finalizers [INFO/PoolWorker-9] process exiting with exitcode 0 [INFO/PoolWorker-4] process shutting down [DEBUG/PoolWorker-4] running all "atexit" finalizers with priority >= 0 [DEBUG/PoolWorker-4] running the remaining "atexit" finalizers [INFO/PoolWorker-4] process exiting with exitcode 0 [DEBUG/PoolWorker-10] worker got sentinel -- exiting [INFO/PoolWorker-10] process shutting down [DEBUG/PoolWorker-10] running all "atexit" finalizers with priority >= 0 [DEBUG/PoolWorker-10] running the remaining "atexit" finalizers [INFO/PoolWorker-10] process exiting with exitcode 0 [DEBUG/PoolWorker-8] worker got sentinel -- exiting [INFO/PoolWorker-8] process shutting down [DEBUG/PoolWorker-8] running all "atexit" finalizers with priority >= 0 [DEBUG/PoolWorker-8] running the remaining "atexit" finalizers [INFO/PoolWorker-8] process exiting with exitcode 0 [DEBUG/PoolWorker-3] worker got sentinel -- exiting [INFO/PoolWorker-3] process shutting down [DEBUG/PoolWorker-3] running all "atexit" finalizers with priority >= 0 [DEBUG/PoolWorker-3] running the remaining "atexit" finalizers [INFO/PoolWorker-3] process exiting with exitcode 0 [DEBUG/MainProcess] terminating workers [DEBUG/MainProcess] joining task handler [DEBUG/MainProcess] joining result handler [DEBUG/MainProcess] joining pool workers 

and when thatโ€™s not all I get is:

 [DEBUG/MainProcess] created semlock with handle 6 [DEBUG/MainProcess] created semlock with handle 7 [DEBUG/MainProcess] created semlock with handle 10 [DEBUG/MainProcess] created semlock with handle 11 [INFO/PoolWorker-1] child process calling self.run() [INFO/PoolWorker-2] child process calling self.run() [INFO/PoolWorker-3] child process calling self.run() [INFO/PoolWorker-8] child process calling self.run() [INFO/PoolWorker-5] child process calling self.run() [INFO/PoolWorker-4] child process calling self.run() [INFO/PoolWorker-9] child process calling self.run() [INFO/PoolWorker-6] child process calling self.run() [INFO/PoolWorker-7] child process calling self.run() [INFO/PoolWorker-10] child process calling self.run() 
+4
source share
2 answers

Not a complete solution, but I found a way to make the code work in any form: from the interpreter or the code in the running script. I think the problem is related to the following note in multiprocessing docs:

The functionality in this package requires the main method to be imported by children. This is described in the Programming Guide, but it is worth noting here. This means that some examples, such as .Pool multiprocessing examples, will not work in the interactive interpreter.

I'm not sure why this restriction exists, and why I can sometimes sometimes use the pool from the interactive interpreter, and sometimes not, but well ...

to get around it, I do the following in any module that can use multiprocessing:

 import __main__ __SHOULD_MULTITHREAD__ = False if hasattr(__main__,'__file__'): __SHOULD_MULTITHREAD__ = True 

the rest of the code inside this module can then check this flag to see if it should use the pool or just execute without parallelization. Having done this, I can still use and test parallel functions in modules from the interactive interpreter, they just work much slower.

+1
source

If I'm not mistaken, test2.py should not look like this:

 from test1 import findAllMyPaths allmypaths = findAllMyPaths 

and then

 from test2 import allmypaths allmypaths() 
0
source

Source: https://habr.com/ru/post/1337054/


All Articles