Why is Queue.join () here?

I am learning python threading module and wrote the following code to help myself understand

from Queue import Queue
import threading

lock = threading.Lock()
MAX_THREADS = 8
q = Queue()
count = 0

# some i/o process
def io_process(x):
    pass

# process that deals with shared resources
def shared_resource_process(x):
    pass

def func():
    global q, count
    while not q.empty():
        x = q.get()
        io_process(x)
        if lock.acquire():
            shared_resource_process(x)
            print '%s is processing %r' %(threading.currentThread().getName(), x)
            count += 1          
            lock.release()

def main():
    global q
    for i in range(40):
        q.put(i)

    threads = []
    for i in range(MAX_THREADS):
        threads.append(threading.Thread(target=func))

    for t in threads:
        t.start()

    for t in threads:
        t.join()

    print 'multi-thread done.'
    print count == 40

if __name__ == '__main__':
    main()

and the output is stuck like this:

Thread-1 is processing 32
Thread-8 is processing 33
Thread-6 is processing 34
Thread-2 is processing 35
Thread-5 is processing 36
Thread-3 is processing 37
Thread-7 is processing 38
Thread-4 is processing 39

Please note that fingerprints in main () are not executed, which means that some threads are hanging / blocked?

then I modify the func () method by adding q.task_done ():

if lock.acquire():
            shared_resource_process(x)
            print '%s is processing %r' %(threading.currentThread().getName(), x)
            count += 1
            q.task_done()  # why is this necessary ?
            lock.release()

and now all threads terminate as I expected and get the correct output:

Thread-6 is processing 36
Thread-4 is processing 37
Thread-3 is processing 38
Thread-7 is processing 39
multi-thread done.
True

Process finished with exit code 0

I read the Queue.Queue document here and see that task_done () works with queue.join () to make sure all the items are in the queue. but since I did not call queue.join () in main (), why is task_done () necessary here in func ()? What is the cause of thread freeze / lock when skipping task_done () code?

+4
1

. , Queue , 8. :

  • Thread A q.empty, , . , False .
  • A q.get, B.
  • Thread B q.empty, , , False .
  • Thread B q.get , . B , q.empty True.
  • . q.empty 1, q.get next, , .

, time :

while not q.empty():
    time.sleep(0.1) # Force context switch
    x = q.get()

, , task_done.

, task_done ? Python 2 100 , . . PDF. , , task_done , , .

, get, . , get Queue.Empty, , :

from Queue import Queue, Empty

def func():
    global q, count
    while True:
        try:
            x = q.get(False)
        except Empty:
            break
        io_process(x)
        if lock.acquire():
            shared_resource_process(x)
            print '%s is processing %r' %(threading.currentThread().getName(), x)
            count += 1
            lock.release()
+3

Source: https://habr.com/ru/post/1671262/


All Articles