Zeromq: reset REQ / REP socket state

When you use the simple ZeroMQ REQ / REP pattern, you are dependent on the fixed sequence send () β†’ recv () / recv () β†’ send (). Since this article describes that you get in trouble when a participant disconnects in the middle of the request, because then you cannot just start by receiving the next request from another connection, but the state machine will force you to send the request to the disconnected one.

Is there a more elegant way to solve this problem, since the article is written?

Reconnecting is the only way to solve this problem (other than using REQ / REP, but use a different template)

+8
source share
5 answers

The good news is that with ZMQ 3.0 and later (in the modern era) you can set a timeout on the socket. As noted elsewhere, you should do this after creating the socket, but before plugging it in:

zmq_req_socket.setsockopt( zmq.RCVTIMEO, 500 ) # milliseconds

Then, when you are really trying to get a response (after you sent the message to the REP socket), you can catch an error that will be acknowledged if the timeout is exceeded:

  try: send( message, 0 ) send_failed = False except zmq.Again: logging.warning( "Image send failed." ) send_failed = True 

But! When this happens, as it is observed elsewhere, your socket will be in a funny state because it is still waiting for an answer. At the moment, I can not find anything that works reliably, except for only restarting the socket. Note that if you unplug () the socket and then reconnect (), it will still be in this bad state. So you need

 def reset_my_socket: zmq_req_socket.close() zmq_req_socket = zmq_context.socket( zmq.REQ ) zmq_req_socket.setsockopt( zmq.RCVTIMEO, 500 ) # milliseconds zmq_req_socket.connect( zmq_endpoint ) 

You will also notice that since I close the () d socket, the receive timeout option has been β€œlost”, so it is important to install it in a new socket.

Hope this helps. And I hope this is not the best answer to this question. :)

+6
source

Since the accepted answer seems so terrible to me, I did some research and found that all we need is really in the documentation.

.setsockopt() with the correct parameter can help you reset the state of a socket machine without brutal destruction and rebuild another one on top of the previous dead body.

(I like the image).

ZMQ_REQ_CORRELATE: response responses with requests
By default, the behavior of REQ sockets is to rely on message ordering to match requests and responses, and this is usually sufficient. If this option is set to 1 , then the REQ socket will be the prefix of outgoing messages with an additional frame containing the id request. This means the complete message (request id , identity , 0 , user frames… ). The REQ socket will discard all incoming messages that do not start with these two frames.
int value parameter type
Unit of measurement for parameter 0 , 1
The default value is 0
Applicable socket types ZMQ_REQ

ZMQ_REQ_RELAXED: relax the strict alternation between request and response

By default, the REQ socket does not allow a new request to be initiated using zmq_send(3) until a response to the previous one is received. If set to 1 , sending another message is allowed and the base connection to the peer is disconnected, from where a response was expected, initiating an attempt to reconnect to vehicles that support it. The request-response state machine is reset, and a new request is sent to the next available node.
If set to 1 , enable ZMQ_REQ_CORRELATE to ensure that requests and responses match correctly. Otherwise, the response to the canceled request may be presented as a response to the replacement request.
int value parameter type
Unit of measurement for parameter 0 , 1
The default value is 0
Applicable Socket ZMQ_REQ

Full documentation here

+22
source

There is one solution for this, and this adds timeouts for all calls. Since ZeroMQ alone does not provide a simple timeout, I recommend using the ZeroMQ socket subclass, which adds a timeout parameter to all important calls.

So, instead of calling s.recv (), you will call s.recv (timeout = 5.0), and if the answer does not return within this 5 second window, it will return None and stop the lock. I made a vain attempt when I ran into this problem.

+1
source

I am actually looking at this at the moment because I am upgrading an outdated system.

I constantly come across code that I need to know about the status of the connection. However, I want to move on to the messaging paradigm that the library supports.

I found the following function: zmq_socket_monitor

What he does is to keep track of the socket passed to him and to generate events that are then passed to the "inproc" endpoint - at this point you can add the processing code to actually do something.

Here is also an example (actually test code): github

I do not have a special code that can be given at the moment (maybe at the end of the week), but I intend to respond to connections and disconnections so that I can actually perform any reset of the required logic.

Hope this helps, and despite quoting 4.2 documents, I use 4.0.4, which seems to have functionality as well.

Note. I noticed that you are talking about python above, but the question is marked with C ++, so where my answer comes from ...

+1
source

Update: I am updating this answer with this excellent resource here: https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/ Socket programming is complicated, so check out the links in this post.

None of the answers here seem accurate or useful. The OP does not look for BSD socket programming information. He is trying to figure out how to reliably handle failures of accept () ed client sockets in ZMQ on the REP socket to prevent server hangs or crashes.

As already noted, this problem is complicated by the fact that ZMQ tries to pretend that the listen () server socket matches the accept () ed socket (and there is no place in the documentation that describes how to set base timeouts on such sockets.)

My answer:

