Why does ZeroMQ PUB log messages without connected subscribers? (Well, "disabled" SUB-s)

I see strange behavior using . ZMQ_PUB

I have a manufacturer that has .connect()-s for different processes that are .bind()on ZMQ_SUBsockets.

All subscribers .bind(), publisher .connect()-s.

When the manufacturer starts, it creates a socket ZMQ_PUBand .connect()-s it for different processes. Then he immediately starts sending messages in a regular period.

As expected, if there are no connected subscribers, it will turn off all messages until the subscriber starts.

The flow works fine, then when the subscriber starts up, he receives messages from this moment.

Now the problem is:

  • I disconnect the subscriber (stopping the process).
  • At the moment there are no active subscribers, since I stopped the only one. The manufacturer continues to send messages that should be discarded, since there are no connected subscribers anymore ...
  • I restart the original subscriber, he communicates, the publisher reconnects ... and the subscriber receives all messages created during this time.

So, I see that the producer set all the messages while the subscriber was turned off. As soon as the socket reconnected, as the subscription process restarted, it sent all messages in the queue.

As I understood from here , the publisher should delete all sent messages when there are no connected subscribers:

ZeroMQ Examples

"The publisher has no connected subscribers, then it will simply delete all messages."

Why is this happening?

, ++ linux .

, , . , .

,


UPDATE:

!!!!!
. , ZMQ_LINGER 0, .
ZMQ:IMMEDIATE, , , ZMQ:IMMEDIATE . ZMQ_LINGER.
3

UPDATE: , . - , uri, , :

$./sub tcp://127.0.0.1: 50001

- , uris , :

./pub tcp://127.0.0.1: 50001 tcp://127.0.0.1: 50002

5 , . FIN/ACK, , TIME_WAIT. SYN, ( , ZMQ_PUB , )

, . , , .

, : ( ), , . 5 . , . , . , /, . , . , , , , " encarnation", .

, ZMQ_PUB ( ) , , .

, .

Pub:

#include <stdio.h>
#include <stdlib.h>
#include <libgen.h>
#include <unistd.h>

#include <string>
#include <zeromq/zmq.hpp>

int main( int argc, char *argv[] )
{
    if ( argc < 2 )
    {
        fprintf( stderr, "Usage : %s <remoteUri1> [remoteUri2...]\n",   
        basename( argv[0] ) );
        exit ( EXIT_FAILURE );
    }

    std::string pLocalUri( argv[1] );
    zmq::context_t localContext( 1 );
    zmq::socket_t *pSocket = new zmq::socket_t( localContext, ZMQ_PUB );
    if ( NULL == pSocket )
    {
        fprintf( stderr, "Couldn't create socket. Aborting...\n" );
        exit ( EXIT_FAILURE );
    }

    int i;
    try
    {
        for ( i = 1; i < argc; i++ )
        {
            printf( "Connecting to [%s]\n", argv[i] );
            {
                pSocket->connect( argv[i] );
            }
        }
    }
    catch( ... )
    {
        fprintf( stderr, "Couldn't connect socket to %s. Aborting...\n", argv[i] );
        exit ( EXIT_FAILURE );
    }

    printf( "Publisher Up and running... sending messages\n" );
    fflush(NULL);

    int msgCounter = 0;
    do
    {
        try
        {
            char msgBuffer[1024];
            sprintf( msgBuffer, "Message #%d", msgCounter++ );
            zmq::message_t outTask( msgBuffer, strlen( msgBuffer ) + 1 );
            printf("Sending message [%s]\n", msgBuffer );
            pSocket->send ( outTask );
            sleep( 1 );
        }
        catch( ... )
        {
            fprintf( stderr, "Some unknown error ocurred. Aborting...\n" );
            exit ( EXIT_FAILURE );
        }
    }
    while ( true );

    exit ( EXIT_SUCCESS );
}

#include <stdio.h>
#include <stdlib.h>
#include <libgen.h>
#include <unistd.h>

#include <string>
#include <zeromq/zmq.hpp>

int main( int argc, char *argv[] )
{
    if ( argc != 2 )
    {
        fprintf( stderr, "Usage : %s <localUri>\n", basename( argv[0] ) );
        exit ( EXIT_FAILURE );
    }

    std::string pLocalUri( argv[1] );
    zmq::context_t localContext( 1 );
    zmq::socket_t *pSocket = new zmq::socket_t( localContext, ZMQ_SUB );
    if ( NULL == pSocket )
    {
        fprintf( stderr, "Couldn't create socket. Aborting...\n" );
        exit ( EXIT_FAILURE );
    }
    try
    {
        pSocket->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
        pSocket->bind( pLocalUri.c_str() );
    }
    catch( ... )
    {
        fprintf( stderr, "Couldn't bind socket. Aborting...\n" );
        exit ( EXIT_FAILURE );
    }

    int msgCounter = 0;
    printf( "Subscriber Up and running... waiting for messages\n" );
    fflush( NULL );

    do
    {
        try
        {
            zmq::message_t inTask;
            pSocket->recv ( &inTask );
            printf( "Message received : [%s]\n", inTask.data() );
            fflush( NULL );
            msgCounter++;
        }
        catch( ... )
        {
            fprintf( stderr, "Some unknown error ocurred. Aborting...\n" );
            exit ( EXIT_FAILURE );
        }
    }
    while ( msgCounter < 5 );

    // pSocket->setsockopt( ZMQ_UNSUBSCRIBE, "", 0 ); NOT UNSUBSCRIBING
    pSocket->close();
    exit ( EXIT_SUCCESS );
}
+4
2

: ?

SUB ( "" ).

, , SUB -, .bind() - .connect() -media, , - "" .

, PUB -side , , SUB - ( PUB -side), "" - , ZeroMQ (a PUB -side duty) a ( , ) SUB -, PUB -side ( - , - CPU - concurrency - ( | ) .).

, ...

, SUB ( ZMQ_LINGER + .close() - ) PUB -side "" - "" - "" PUB "-FSA", "" "" -ed SUB - , .

"Distributed-FSA" " ". KILL - , "-FSA" , , .


?

, , . ZeroMQ , ( ) .


FSA ( FSA -FSA-s)

, , , , FSA-FSA - , .Context() 1:1 PUB/SUB, KILL - FSA-s SUB, , PUB -side. TCP- ( PUB -side, SUB -side) [ ESTABLISHED] [ CLOSED].


FSA-FSA

( FSA TCP-)

PUB -side:

enter image description here


.socket( .. ) FSA:

enter image description here


SUB -side:

enter image description here

(Courtesy nanomsg).

+2

, , .

1:

, :

  • bind
  • connect

'coz, , , , , .

2:

-, :

  • (SIGINT)
  • :
    • unsubscribe
    • close
    • 0

UPDATE:

, , . zeromq, .

, . ROUTER . 'Coz ROUTERsockets , REQ/REP . Async , . n/w uuid ..

UPDATE:

zeromq, PUB/SUB , Publisher ( ), ( ).

, ZMQ_IMMEDIATE ZMQ_DELAY_ATTACH_ON_CONNECT

en queue, .

+1

Source: https://habr.com/ru/post/1667020/


All Articles