How to properly design the proper release of a boost :: asio connector or its wrapper

I am making several attempts to make my own simple asynchronous TCP server using boost :: asio after not touching it for several years.

In the last list of examples I can find: http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/tutorial/tutdaytime3/src.html

The problem I ran into in this example is that (I feel) it is cheating and it is cheating large ones by creating tcp_connection shared_ptr so that it does not worry about managing each connection for life. (I think) They do this for short, as this is a small tutorial, but this solution is not the real world.

What if you want to send a message to each client by timer or something like that? A collection of client connections will be needed on any non-trivial real-world server.

I worry about lifelong management of each connection. I suppose it would be natural to keep some collection of tcp_connection objects or pointers for them inside tcp_server. Add to this collection from the OnConnect callback and remove OnDisconnect from this collection.

Note that OnDisconnect will most likely be called from the actual Disconnect method, which in turn will be called from the OnReceive callback or OnSend callback in case of an error.

That is the problem.

We believe that we have a column that looks something like this:

tcp_connection::~tcp_connection tcp_server::OnDisconnect tcp_connection::OnDisconnect tcp_connection::Disconnect tcp_connection::OnReceive 

This will lead to errors, since the call stack will disconnect, and we will execute the code in the object whose destructor was called ... I think, right?

I assume that everyone involved in server programming is somehow confronted with this scenario. What is the strategy for handling it?

I hope the explanation is good enough to follow. If you do not let me know, I will create my own list of sources, but it will be very large.


Edit: Linked

) Memory management in asynchronous C ++ code

IMO is not an acceptable answer, it relies on cheating with shared_ptr to receive calls and nothing more, and is not the real world. what if the server wanted to say hello to all clients every 5 minutes. A collection of some kind is needed. What if you call io_service.run for multiple threads?

I also ask the mailing list: http://boost.2283326.n4.nabble.com/How-to-design-proper-release-of-a-boost-asio-socket-or-wrapper-thereof-td4693442.html

+5
source share
4 answers

While others answered similarly to the second half of this answer, it seems that the most complete answer I can find came from asking the same question on the Boost Mailing list.

http://boost.2283326.n4.nabble.com/How-to-design-proper-release-of-a-boost-asio-socket-or-wrapper-thereof-td4693442.html

I will describe here to help those who come here from a future search.

There are 2 options

1) Close the socket to cancel any outstanding io, and then send a logic callback after disconnecting to io_service and letting the server class be called back when the socket was disconnected. Then it can safely release the connection. As long as there is only one thread that called io_service :: run, then other asynchronous operations will already be allowed on the callback. However, if there are several threads that called io_service :: run, then this is not safe.

2) As others have pointed out, using shared_ptr to manage the life cycle of connections, using the outstanding io operations to maintain their viability, is viable. We can use the weak_ptr assembly for connections to access them if we need to. The last one is a tidbit that was omitted from other related posts that confused me.

+3
source

As I said, I don’t see the use of smart pointers to “cheat and cheat”. I also don’t think that your assessment that “they do it for short” contains water.


Here's a slightly edited excerpt 1 from our code base, which illustrates how using shared_ptrs does not preclude connection tracking.

It shows only the server side of things,

  • very simple connection object in connection.hpp; it uses enable_shared_from_this

  • only fixed connection_pool size (we also dynamically resize pools, hence blocking primitives). Please note how we can do actions in all active connections.

    So, you trivially write something like this to write to all clients, for example, on a timer:

     _pool.for_each_active([] (auto const& conn) { send_message(conn, hello_world_packet); }); 
  • listener sample that shows how it is connected to connection_pool (which has a fetch method to close all connections)

