Network Client Simulator Design

I am trying to create software in C ++ that will send request bytes (according to the protocol of the standard ** application level **, the fields that will be filled will be taken from a text file) using ** the UDP protocol **.

Now this client should be able to send these requests at a very high speed ... up to ** 2000 transactions per second ** and also receive a response if it falls into the specified timeout, does not yet receive it

I will use an additional library for all sockets, but I'm not sure of its design for such a high-speed application :(

I think I need to use a multi-threaded application (Boost will be used again). I'm right? Do I need to create a separate thread for each request? But I think that only one thread needs to wait in order to get an answer yet, if many threads are waiting for an answer, how can we distinguish for which request flows we have an answer!

Hope this question is clear. I just need help regarding design points and possible problems that I may run into.

+3
source share
2 answers

I am halfway through my network client at the moment, so maybe I can share some tips and some resources to watch. There are many more experienced in this area, and hopefully they will call in :)

-, . , , boost::asio - . , io_service run runOne -. . , runOne :

boost::asio::io_service myIOService;
while(true)
{
    myIOService.runOne();
}

run ( ) :

boost::thread t(boost::bind(&boost::asio::io_service::run, &myIOService));

, run , ( ). , Stackoverflow, , - . boost::asio::io_service::work:

boost::asio::io_service::work myWork(myIOService);   // just some abstract "work"

, , . :)

- -. Socket ( ). , #. , :

, , , EventArgs:

eventArgs.h

 class EventArgs : boost::noncopyable
 {
 private:

 public:
  EventArgs();
  virtual ~EventArgs() = 0;
 }; // eo class EventArgs:

, /:

event.h

// STL
#include <functional>
#include <stack>

// Boost
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>

 // class Event
 class Event : boost::noncopyable
 {
 public:
  typedef std::function<void(const EventArgs&)> DelegateType;
  typedef boost::shared_ptr<DelegateType> DelegateDecl;

 private:
  boost::mutex m_Mutex;
  typedef std::set<DelegateDecl> DelegateSet;
  typedef std::stack<DelegateDecl> DelegateStack;
  typedef DelegateSet::const_iterator DelegateSet_cit;
  DelegateSet m_Delegates;
  DelegateStack m_ToRemove;

 public:
  Event()
  {
  }; // eo ctor


  Event(Event&& _rhs) : m_Delegates(std::move(_rhs.m_Delegates))
  {
  }; // eo mtor

  ~Event()
  {
  }; // eo dtor

  // static methods
  static DelegateDecl bindDelegate(DelegateType _f)
  {
   DelegateDecl ret(new DelegateType(_f));
   return ret;
  }; // eo bindDelegate

  // methods
  void raise(const EventArgs& _args)
  {
   boost::mutex::scoped_lock lock(m_Mutex);

   // get rid of any we have to remove
   while(m_ToRemove.size())
   {
    m_Delegates.erase(m_Delegates.find(m_ToRemove.top()));
    m_ToRemove.pop();
   };

   if(m_Delegates.size())
   std::for_each(m_Delegates.begin(),
        m_Delegates.end(),
        [&_args](const DelegateDecl& _decl) { (*_decl)(_args); });
  }; // eo raise

  DelegateDecl addListener(DelegateDecl _decl)
  {
   boost::mutex::scoped_lock lock(m_Mutex);
   m_Delegates.insert(_decl);
   return _decl;
  }; // eo addListener

  DelegateDecl addListener(DelegateType _f)
  {
   DelegateDecl ret(bindDelegate(_f));
   return addListener(ret);
  }; // eo addListener


  void removeListener(const DelegateDecl _decl)
  {
   boost::mutex::scoped_lock lock(m_Mutex);
   DelegateSet_cit cit(m_Delegates.find(_decl));
   if(cit != m_Delegates.end())
    m_ToRemove.push(_decl);
  }; // eo removeListener

  // operators

  // Only use operator += if you don't which to manually detach using removeListener
  Event& operator += (DelegateType _f)
  {
   addListener(_f);
   return *this;
  }; // eo op +=

 }; // eo class Event

