Multiprocessor or multithreaded? - parallelization of simple calculations for millions of iterations and saving the result in a single data structure

I have a dictionary of D records {string: list}, and I compute the function f (D [s1], D [s2]) β†’ float for a pair of strings (s1, s2) in D.

In addition, I created my own matrix LabeledNumericMatrix class, which allows you to perform tasks such as m [ID1, ID2] = 1.0.

I need to compute f (x, y) and store the result in m [x, y] for all 2-tuples in the rowset S, including when s1 = s2. It’s easy to code as a loop, but this code takes quite a while to complete, since the size of the set S grows to large values, such as 10,000 or more.

None of the results that I save in my labeled matrix m is independent of each other. Therefore, it is a simple parallelization of this calculation using multithreaded or multiprocessor python services. However, since cPython really does not allow me to do f (x, y) and storage m [x, y] calculations through streaming at the same time, it seems like a multiprocessor is my only choice. However, I do not think that the multiprocessor is designed to transfer about 1 GB of data structures between processes, such as my labeled matrix structure, containing elements of 10000x10000.

Can someone give advice (a) if I should avoid trying to parallelize my algorithm, and (b) if I can perform parallelization, how can I do this, preferably in cPython?

+6
source share
5 answers

The first option is the server process

Create a server process. This is part of the Multiprocessing package, which provides concurrent access to data structures. Thus, each process will directly access the data structure, blocking other processes.

From the documentation :

Server process

The manager object returned by manager () controls the server process, which contains Python objects and allows other processes to manipulate them using proxies.

The manager returned by manager () will maintain a list of dict , namespace, lock , rlock, semaphore, restricted map, state, event, queue, value and array types.

The second option is a pool of workers

Create an employee pool , input queue, and result queue.

  • The main process, acting as a producer, will feed the input queue in pairs (s1, s2).
  • Each workflow will read a pair from the input queue and write the result to the output queue.
  • The main thread will read the results from the result queue and write them to the results dictionary.

The third option is dividing into independent tasks

Your data is independent: f (D [s i ], D [s j ]) is an isolated problem, independent of any f (D [s k ], D [s l ]). in addition, the calculation time of each pair should be sufficiently equal to, or at least in the same order of magnitude.

Divide the task into n sets of inputs, where n is the number of computing units (cores or even computers) that you have. Give each input set a different process and attach it to the output.

+6
source

You will definitely not have a performance boost with threading - this is an unacceptable tool for CPU related tasks.

Thus, the only possible choice is multiprocessing , but since you have a large data structure, I would suggest something like mmap (rather low level, but built-in) or Redis (a delicious and high-level API, but it needs to be installed and configured).

+2
source

Have you profiled your code? Is it just calculating f, which is too expensive, or storing the results in a data structure (or, perhaps, both)?

If f is dominant, then you must be sure that you cannot make algorithmic improvements before you start worrying about parallelization. You could get more speed by turning part or all of the functions into a C extension, possibly using cython . If you are doing multiprocessing, then I don’t understand why you need to transfer the entire data structure between processes?

If storing the results in a matrix is ​​too expensive, you can speed up your code by using a more efficient data structure (e.g. array.array or numpy.ndarray ). If you don't design and implement your own matrix class very carefully, it will almost certainly be slower than that.

+1
source

Thank you all for your answers.

