Is PGM stable?
FYI: works with v 2.1.1, and today we have stable 4.2. +
, , PGM/EPGM, , , / , { localhost | home-subnet | remote-network(s) | remote-host(s) }.
[PUB] - :
, , SLOC, - :
ØMQ , ZMQ_SNDHWM, 60-70% .
, , [PUB] - , - .
O/S:
pgm IP-. . , PGM, epgm, .
[SUB]:
- [PUB] - / [SUB] -receiver:
------------------------------------------------
#include "stdafx.h"
#include <stdio.h>
#include "zmq.hpp"
#include <chrono> // since C++ 11
typedef std::chrono::high_resolution_clock nanoCLK;
#define ZMQ_IO_THREAD_POOL_SIZE 8
#define ZMQ_AFINITY_PLAIN_ROUNDROBIN_UNMANAGED_RISKY 0
#define ZMQ_AFINITY_LO_PRIO_POOL 0 | 1
#define ZMQ_AFINITY_HI_PRIO_POOL 0 | 0 | 2
#define ZMQ_AFINITY_MC_EPGM_POOL 0 | 0 | 0 | 4 | 8 | 0 | 0 | 64 | 128
int main( int argc, char* argv[] ) {
auto RECV_start = nanoCLK::now();
auto RECV_ret = nanoCLK::now();
auto RECV_last = nanoCLK::now();
auto TEST_start = nanoCLK::now();
try {
zmq::context_t context( ZMQ_IO_THREAD_POOL_SIZE ); printf ( "Connecting to server..." );
int major, minor, patch;
zmq::version( &major, &minor, &patch ); printf ( "Using ZeroMQ( %d.%d.%d )", major, minor, patch );
zmq::socket_t *s1 = new zmq::socket_t( context, ZMQ_SUB );
int zmqLinger = 0,
zmqAffinity = 0,
recvBuffer = 2 * 123456,
recvMaxSize = 9876,
recvHwMark = 123456,
recvRate = 80000 * 10,
recvTimeout = 3000,
recoverMSEC = 60 * 60
;
s1->setsockopt ( ZMQ_AFFINITY, &zmqAffinity, sizeof(int) );
s1->setsockopt ( ZMQ_LINGER, &zmqLinger, sizeof(int) );
s1->setsockopt ( ZMQ_MAXMSGSIZE, &recvMaxSize, sizeof(int) );
s1->setsockopt ( ZMQ_RCVBUF, &recvBuffer, sizeof(int) );
s1->setsockopt ( ZMQ_RCVHWM, &recvHwMark, sizeof(int) );
s1->setsockopt ( ZMQ_RCVTIMEO, &recvTimeout, sizeof(int) );
s1->setsockopt ( ZMQ_RATE, &recvRate, sizeof(int) );
s1->connect ( "pgm://local_IP;239.255.0.20:30001" );
s1->setsockopt ( ZMQ_SUBSCRIBE, NULL, 0 ); printf ( "done. \n" );
int seq = 0;
while( true ) {
zmq::message_t msgbuff; RECV_start = nanoCLK::now(); RECV_last = RECV_ret;
int ret = s1->recv( &msgbuff, 0 ); RECV_ret = nanoCLK::now();
if ( !ret ) printf ( "[T0+ %14d [ns]]: [SUB] did not receive any message within set timeout(%d). RC == %d LOOP_ovhd == %6d [ns] RECV_wait == %10d [ns]\n", std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - TEST_start ).count(), recvTimeout, ret, std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_last ).count(), std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_start ).count() );
else printf ( "[T0+ %14d [ns]]: [SUB] did now receive a message SEQ#(%6d.) DATA[%6d] B. RC == %d LOOP_ovhd == %6d [ns] RECV_wait == %10d [ns]\n", std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - TEST_start ).count(), ++seq, msgbuff.size(), ret, std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_last ).count(), std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_start ).count() );
}
}
catch( zmq::error_t &e ) { printf ( "[T0+ %14d [ns]]: [EXC.ZMQ] An error occurred: %s\nWill RET(1)", std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - TEST_start ).count(), e.what() );
return 1;
}
catch( std::exception &e ) { printf ( "[T0+ %14d [ns]]: [EXC.std] An error occurred: %s\nWill RET(1)", std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - TEST_start ).count(), e.what() );
return 1;
}
return 0;
}