ZeroMQ: how to prioritize sockets in .poll () method?

Provide the following code:

import threading, zmq, time

context = zmq.Context()
receivers = []
poller = zmq.Poller()

def thread_fn(number: int):
    sender = context.socket(zmq.PUSH)
    sender.connect("tcp://localhost:%d" % (6666 + number))
    for i in range(10):
        sender.send_string("message from thread %d" % number)

for i in range(3):
    new_receiver = context.socket(zmq.PULL)
    new_receiver.bind("tcp://*:%d" % (6666 + i))
    poller.register(new_receiver, zmq.POLLIN)
    receivers.append(new_receiver)
    threading.Thread(target=lambda: thread_fn(i), daemon=True).start()

while True:
    try:
        socks = dict(poller.poll())
    except KeyboardInterrupt:
        break

    for i in range(3):
        if receivers[i] in socks:
            print("%d: process message %s" % (i, receivers[i].recv_string()))
            time.sleep(0.2)  # 'process' the data

Streams send some messages without interruptions, which arrive in some random order in the corresponding PULL-connections, where they receive "processed" ones.

Note:you usually connect to a single PULL-socket, but this example is intended to provide more than one receive socket.

Output:

0: process message message from thread 0
1: process message message from thread 1
0: process message message from thread 0
1: process message message from thread 1
2: process message message from thread 2
0: process message message from thread 0
1: process message message from thread 1
2: process message message from thread 2
....

Now I want to read from all sockets, as in the example, but I would like to give priority to one socket.

Ie: I want the result to be:

0: process message message from thread 0   <-- socket 0 processed first
0: process message message from thread 0
0: process message message from thread 0
0: process message message from thread 0
0: process message message from thread 0
1: process message message from thread 1
1: process message message from thread 1
2: process message message from thread 2
1: process message message from thread 1
2: process message message from thread 2
....

Of course, I can just poll the sockets separately with timeout=0, but I want to make sure that ZeroMQ is not doing this for me already.

So the questions are:

Q1:
( .poll( timeout ))
,
,

Q2:
?

+4
1

() -chaos,

A1: , TL;DR TL;DR zmq.select() zmq.select() API/python
A2: TL;DR a .

Q2 , , , ( ) modus operandi.

?

, , :

# --------------------------------------------------------
# FIRST scan all HI-PRIO socket(s) for incoming messages:
while true:
    # process 'em first, based on a ZeroMQ-socket behaviour-fixed ordering
    ...
    break
# --------------------------------------------------------
# NEXT  scan all LO-PRIO socket(s) for incoming messages:
while true:
    # process 'em, again, based on a ZeroMQ-socket behaviour-fixed ordering
    ...
    break

, , , B - , , FSA- , DoS-, , , , .

, , , "" ( ), " " HI-PRIO.

, , Apollo Guidance Computer (AGC) MIT, - , enter image description here, " ". " , , , ( ) - " ".

, , Apollo.

, .

ZeroMQ python , Poller(), - Poller() - , , . , /.

    # ------------------------------------------------------------
    # Initialize separate engines for polling set(s)
    HiPRIOpoller = zmq.Poller()
    LoPRIOpoller = zmq.Poller()

    # ------------------------------------------------------------
    # Associate 
    HiPRIOpoller.register( socket_0_pull, zmq.POLLIN )         # 0:

    LoPRIOpoller.register( ... ,          zmq.POLLIN )         # 1:
    LoPRIOpoller.register( ... ,          zmq.POLLIN )         # 2:
    ...

    # ------------------------------------------------------------
    # Detect, who is waiting in front of the closed door
    aListOfHiPRIOevents = HiPRIOpoller.poll( timeout = 0.200 ) # 200 [us]
    aListOfLoPRIOevents = LoPRIOpoller.poll( timeout = 0 )     # no wait at all

    # ------------------------------------------------------------
    # Now AFTER you have a COMPLETE view what is waiting there
    # one
    # CAN & SHALL ADAPT order / scope of event-handling,
    #             IMMUNE to infinite-PRIO-event-flow.
    ...
    # ------------------------------------------------------------

Poller.poll() list , . ( socket, event ), { a-0MQ-Socket-instance | integer-system-native-fd } { a-0MQ-Socket-instance | integer-system-native-fd }, - ( POLLIN, POLLOUT ). aDictOfEVENTs = dict( aPoller.poll() ), { aSocket: anEvent,... } .

,

: MIT

, , . - , 60- , , ( ) .. ,


() -:

, .

, , .

ZERO-BUG, - , - , , , - - , , " " .

.
, .
, , .

+3

Source: https://habr.com/ru/post/1649383/


All Articles