I am halfway through my network client at the moment, so maybe I can share some tips and some resources to watch. There are many more experienced in this area, and hopefully they will call in :)
-, . , , boost::asio - . , io_service run runOne -. . , runOne :
boost::asio::io_service myIOService;
while(true)
{
myIOService.runOne();
}
run ( ) :
boost::thread t(boost::bind(&boost::asio::io_service::run, &myIOService));
, run , ( ). , Stackoverflow, , - . boost::asio::io_service::work:
boost::asio::io_service::work myWork(myIOService);
, , . :)
- -. Socket ( ). , #. , :
, , , EventArgs:
eventArgs.h
class EventArgs : boost::noncopyable
{
private:
public:
EventArgs();
virtual ~EventArgs() = 0;
};
, /:
event.h
#include <functional>
#include <stack>
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>
class Event : boost::noncopyable
{
public:
typedef std::function<void(const EventArgs&)> DelegateType;
typedef boost::shared_ptr<DelegateType> DelegateDecl;
private:
boost::mutex m_Mutex;
typedef std::set<DelegateDecl> DelegateSet;
typedef std::stack<DelegateDecl> DelegateStack;
typedef DelegateSet::const_iterator DelegateSet_cit;
DelegateSet m_Delegates;
DelegateStack m_ToRemove;
public:
Event()
{
};
Event(Event&& _rhs) : m_Delegates(std::move(_rhs.m_Delegates))
{
};
~Event()
{
};
static DelegateDecl bindDelegate(DelegateType _f)
{
DelegateDecl ret(new DelegateType(_f));
return ret;
};
void raise(const EventArgs& _args)
{
boost::mutex::scoped_lock lock(m_Mutex);
while(m_ToRemove.size())
{
m_Delegates.erase(m_Delegates.find(m_ToRemove.top()));
m_ToRemove.pop();
};
if(m_Delegates.size())
std::for_each(m_Delegates.begin(),
m_Delegates.end(),
[&_args](const DelegateDecl& _decl) { (*_decl)(_args); });
};
DelegateDecl addListener(DelegateDecl _decl)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Delegates.insert(_decl);
return _decl;
};
DelegateDecl addListener(DelegateType _f)
{
DelegateDecl ret(bindDelegate(_f));
return addListener(ret);
};
void removeListener(const DelegateDecl _decl)
{
boost::mutex::scoped_lock lock(m_Mutex);
DelegateSet_cit cit(m_Delegates.find(_decl));
if(cit != m_Delegates.end())
m_ToRemove.push(_decl);
};
Event& operator += (DelegateType _f)
{
addListener(_f);
return *this;
};
};
. :
socket.h
( : ByteVector is typedef std::vector<unsigned char>)
#pragma once
#include "event.h"
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/buffer.hpp>
class MORSE_API Socket : boost::noncopyable
{
protected:
typedef boost::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;
private:
ByteVector m_Buffer;
SocketPtr m_SocketPtr;
boost::asio::ip::tcp::endpoint m_RemoteEndPoint;
bool m_bConnected;
void _handleConnect(const boost::system::error_code& _errorCode, boost::asio::ip::tcp::resolver_iterator _rit);
void _handleRead(const boost::system::error_code& _errorCode, std::size_t read);
protected:
SocketPtr socket() { return m_SocketPtr; };
public:
Socket(ByteVector_sz _bufSize = 512);
virtual ~Socket();
bool isConnected() const { return m_bConnected; };
const boost::asio::ip::tcp::endpoint& remoteEndPoint() const {return m_RemoteEndPoint; };
void connect(boost::asio::ip::tcp::resolver_iterator _rit);
void connect(const String& _host, const Port _port);
void close();
Event onRead;
Event onResolve;
Event onConnect;
Event onClose;
};
, . , DNS. . EventArg -, . EventArg, .
socket.cpp
#include "socket.h"
#include <boost/asio/placeholders.hpp>
namespace morse
{
namespace net
{
Socket::Socket(ByteVector_sz _bufSize ) : m_bConnected(false)
{
m_Buffer.resize(_bufSize);
};
Socket::~Socket()
{
};
void Socket::_handleRead(const boost::system::error_code& _errorCode,
std::size_t _read)
{
if(!_errorCode)
{
if(_read)
{
onRead.raise(SocketReadEventArgs(*this, m_Buffer, _read));
m_SocketPtr->async_read_some(boost::asio::buffer(m_Buffer), boost::bind(&Socket::_handleRead, this, _1, _2));
};
}
else
close();
};
void Socket::_handleConnect(const boost::system::error_code& _errorCode,
boost::asio::ip::tcp::resolver_iterator _rit)
{
m_bConnected = !_errorCode;
bool _raise(false);
if(!_errorCode)
{
m_RemoteEndPoint = *_rit;
_raise = true;
m_SocketPtr->async_read_some(boost::asio::buffer(m_Buffer), boost::bind(&Socket::_handleRead, this, _1, _2));
}
else if(++_rit != boost::asio::ip::tcp::resolver::iterator())
{
m_SocketPtr->close();
m_SocketPtr->async_connect(*_rit, boost::bind(&Socket::_handleConnect, this, boost::asio::placeholders::error, _rit));
}
else
_raise = true;
if(_raise)
onConnect.raise(SocketConnectEventArgs(*this, _errorCode));
};
void Socket::connect(boost::asio::ip::tcp::resolver_iterator _rit)
{
boost::asio::ip::tcp::endpoint ep(*_rit);
m_SocketPtr.reset(new boost::asio::ip::tcp::socket(Root::instance().ioService()));
m_SocketPtr->async_connect(ep, boost::bind(&Socket::_handleConnect, this, boost::asio::placeholders::error, _rit));
};
void Socket::connect(const String& _host, Port _port)
{
auto anonResolve = [this](const boost::system::error_code& _errorCode,
boost::asio::ip::tcp::resolver_iterator _epIt)
{
onResolve.raise(SocketResolveEventArgs(*this, !_errorCode ? (*_epIt).host_name() : String(""), _errorCode));
if(!_errorCode)
this->connect(_epIt);
};
Root::instance().resolveHost(_host, _port, anonResolve);
};
void Socket::close()
{
if(m_bConnected)
{
onClose.raise(SocketCloseEventArgs(*this));
m_SocketPtr->close();
m_bConnected = false;
};
}
DNS, Root::instance().resolveHost(_host, _port, anonResolve); DNS:
template<typename ResolveHandler>
void resolveHost(const String& _host, Port _port, ResolveHandler _handler)
{
boost::asio::ip::tcp::endpoint ret;
boost::asio::ip::tcp::resolver::query query(_host, boost::lexical_cast<std::string>(_port));
m_Resolver.async_resolve(query, _handler);
};
, , , ( ). . , Event, onLine, , :
#include <boost/asio/buffer.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/placeholders.hpp>
namespace morse
{
namespace net
{
String TextSocket::m_DefaultEOL("\r\n");
TextSocket::TextSocket() : m_EOL(m_DefaultEOL)
{
onRead += boost::bind(&TextSocket::readHandler, this, _1);
};
TextSocket::~TextSocket()
{
};
void TextSocket::readHandler(const EventArgs& _args)
{
auto& args(static_cast<const SocketReadEventArgs&>(_args));
m_LineBuffer.append(args.buffer().begin(), args.buffer().begin() + args.bytesRead());
String::size_type pos;
while((pos = m_LineBuffer.find(eol())) != String::npos)
{
onLine.raise(SocketLineEventArgs(*this, m_LineBuffer.substr(0, pos)));
m_LineBuffer = m_LineBuffer.substr(pos + eol().length());
};
};
void TextSocket::writeHandler(const boost::system::error_code& _errorCode, std::size_t _written)
{
if(!_errorCode)
{
m_Queue.pop_front();
if(!m_Queue.empty())
boost::asio::async_write(*socket().get(), boost::asio::buffer(m_Queue.front(), m_Queue.front().length()), boost::bind(&TextSocket::writeHandler, this, _1, _2));
}
else
close();
};
void TextSocket::sendLine(String _line)
{
Root::instance().ioService().post(boost::bind(&TextSocket::_sendLine, this, _line));
};
void TextSocket::_sendLine(String _line)
{
_line.append(m_EOL);
m_Queue.push_back(_line);
if(m_Queue.size() == 1)
boost::asio::async_write(*socket().get(), boost::asio::buffer(m_Queue.front(), m_Queue.front().length()), boost::bind(&TextSocket::writeHandler, this, _1, _2));
};
, ... boost::asio::post . , ASIO - , , . .
, , , , . , , , . , " ,