. :

socket.h

( : ByteVector is typedef std::vector<unsigned char>)

#pragma once

#include "event.h"

// boost
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/buffer.hpp>
  // class Socket
  class MORSE_API Socket : boost::noncopyable
  {
  protected:
   typedef boost::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;

  private:
   ByteVector      m_Buffer;   // will be used to read in

   SocketPtr        m_SocketPtr;
   boost::asio::ip::tcp::endpoint      m_RemoteEndPoint;
   bool         m_bConnected;

   // reader
   void _handleConnect(const boost::system::error_code& _errorCode, boost::asio::ip::tcp::resolver_iterator _rit);
   void _handleRead(const boost::system::error_code& _errorCode, std::size_t read);
  protected:

   SocketPtr socket() { return m_SocketPtr; };
  public:
   Socket(ByteVector_sz _bufSize = 512);
   virtual ~Socket();

   // properties
   bool isConnected() const { return m_bConnected; };
   const boost::asio::ip::tcp::endpoint& remoteEndPoint() const {return m_RemoteEndPoint; };

   // methods
   void connect(boost::asio::ip::tcp::resolver_iterator _rit);
   void connect(const String& _host, const Port _port);
   void close();

   // Events
   Event onRead;
   Event onResolve;
   Event onConnect;
   Event onClose;
  }; // eo class Socket

, . , DNS. . EventArg -, . EventArg, .

socket.cpp

#include "socket.h"


// boost
#include <boost/asio/placeholders.hpp>

namespace morse
{
 namespace net
 {
  // ctor
  Socket::Socket(ByteVector_sz _bufSize /* = 512 */) : m_bConnected(false)
  {
   m_Buffer.resize(_bufSize);
  }; // eo ctor

  // dtor
  Socket::~Socket()
  {
  }; // eo dtor


  // _handleRead
  void Socket::_handleRead(const boost::system::error_code& _errorCode,
            std::size_t _read)
  {
   if(!_errorCode)
   {
    if(_read)
    {
     onRead.raise(SocketReadEventArgs(*this, m_Buffer, _read));
     // read again
     m_SocketPtr->async_read_some(boost::asio::buffer(m_Buffer), boost::bind(&Socket::_handleRead, this, _1, _2));
    };
   }
   else
    close();
  }; // eo _handleRead


  // _handleConnect
  void Socket::_handleConnect(const boost::system::error_code& _errorCode,
         boost::asio::ip::tcp::resolver_iterator _rit)
  {
   m_bConnected = !_errorCode;
   bool _raise(false);
   if(!_errorCode)
   {
    m_RemoteEndPoint = *_rit;
    _raise = true;
    m_SocketPtr->async_read_some(boost::asio::buffer(m_Buffer), boost::bind(&Socket::_handleRead, this, _1, _2));
   }
   else if(++_rit != boost::asio::ip::tcp::resolver::iterator())
   {
    m_SocketPtr->close();
    m_SocketPtr->async_connect(*_rit, boost::bind(&Socket::_handleConnect, this, boost::asio::placeholders::error, _rit));
   }
   else
    _raise = true; // raise complete failure

   if(_raise)
    onConnect.raise(SocketConnectEventArgs(*this, _errorCode));

  }; // eo _handleConnect


  // connect
  void Socket::connect(boost::asio::ip::tcp::resolver_iterator _rit)
  {
   boost::asio::ip::tcp::endpoint ep(*_rit);
   m_SocketPtr.reset(new boost::asio::ip::tcp::socket(Root::instance().ioService()));
   m_SocketPtr->async_connect(ep, boost::bind(&Socket::_handleConnect, this, boost::asio::placeholders::error, _rit));
  };


