From 777ee533555d525b8c220e1c478e5edd8a61a897 Mon Sep 17 00:00:00 2001 From: mbandic Date: Thu, 12 Sep 2024 13:22:17 +0000 Subject: [PATCH] Fix recycling connections --- README.md | 4 ++ lib/mysql.hpp | 57 ++++++++++-------- src/mysql.cpp | 158 +++++++++++++++++++++++--------------------------- test/test.cpp | 118 ++++++++++++++++++++++--------------- 4 files changed, 180 insertions(+), 157 deletions(-) diff --git a/README.md b/README.md index bbb3101..540d2e6 100644 --- a/README.md +++ b/README.md @@ -134,6 +134,10 @@ wait(a2); wait(a3); ``` +## To do + +- On error, implement virtual/friend function + ## License [APACHE 2.0](http://www.apache.org/licenses/LICENSE-2.0/) diff --git a/lib/mysql.hpp b/lib/mysql.hpp index d7f2ce2..5e21ae8 100644 --- a/lib/mysql.hpp +++ b/lib/mysql.hpp @@ -1,7 +1,7 @@ #ifndef _MYSQL_ #define _MYSQL_ -#include +#include #include #include #include @@ -66,7 +66,7 @@ inline int getValue(ResultSet* res, int column) { return res->getInt(column); } template<> -inline uint getValue(ResultSet* res, int column) { +inline uint32_t getValue(ResultSet* res, int column) { return res->getUInt(column); } template<> @@ -97,34 +97,36 @@ inline bool getValue(ResultSet* res, int column) { class MySQL { mutex io; + condition_variable condition; MySQL_Driver *drv; - deque con; - string path, username, password, db; - uint available; - uint reconTrys = 3; + queue connection_pool; + string path, username, password, database; + uint32_t pool_size; + uint32_t connect_trys = 3; bool run_engin = true; future periodic_engin; periodical_engine engine_type; /** - * Open one database + * Open one database server connection */ - bool open_one(Connection* con_ptr); + Connection* create_connection(); /** - * Open one database server connection + * Close one database connection */ - Connection* create_con(); + bool disconnect_connection(Connection* connection); /** - * Close one database connection + * Take an pool_size database connection */ - bool disconnect_one(Connection* con_ptr); + Connection* occupy_connection(); /** - * Take an available database connection + * Free an database connection */ - Connection* shift_con(); + + void release_connection(Connection* connection); /** * Function parses a parameterized row @@ -135,6 +137,17 @@ class MySQL { return make_tuple(getValue(res, Is + 1)...); } + /** + * Connect all connections to server + */ + + void connect_pool(); + + /** + * Disconnect all connections to server + */ + void disconnect_pool(); + public: /** @@ -143,30 +156,26 @@ class MySQL { * username, password, database name, * and number of active connections (optional) */ - MySQL(const string _path, const string _username, const string _password, const string _db, const uint _available = 1, const periodical_engine _engine_type = periodical_engine::internal); + MySQL(const string _path, const string _username, const string _password, const string _db, const uint32_t _available = 1, const periodical_engine _engine_type = periodical_engine::internal); - /** - * Disconnect all connections to server - */ - bool disconnect(); /** * Define the maximum number of attempts to * reconnect to the server */ - void reconnectTrys(const uint _trys); + void set_connect_trys(const uint32_t _trys); /** * Execute the SQL statement */ template MySQL_Res exec(const string& sql_q) { - Connection* con_ptr = shift_con(); + Connection* connection = occupy_connection(); MySQL_Res result; try { Statement *stmt; - stmt = con_ptr->createStatement(); + stmt = connection->createStatement(); result.have_result = stmt->execute(sql_q); if (result.have_result) { @@ -192,10 +201,12 @@ class MySQL { stmt->close(); delete stmt; - disconnect_one(con_ptr); + release_connection(connection); } catch (sql::SQLException& e) { throw runtime_error(e.what()); + // std::cerr << "SQLState: " << e.getSQLState() << std::endl; + // std::cerr << "Error code: " << e.getErrorCode() << std::endl; } return result; diff --git a/src/mysql.cpp b/src/mysql.cpp index b82359a..fcf724b 100644 --- a/src/mysql.cpp +++ b/src/mysql.cpp @@ -1,15 +1,16 @@ #include "../lib/mysql.hpp" -marcelb::mysql::MySQL::MySQL(const string _path, const string _username, const string _password, const string _db, const uint _available, const periodical_engine _engine_type) { +marcelb::mysql::MySQL::MySQL(const string _path, const string _username, const string _password, const string _db, const uint32_t _available, const periodical_engine _engine_type) { path = _path; username = _username; password = _password; - db = _db; - available = _available > 0 ? _available : 1; + database = _db; + pool_size = _available > 0 ? _available : 1; engine_type = _engine_type; drv = get_mysql_driver_instance(); + connect_pool(); if (engine_type == periodical_engine::internal) { periodic_engin = async(launch::async, [&](){ @@ -23,51 +24,58 @@ marcelb::mysql::MySQL::MySQL(const string _path, const string _username, const s } -Connection* marcelb::mysql::MySQL::create_con() { - uint trys = 0; +Connection* marcelb::mysql::MySQL::create_connection() { + uint32_t trys = 0; bool status = true; Connection* new_con = NULL; - while (reconTrys == unlimited ? status : (trys <= reconTrys && status)) { + while (connect_trys == unlimited ? status : (trys <= connect_trys && status)) { try { Connection* con_can = drv->connect(path, username, password); + con_can->setSchema(database); status = !con_can->isValid(); if (!status) { new_con = con_can; } else if (!con_can->isClosed()) { - disconnect_one(con_can); + disconnect_connection(con_can); } } catch (const SQLException &error) { cout << error.what() << endl; + // on_error -- ako se ikad impelementira pozovi ga ovdje! usleep(reconnectSleep); - reconTrys == unlimited ? trys : trys++; + connect_trys == unlimited ? trys : trys++; } } return new_con; } -bool marcelb::mysql::MySQL::disconnect() { - io.lock(); - bool status = true; - - for (uint i=0; i lock(io); + for (uint32_t i=0; i lock(io); + for (uint32_t i=0; iisClosed(); +bool marcelb::mysql::MySQL::disconnect_connection(Connection* connection) { + bool status = !connection->isClosed(); if (status) { try { - con_ptr->close(); - status = !con_ptr->isClosed(); + connection->close(); + status = !connection->isClosed(); } catch (const SQLException &error) { cout << error.what() << endl; @@ -79,58 +87,48 @@ bool marcelb::mysql::MySQL::disconnect_one(Connection* con_ptr) { status = false; // već je zatvorena } - delete con_ptr; + delete connection; return status; } -bool marcelb::mysql::MySQL::open_one(Connection* con_ptr) { - bool status = true; // ako true greška je - uint trys = 0; - - while (reconTrys == unlimited ? status : (trys <= reconTrys && status)) { - try { - if (con_ptr->isValid()) { - con_ptr->setSchema(db); - status = false; - } - else { - break; - } - } - catch (const SQLException &error) { - cout << error.what() << endl; - usleep(reconnectSleep); - reconTrys == unlimited ? trys : trys++; - } - } - - return status; -} /** * Broj pokušaja usljed povezivanja s bazom od 1 do unlimited; */ -void marcelb::mysql::MySQL::reconnectTrys(const uint _trys) { - io.lock(); - reconTrys = _trys; - io.unlock(); +void marcelb::mysql::MySQL::set_connect_trys(const uint32_t _trys) { + lock_guard lock(io); + connect_trys = _trys; } -Connection* marcelb::mysql::MySQL::shift_con() { - while (true) { - while(con.size()) { - io.lock(); - Connection* con_ptr = con[0]; - con.pop_front(); - if (con_ptr->isValid()) { - io.unlock(); - return con_ptr; - } - io.unlock(); - } - usleep(1000); +Connection* marcelb::mysql::MySQL::occupy_connection() { + unique_lock lock(io); + while (connection_pool.empty()) { + condition.wait(lock); } + Connection *connection = connection_pool.front(); + connection_pool.pop(); + return connection; + + // while (true) { + // while(connection_pool.size()) { + // io.lock(); + // Connection* connection = connection_pool.front(); + // connection_pool.pop(); + // if (connection->isValid()) { + // io.unlock(); + // return connection; + // } + // io.unlock(); + // } + // usleep(1000); + // } +} + +void marcelb::mysql::MySQL::release_connection(Connection* connection) { + lock_guard lock(io); + connection_pool.push(connection); + condition.notify_one(); } marcelb::mysql::MySQL::~MySQL() { @@ -138,38 +136,26 @@ marcelb::mysql::MySQL::~MySQL() { run_engin = false; periodic_engin.get(); } else { - + // ne bi bilo loše ubiti periodic nekako?! + // iako disconnecta može periodic connect napraviti!!! + run_engin = false; } - disconnect(); + disconnect_pool(); } void marcelb::mysql::MySQL::periodic_maintenance() { - while (available>con.size() && run_engin) { - try { - Connection* new_con_ptr = create_con(); - if (!db.empty()) { - if (open_one(new_con_ptr)) { - throw string("[ERROR] Unable to open database " + db); - } + for (size_t i = 0; i < pool_size && run_engin; i++) { + Connection *conn = occupy_connection(); + if (conn->isValid()) { + release_connection(conn); + } else { + if (!conn->isClosed()){ + conn->close(); } - io.lock(); - con.push_back(new_con_ptr); - io.unlock(); - } catch (const SQLException except) { - cout << except.what() << endl; - } catch (const string except) { - cout << except << endl; - } - } - - for (int i=0; iisValid()) { - io.lock(); - con.erase(con.begin()+i); - io.unlock(); - i--; + Connection *n_conn = create_connection(); + release_connection(n_conn); } } } \ No newline at end of file diff --git a/test/test.cpp b/test/test.cpp index 85d0dcf..13466f7 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -13,18 +13,76 @@ using namespace marcelb::asynco; int main() { try { - MySQL mydb("tcp://192.168.2.10:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 10, periodical_engine::external); - // MySQL mydb("tcp://bitelex.ddns.net:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 5); + // MySQL mydb("tcp://192.168.2.10:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 10, periodical_engine::external); + // MySQL mydb("tcp://bitelex.ddns.net:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 10, periodical_engine::external); + MySQL mydb("tcp://bitelex.ddns.net:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 5); - periodic mysql_maintenance ( [&mydb] () { - cout << "IZVRŠAVA SE ENGINE" << endl; - mydb.periodic_maintenance(); - }, MYSQL_PERIODIC_INTERNAL_TIME); + // periodic mysql_maintenance ( [&mydb] () { + // mydb.periodic_maintenance(); + // }, MYSQL_PERIODIC_INTERNAL_TIME); sleep(5); + auto start = high_resolution_clock::now(); - auto a1 = atask ( [&mydb] () { + auto a1 = nonsync ( [&mydb] () { + try { + auto response = mydb.exec("SELECT id,domain FROM records WHERE enabled = 1;"); + cout << response.affected << " " << response.have_result << endl; + cout << response.rows << " " << response.columns << endl; + + for (auto row : response) { + cout << get<0>(row) << " " << get<1>(row) << endl; + } + + for (auto column_name : response.columns_name) { + cout << column_name << endl; + } + + } catch (const string err) { + cout << err << endl; + } + }); + + auto a2 = nonsync ( [&mydb] () { + try { + auto response = mydb.exec("SELECT zonename,auth_key FROM zones;"); + cout << response.affected << " " << response.have_result << endl; + cout << response.rows << " " << response.columns << endl; + + for (auto row : response) { + cout << get<0>(row) << " " << get<1>(row) << endl; + } + + for (auto column_name : response.columns_name) { + cout << column_name << endl; + } + + } catch (const string err) { + cout << err << endl; + } + }); + + auto a3 = nonsync ( [&mydb] () { + try { + auto response = mydb.exec("SELECT username,email FROM users WHERE enabled = 1;"); + cout << response.affected << " " << response.have_result << endl; + cout << response.rows << " " << response.columns << endl; + + for (auto row : response) { + cout << get<0>(row) << " " << get<1>(row) << endl; + } + + for (auto column_name : response.columns_name) { + cout << column_name << endl; + } + + } catch (const string err) { + cout << err << endl; + } + }); + + auto a4 = nonsync ( [&mydb] () { try { auto response = mydb.exec("SELECT id,domain FROM records WHERE enabled = 1;"); cout << response.affected << " " << response.have_result << endl; @@ -43,7 +101,7 @@ int main() { } }); - auto a2 = atask ( [&mydb] () { + auto a5 = nonsync ( [&mydb] () { try { auto response = mydb.exec("SELECT zonename,auth_key FROM zones;"); cout << response.affected << " " << response.have_result << endl; @@ -62,7 +120,7 @@ int main() { } }); - auto a3 = atask ( [&mydb] () { + auto a6 = nonsync ( [&mydb] () { try { auto response = mydb.exec("SELECT username,email FROM users WHERE enabled = 1;"); cout << response.affected << " " << response.have_result << endl; @@ -84,46 +142,10 @@ int main() { wait(a1); wait(a2); wait(a3); + wait(a4); + wait(a5); + wait(a6); -// one by one - // try { - // auto response = mydb.exec("SELECT id,domain FROM records WHERE enabled = 1;"); - // // auto response = mydb.exec("UPDATE records SET enabled = 1;"); - // cout << response.affected << " " << response.have_result << endl; - // cout << response.rows << " " << response.columns << endl; - - // for (auto row : response) { - // cout << get<0>(row) << " " << get<1>(row) << endl; - // } - - // for (auto column_name : response.columns_name) { - // cout << column_name << endl; - // } - - // } catch (const string err) { - // cout << err << endl; - // } - - // auto a1 = atask ( [&mydb] () { - // try { - // auto response = mydb.exec("SELECT username,email FROM records WHERE enabled = 1;"); - // cout << response.affected << " " << response.have_result << endl; - // cout << response.rows << " " << response.columns << endl; - - // for (auto row : response) { - // cout << get<0>(row) << " " << get<1>(row) << endl; - // } - - // for (auto column_name : response.columns_name) { - // cout << column_name << endl; - // } - - // } catch (const string err) { - // cout << err << endl; - // } - // }); - - // wait(a1); auto end = high_resolution_clock::now(); auto duration = duration_cast(end - start);