After much digging in the code, the only relevant socket options passed to accept () ed media seem to support the parameters from the parent listen () er. Therefore, the solution is to set the following parameters on the listening socket before calling send or recv:

 void zmq_setup(zmq::context_t** context, zmq::socket_t** socket, const char* endpoint) { // Free old references. if(*socket != NULL) { (**socket).close(); (**socket).~socket_t(); } if(*context != NULL) { // Shutdown all previous server client-sockets. zmq_ctx_destroy((*context)); (**context).~context_t(); } *context = new zmq::context_t(1); *socket = new zmq::socket_t(**context, ZMQ_REP); // Enable TCP keep alive. int is_tcp_keep_alive = 1; (**socket).setsockopt(ZMQ_TCP_KEEPALIVE, &is_tcp_keep_alive, sizeof(is_tcp_keep_alive)); // Only send 2 probes to check if client is still alive. int tcp_probe_no = 2; (**socket).setsockopt(ZMQ_TCP_KEEPALIVE_CNT, &tcp_probe_no, sizeof(tcp_probe_no)); // How long does a con need to be "idle" for in seconds. int tcp_idle_timeout = 1; (**socket).setsockopt(ZMQ_TCP_KEEPALIVE_IDLE, &tcp_idle_timeout, sizeof(tcp_idle_timeout)); // Time in seconds between individual keep alive probes. int tcp_probe_interval = 1; (**socket).setsockopt(ZMQ_TCP_KEEPALIVE_INTVL, &tcp_probe_interval, sizeof(tcp_probe_interval)); // Discard pending messages in buf on close. int is_linger = 0; (**socket).setsockopt(ZMQ_LINGER, &is_linger, sizeof(is_linger)); // TCP user timeout on unacknowledged send buffer int is_user_timeout = 2; (**socket).setsockopt(ZMQ_TCP_MAXRT, &is_user_timeout, sizeof(is_user_timeout)); // Start internal enclave event server. printf("Host: Starting enclave event server\n"); (**socket).bind(endpoint); } 

This means that the operating system aggressively checks the client socket for timeouts and reaps them for cleaning when the client does not return the heartbeat in time. As a result, the operating system will send SIGPIPE back to your program, and socket errors will bubble when sending / restoring a fixed hung server. Then you need to do two more things:

1. Handle SIGPIPE errors so that the program does not crash

 #include <signal.h> #include <zmq.hpp> // zmq_setup def here [...] int main(int argc, char** argv) { // Ignore SIGPIPE signals. signal(SIGPIPE, SIG_IGN); // ... rest of your code after // (Could potentially also restart the server // sock on N SIGPIPEs if you're paranoid.) // Start server socket. const char* endpoint = "tcp://127.0.0.1:47357"; zmq::context_t* context; zmq::socket_t* socket; zmq_setup(&context, &socket, endpoint); // Message buffers. zmq::message_t request; zmq::message_t reply; // ... rest of your socket code here } 

2. Check -1 returned by send or recv, and find ZMQ errors.

 // Eg skip broken accepted sockets (pseudo-code.) while (1): { try { if ((*socket).recv(&request)) == -1) throw -1; } catch (...) { // Prevent any endless error loops killing CPU. sleep(1) // Reset ZMQ state machine. try { zmq::message_t blank_reply = zmq::message_t(); (*socket).send (blank_reply); } catch (...) { 1; } continue; } 

Notice the weird code that tries to send a response when a socket fails? In ZMQ, the server's β€œsocket” REP is the endpoint for another program that creates the REQ socket for this server. As a result, if you recv for the REP socket with a hung client, the server toe will get stuck in the broken receive cycle, where it will wait forever to get a valid response.

To force the update on the destination computer, try sending a response. ZMQ detects that the socket is broken, and removes it from its queue. The server socket is peeling off, and the next recv call returns a new client from the queue.

To enable timeouts on an asynchronous client (in Python 3 ), the code should look something like this:

 import asyncio import zmq import zmq.asyncio @asyncio.coroutine def req(endpoint): ms = 2000 # In milliseconds. sock = ctx.socket(zmq.REQ) sock.setsockopt(zmq.SNDTIMEO, ms) sock.setsockopt(zmq.RCVTIMEO, ms) sock.setsockopt(zmq.LINGER, ms) # Discard pending buffered socket messages on close(). sock.setsockopt(zmq.CONNECT_TIMEOUT, ms) # Connect the socket. # Connections don't strictly happen here. # ZMQ waits until the socket is used (which is confusing, I know.) sock.connect(endpoint) # Send some bytes. yield from sock.send(b"some bytes") # Recv bytes and convert to unicode. msg = yield from sock.recv() msg = msg.decode(u"utf-8") 

Now you have several failure scenarios when something goes wrong.

By the way, if anyone is interested, the default value for TCP idle timeout on Linux is 7200 seconds or 2 hours. So you will wait a long time until the hung server does something!

Sources:

Denial of responsibility:

I tested this code and it seems to work, but does ZMQ really complicate testing it because the client reconnects when it fails? If someone wants to use this solution in production, I recommend writing a few basic unit tests first.

Server code can also be greatly improved with threads or polling so that multiple clients can be processed simultaneously. In the existing state, a malicious client can temporarily receive resources from the server (waiting time 3 seconds), which is not ideal.

0
source

Source: https://habr.com/ru/post/978151/


All Articles