From c910c78633e95d0540d085116f803f403bc13679 Mon Sep 17 00:00:00 2001 From: mbandic Date: Fri, 27 Sep 2024 10:57:45 +0000 Subject: [PATCH] Integrate asynco in library, optimize maintenance on both engine --- .vscode/settings.json | 4 +- README.md | 55 ++++++++++--------- lib/mysql.hpp | 66 +++++++++++++++++++--- src/mysql.cpp | 125 +++++++++++++++++++++++++----------------- test/compile.sh | 4 +- test/test.cpp | 12 +++- 6 files changed, 176 insertions(+), 90 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 20cccac..ce9e297 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -61,6 +61,8 @@ "cinttypes": "cpp", "typeinfo": "cpp", "any": "cpp", - "variant": "cpp" + "variant": "cpp", + "*.ipp": "cpp", + "bitset": "cpp" } } \ No newline at end of file diff --git a/README.md b/README.md index fb6d407..73f5b1e 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ A small framework for basic MySQL database operations via MySQL/Connector++ - Response object - Thread safe - Exceptions and log error callback -- Can use external time loop for connection management +- Support my Asynco wrapper around Boost ASIO and support threads ## Installation @@ -22,6 +22,11 @@ First install dependency MySQL/Connector++ ``` sudo apt install libmysqlcppconn-dev ``` +If you are going to use with an Asynco wrapper, download the archive from the profile and install the Boost dependencies + +``` +sudo apt install libboost-all-dev +``` Just download the latest release and unzip it into your project. You can turn it on with: @@ -29,7 +34,15 @@ Just download the latest release and unzip it into your project. You can turn it #include "mysql/lib/mysql.hpp" using namespace marcelb; ``` +Compiling + +``` +# use with my Asynco lib +g++ -DMYSQL_USE_ASYNCO test.cpp ../src/* ../../asynco/src/* -o test.o -lmysqlcppconn -lpthread +# or use without asnyco (in multithread) +g++ test.cpp ../src/* ../../asynco/src/* -o test.o -lmysqlcppconn -lpthread +``` ## Usage ### Internal engine @@ -46,6 +59,14 @@ using namespace marcelb::mysql; */ MySQL mydb("tcp://192.168.2.10:3306", "user_nm", "passss", "my_db", 5); +mydb.on_error = [](const string& error) { + cout << error << endl; +}; + +mydb.on_connect = []() { + cout << "Init all pool connection done" << endl; +}; + /** * Use ------------------ */ | | @@ -72,34 +93,16 @@ try { | | | | } ``` -### External engine +### Run async with Asynco As I developed quite a few wrappers that have some internal thread, I realized that it was inefficient and made it possible to call the necessary functions periodically outside (one thread per whole application or timer (ASIO), or my asynco wrapper). ```c++ -#include "../lib/mysql.hpp" -using namespace marcelb::mysql; - -#include "../../asynco/lib/timers.hpp" -using namespace marcelb::asynco; -/** -* Init -*/ -MySQL mydb("tcp://192.168.2.10:3306", "user_nm", "passss", "my_db", 5, time_loop_type::external); - -periodic mysql_maintenance ( [&mydb] () { - mydb.periodic_maintenance(); -}, MYSQL_PERIODIC_INTERNAL_TIME); - -mydb.set_on_error( [](const string& error) { - cout << error << endl; // print or log -}); - /** * You can call multiple queries asynchronously */ -auto a1 = atask ( [&mydb] () { +auto a1 = async_ ( [&mydb] () { try { auto response = mydb.exec("SELECT id,domain FROM records WHERE enabled = 1;"); for (auto row : response) { @@ -110,7 +113,7 @@ auto a1 = atask ( [&mydb] () { } }); -auto a2 = atask ( [&mydb] () { +auto a2 = async_ ( [&mydb] () { try { auto response = mydb.exec("SELECT zonename,auth_key FROM zones;"); for (auto row : response) { @@ -121,7 +124,7 @@ auto a2 = atask ( [&mydb] () { } }); -auto a3 = atask ( [&mydb] () { +auto a3 = async_ ( [&mydb] () { try { auto response = mydb.exec("SELECT username,email FROM users WHERE enabled = 1;"); for (auto row : response) { @@ -132,9 +135,9 @@ auto a3 = atask ( [&mydb] () { } }); -wait(a1); -wait(a2); -wait(a3); +await(a1); +await(a2); +await(a3); ``` ## License diff --git a/lib/mysql.hpp b/lib/mysql.hpp index 0b9dd82..7139231 100644 --- a/lib/mysql.hpp +++ b/lib/mysql.hpp @@ -8,7 +8,6 @@ #include #include #include -#include #include using namespace std; @@ -39,6 +38,8 @@ namespace mysql { * */ #define MYSQL_PERIODIC_INTERNAL_TIME 5000 +#define MYSQL_MAX_CONNECTION_INIT_SAME_TIME 10 +#define MYSQL_DELAYED_CONNECT_TIME 1000 /** * A class for creating sql responses @@ -99,13 +100,16 @@ class MySQL { queue connection_pool; string path, username, password, database; uint32_t pool_size; - bool run_tloop = true; + const uint32_t max_t = thread::hardware_concurrency(); + uint32_t t = 1; #ifdef MYSQL_USE_ASYNCO periodic p_loop; + // delayed d_connect; #else - future tloop_future; + future tloop_future, connect_f; + bool run_tloop = true; + #endif - time_t last_loop_time; /** * Open one database server connection @@ -154,8 +158,57 @@ class MySQL { void _tloop(uint32_t b, uint32_t e); + /** + * + */ + + vector> divide_and_conquer(int n, int t) { + vector> pairs; + int part_size = n / t; + int residue = n % t; // Ostatak koji treba rasporediti + + for (int i = 0; i < t; i++) { + int start = i * part_size + std::min(i, residue); + int end = start + part_size + (i < residue ? 1 : 0); + pairs.push_back(make_pair(start, end)); + } + return pairs; + } + + /** + * Automatically adjusts + */ + + void auto_adjusts(const uint32_t duration) { + if (duration > (MYSQL_PERIODIC_INTERNAL_TIME*2)/3) { + if (t < max_t) { + t++; // povećaj broj asinkronih zadataka drugi put + } + + else if (on_error) { + on_error("Periodic maintenance takes too long"); + } + } + + else if (duration < MYSQL_PERIODIC_INTERNAL_TIME/3 && t > 1) { + t--; // smanji nema potrebe + } + } + +#ifndef MYSQL_USE_ASYNCO + /** + * + */ + uint64_t timestamp() { + return chrono::duration_cast( + chrono::system_clock::now().time_since_epoch() + ).count(); + }; +#endif + public: function on_error; + function on_connect; uint32_t connect_trys = 3; @@ -206,13 +259,12 @@ public: release_connection(connection); } catch (sql::SQLException& e) { throw runtime_error(e.what()); - // std::cerr << "SQLState: " << e.getSQLState() << std::endl; - // std::cerr << "Error code: " << e.getErrorCode() << std::endl; + // cerr << "SQLState: " << e.getSQLState() << endl; + // cerr << "Error code: " << e.getErrorCode() << endl; } return result; } - /** * Destruktor * close all connections diff --git a/src/mysql.cpp b/src/mysql.cpp index 60c6758..f57f3a4 100644 --- a/src/mysql.cpp +++ b/src/mysql.cpp @@ -1,26 +1,57 @@ #include "../lib/mysql.hpp" marcelb::mysql::MySQL::MySQL(const string _path, const string _username, const string _password, const string _db, const uint32_t _available): -#ifdef MYSQL_USE_ASYNCO +#ifdef MYSQL_USE_ASYNCO p_loop(periodic( [&] () { - cout << "U asynco" << endl; - try { - auto start = rtime_ms(); - _tloop(0, connection_pool.size()); - cout << "loop--------------------------- nema error, trajalo: " << rtime_ms() - start << endl; - } catch (...) { - // cout << "Bude neki error u loopu" << endl; - if(on_error) { - on_error("Bude neki error u loopu"); + const uint64_t start = rtime_ms(); + auto pairs = divide_and_conquer(connection_pool.size(), t); + + vector> fuve; + for (auto& pair : pairs) { + fuve.push_back(async_ ([&, pair](){ + _tloop(pair.first, pair.second); + })); + } + + for (auto& fu : fuve) { + try { + await_(fu); + } catch (...) { + if(on_error) { + on_error("Error in maintenance periodic loop."); + } } } + + auto_adjusts(rtime_ms() - start); + }, MYSQL_PERIODIC_INTERNAL_TIME)), #else tloop_future (async(launch::async, [&](){ while (run_tloop) { - cout << "U STD async" << endl; usleep(MYSQL_PERIODIC_INTERNAL_TIME*1000); - _tloop(0, connection_pool.size()); + + const uint64_t start = timestamp(); + auto pairs = divide_and_conquer(connection_pool.size(), t); + + vector> fuve; + for (auto& pair : pairs) { + fuve.push_back(async (launch::async, [&, pair](){ + _tloop(pair.first, pair.second); + })); + } + + for (auto& fu : fuve) { + try { + fu.get(); + } catch (...) { + if(on_error) { + on_error("Error in maintenance periodic loop."); + } + } + } + + auto_adjusts(timestamp() - start); } return; })), @@ -33,9 +64,6 @@ marcelb::mysql::MySQL::MySQL(const string _path, const string _username, const s drv = get_mysql_driver_instance(); connect_pool(); - - // set on initialization to avoid the error - last_loop_time = time(nullptr); } @@ -68,11 +96,28 @@ Connection* marcelb::mysql::MySQL::create_connection() { } void marcelb::mysql::MySQL::connect_pool() { - lock_guard lock(io); - for (uint32_t i=0; i void { + const uint32_t tens = pool_size/MYSQL_MAX_CONNECTION_INIT_SAME_TIME; + for (uint32_t i=0; i lock(io); + connection_pool.push(connection); + } + if ((i+1)%tens == 0) { + usleep(MYSQL_DELAYED_CONNECT_TIME*1000); + } + } + if (on_connect) { + on_connect(); + } + }; + +#ifdef MYSQL_USE_ASYNCO + async_ (connect_); +#else + connect_f = async (launch::async, connect_); +#endif } void marcelb::mysql::MySQL::disconnect_pool() { @@ -108,35 +153,18 @@ bool marcelb::mysql::MySQL::disconnect_connection(Connection* connection) { } void marcelb::mysql::MySQL::_tloop(uint32_t b, uint32_t e) { - if (!run_tloop) { - return; - } for (size_t i=b; i lock(io); - conn = connection_pool.front(); - connection_pool.pop(); - } + Connection *conn = nullptr; // provjeri ovdje svugdje možeš li koristiti release i ocupacy sada + conn = occupy_connection(); if (conn->isValid()) { - cout << "Validno----" << endl; - connection_pool.push(conn); - condition.notify_one(); + release_connection(conn); } else { - cout << "Nije validno----" << endl; if (!conn->isClosed()){ - cout << "Zatvori----" << endl; - - conn->close(); + disconnect_connection(conn); } Connection *n_conn = create_connection(); - { - cout << "Otvori----" << endl; - lock_guard lock(io); - connection_pool.push(n_conn); - condition.notify_one(); - } + release_connection(n_conn); } } catch (const SQLException &error) { @@ -145,16 +173,9 @@ void marcelb::mysql::MySQL::_tloop(uint32_t b, uint32_t e) { } } } - - last_loop_time = time(nullptr); } Connection* marcelb::mysql::MySQL::occupy_connection() { - if (last_loop_time + (MYSQL_PERIODIC_INTERNAL_TIME*3/1000) < time(nullptr)) { - if (on_error) { - on_error("The time loop is not executing properly"); - } - } unique_lock lock(io); while (connection_pool.empty()) { condition.wait(lock); @@ -171,9 +192,11 @@ void marcelb::mysql::MySQL::release_connection(Connection* connection) { } marcelb::mysql::MySQL::~MySQL() { +#ifdef MYSQL_USE_ASYNCO + p_loop.stop(); +#else run_tloop = false; -#ifndef MYSQL_USE_ASYNCO - tloop_future.get(); + tloop_future.get(); #endif disconnect_pool(); } \ No newline at end of file diff --git a/test/compile.sh b/test/compile.sh index acb252a..61b0a5c 100644 --- a/test/compile.sh +++ b/test/compile.sh @@ -1,2 +1,2 @@ -# g++ -DMYSQL_USE_ASYNCO test.cpp ../src/* ../../asynco/src/* -o test.o -lmysqlcppconn -lpthread -g++ test.cpp ../src/* ../../asynco/src/* -o test.o -lmysqlcppconn -lpthread \ No newline at end of file +g++ -DMYSQL_USE_ASYNCO test.cpp ../src/* ../../asynco/src/* -o test.o -lmysqlcppconn -lpthread +# g++ test.cpp ../src/* ../../asynco/src/* -o test.o -lmysqlcppconn -lpthread \ No newline at end of file diff --git a/test/test.cpp b/test/test.cpp index 6fa79a5..5ad1b72 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -15,17 +15,21 @@ using namespace marcelb::mysql; int main() { auto inis = rtime_ms(); try { - const int n = 5; + const int n = 10; // MySQL mydb("tcp://192.168.2.10:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 5, time_loop_type::internal); MySQL mydb("tcp://bitelex.ddns.net:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", n); // MySQL mydb("tcp://bitelex.ddns.net:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 5); - cout << "init: " << rtime_ms() - inis << endl; + cout << "--------------init: " << rtime_ms() - inis << endl; mydb.on_error = [](const string& error) { cout << error << endl; }; + mydb.on_connect = []() { + cout << "Init all pool connection done" << endl; + }; + // periodic mysql_tloop ( [&mydb] () { // auto l_start = rtime_ms(); // vector> to_wait; @@ -203,7 +207,9 @@ while (true) { } catch (const SQLException error) { cout << error.what() << endl; - } catch (const string error) { + } catch (const exception& error) { + cout << error.what() << endl; + } catch (const string& error) { cout << error << endl; } catch (...) { cout << "Jebi ga" << endl;