I created a solution (rather than a "solution") for the proposed problem, and since others may be useful, I am posting the code here. My solution is a hybrid of options 1 and 3, proposed by Adam Matan. The code contains line numbers from my vi session, which will help in the discussion below.

  12 # System libraries needed by this module. 13 import numpy, multiprocessing, time 14 15 # Third-party libraries needed by this module. 16 import labeledMatrix 17 18 # ----- Begin code for this module. ----- 19 from commonFunctions import debugMessage 20 21 def createSimilarityMatrix( fvFileHandle, fvFileParser, fvSimScorer, colIDs, rowIDs=None, 22 exceptionType=ValueError, useNumType=numpy.float, verbose=False, 23 maxProcesses=None, processCheckTime=1.0 ): 24 """Create a labeled similarity matrix from vectorial data in [fvFileHandle] that can be 25 parsed by [fvFileParser]. 26 [fvSimScorer] should be a function that can return a floating point value for a pair of vectors. 27 28 If the matrix [rowIDs] are not specified, they will be the same as the [colIDs]. 29 30 [exceptionType] will be raised when a row or column ID cannot be found in the vectorial data. 31 [maxProcesses] specifies the number of CPUs to use for calculation; default value is all available CPUs. 32 [processCheckTime] is the interval for checking activity of CPUs (if completed calculation or not). 33 34 Return: a LabeledNumericMatrix with corresponding row and column IDs.""" 35 36 # Setup row/col ID information. 37 useColIDs = list( colIDs ) 38 useRowIDs = rowIDs or useColIDs 39 featureData = fvFileParser( fvFileHandle, retainIDs=(useColIDs+useRowIDs) ) 40 verbose and debugMessage( "Retrieved %i feature vectors from FV file." % len(featureData) ) 41 featureIDs = featureData.keys() 42 absentIDs = [ ID for ID in set(useColIDs + useRowIDs) if ID not in featureIDs ] 43 if absentIDs: 44 raise exceptionType, "IDs %s not found in feature vector file." % absentIDs 45 # Otherwise, proceed to creation of matrix. 46 resultMatrix = labeledMatrix.LabeledNumericMatrix( useRowIDs, useColIDs, numType=useNumType ) 47 calculateSymmetric = True if set( useRowIDs ) == set( useColIDs ) else False 48 49 # Setup data structures needed for parallelization. 50 numSubprocesses = multiprocessing.cpu_count() if maxProcesses==None else int(maxProcesses) 51 assert numSubprocesses >= 1, "Specification of %i CPUs to calculate similarity matrix." % numSubprocesses 52 dataManager = multiprocessing.Manager() 53 sharedFeatureData = dataManager.dict( featureData ) 54 resultQueue = multiprocessing.Queue() 55 # Assign jobs evenly through number of processors available. 56 jobList = [ list() for i in range(numSubprocesses) ] 57 calculationNumber = 0 # Will hold total number of results stored. 58 if calculateSymmetric: # Perform calculations with n(n+1)/2 pairs, instead of n^2 pairs. 59 remainingIDs = list( useRowIDs ) 60 while remainingIDs: 61 firstID = remainingIDs[0] 62 for secondID in remainingIDs: 63 jobList[ calculationNumber % numSubprocesses ].append( (firstID, secondID) ) 64 calculationNumber += 1 65 remainingIDs.remove( firstID ) 66 else: # Straight processing one at a time. 67 for rowID in useRowIDs: 68 for colID in useColIDs: 69 jobList[ calculationNumber % numSubprocesses ].append( (rowID, colID) ) 70 calculationNumber += 1 71 72 verbose and debugMessage( "Completed setup of job distribution: %s." % [len(js) for js in jobList] ) 73 # Define a function to perform calculation and store results 74 def runJobs( scoreFunc, pairs, featureData, resultQueue ): 75 for pair in pairs: 76 score = scoreFunc( featureData[pair[0]], featureData[pair[1]] ) 77 resultQueue.put( ( pair, score ) ) 78 verbose and debugMessage( "%s: completed all calculations." % multiprocessing.current_process().name ) 79 80 81 # Create processes to perform parallelized computing. 82 processes = list() 83 for num in range(numSubprocesses): 84 processes.append( multiprocessing.Process( target=runJobs, 85 args=( fvSimScorer, jobList[num], sharedFeatureData, resultQueue ) ) ) 86 # Launch processes and wait for them to all complete. 87 import Queue # For Queue.Empty exception. 88 for p in processes: 89 p.start() 90 assignmentsCompleted = 0 91 while assignmentsCompleted < calculationNumber: 92 numActive = [ p.is_alive() for p in processes ].count( True ) 93 verbose and debugMessage( "%i/%i complete; Active processes: %i" % \ 94 ( assignmentsCompleted, calculationNumber, numActive ) ) 95 while True: # Empty queue immediately to avoid underlying pipe/socket implementation from hanging. 96 try: 97 pair, score = resultQueue.get( block=False ) 98 resultMatrix[ pair[0], pair[1] ] = score 99 assignmentsCompleted += 1 100 if calculateSymmetric: 101 resultMatrix[ pair[1], pair[0] ] = score 102 except Queue.Empty: 103 break 104 if numActive == 0: finished = True 105 else: 106 time.sleep( processCheckTime ) 107 # Result queue emptied and no active processes remaining - completed calculations. 108 return resultMatrix 109 ## end of createSimilarityMatrix() 

