Why ZeroMQ PGM multicast gets stuck in between & doesn’t accept packets further

ZeroMQ (version - zeromq-4.1.6) The PGM multicast packet gets stuck between them, even the sender still sends the packets without any problems.

If we restart the receiver, the application will now receive packets, but this will not be a solution. I tried with different ZMQ_RATEsender and receiver side.

Question:

The sender sends almost 300,000 packets with the following socket parameters, but the receiver stands between them and does not receive all packets. If we add Sleep( 2 ), we expect 2 ms in each sending, sometimes we receive all packets, but this takes more time.

Environment setup:

(The sender and receiver are connected on the same subnet using the D-Link switch. The data transfer rate is 1 Gb / s)

Sender: JZMQ ( ZMQ C library, openPGM )
ZMQ_RATE - 30Mbps ( Megabits per second )
Packet size - 1024 bytes
ZMQ_RECOVERY_IVL - 2 Minutes
Send Flag - 0 ( blocking mode )
Sleep( 2ms ) - sometimes its working without any issue but taking more time for transfer.
Platform - Windows

Receiver: ZMQ C++ ( ZMQ C library, openPGM )
ZMQ_RATE - 30Mbps ( Megabits per second )
ZMQ_RCVTIMEO - 3 Secs
receive Flag - 0 ( blocking mode )
Platform - Windows

What could be the problem?

Is ZeroMQ PGM-multicast not a stable library?

JZMQ Sender:
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.PUB);
socket.setRate(80000);
socket.setRecoveryInterval(60*60);
socket.setSendTimeOut(-1);
socket.setSendBufferSize(1024*64);
socket.bind("pgm://local_IP;239.255.0.20:30001");

byte[] bytesToSend = new byte[1024];
int count = 0;
while(count < 300000) {
    socket.send(bytesToSend, 0);
    count++;
}

------------------------------------------------
// ZMQCPP-PGM-receive.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include <stdio.h>
#include "zmq.hpp"


int main(int argc, char* argv[]) {
    try {

         zmq::context_t context(1);

      // Socket to talk to server
         printf ("Connecting to server...");

         zmq::socket_t *s1 = new zmq::socket_t(context, ZMQ_SUB);

         int recvTimeout = 3000;
         s1->setsockopt(ZMQ_RCVTIMEO,&recvTimeout,sizeof(int));

         int recvRate = 80000;
         s1->setsockopt(ZMQ_RATE,&recvRate,sizeof(int));

         int recsec = 60 * 60;
      // s1->setsockopt(ZMQ_RECOVERY_IVL,&recsec,sizeof(recsec));

         s1->connect("pgm://local_IP;239.255.0.20:30001");

         s1->setsockopt (ZMQ_SUBSCRIBE, NULL, 0);

         printf ("done. \n");
         int seq=0;
         while(true) {

               zmq::message_t msgbuff;

               int ret = s1->recv(&msgbuff,0);
               if(!ret)
               {
                   printf ("Received not received timeout\n");
                   continue;
               }

               printf ("Seq(%d) Received data size=%d\n",seq,msgbuff.size());
               ++seq;
         }
    }
    catch( zmq::error_t &e )   {
           printf ("An error occurred: %s\n", e.what());
           return 1;
    }
    catch( std::exception &e ) {
           printf ("An error occurred: %s\n", e.what());
           return 1;
    }
    return 0;
}
+4
source share
1 answer

Is PGM stable?
FYI: works with v 2.1.1, and today we have stable 4.2. +

, , PGM/EPGM, , , / , { localhost | home-subnet | remote-network(s) | remote-host(s) }.


[PUB] - :

, , SLOC, - :

ØMQ , ZMQ_SNDHWM, 60-70% .

, , [PUB] - , - .

O/S:

pgm IP-. . , PGM, epgm, .


[SUB]:

- [PUB] - / [SUB] -receiver:

------------------------------------------------
// ZMQCPP-PGM-receive.cpp : Defines the entry point for the console application.
//                          MODs: https://stackoverflow.com/q/44526517/3666197

#include "stdafx.h"
#include <stdio.h>
#include "zmq.hpp"

#include <chrono>                                                       // since C++ 11
typedef std::chrono::high_resolution_clock              nanoCLK;

#define ZMQ_IO_THREAD_POOL_SIZE                         8

