Limiting Queue Length Using PyZMQ

I want to limit the amount of memory consumed by my ZeroMQ message queues in a Python application. I know that setting a label with a high water level will limit the amount that will be queued on the sender side, but is there a way to control how much will be queued on the receiver side? Linking to Python ZeroMQ is like an unlimited amount.

My test case: I have two python terminals that I use for testing. One of them is the receiver:

Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) [GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> import zmq >>> context = zmq.Context() >>> socket = context.socket(zmq.PULL) >>> socket.setsockopt(zmq.RCVBUF, 256) >>> socket.bind("tcp://127.0.0.1:12345") 

Another is the sender:

 Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) [GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> import zmq >>> context=zmq.Context() >>> socket = context.socket(zmq.PUSH) >>> socket.setsockopt(zmq.SNDBUF, 2048) >>> socket.setsockopt(zmq.HWM, 1) >>> socket.connect("tcp://127.0.0.1:12345") >>> num = 0 >>> while True: ... print num ... socket.send(str(num)) ... num = num + 1 ... 

I run socket.recv() on the receiver side a couple of times to make sure the queue is working, but other than that, let the two terminals just sit there. The send cycle never seems to lock, and the receiving message seems to be growing memory.

+5
source share
3 answers

Unlike the ZeroMQ documentation, the high water mark must be installed on both the PUSH side and the PULL side. As soon as I changed the PULL , it worked better. New PULL code:

 Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) [GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> import zmq >>> context=zmq.Context() >>> socket = context.socket(zmq.PULL) >>> socket.setsockopt(zmq.RCVBUF, 256) >>> socket.setsockopt(zmq.HWM, 1) >>> socket.bind("tcp://127.0.0.1:12345") 
+3
source

Actually, the documentation says the following:

"When the ZMQ_PUSH socket goes into exceptional condition because it has reached the high water mark for all downstream nodes, or if there are no downstream nodes at all, then any operations of zmq_send (3) on the socket should be blocked until it ends an exceptional state or at least one downstream node becomes available for sending; no messages are discarded. "

http://api.zeromq.org/2-1:zmq-socket

Which directly indicates that you can (and should) set a high water sign for downstream nodes (aka pull) and, possibly, implies that setting it on the click side will not have any effect (although I suspect it is not , because it still takes place when nodes are available downstream, but messages arrive faster than they can be sent.)

0
source

I use these attributes to limit the size of the ZeroMQ queue to one:

Here is an example with ZMQ PUSH/PULL :

Sender ( zmq.PUSH ):

 def create_pub_socket(ip, port): try: context = zmq.Context() socket = context.socket(zmq.PUSH) socket.setsockopt(zmq.SNDHWM, 1) zmq_address = "tcp://{}:{}".format(ip, port) socket.connect(zmq_address) return socket except zmq.ZMQError as exp: print(exp) return False sock = create_push_socket('127.0.0.1', 5558) if sock: sock.send_json({'a': 1}) 

Getter ( zmq.PULL ):

 def listen(self): sock = None try: context = zmq.Context() sock = context.socket(zmq.PULL) sock.setsockopt(zmq.RCVHWM, 1) sock.setsockopt(zmq.CONFLATE, 1) # last msg only. sock.bind("tcp://*:5558") except zmq.ZMQError: logger.captureException() configs = None while configs is None: if sock: configs = sock.recv_json() time.sleep(1e-1) else: time.sleep(5) listen() # Recursive. listen() 
0
source

Source: https://habr.com/ru/post/1397674/


All Articles