Why does this Python 0MQ script for distributed computing freeze with a fixed input size?

I recently started learning 0MQ . Earlier today I came across a blog, Python Multiprocessing with ZeroMQ . He talked about the fan drawing in the 0MQ manual that I read about, so he decided to give it a try.

Instead of just calculating the products of numbers by workers, as the source code does, I decided to try to get the fan to send large arrays to workers through 0mq messages. Below is the code that I used for my “experiments”.

As noted in the comment below, at any time when I tried to increase the string_length variable to a number greater than 3 MB, the code freezes.

Typical symptom: suppose we set string_length to 4 MB (i.e. 4194304), then perhaps the result manager receives the result from one worker, and then the code simply pauses. htop shows that 2 cores do not do much. The Etherape network traffic monitor also does not show traffic on the lo interface.

So far, after several hours, looking around, I could not understand what this is causing, and I would like to get a hint or two to the question of why and any solution on this issue. Thanks!

I am running Ubuntu 11.04 64bit on a Dell laptop with an Intel Core processor, 8 GB RAM, 80 GB Intel X25MG2 SSD, Python 2.7.1+, libzmq1 2.1.10-1chl1 ~ natty1, python-pyzmq 2.1.10-1chl1 ~ natty1

import time import zmq from multiprocessing import Process, cpu_count np = cpu_count() pool_size = np number_of_elements = 128 # Odd, why once the slen is bumped to 3MB or above, the code hangs? string_length = 1024 * 1024 * 3 def create_inputs(nelem, slen, pb=True): ''' Generates an array that contains nelem fix-sized (of slen bytes) random strings and an accompanying array of hexdigests of the former elements. Both are returned in a tuple. :type nelem: int :param nelem: The desired number of elements in the to be generated array. :type slen: int :param slen: The desired number of bytes of each array element. :type pb: bool :param pb: If True, displays a text progress bar during input array generation. ''' from os import urandom import sys import hashlib if pb: if nelem <= 64: toolbar_width = nelem chunk_size = 1 else: toolbar_width = 64 chunk_size = nelem // toolbar_width description = '%d random strings of %d bytes. ' % (nelem, slen) s = ''.join(('Generating an array of ', description, '...\n')) sys.stdout.write(s) # create an ASCII progress bar sys.stdout.write("[%s]" % (" " * toolbar_width)) sys.stdout.flush() sys.stdout.write("\b" * (toolbar_width+1)) array = list() hash4a = list() try: for i in range(nelem): e = urandom(int(slen)) array.append(e) h = hashlib.md5() h.update(e) he = h.hexdigest() hash4a.append(he) i += 1 if pb and i and i % chunk_size == 0: sys.stdout.write("-") sys.stdout.flush() if pb: sys.stdout.write("\n") except MemoryError: print('Memory Error: discarding existing arrays') array = list() hash4a = list() finally: return array, hash4a # The "ventilator" function generates an array of nelem fix-sized (of slen # bytes long) random strings, and sends the array down a zeromq "PUSH" # connection to be processed by listening workers, in a round robin load # balanced fashion. def ventilator(): # Initialize a zeromq context context = zmq.Context() # Set up a channel to send work ventilator_send = context.socket(zmq.PUSH) ventilator_send.bind("tcp://127.0.0.1:5557") # Give everything a second to spin up and connect time.sleep(1) # Create the input array nelem = number_of_elements slen = string_length payloads = create_inputs(nelem, slen) # Send an array to each worker for num in range(np): work_message = { 'num' : payloads } ventilator_send.send_pyobj(work_message) time.sleep(1) # The "worker" functions listen on a zeromq PULL connection for "work" # (array to be processed) from the ventilator, get the length of the array # and send the results down another zeromq PUSH connection to the results # manager. def worker(wrk_num): # Initialize a zeromq context context = zmq.Context() # Set up a channel to receive work from the ventilator work_receiver = context.socket(zmq.PULL) work_receiver.connect("tcp://127.0.0.1:5557") # Set up a channel to send result of work to the results reporter results_sender = context.socket(zmq.PUSH) results_sender.connect("tcp://127.0.0.1:5558") # Set up a channel to receive control messages over control_receiver = context.socket(zmq.SUB) control_receiver.connect("tcp://127.0.0.1:5559") control_receiver.setsockopt(zmq.SUBSCRIBE, "") # Set up a poller to multiplex the work receiver and control receiver channels poller = zmq.Poller() poller.register(work_receiver, zmq.POLLIN) poller.register(control_receiver, zmq.POLLIN) # Loop and accept messages from both channels, acting accordingly while True: socks = dict(poller.poll()) # If the message came from work_receiver channel, get the length # of the array and send the answer to the results reporter if socks.get(work_receiver) == zmq.POLLIN: #work_message = work_receiver.recv_json() work_message = work_receiver.recv_pyobj() length = len(work_message['num'][0]) answer_message = { 'worker' : wrk_num, 'result' : length } results_sender.send_json(answer_message) # If the message came over the control channel, shut down the worker. if socks.get(control_receiver) == zmq.POLLIN: control_message = control_receiver.recv() if control_message == "FINISHED": print("Worker %i received FINSHED, quitting!" % wrk_num) break # The "results_manager" function receives each result from multiple workers, # and prints those results. When all results have been received, it signals # the worker processes to shut down. def result_manager(): # Initialize a zeromq context context = zmq.Context() # Set up a channel to receive results results_receiver = context.socket(zmq.PULL) results_receiver.bind("tcp://127.0.0.1:5558") # Set up a channel to send control commands control_sender = context.socket(zmq.PUB) control_sender.bind("tcp://127.0.0.1:5559") for task_nbr in range(np): result_message = results_receiver.recv_json() print "Worker %i answered: %i" % (result_message['worker'], result_message['result']) # Signal to all workers that we are finsihed control_sender.send("FINISHED") time.sleep(5) if __name__ == "__main__": # Create a pool of workers to distribute work to for wrk_num in range(pool_size): Process(target=worker, args=(wrk_num,)).start() # Fire up our result manager... result_manager = Process(target=result_manager, args=()) result_manager.start() # Start the ventilator! ventilator = Process(target=ventilator, args=()) ventilator.start() 
+6
source share
1 answer

