ZMQ: There is no subscription message in the XPUB slot for multiple subscribers (last value caching template)

I implemented the ZCQ example ( http://zguide.zeromq.org/php:chapter5#Last-Value-Caching ), but cannot get a second subscriber to register for the backend.

The first time a subscriber comes aboard, the event[0] == b'\x01' condition is satisfied and the cached value is sent, but the second subscriber (the same section) is not even registered ( if backend in events: never true). Everything else works fine. Data is transmitted from the publisher to subscribers (all).

What could be the reason for this? Is the backend connected correctly? Is this template intended to work with the first subscriber?

Update

When I subscribe the second subscriber on a different topic, I get the correct behavior (i.e. \x01 when subscribing). This really works for the first onlt subscriber. Is there a bug in ZeroMQ?

Update 2

Here's a minimal working example showing that the LVC pattern is not working (at least not the way it is implemented here).

 # subscriber.py import zmq def main(): ctx = zmq.Context.instance() sub = ctx.socket(zmq.SUB) sub.connect("tcp://127.0.0.1:5558") # Subscribe to every single topic from publisher print 'subscribing (sub side)' sub.setsockopt(zmq.SUBSCRIBE, b"my-topic") poller = zmq.Poller() poller.register(sub, zmq.POLLIN) while True: try: events = dict(poller.poll(1000)) except KeyboardInterrupt: print("interrupted") break # Any new topic data we cache and then forward if sub in events: msg = sub.recv_multipart() topic, current = msg print 'received %s on topic %s' % (current, topic) if __name__ == '__main__': main() 

And here is the broker (as in the example, but with a bit more detail and an integrated publisher).

 # broker.py # from http://zguide.zeromq.org/py:lvcache import zmq import threading import time class Publisher(threading.Thread): def __init__(self): super(Publisher, self).__init__() def run(self): time.sleep(10) ctx = zmq.Context.instance() pub = ctx.socket(zmq.PUB) pub.connect("tcp://127.0.0.1:5557") cnt = 0 while True: msg = 'hello %d' % cnt print 'publisher is publishing %s' % msg pub.send_multipart(['my-topic', msg]) cnt += 1 time.sleep(5) def main(): ctx = zmq.Context.instance() frontend = ctx.socket(zmq.SUB) frontend.bind("tcp://*:5557") backend = ctx.socket(zmq.XPUB) backend.bind("tcp://*:5558") # Subscribe to every single topic from publisher frontend.setsockopt(zmq.SUBSCRIBE, b"") # Store last instance of each topic in a cache cache = {} # We route topic updates from frontend to backend, and # we handle subscriptions by sending whatever we cached, # if anything: poller = zmq.Poller() poller.register(frontend, zmq.POLLIN) poller.register(backend, zmq.POLLIN) # launch a publisher p = Publisher() p.daemon = True p.start() while True: try: events = dict(poller.poll(1000)) except KeyboardInterrupt: print("interrupted") break # Any new topic data we cache and then forward if frontend in events: msg = frontend.recv_multipart() topic, current = msg cache[topic] = current backend.send_multipart(msg) ### this is where it fails for the 2nd subscriber. ### There never even an event from the backend ### in events when the 2nd subscriber is subscribing. # When we get a new subscription we pull data from the cache: if backend in events: print 'message from subscriber' event = backend.recv() # Event is one byte 0=unsub or 1=sub, followed by topic if event[0] == b'\x01': topic = event[1:] print ' => subscribe to %s' % topic if topic in cache: print ("Sending cached topic %s" % topic) backend.send_multipart([ topic, cache[topic] ]) elif event[0] == b'\x00': topic = event[1:] print ' => unsubscribe from %s' % topic if __name__ == '__main__': main() 

Running this code (1 x broker.py , 2 x subscriber.py ) shows that the first subscriber registers with the broker as expected ( \x01 and cache search), but the second subscriber does not register the same. Interestingly, the second subscriber is connected to the pub / subchannel, since after a while (10 seconds) both subscribers receive data from the publisher.

It is very strange. Perhaps some of my libraries are out of date. Here is what I got:

 Python 2.7.9 (v2.7.9:648dcafa7e5f, Dec 10 2014, 10:10:46) [GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> import zmq >>> zmq.__version__ '14.1.1' $ brew info zeromq zeromq: stable 4.0.5 (bottled), HEAD High-performance, asynchronous messaging library http://www.zeromq.org/ /usr/local/Cellar/zeromq/4.0.5_2 (64 files, 2.8M) * Poured from bottle From: https://github.com/Homebrew/homebrew/blob/master/Library/Formula/zeromq.rb ==> Dependencies Build: pkg-config ✔ Optional: libpgm ✘, libsodium ✘ 

Update 3

This behavior can also be observed in zeromq 4.1.2 and pyzmq-14.7.0 (with or without libpgm and libsodium installed).

Update 4

Another observation suggests that the first subscriber is somehow handled differently: the first subscriber is the only one that cancels the expected path from the XPUB socket ( backend ) before its subscription theme is \x00 . Other subscribers (I tried more than 2) remained silent on the backend channel (although I received messages).

Update 5

I hope I don’t go into the rabbit hole, but I looked at the czmq bindings and ran my Python example in C. The results are the same, so I think this is not a binding problem, but with libzmq .

I also confirmed that the second subscriber is sending a subscription message, and I really see this on the wiring:

Subscribe first:

 0000 02 00 00 00 45 00 00 3f 98 be 40 00 40 06 00 00 ....E..? ..@. @... 0010 7f 00 00 01 7f 00 00 01 fa e5 15 b6 34 f0 51 c3 ........ ....4.Q. 0020 05 e4 8b 77 80 18 31 d4 fe 33 00 00 01 01 08 0a ...w..1. .3...... 0030 2a aa d1 d2 2a aa cd e9 00 09 01 6d 79 2d 74 6f *...*... ...my-to 0040 70 69 63 pic 

The 2nd subscription with a difference (above) is marked and explained. The same data is sent in the frame of the subscription.

  identification v 0000 02 00 00 00 45 00 00 3f ed be 40 00 40 06 00 00 ....E..? ..@. @... src port sequence number vvvvv 0010 7f 00 00 01 7f 00 00 01 fa e6 15 b6 17 da 02 e7 ........ ........ Acknowledgement number window scaling factor vvvvv 0020 71 4b 33 e6 80 18 31 d5 fe 33 00 00 01 01 08 0a qK3...1. .3...... timestamp value timestamp echo reply vvv |<-------- data ------- 0030 2a aa f8 2c 2a aa f4 45 00 09 01 6d 79 2d 74 6f *..,*..E ...my-to ------>| 0040 70 69 63 pic 
+6
source share
1 answer

I found a solution to this problem, and although I read the front documents to go back and forth, I did not see this. Key XPUB_VERBOSE . Add this line after backend initialization and everything will work fine

 backend.setsockopt(zmq.XPUB_VERBOSE, True) 

Here is an excerpt from the official documentation :

ZMQ_XPUB_VERBOSE : provide all subscribed messages on XPUB sockets Sets XPUB socket XPUB for new subscribers and cancels the subscription. The value 0 is the default value and transmits only new subscription messages for the upstream. A value of 1 goes through all subscription messages upstream.

Parameter value type int Unit value 0, 1 Default value 0 Applicable socket types ZMQ_XPUB

Pieter Hintjens has more information about this on his blog . This is the relevant section:

A few months ago, we added a neat little option ( ZMQ_XPUB_VERBOSE ) in XPUB that disables filtering of duplicate subscriptions. This now works for any number of subscribers. We use it as follows:

 void *publisher = zsocket_new (ctx, ZMQ_XPUB); zsocket_set_xpub_verbose (publisher, 1); zsocket_bind (publisher, "tcp://*:6001"); 

The description of the LVC template must be updated to reflect this parameter, since this template will not work otherwise.

+7
source

Source: https://habr.com/ru/post/989199/


All Articles