  void Socket::connect(const String& _host, Port _port)
  {
   // Anon function for resolution of the host-name and asynchronous calling of the above
   auto anonResolve = [this](const boost::system::error_code& _errorCode, 
           boost::asio::ip::tcp::resolver_iterator _epIt)
   {
    // raise event
    onResolve.raise(SocketResolveEventArgs(*this, !_errorCode ? (*_epIt).host_name() : String(""), _errorCode));

    // perform connect, calling back to anonymous function
    if(!_errorCode)
     this->connect(_epIt);
   };

   // Resolve the host calling back to anonymous function
   Root::instance().resolveHost(_host, _port, anonResolve);

  }; // eo connect


  void Socket::close()
  {
   if(m_bConnected)
   {
    onClose.raise(SocketCloseEventArgs(*this));
    m_SocketPtr->close();
    m_bConnected = false;
   };
  } // eo close

DNS, Root::instance().resolveHost(_host, _port, anonResolve); DNS:

  // resolve a host asynchronously
  template<typename ResolveHandler>
  void resolveHost(const String& _host, Port _port, ResolveHandler _handler)
  {
   boost::asio::ip::tcp::endpoint ret;
   boost::asio::ip::tcp::resolver::query query(_host, boost::lexical_cast<std::string>(_port));
   m_Resolver.async_resolve(query, _handler);
  }; // eo resolveHost

, , , ( ). . , Event, onLine, , :

// boost
#include <boost/asio/buffer.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/placeholders.hpp>

namespace morse
{
 namespace net
 {
  String TextSocket::m_DefaultEOL("\r\n");

  // ctor
  TextSocket::TextSocket() : m_EOL(m_DefaultEOL)
  {
   onRead += boost::bind(&TextSocket::readHandler, this, _1);
  }; // eo ctor


  // dtor
  TextSocket::~TextSocket()
  {
  }; // eo dtor


  // readHandler
  void TextSocket::readHandler(const EventArgs& _args)
  {
   auto& args(static_cast<const SocketReadEventArgs&>(_args));
   m_LineBuffer.append(args.buffer().begin(), args.buffer().begin() + args.bytesRead());
   String::size_type pos;
   while((pos = m_LineBuffer.find(eol())) != String::npos)
   {
    onLine.raise(SocketLineEventArgs(*this, m_LineBuffer.substr(0, pos)));
    m_LineBuffer = m_LineBuffer.substr(pos + eol().length());
   };
  }; // eo readHandler


  // writeHandler
  void TextSocket::writeHandler(const boost::system::error_code& _errorCode, std::size_t _written)
  {
   if(!_errorCode)
   {
    m_Queue.pop_front();
    if(!m_Queue.empty()) // more to do?
     boost::asio::async_write(*socket().get(), boost::asio::buffer(m_Queue.front(), m_Queue.front().length()), boost::bind(&TextSocket::writeHandler, this, _1, _2));
   }
   else
    close();
  }; // eo writeHandler

  void TextSocket::sendLine(String _line)
  {
   Root::instance().ioService().post(boost::bind(&TextSocket::_sendLine, this, _line));
  }; // eo sendLine


  // _sendLine
  void TextSocket::_sendLine(String _line)
  {
   // copy'n'queue
   _line.append(m_EOL);
   m_Queue.push_back(_line);
   if(m_Queue.size() == 1) // previously an empty queue, must start write!
    boost::asio::async_write(*socket().get(), boost::asio::buffer(m_Queue.front(), m_Queue.front().length()), boost::bind(&TextSocket::writeHandler, this, _1, _2));
  }; // eo sendLine

, ... boost::asio::post . , ASIO - , , . .

, , , , . , , , . , " ,

+2

, "" . , , .

, , , , .

boost:: asio , boost:: asio:: io_service:: run. Io_service UDP , , , . io_services .

2000 .

, boost:: asio:: io_service:: run , .

Asio, : - , , . , , , , . .

+2

Source: https://habr.com/ru/post/1787122/


All Articles