Code Lists

  • connection.hpp

     #pragma once #include "xxx/net/rpc/protocol.hpp" #include "log.hpp" #include "stats_filer.hpp" #include <memory> namespace xxx { namespace net { namespace rpc { struct connection : std::enable_shared_from_this<connection>, protected LogSource { typedef std::shared_ptr<connection> ptr; private: friend struct io; friend struct listener; boost::asio::io_service& _svc; protocol::socket _socket; protocol::endpoint _ep; protocol::endpoint _peer; public: connection(boost::asio::io_service& svc, protocol::endpoint ep) : LogSource("rpc::connection"), _svc(svc), _socket(svc), _ep(ep) {} void init() { _socket.set_option(protocol::no_delay(true)); _peer = _socket.remote_endpoint(); g_stats_filer_p->inc_value("asio." + _ep.address().to_string() + ".sockets_accepted"); debug() << "New connection from " << _peer; } protocol::endpoint endpoint() const { return _ep; } protocol::endpoint peer() const { return _peer; } protocol::socket& socket() { return _socket; } // TODO encapsulation int handle() { return _socket.native_handle(); } bool valid() const { return _socket.is_open(); } void cancel() { _svc.post([this] { _socket.cancel(); }); } using shutdown_type = boost::asio::ip::tcp::socket::shutdown_type; void shutdown(shutdown_type what = shutdown_type::shutdown_both) { _svc.post([=] { _socket.shutdown(what); }); } ~connection() { g_stats_filer_p->inc_value("asio." + _ep.address().to_string() + ".sockets_disconnected"); } }; } } } 
  • connection_pool.hpp

     #pragma once #include <mutex> #include "xxx/threads/null_mutex.hpp" #include "xxx/net/rpc/connection.hpp" #include "stats_filer.hpp" #include "log.hpp" namespace xxx { namespace net { namespace rpc { // not thread-safe by default, but pass eg std::mutex for `Mutex` if you need it template <typename Ptr = xxx::net::rpc::connection::ptr, typename Mutex = xxx::threads::null_mutex> struct basic_connection_pool : LogSource { using WeakPtr = std::weak_ptr<typename Ptr::element_type>; basic_connection_pool(std::string name = "connection_pool", size_t size) : LogSource(std::move(name)), _pool(size) { } bool try_insert(Ptr const& conn) { std::lock_guard<Mutex> lk(_mx); auto slot = std::find_if(_pool.begin(), _pool.end(), std::mem_fn(&WeakPtr::expired)); if (slot == _pool.end()) { g_stats_filer_p->inc_value("asio." + conn->endpoint().address().to_string() + ".connections_dropped"); error() << "dropping connection from " << conn->peer() << ": connection pool (" << _pool.size() << ") saturated"; return false; } *slot = conn; return true; } template <typename F> void for_each_active(F action) { auto locked = [=] { using namespace std; lock_guard<Mutex> lk(_mx); vector<Ptr> locked(_pool.size()); transform(_pool.begin(), _pool.end(), locked.begin(), mem_fn(&WeakPtr::lock)); return locked; }(); for (auto const& p : locked) if (p) action(p); } constexpr static bool synchronizing() { return not std::is_same<xxx::threads::null_mutex, Mutex>(); } private: void dump_stats(LogSource::LogTx tx) const { // lock is assumed! size_t empty = 0, busy = 0, idle = 0; for (auto& p : _pool) { switch (p.use_count()) { case 0: empty++; break; case 1: idle++; break; default: busy++; break; } } tx << "usage empty:" << empty << " busy:" << busy << " idle:" << idle; } Mutex _mx; std::vector<WeakPtr> _pool; }; // TODO FIXME use null_mutex once growing is no longer required AND if // en-pooling still only happens from the single IO thread (XXX-2535) using server_connection_pool = basic_connection_pool<xxx::net::rpc::connection::ptr, std::mutex>; } } } 
  • listener.hpp

     #pragma once #include "xxx/threads/null_mutex.hpp" #include <mutex> #include "xxx/net/rpc/connection_pool.hpp" #include "xxx/net/rpc/io_operations.hpp" namespace xxx { namespace net { namespace rpc { struct listener : std::enable_shared_from_this<listener>, LogSource { typedef std::shared_ptr<listener> ptr; protocol::acceptor _acceptor; protocol::endpoint _ep; listener(boost::asio::io_service& svc, protocol::endpoint ep, server_connection_pool& pool) : LogSource("rpc::listener"), _acceptor(svc), _ep(ep), _pool(pool) { _acceptor.open(ep.protocol()); _acceptor.set_option(protocol::acceptor::reuse_address(true)); _acceptor.set_option(protocol::no_delay(true)); ::fcntl(_acceptor.native(), F_SETFD, FD_CLOEXEC); // FIXME use non-racy socket factory? _acceptor.bind(ep); _acceptor.listen(32); } void accept_loop(std::function<void(connection::ptr conn)> on_accept) { auto self = shared_from_this(); auto conn = std::make_shared<xxx::net::rpc::connection>(_acceptor.get_io_service(), _ep); _acceptor.async_accept(conn->_socket, [this,self,conn,on_accept](boost::system::error_code ec) { if (ec) { auto tx = ec == boost::asio::error::operation_aborted? debug() : warn(); tx << "failed accept " << ec.message(); } else { ::fcntl(conn->_socket.native(), F_SETFD, FD_CLOEXEC); // FIXME use non-racy socket factory? if (_pool.try_insert(conn)) { on_accept(conn); } self->accept_loop(on_accept); } }); } void close() { _acceptor.cancel(); _acceptor.close(); _acceptor.get_io_service().post([=] { _pool.for_each_active([] (auto const& sp) { sp->shutdown(connection::shutdown_type::shutdown_both); sp->cancel(); }); }); debug() << "shutdown"; } ~listener() { } private: server_connection_pool& _pool; }; } } } 

¹ download as gist https://gist.github.com/sehe/979af25b8ac4fd77e73cdf1da37ab4c2

+2
source

Connection lifetime is the main problem with boost::asio . Speaking from experience, I can assure you that the erroneous reasons are "undefined behavior" ...

The asio examples use shared_ptr to ensure that the connection is supported in standby mode, while it can have outstanding handlers in asio::io_service . Note that even in a single thread, a asio::io_service works asynchronously with application code, see CppCon 2016: Michael Caisse "Asynchronous I / O with Boost.Asio" for a great description of the exact mechanism.

A shared_ptr allows you to control the lifetime of a connection using an instance of the shared_ptr instance. IMHO this is not a "deception and a big deception"; but an elegant solution to a complex problem.

However, I agree with you that simply using shared_ptr to control connection lifetimes is not a complete solution, as this can lead to a resource leak.

In my answer here: Boost async_ * functions and shared_ptr , I suggested using a combination of shared_ptr and weak_ptr to control the timing of the connection. An HTTP server using a combination of shared_ptr and weak_ptr can be found here: via-httplib .

The HTTP server is built on an asynchronous TCP server, which uses a collection of connections ( shared_ptr to) created upon connection and destruction upon disconnection, as you suggest.

+1
source

The way asio solves the “delete” problem, where there are outstanding async methods, is to split each async-enabled object into 3 classes, for example:

  • Server
  • server_service
  • server_impl

there is one service for io_loop (see use_service<> ). The service creates it for the server, which is now a descriptor class.

This separated the descriptor lifetime and implementation lifetime.

Now, in the descriptor descriptor, a message can be sent (via a service) to impl to cancel all remaining IOs.

The descriptor destructor can wait for these io calls to be queued if necessary (for example, if server operation is delegated to a background loop or thread).

It has become my habit to implement all objects related to io_service in a way that makes coding with aiso much easier.

0
source

Source: https://habr.com/ru/post/1266380/


All Articles