Lines 36-47 are simply preliminary materials related to the definition of the problem, which was part of the original question. The setting for multiprocessing to get around cPython GIL is on lines 49-56, and lines 57-70 are used to evenly create shared tasks. The code in lines 57-70 is used instead of itertools.product because when the list of row / column identifiers reaches 40,000 or so, the product ends up with a huge amount of memory.

The actual calculation that needs to be done is on lines 74-78, and here we use the common vocabulary ID-> vector records and the general result queue.

Lines 81-85 configure the actual process objects, although they are not already running.

In my first attempt (not shown here), "try ... resultQueue.get () and throw an exception ..." the code was actually outside the external control loop (although not all calculations were completed). When I ran this version of the code on the unit test of the 9x9 matrix, there were no problems. However, moving up to 200x200 or higher, I found that this code freezes, even though it does not change anything in the code between executions.

According to this discussion (http://bugs.python.org/issue8426) and the official documentation for multiprocessing, using multiprocess.Queue may hang if the base implementation does not have a large pipe / socket size, so the code provided here as my solution , periodically empties the queue when checking the completion of processes (see Lines 91-106) so that child processes can continue to add new results to it and avoid channel overflows.

When I tested the code on larger 1000x1000 matrices, I noticed that the calculation code was completed long before the queue and matrix assignment codes. Using cProfile, I found that the default polling interval was processCheckTime = 1.0 (line 23), and lowering this value improved the speed of the results (see the bottom of the column for synchronization examples). This can be useful information for other people new to multiprocessing in Python.

In general, this may not be the best possible implementation, but it provides a starting point for further optimization. It is often said that optimization through parallelization requires proper analysis and thought.

Temporary examples, all with 8 processors.

200x200 (calculations / tasks 20100)

t = 1.0: runtime 18 s

t = 0.01: runtime 3s

500x500 (125250 calculations / assignments)

t = 1.0: runtime 86s

t = 0.01: run time 23 s

If someone wants to copy and paste the code, here is the block test I used for the development part. Obviously, the tagged code for the matrix class is missing here, and the fingerprint reader / scorer code is not included (although it is quite easy to minimize it yourself). Of course, I am happy to share this code if I helped someone.

 112 def unitTest(): 113 import cStringIO, os 114 from fingerprintReader import MismatchKernelReader 115 from fingerprintScorers import FeatureVectorLinearKernel 116 exampleData = cStringIO.StringIO() # 9 examples from GPCR (3,1)-mismatch descriptors, first 10 columns. 117 exampleData.write( ",AAA,AAC,AAD,AAE,AAF,AAG,AAH,AAI,AAK" + os.linesep ) 118 exampleData.write( "TS1R2_HUMAN,5,2,3,6,8,6,6,7,4" + os.linesep ) 119 exampleData.write( "SSR1_HUMAN,11,6,5,7,4,7,4,7,9" + os.linesep ) 120 exampleData.write( "OXYR_HUMAN,27,13,14,14,15,14,11,16,14" + os.linesep ) 121 exampleData.write( "ADA1A_HUMAN,7,3,5,4,5,7,3,8,4" + os.linesep ) 122 exampleData.write( "TA2R_HUMAN,16,6,7,8,9,10,6,6,6" + os.linesep ) 123 exampleData.write( "OXER1_HUMAN,10,6,5,7,11,9,5,10,6" + os.linesep ) 124 exampleData.write( "NPY1R_HUMAN,3,3,0,2,3,1,0,6,2" + os.linesep ) 125 exampleData.write( "NPSR1_HUMAN,0,1,1,0,3,0,0,6,2" + os.linesep ) 126 exampleData.write( "HRH3_HUMAN,16,9,9,13,14,14,9,11,9" + os.linesep ) 127 exampleData.write( "HCAR2_HUMAN,3,1,3,2,5,1,1,6,2" ) 128 columnIDs = ( "TS1R2_HUMAN", "SSR1_HUMAN", "OXYR_HUMAN", "ADA1A_HUMAN", "TA2R_HUMAN", "OXER1_HUMAN", 129 "NPY1R_HUMAN", "NPSR1_HUMAN", "HRH3_HUMAN", "HCAR2_HUMAN", ) 130 m = createSimilarityMatrix( exampleData, MismatchKernelReader, FeatureVectorLinearKernel, columnIDs, 131 verbose=True, ) 132 m.SetOutputPrecision( 6 ) 133 print m 134 135 ## end of unitTest() 
0
source

In connection with my last comment attached to the code published on March 21, I found that the multiprocessor .Pool + SQLite (pysqlite2) is not applicable for my specific task, as there were two problems:

(1) Using the default connection, with the exception of the first worker, each other worker process that has executed the insert request is executed only once. (2) When I change the connection keywords to check_same_thread = False, then the full pool of workers is used, but then only some requests succeed and some requests fail. When each employee also performed time.sleep (0.01), the number of query failures was reduced, but not completely. (3) Less important, I heard that my hard drive reads / writes frantically, even for a small list of tasks from 10 inserted queries.

I resorted to MySQL-Python, and everything turned out much better. True, for this user it is necessary to install the MySQL server daemon, the user and the database for this user, but these steps are relatively simple.

Here is an example of the code that worked for me. Obviously, it can be optimized, but it conveys the basic idea for those who are looking for how to start using multiprocessing.

  1 from multiprocessing import Pool, current_process 2 import MySQLdb 3 from numpy import random 4 5 6 if __name__ == "__main__": 7 8 numValues = 50000 9 tableName = "tempTable" 10 useHostName = "" 11 useUserName = "" # Insert your values here. 12 usePassword = "" 13 useDBName = "" 14 15 # Setup database and table for results. 16 dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName ) 17 topCursor = dbConnection.cursor() 18 # Assuming table does not exist, will be eliminated at the end of the script. 19 topCursor.execute( 'CREATE TABLE %s (oneText TEXT, oneValue REAL)' % tableName ) 20 topCursor.close() 21 dbConnection.close() 22 23 # Define simple function for storing results. 24 def work( storeValue ): 25 #print "%s storing value %f" % ( current_process().name, storeValue ) 26 try: 27 dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName ) 28 cursor = dbConnection.cursor() 29 cursor.execute( "SET AUTOCOMMIT=1" ) 30 try: 31 query = "INSERT INTO %s VALUES ('%s',%f)" % ( tableName, current_process().name, storeValue ) 32 #print query 33 cursor.execute( query ) 34 except: 35 print "Query failed." 36 37 cursor.close() 38 dbConnection.close() 39 except: 40 print "Connection/cursor problem." 41 42 43 # Create set of values to assign 44 values = random.random( numValues ) 45 46 # Create pool of workers 47 pool = Pool( processes=6 ) 48 # Execute assignments. 49 for value in values: pool.apply_async( func=work, args=(value,) ) 50 pool.close() 51 pool.join() 52 53 # Cleanup temporary table. 54 dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName ) 55 topCursor = dbConnection.cursor() 56 topCursor.execute( 'DROP TABLE %s' % tableName ) 57 topCursor.close() 58 dbConnection.close() 
0
source

Source: https://habr.com/ru/post/908878/


All Articles