The problem is that your fan slot (PUSH) closes before it is sent. You have 1s sleep at the end of the fan function, which is not enough to send 384 MB of messages. That is why you have a threshold that you have, if the dream were shorter, then the threshold would be lower.

However, LINGER is supposed to prevent such things, so I would say it with zeromq: PUSH does not seem to respect LINGER.

The fix for your specific example (without adding an indefinitely long sleep) would be to use the same FINISH signal to stop your fan as your workers. Thus, you guarantee that your fan will remain as long as necessary.

Revised Fan:

 def ventilator(): # Initialize a zeromq context context = zmq.Context() # Set up a channel to send work ventilator_send = context.socket(zmq.PUSH) ventilator_send.bind("tcp://127.0.0.1:5557") # Set up a channel to receive control messages control_receiver = context.socket(zmq.SUB) control_receiver.connect("tcp://127.0.0.1:5559") control_receiver.setsockopt(zmq.SUBSCRIBE, "") # Give everything a second to spin up and connect time.sleep(1) # Create the input array nelem = number_of_elements slen = string_length payloads = create_inputs(nelem, slen) # Send an array to each worker for num in range(np): work_message = { 'num' : payloads } ventilator_send.send_pyobj(work_message) # Poll for FINISH message, so we don't shutdown too early poller = zmq.Poller() poller.register(control_receiver, zmq.POLLIN) while True: socks = dict(poller.poll()) if socks.get(control_receiver) == zmq.POLLIN: control_message = control_receiver.recv() if control_message == "FINISHED": print("Ventilator received FINSHED, quitting!") break # else: unhandled message 
+6
source

Source: https://habr.com/ru/post/906262/


All Articles