I use ZeroMQ in Python and C ++ in many configurations, and I wonder which one is the most elegant way to interrupt recv() or poll() from another thread (for example, in the case of controlled program termination, and also if you want to stop listening without having to destroy the socket).
Unlike this question, I do not want to just avoid the infinitive wait, but I want to return immediately from recv() or poll() .
I know that I can simply provide a timeout and abort recv() as follows:
poller = zmq.Poller() poller.register(socket, zmq.POLLIN) while _running: if poller.poll(timeout=100) == []: # maybe handle unwanted timout here.. continue handle_message(socket.recv())
This will try the socket endlessly until _running is set to False from another thread - after a maximum of 100 ms I have finished.
But this is not very pleasant - I have a busy cycle and it is difficult to cope with real timeouts that may be the result of unwanted behavior. Also, I have to wait for a timeout, which is not critical in most cases, but .. you know what I mean.
Of course, I can ask for an additional abortion socket:
abort_socket = context.socket(zmq.SUB) abort_socket.setsockopt(zmq.SUBSCRIBE, b"") abort_socket.connect(<abort-publisher-endpoint>) poller = zmq.Poller() poller.register(socket, zmq.POLLIN) poller.register(abort_socket, zmq.POLLIN) while _running: poll_result = poller.poll(timeout=1000) if socket in poll_result: handle_message(socket.recv()) elif abort_socket in poll_result: break else:
But this approach also has disadvantages:
- a bit detailed - in the place where I cause the interrupt, I will need to create a publisher and use it to interrupt the reception
abort_socket can only be used from one thread , so I would have to make sure
So my question is : how is this done in a glorious way?
Can I somehow just use something like Python threading.Event or s.th. similar to other languages, not an interrupt, which can be passed to the polter as follows:
def listener_thread_fn(event) poller = zmq.Poller() poller.register(socket, zmq.POLLIN) poller.register(event, zmq.POLLIN) while _running: poll_result = poller.poll(timeout=1000) if socket in poll_result: handle_message(socket.recv()) elif event in poll_result: break else: # handle real timeout here pass
So, you just had to create theading.Event() in the first place, pass it to listener_thread_fn and call event.set() from any thread to abort.