Communication scheme between clients and ZMQ clients

I lost a bit of reading the ZeroMQ documentation when I discovered these three socket combinations. It:

  • DEALER to ROUTER
  • DEALER TO DEALER
  • ROUTER to ROUTER

I understand that DEALER and ROUTER are replacements for synchronous REQ / REP communications, so they become asynchronous and can connect multiple nodes. I do not understand how DEALER can be a replacement for REQ and REP in DEALER DEALER (as well as a router in ROUTER to ROUTER).

I was looking for a template that allows an arbitrary number of clients to send jobs to an arbitrary number of workers (with load balancing) and return responses (and intermediate results) to the client (asynchronously, but sending several messages back). In addition, the client may require early termination of work. I find the documentation a little covered in this regard (I am not an expert for any reason and may have skipped the relevant section).

I am happy to figure out the details myself, but every time I think I found a suitable model, I find another that can be equally suitable (for example, these 3 templates are equally suitable, in my opinion: http: //zguide.zeromq. org / page: all # ROUTER-Broker-and-REQ-Workers , http://zguide.zeromq.org/page:all#ROUTER-Broker-and-DEALER-Workers , http://zguide.zeromq.org/ page: all # A-Load-Balancing-Message-Broker ).

Any advice on structure is welcome (which socket, for which the component should interact).

Update

This is what I came up with:

import multiprocessing
import zmq
import time

router_url_b = 'tcp://*:5560'
router_url = 'tcp://localhost:5560'

dealer_url_b = 'tcp://*:5561'
dealer_url = 'tcp://localhost:5561'


def broker():
    context = zmq.Context()
    router = context.socket(zmq.ROUTER)
    router.bind(router_url_b)

    dealer = context.socket(zmq.DEALER)
    dealer.bind(dealer_url_b)

    poll = zmq.Poller()
    poll.register(router, zmq.POLLIN)
    poll.register(dealer, zmq.POLLIN)

    while True:
        poll_result = dict(poll.poll())
        if router in poll_result:
            ident, msg = router.recv_multipart()
            print 'router: ident=%s, msg=%s' % (ident, msg)
            # print 'router received "%s" and ident %s' % (msg, ident)
            dealer.send_multipart([ident, msg])
            # dealer.send(msg)
        if dealer in poll_result:
            ident, msg = dealer.recv_multipart()
            print 'dealer: ident=%s, msg=%s' % (ident, msg)
            router.send_multipart([ident, msg])


def client(client_id):
    context = zmq.Context()
    req = context.socket(zmq.DEALER)
    # setting identity doesn't seem to make a difference
    req.setsockopt(zmq.IDENTITY, b"%s" % client_id)
    req.connect(router_url)

    req.send('work %d' % client_id)
    while True:
        msg = req.recv()
        print 'client %d received response: %s' % (client_id, msg)


def worker(worker_id):
    context = zmq.Context()
    # to allow asynchronous sending of responses.
    rep = context.socket(zmq.ROUTER)
    # not sure if this is required...
    # rep.setsockopt(zmq.IDENTITY, b"%s" % (10+worker_id))
    rep.connect(dealer_url)

    while True:
        msg = rep.recv_multipart()
        ident, msg = msg[:-1], msg[-1]
        print 'worker %d received: "%s", ident="%s"' % (worker_id, msg, ident)
        # do some work...
        time.sleep(10)
        rep.send_multipart(ident + ['result A from worker %d (%s)' % (worker_id, msg)])
        # do more work...
        time.sleep(10)
        rep.send_multipart(ident + ['result B from worker %d (%s)' % (worker_id, msg)])
    print 'finished worker', worker_id


def main():

    print 'creating workers'
    for i in xrange(2):
        p = multiprocessing.Process(target=worker, args=(i, ))
        p.daemon = True
        p.start()

    print 'creating clients'
    for i in xrange(5):
        p = multiprocessing.Process(target=client, args=(i, ))
        p.daemon = True
        p.start()

    broker()


if __name__ == '__main__':
    main()

, . , , - , . , - (pub/sub) , .

:

  • ?
  • IDENTITY? , , ( , ).
  • , : worker 1 received: "work 3", ident="['\x00\x80\x00A\xa7', '3']" worker 0 received: "work 4", ident="['\x00\x80\x00A\xa7', '4']" ident ? , , , - , . (, , )?
+4
1

, . .

DEALER, ROUTER (asyn + many clients), () DEALER (asyn), DEALER (asyn, , ROUTER ).

, REP, , . ; , .

- / zmq.proxy(router, dealer) (while True loop in broker()).

-, ZMQ. ROUTER to ROUTER. , . , , ( , ZMQ).

+4

Source: https://habr.com/ru/post/1584337/


All Articles