#define ZMQ_AFINITY_PLAIN_ROUNDROBIN_UNMANAGED_RISKY    0
#define ZMQ_AFINITY_LO_PRIO_POOL                        0 | 1
#define ZMQ_AFINITY_HI_PRIO_POOL                        0 | 0 | 2
#define ZMQ_AFINITY_MC_EPGM_POOL                        0 | 0 | 0 | 4 | 8 | 0 | 0 | 64 | 128


int main( int argc, char* argv[] ) {

    auto RECV_start = nanoCLK::now();
    auto RECV_ret   = nanoCLK::now();
    auto RECV_last  = nanoCLK::now();
    auto TEST_start = nanoCLK::now();

    try {
           zmq::context_t context( ZMQ_IO_THREAD_POOL_SIZE );           printf ( "Connecting to server..." );
           int            major,  minor,  patch;
           zmq::version( &major, &minor, &patch );                      printf ( "Using ZeroMQ( %d.%d.%d )", major, minor, patch );

           zmq::socket_t *s1 = new zmq::socket_t( context, ZMQ_SUB );   // Socket to talk to server

           int zmqLinger   =       0,          // [  ms]
               zmqAffinity =       0,          // [   #]  mapper bitmap-onto-IO-thread-Pool (ref. #define-s above )

               recvBuffer  =       2 * 123456, // [   B]
               recvMaxSize =    9876,          // [   B]
               recvHwMark  =  123456,          // [   #]  max number of MSGs allowed to be Queued per connected Peer

               recvRate    =   80000 * 10,     // [kbps]
               recvTimeout =    3000,          // [  ms]  before ret EAGAIN { 0: NO_BLOCK | -1: INF | N: wait [ms] }
               recoverMSEC =      60 * 60      // [  ms]
               ;

           s1->setsockopt ( ZMQ_AFFINITY,     &zmqAffinity, sizeof(int) );
           s1->setsockopt ( ZMQ_LINGER,       &zmqLinger,   sizeof(int) );
           s1->setsockopt ( ZMQ_MAXMSGSIZE,   &recvMaxSize, sizeof(int) );
           s1->setsockopt ( ZMQ_RCVBUF,       &recvBuffer,  sizeof(int) );
           s1->setsockopt ( ZMQ_RCVHWM,       &recvHwMark,  sizeof(int) );
           s1->setsockopt ( ZMQ_RCVTIMEO,     &recvTimeout, sizeof(int) );
           s1->setsockopt ( ZMQ_RATE,         &recvRate,    sizeof(int) );
     //    s1->setsockopt ( ZMQ_RECOVERY_IVL, &recoverMSEC, sizeof(int) );

           s1->connect ( "pgm://local_IP;239.255.0.20:30001" );
           s1->setsockopt ( ZMQ_SUBSCRIBE, NULL, 0 );                   printf ( "done. \n" );

           int seq = 0;
           while( true ) {
                  zmq::message_t         msgbuff;                  RECV_start = nanoCLK::now(); RECV_last = RECV_ret;
                  int   ret = s1->recv( &msgbuff, 0 );             RECV_ret   = nanoCLK::now();
                  if ( !ret )                                           printf ( "[T0+ %14d [ns]]: [SUB] did not receive any message within set timeout(%d). RC == %d LOOP_ovhd == %6d [ns] RECV_wait == %10d [ns]\n", std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - TEST_start ).count(),           recvTimeout, ret, std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_last ).count(), std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_start ).count() );
                  else                                                  printf ( "[T0+ %14d [ns]]: [SUB] did now receive   a message SEQ#(%6d.) DATA[%6d] B. RC == %d LOOP_ovhd == %6d [ns] RECV_wait == %10d [ns]\n", std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - TEST_start ).count(), ++seq, msgbuff.size(), ret, std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_last ).count(), std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_start ).count() );
           }
    }
    catch( zmq::error_t   &e ) {                                        printf ( "[T0+ %14d [ns]]: [EXC.ZMQ] An error occurred: %s\nWill RET(1)", std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - TEST_start ).count(), e.what() );
           return 1;
    }
    catch( std::exception &e ) {                                        printf ( "[T0+ %14d [ns]]: [EXC.std] An error occurred: %s\nWill RET(1)", std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - TEST_start ).count(), e.what() );
           return 1;
    }
    return 0;
}
0

Source: https://habr.com/ru/post/1679163/


All Articles