, , , input_rows insert() - multiprocessing.Pool.apply_async() , , insert() 100 100. , . pool.apply_async(insert, [input_rows]), ... , 100 multiprocessing.Pool.map() .
, "" . "" , multiprocessing.Pool imap() ( imap_unordered()) , ( ), , , , :
import arcpy
import itertools
import multiprocessing
def iterator_slice(iterator, length):
iterator = iter(iterator)
while True:
res = tuple(itertools.islice(iterator, length))
if not res:
break
yield res
def insert(rows):
with arcpy.da.InsertCursor("c:\temp2.gdb\test" fields=["*"]) as i_cursor:
for row in rows:
i_cursor.insertRow(row)
if __name__ == "__main__":
with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
pool = multiprocessing.Pool(processes=4)
for result in pool.imap_unordered(insert, iterator_slice(s_cursor, 100)):
pass
pool.close()
(, s_cursor, 100)
... , . , imap_unordered() - ( , ) , , , - apply_async() - , , , , :
if __name__ == "__main__":
with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
pool = multiprocessing.Pool(processes=4)
cursor_iterator = iterator_slice(s_cursor, 100)
queue = []
while cursor_iterator or queue:
try:
queue.append(pool.apply_async(insert, [next(cursor_iterator)]))
except (StopIteration, TypeError):
cursor_iterator = None
while queue and (len(queue) >= pool._processes or not cursor_iterator):
process = queue.pop(0)
process.wait(0.1)
if not process.ready():
queue.append(process)
else:
pass
pool.close()
s_cursor , 100 ( insert() ).
UPDATE. , , . :
import random
import time
def get_counter(limit=100):
for i in range(limit):
if not i % 3:
print("Generated: {}".format(i))
yield i
def test_process(values):
time_to_wait = 1 + random.random() * 5
print("Processing: {}, waiting: {:0.2f} seconds".format(values, time_to_wait))
time.sleep(time_to_wait)
print("Processed: {}".format(values))
:
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=2)
count = get_counter(30)
count_iterator = iterator_slice(count, 7)
queue = []
while count_iterator or queue:
try:
queue.append(pool.apply_async(test_process, [next(count_iterator)]))
except (StopIteration, TypeError):
count_iterator = None
while queue and (len(queue) >= pool._processes or not count_iterator):
process = queue.pop(0)
process.wait(0.1)
if not process.ready():
queue.append(process)
else:
pass
pool.close()
(, ):
Generated: 0
Generated: 3
Generated: 6
Generated: 9
Generated: 12
Processing: (0, 1, 2, 3, 4, 5, 6), waiting: 3.32 seconds
Processing: (7, 8, 9, 10, 11, 12, 13), waiting: 2.37 seconds
Processed: (7, 8, 9, 10, 11, 12, 13)
Generated: 15
Generated: 18
Processing: (14, 15, 16, 17, 18, 19, 20), waiting: 1.85 seconds
Processed: (0, 1, 2, 3, 4, 5, 6)
Generated: 21
Generated: 24
Generated: 27
Processing: (21, 22, 23, 24, 25, 26, 27), waiting: 2.55 seconds
Processed: (14, 15, 16, 17, 18, 19, 20)
Processing: (28, 29), waiting: 3.14 seconds
Processed: (21, 22, 23, 24, 25, 26, 27)
Processed: (28, 29)
, / , , , (/ -, ). , . , , , , - ( ) queue ( ), - while queue and (len(queue) >= pool._processes + 3 or not count_iterator):, 3 , , , .