Multiprocessor Iterator Pool

I would like to use a multiprocessor pool with an iterator to execute a function on a thread sharing an iterator in N elements, until the iterator finishes.

import arcpy
from multiprocessing import Pool

def insert(rows):
    with arcpy.da.InsertCursor("c:\temp2.gdb\test" fields=["*"]) as i_cursor:
        #i_cursor is an iterator
        for row in rows:
            i_cursor.insertRow(row)

input_rows = []
count = 0
pool = Pool(4)
with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
    #s_cursor is an iterator
    for row in s_cursor:
        if (count < 100):
            input_rows.append(row)
            count += 1
        else:
            #send 100 rows to the insert function in a new thread
            pool.apply_async(insert, input_rows)
            #reset count and input_rows
            count = 1
            input_rows = [row]


pool.join()
pool.close()

My question is: is this script the correct way to do this? Is there a better way?

Maybe something is wrong with this script because I got the following AssertionError in pool.join()

Traceback (most recent call last):
  File "G:\Maxime\truncate_append_pool.py", line 50, in <module>
    pool.join()
  File "C:\App\Python27\ArcGIS10.3\lib\multiprocessing\pool.py", line 460, in join
    assert self._state in (CLOSE, TERMINATE)
AssertionError
+4
source share
1 answer

, , , input_rows insert() - multiprocessing.Pool.apply_async() , , insert() 100 100. , . pool.apply_async(insert, [input_rows]), ... , 100 multiprocessing.Pool.map() .

, "" . "" , multiprocessing.Pool imap() ( imap_unordered()) , ( ), , , , :

import arcpy
import itertools
import multiprocessing

# a utility function to get us a slice of an iterator, as an iterator
# when working with iterators maximum lazyness is preferred 
def iterator_slice(iterator, length):
    iterator = iter(iterator)
    while True:
        res = tuple(itertools.islice(iterator, length))
        if not res:
            break
        yield res

def insert(rows):
    with arcpy.da.InsertCursor("c:\temp2.gdb\test" fields=["*"]) as i_cursor:
        for row in rows:
            i_cursor.insertRow(row)

if __name__ == "__main__":  # guard for multi-platform use
    with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
        pool = multiprocessing.Pool(processes=4)  # lets use 4 workers
        for result in pool.imap_unordered(insert, iterator_slice(s_cursor, 100)):
            pass  # do whatever you want with your result (return from your process function)
        pool.close()  # all done, close cleanly

(, s_cursor, 100)

... , . , imap_unordered() - ( , ) , , , - apply_async() - , , , , :

if __name__ == "__main__":
    with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
        pool = multiprocessing.Pool(processes=4)  # lets use 4 workers
        cursor_iterator = iterator_slice(s_cursor, 100)  # slicer from above, for convinience
        queue = []  # a queue for our current worker async results, a deque would be faster
        while cursor_iterator or queue:  # while we have anything to do...
            try:
                # add our next slice to the pool:
                queue.append(pool.apply_async(insert, [next(cursor_iterator)])) 
            except (StopIteration, TypeError):  # no more data, clear out the slice iterator
                cursor_iterator = None
            # wait for a free worker or until all remaining finish
            while queue and (len(queue) >= pool._processes or not cursor_iterator):
                process = queue.pop(0)  # grab a process response from the top
                process.wait(0.1)  # let it breathe a little, 100ms should be enough
                if not process.ready():  # a sub-process has not finished execution
                    queue.append(process)  # add it back to the queue
                else:
                    # you can use process.get() to get the result if needed
                    pass
        pool.close()

s_cursor , 100 ( insert() ).

UPDATE. , , . :

import random
import time

# just an example generator to prove lazy access by printing when it generates
def get_counter(limit=100):
    for i in range(limit):
        if not i % 3:  # print every third generation to reduce verbosity
            print("Generated: {}".format(i))
        yield i

# our process function, just prints what passed to it and waits for 1-6 seconds
def test_process(values):
    time_to_wait = 1 + random.random() * 5
    print("Processing: {}, waiting: {:0.2f} seconds".format(values, time_to_wait))
    time.sleep(time_to_wait)
    print("Processed: {}".format(values))

:

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=2)  # lets use just 2 workers
    count = get_counter(30)  # get our counter iterator set to iterate from 0-29
    count_iterator = iterator_slice(count, 7)  # we'll process them in chunks of 7
    queue = []  # a queue for our current worker async results, a deque would be faster
    while count_iterator or queue:
        try:
            # add our next slice to the pool:
            queue.append(pool.apply_async(test_process, [next(count_iterator)]))
        except (StopIteration, TypeError):  # no more data, clear out the slice iterator
            count_iterator = None
        # wait for a free worker or until all remaining workers finish
        while queue and (len(queue) >= pool._processes or not count_iterator):
            process = queue.pop(0)  # grab a process response from the top
            process.wait(0.1)  # let it breathe a little, 100ms should be enough
            if not process.ready():  # a sub-process has not finished execution
                queue.append(process)  # add it back to the queue
            else:
                # you can use process.get() to get the result if needed
                pass
    pool.close()

(, ):

Generated: 0
Generated: 3
Generated: 6
Generated: 9
Generated: 12
Processing: (0, 1, 2, 3, 4, 5, 6), waiting: 3.32 seconds
Processing: (7, 8, 9, 10, 11, 12, 13), waiting: 2.37 seconds
Processed: (7, 8, 9, 10, 11, 12, 13)
Generated: 15
Generated: 18
Processing: (14, 15, 16, 17, 18, 19, 20), waiting: 1.85 seconds
Processed: (0, 1, 2, 3, 4, 5, 6)
Generated: 21
Generated: 24
Generated: 27
Processing: (21, 22, 23, 24, 25, 26, 27), waiting: 2.55 seconds
Processed: (14, 15, 16, 17, 18, 19, 20)
Processing: (28, 29), waiting: 3.14 seconds
Processed: (21, 22, 23, 24, 25, 26, 27)
Processed: (28, 29)

, / , , , (/ -, ). , . , , , , - ( ) queue ( ), - while queue and (len(queue) >= pool._processes + 3 or not count_iterator):, 3 , , , .

+5

Source: https://habr.com/ru/post/1679049/


All Articles