Is MySQL Asynchronous?

Im basically facing a lock problem. I encoded a server based on C ++ Boost.ASIO using 8 threads, since the server has 8 logical cores.

My problem is that the thread may run into 0.2 ~ 1.5 seconds of blocking the MySQL query, and I honestly don't know how to get around this, since the MySQL C ++ Connector does not support asynchronous queries, and I don't know how to design the server "correctly" to use multiple threads to execute queries.

That's where I ask for opinions on what to do in this case. Create 100 threads for async 'query sql? Can I get an opinion from experts about this?

+4
source share
2 answers

Well, the correct solution for this would be to extend Asio and write an implementation mysql_servicefor its integration. I was almost about to learn how to do this right away , but I wanted to start using "emulation."

The idea is to have

  • your business processes using io_service(as you already do)
  • database facade interface, which sends asynchronous requests to another queue (io_service) and sends a completion handler to the business process io_service

The fine tuning required here, you need the io_service on the business process side to shut down as soon as the job queue is empty, as it can still wait for a response from the database level.

So, simulating this in a quick demonstration:

namespace database
{
    // data types
    struct sql_statement { std::string dml; };
    struct sql_response { std::string echo_dml; }; // TODO cover response codes, resultset data etc.

I hope you forgive my gross simplifications: /

struct service
{
    service(unsigned max_concurrent_requests = 10)
        : work(io_service::work(service_)),
        latency(mt19937(), uniform_int<int>(200, 1500)) // random 0.2 ~ 1.5s
    {
        for (unsigned i = 0; i < max_concurrent_requests; ++i)
            svc_threads.create_thread(boost::bind(&io_service::run, &service_));
    }

    friend struct connection;

private:
    void async_query(io_service& external, sql_statement query, boost::function<void(sql_response response)> completion_handler)
    {
        service_.post(bind(&service::do_async_query, this, ref(external), std::move(query), completion_handler));
    }

    void do_async_query(io_service& external, sql_statement q, boost::function<void(sql_response response)> completion_handler)
    {
        this_thread::sleep_for(chrono::milliseconds(latency())); // simulate the latency of a db-roundtrip

        external.post(bind(completion_handler, sql_response { q.dml }));
    }

    io_service service_;
    thread_group svc_threads; // note the order of declaration
    optional<io_service::work> work;

    // for random delay
    random::variate_generator<mt19937, uniform_int<int> > latency;
};

- , ( io_service) ping/pongs io_service ( async_query/do_async_query). 0,2 1,5 :)

""

struct connection
{
    connection(int connection_id, io_service& external, service& svc)
        : connection_id(connection_id),
          external_(external), 
          db_service_(svc)
    { }

    void async_query(sql_statement query, boost::function<void(sql_response response)> completion_handler)
    {
        db_service_.async_query(external_, std::move(query), completion_handler);
    }
  private:
    int connection_id;
    io_service& external_;
    service& db_service_;
};

connection - , .

, - Asio:

namespace domain
{
    struct business_process : id_generator
    {
        business_process(io_service& app_service, database::service& db_service_) 
            : id(generate_id()), phase(0), 
            in_progress(io_service::work(app_service)),
            db(id, app_service, db_service_)
        { 
            app_service.post([=] { start_select(); });
        }

    private:
        int id, phase;
        optional<io_service::work> in_progress;

        database::connection db;

        void start_select() {
            db.async_query({ "select * from tasks where completed = false" }, [=] (database::sql_response r) { handle_db_response(r); });
        }

        void handle_db_response(database::sql_response r) {
            if (phase++ < 4)
            {
                if ((id + phase) % 3 == 0) // vary the behaviour slightly
                {
                    db.async_query({ "insert into tasks (text, completed) values ('hello', false)" }, [=] (database::sql_response r) { handle_db_response(r); });
                } else
                {
                    db.async_query({ "update * tasks set text = 'update' where id = 123" }, [=] (database::sql_response r) { handle_db_response(r); });
                }
            } else
            {
                in_progress.reset();
                lock_guard<mutex> lk(console_mx);
                std::cout << "business_process " << id << " has completed its work\n";
            }
        }
    };

}

- . db , , ( in_progress.reset() ).

, 10 - :

int main()
{
    io_service app;
    database::service db;

    ptr_vector<domain::business_process> bps;
    for (int i = 0; i < 10; ++i)
    {
        bps.push_back(new domain::business_process(app, db));
    }

    app.run();
}

- - , , , , app.run() :

thread_group g;
for (unsigned i = 0; i < thread::hardware_concurrency(); ++i)
    g.create_thread(boost::bind(&io_service::run, &app));
g.join_all();

Live On Coliru

+6

MySQL, .

  • NumberOfThreads == NumberOfCores , , .

  • , , , - .

  • NumberOfThreads = n * NumberOfCores, "n" , . "n" . - 3 .

0

Source: https://habr.com/ru/post/1538185/


All Articles