Compare commits

...

4 Commits
v0.8 ... dev

Author SHA1 Message Date
mbandic
b04cf84eaa Small fix after remote test 2024-09-13 10:44:03 +00:00
marcelb
e23e1f8cea Protects against time loop calls both externally and internally 2024-09-12 18:36:21 +02:00
mbandic
27074f9269 Add on error callback 2024-09-12 13:42:33 +00:00
mbandic
777ee53355 Fix recycling connections 2024-09-12 13:22:17 +00:00
5 changed files with 232 additions and 192 deletions

View File

@ -59,6 +59,8 @@
"stop_token": "cpp", "stop_token": "cpp",
"streambuf": "cpp", "streambuf": "cpp",
"cinttypes": "cpp", "cinttypes": "cpp",
"typeinfo": "cpp" "typeinfo": "cpp",
"any": "cpp",
"variant": "cpp"
} }
} }

View File

@ -12,8 +12,8 @@ A small framework for basic MySQL database operations via MySQL/Connector++
- Native C++ containers: vector, tuple - Native C++ containers: vector, tuple
- Response object - Response object
- Thread safe - Thread safe
- Exceptions - Exceptions and log error callback
- Can use external periodic maintenance for connection management - Can use external time loop for connection management
## Installation ## Installation
@ -85,13 +85,16 @@ using namespace marcelb::asynco;
/** /**
* Init * Init
*/ */
MySQL mydb("tcp://192.168.2.10:3306", "user_nm", "passss", "my_db", 5, periodical_engine::external); MySQL mydb("tcp://192.168.2.10:3306", "user_nm", "passss", "my_db", 5, time_loop_type::external);
periodic mysql_maintenance ( [&mydb] () { periodic mysql_maintenance ( [&mydb] () {
cout << "IZVRŠAVA SE ENGINE" << endl;
mydb.periodic_maintenance(); mydb.periodic_maintenance();
}, MYSQL_PERIODIC_INTERNAL_TIME); }, MYSQL_PERIODIC_INTERNAL_TIME);
mydb.set_on_error( [](const string& error) {
cout << error << endl; // print or log
});
/** /**
* You can call multiple queries asynchronously * You can call multiple queries asynchronously
*/ */

View File

@ -1,13 +1,14 @@
#ifndef _MYSQL_ #ifndef _MYSQL_
#define _MYSQL_ #define _MYSQL_
#include <deque> #include <queue>
#include <mutex> #include <mutex>
#include <thread> #include <thread>
#include <future> #include <future>
#include <string> #include <string>
#include <vector> #include <vector>
#include <tuple> #include <tuple>
#include "ctime"
#include <mysql_driver.h> #include <mysql_driver.h>
#include <mysql_connection.h> #include <mysql_connection.h>
@ -38,7 +39,7 @@ namespace mysql {
* external - expects periodic_maintenance() to be run periodically outside the library * external - expects periodic_maintenance() to be run periodically outside the library
* *
*/ */
enum class periodical_engine { enum class time_loop_type {
internal, internal,
external external
}; };
@ -66,7 +67,7 @@ inline int getValue<int>(ResultSet* res, int column) {
return res->getInt(column); return res->getInt(column);
} }
template<> template<>
inline uint getValue<uint>(ResultSet* res, int column) { inline uint32_t getValue<uint32_t>(ResultSet* res, int column) {
return res->getUInt(column); return res->getUInt(column);
} }
template<> template<>
@ -97,34 +98,36 @@ inline bool getValue<bool>(ResultSet* res, int column) {
class MySQL { class MySQL {
mutex io; mutex io;
condition_variable condition;
MySQL_Driver *drv; MySQL_Driver *drv;
deque<Connection*> con; queue<Connection*> connection_pool;
string path, username, password, db; string path, username, password, database;
uint available; uint32_t pool_size;
uint reconTrys = 3; bool run_tloop = true;
bool run_engin = true; future<void> tloop_future;
future<void> periodic_engin; time_loop_type tloop_type;
periodical_engine engine_type; time_t last_loop_time;
/**
* Open one database
*/
bool open_one(Connection* con_ptr);
/** /**
* Open one database server connection * Open one database server connection
*/ */
Connection* create_con(); Connection* create_connection();
/** /**
* Close one database connection * Close one database connection
*/ */
bool disconnect_one(Connection* con_ptr); bool disconnect_connection(Connection* connection);
/** /**
* Take an available database connection * Take an pool_size database connection
*/ */
Connection* shift_con(); Connection* occupy_connection();
/**
* Free an database connection
*/
void release_connection(Connection* connection);
/** /**
* Function parses a parameterized row * Function parses a parameterized row
@ -135,7 +138,27 @@ class MySQL {
return make_tuple(getValue<Types>(res, Is + 1)...); return make_tuple(getValue<Types>(res, Is + 1)...);
} }
/**
* Connect all connections to server
*/
void connect_pool();
/**
* Disconnect all connections to server
*/
void disconnect_pool();
/**
* Internal tloop periodic
*/
void _tloop();
public: public:
function<void(const string&)> on_error;
uint32_t connect_trys = 3;
/** /**
* MySQL constructor, * MySQL constructor,
@ -143,30 +166,19 @@ class MySQL {
* username, password, database name, * username, password, database name,
* and number of active connections (optional) * and number of active connections (optional)
*/ */
MySQL(const string _path, const string _username, const string _password, const string _db, const uint _available = 1, const periodical_engine _engine_type = periodical_engine::internal); MySQL(const string _path, const string _username, const string _password, const string _db, const uint32_t _available = 1, const time_loop_type _engine_type = time_loop_type::internal);
/**
* Disconnect all connections to server
*/
bool disconnect();
/**
* Define the maximum number of attempts to
* reconnect to the server
*/
void reconnectTrys(const uint _trys);
/** /**
* Execute the SQL statement * Execute the SQL statement
*/ */
template<typename... Types> template<typename... Types>
MySQL_Res<Types...> exec(const string& sql_q) { MySQL_Res<Types...> exec(const string& sql_q) {
Connection* con_ptr = shift_con(); Connection* connection = occupy_connection();
MySQL_Res<Types...> result; MySQL_Res<Types...> result;
try { try {
Statement *stmt; Statement *stmt;
stmt = con_ptr->createStatement(); stmt = connection->createStatement();
result.have_result = stmt->execute(sql_q); result.have_result = stmt->execute(sql_q);
if (result.have_result) { if (result.have_result) {
@ -192,10 +204,12 @@ class MySQL {
stmt->close(); stmt->close();
delete stmt; delete stmt;
disconnect_one(con_ptr); release_connection(connection);
} catch (sql::SQLException& e) { } catch (sql::SQLException& e) {
throw runtime_error(e.what()); throw runtime_error(e.what());
// std::cerr << "SQLState: " << e.getSQLState() << std::endl;
// std::cerr << "Error code: " << e.getErrorCode() << std::endl;
} }
return result; return result;
@ -206,7 +220,7 @@ class MySQL {
* please call this function in it for proper operation at a certain time interval. * please call this function in it for proper operation at a certain time interval.
* You can use the default MYSQL_PERIODIC_INTERNAL_TIME * You can use the default MYSQL_PERIODIC_INTERNAL_TIME
*/ */
void periodic_maintenance(); void tloop();
/** /**
* Destruktor * Destruktor

View File

@ -1,76 +1,88 @@
#include "../lib/mysql.hpp" #include "../lib/mysql.hpp"
marcelb::mysql::MySQL::MySQL(const string _path, const string _username, const string _password, const string _db, const uint _available, const periodical_engine _engine_type) { marcelb::mysql::MySQL::MySQL(const string _path, const string _username, const string _password, const string _db, const uint32_t _available, const time_loop_type _engine_type) {
path = _path; path = _path;
username = _username; username = _username;
password = _password; password = _password;
db = _db; database = _db;
available = _available > 0 ? _available : 1; pool_size = _available > 0 ? _available : 1;
engine_type = _engine_type; tloop_type = _engine_type;
drv = get_mysql_driver_instance(); drv = get_mysql_driver_instance();
connect_pool();
if (engine_type == periodical_engine::internal) { if (tloop_type == time_loop_type::internal) {
periodic_engin = async(launch::async, [&](){ tloop_future = async(launch::async, [&](){
while (run_engin) { while (run_tloop) {
usleep(MYSQL_PERIODIC_INTERNAL_TIME*1000); usleep(MYSQL_PERIODIC_INTERNAL_TIME*1000);
periodic_maintenance(); _tloop();
} }
return; return;
}); });
} }
// set on initialization to avoid the error
last_loop_time = time(nullptr);
} }
Connection* marcelb::mysql::MySQL::create_con() { Connection* marcelb::mysql::MySQL::create_connection() {
uint trys = 0; uint32_t trys = 0;
bool status = true; bool status = true;
Connection* new_con = NULL; Connection* new_con = NULL;
while (reconTrys == unlimited ? status : (trys <= reconTrys && status)) { while (connect_trys == unlimited ? status : (trys <= connect_trys && status)) {
try { try {
Connection* con_can = drv->connect(path, username, password); Connection* con_can = drv->connect(path, username, password);
con_can->setSchema(database);
status = !con_can->isValid(); status = !con_can->isValid();
if (!status) { if (!status) {
new_con = con_can; new_con = con_can;
} }
else if (!con_can->isClosed()) { else if (!con_can->isClosed()) {
disconnect_one(con_can); disconnect_connection(con_can);
} }
} }
catch (const SQLException &error) { catch (const SQLException &error) {
cout << error.what() << endl; if (on_error) {
on_error(error.what() + string(", SQL state: ") + error.getSQLState() + string(", Error code: ") + to_string(error.getErrorCode()));
}
usleep(reconnectSleep); usleep(reconnectSleep);
reconTrys == unlimited ? trys : trys++; connect_trys == unlimited ? trys : trys++;
} }
} }
return new_con; return new_con;
} }
bool marcelb::mysql::MySQL::disconnect() { void marcelb::mysql::MySQL::connect_pool() {
io.lock(); lock_guard<mutex> lock(io);
bool status = true; for (uint32_t i=0; i<pool_size; i++) {
Connection* connection = create_connection();
for (uint i=0; i<con.size(); i++) { connection_pool.push(connection);
status = disconnect_one(con[i]) ; }
} }
io.unlock(); void marcelb::mysql::MySQL::disconnect_pool() {
return status; lock_guard<mutex> lock(io);
for (uint32_t i=0; i<connection_pool.size(); i++) {
Connection* connection = connection_pool.front();
connection_pool.pop();
disconnect_connection(connection) ;
}
} }
bool marcelb::mysql::MySQL::disconnect_one(Connection* con_ptr) { bool marcelb::mysql::MySQL::disconnect_connection(Connection* connection) {
bool status = !con_ptr->isClosed(); bool status = !connection->isClosed();
if (status) { if (status) {
try { try {
con_ptr->close(); connection->close();
status = !con_ptr->isClosed(); status = !connection->isClosed();
} }
catch (const SQLException &error) { catch (const SQLException &error) {
cout << error.what() << endl; if (on_error) {
on_error(error.what() + string(", SQL state: ") + error.getSQLState() + string(", Error code: ") + to_string(error.getErrorCode()));
}
status = true; status = true;
} }
} }
@ -79,97 +91,77 @@ bool marcelb::mysql::MySQL::disconnect_one(Connection* con_ptr) {
status = false; // već je zatvorena status = false; // već je zatvorena
} }
delete con_ptr; delete connection;
return status; return status;
} }
bool marcelb::mysql::MySQL::open_one(Connection* con_ptr) { void marcelb::mysql::MySQL::_tloop() {
bool status = true; // ako true greška je if (!run_tloop) {
uint trys = 0; return;
}
while (reconTrys == unlimited ? status : (trys <= reconTrys && status)) { lock_guard<mutex> lock(io);
for (size_t i=0; i<connection_pool.size(); i++) {
try { try {
if (con_ptr->isValid()) { Connection *conn = connection_pool.front();
con_ptr->setSchema(db); connection_pool.pop();
status = false; if (conn->isValid()) {
connection_pool.push(conn);
} else {
if (!conn->isClosed()){
conn->close();
} }
else { Connection *n_conn = create_connection();
break; release_connection(n_conn);
} }
} catch (const SQLException &error) {
if (on_error) {
on_error(error.what() + string(", SQL state: ") + error.getSQLState() + string(", Error code: ") + to_string(error.getErrorCode()));
} }
catch (const SQLException &error) {
cout << error.what() << endl;
usleep(reconnectSleep);
reconTrys == unlimited ? trys : trys++;
} }
} }
return status; last_loop_time = time(nullptr);
} }
/** Connection* marcelb::mysql::MySQL::occupy_connection() {
* Broj pokušaja usljed povezivanja s bazom od 1 do unlimited; if (last_loop_time + (MYSQL_PERIODIC_INTERNAL_TIME*3/1000) < time(nullptr)) {
*/ if (on_error) {
on_error("The time loop is not executing properly");
void marcelb::mysql::MySQL::reconnectTrys(const uint _trys) { }
io.lock(); }
reconTrys = _trys; unique_lock<mutex> lock(io);
io.unlock(); while (connection_pool.empty()) {
condition.wait(lock);
}
Connection *connection = connection_pool.front();
connection_pool.pop();
return connection;
} }
Connection* marcelb::mysql::MySQL::shift_con() { void marcelb::mysql::MySQL::release_connection(Connection* connection) {
while (true) { lock_guard<std::mutex> lock(io);
while(con.size()) { connection_pool.push(connection);
io.lock(); condition.notify_one();
Connection* con_ptr = con[0];
con.pop_front();
if (con_ptr->isValid()) {
io.unlock();
return con_ptr;
}
io.unlock();
}
usleep(1000);
}
} }
marcelb::mysql::MySQL::~MySQL() { marcelb::mysql::MySQL::~MySQL() {
if (engine_type == periodical_engine::internal) { if (tloop_type == time_loop_type::internal) {
run_engin = false; run_tloop = false;
periodic_engin.get(); tloop_future.get();
} else { } else {
run_tloop = false;
} }
disconnect(); disconnect_pool();
} }
void marcelb::mysql::MySQL::periodic_maintenance() { void marcelb::mysql::MySQL::tloop() {
while (available>con.size() && run_engin) { if (tloop_type == time_loop_type::internal) {
try { if (on_error) {
Connection* new_con_ptr = create_con(); on_error("Can't start external call tloop, internal is active!");
if (!db.empty()) {
if (open_one(new_con_ptr)) {
throw string("[ERROR] Unable to open database " + db);
}
}
io.lock();
con.push_back(new_con_ptr);
io.unlock();
} catch (const SQLException except) {
cout << except.what() << endl;
} catch (const string except) {
cout << except << endl;
}
}
for (int i=0; i<con.size() && run_engin; i++) {
if (!con[i]->isValid()) {
io.lock();
con.erase(con.begin()+i);
io.unlock();
i--;
} }
return;
} }
_tloop();
} }

View File

@ -13,18 +13,25 @@ using namespace marcelb::asynco;
int main() { int main() {
try { try {
MySQL mydb("tcp://192.168.2.10:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 10, periodical_engine::external); // MySQL mydb("tcp://192.168.2.10:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 5, time_loop_type::internal);
MySQL mydb("tcp://bitelex.ddns.net:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 5, time_loop_type::external);
// MySQL mydb("tcp://bitelex.ddns.net:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 5); // MySQL mydb("tcp://bitelex.ddns.net:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 5);
periodic mysql_maintenance ( [&mydb] () { mydb.on_error = [](const string& error) {
cout << "IZVRŠAVA SE ENGINE" << endl; cout << error << endl;
mydb.periodic_maintenance(); };
periodic mysql_tloop ( [&mydb] () {
cout << "loop---------------------------" << endl;
mydb.tloop();
}, MYSQL_PERIODIC_INTERNAL_TIME); }, MYSQL_PERIODIC_INTERNAL_TIME);
while (true) {
sleep(5); sleep(5);
auto start = high_resolution_clock::now(); auto start = high_resolution_clock::now();
auto a1 = atask ( [&mydb] () { auto a1 = nonsync ( [&mydb] () {
try { try {
auto response = mydb.exec<int,string>("SELECT id,domain FROM records WHERE enabled = 1;"); auto response = mydb.exec<int,string>("SELECT id,domain FROM records WHERE enabled = 1;");
cout << response.affected << " " << response.have_result << endl; cout << response.affected << " " << response.have_result << endl;
@ -38,12 +45,12 @@ int main() {
cout << column_name << endl; cout << column_name << endl;
} }
} catch (const string err) { } catch (const SQLException error) {
cout << err << endl; cout << error.what() << endl;
} }
}); });
auto a2 = atask ( [&mydb] () { auto a2 = nonsync ( [&mydb] () {
try { try {
auto response = mydb.exec<string,string>("SELECT zonename,auth_key FROM zones;"); auto response = mydb.exec<string,string>("SELECT zonename,auth_key FROM zones;");
cout << response.affected << " " << response.have_result << endl; cout << response.affected << " " << response.have_result << endl;
@ -57,12 +64,12 @@ int main() {
cout << column_name << endl; cout << column_name << endl;
} }
} catch (const string err) { } catch (const SQLException error) {
cout << err << endl; cout << error.what() << endl;
} }
}); });
auto a3 = atask ( [&mydb] () { auto a3 = nonsync ( [&mydb] () {
try { try {
auto response = mydb.exec<string,string>("SELECT username,email FROM users WHERE enabled = 1;"); auto response = mydb.exec<string,string>("SELECT username,email FROM users WHERE enabled = 1;");
cout << response.affected << " " << response.have_result << endl; cout << response.affected << " " << response.have_result << endl;
@ -76,61 +83,83 @@ int main() {
cout << column_name << endl; cout << column_name << endl;
} }
} catch (const string err) { } catch (const SQLException error) {
cout << err << endl; cout << error.what() << endl;
}
});
auto a4 = nonsync ( [&mydb] () {
try {
auto response = mydb.exec<int,string>("SELECT id,domain FROM records WHERE enabled = 1;");
cout << response.affected << " " << response.have_result << endl;
cout << response.rows << " " << response.columns << endl;
for (auto row : response) {
cout << get<0>(row) << " " << get<1>(row) << endl;
}
for (auto column_name : response.columns_name) {
cout << column_name << endl;
}
} catch (const SQLException error) {
cout << error.what() << endl;
}
});
auto a5 = nonsync ( [&mydb] () {
try {
auto response = mydb.exec<string,string>("SELECT zonename,auth_key FROM zones;");
cout << response.affected << " " << response.have_result << endl;
cout << response.rows << " " << response.columns << endl;
for (auto row : response) {
cout << get<0>(row) << " " << get<1>(row) << endl;
}
for (auto column_name : response.columns_name) {
cout << column_name << endl;
}
} catch (const SQLException error) {
cout << error.what() << endl;
}
});
auto a6 = nonsync ( [&mydb] () {
try {
auto response = mydb.exec<string,string>("SELECT username,email FROM users WHERE enabled = 1;");
cout << response.affected << " " << response.have_result << endl;
cout << response.rows << " " << response.columns << endl;
for (auto row : response) {
cout << get<0>(row) << " " << get<1>(row) << endl;
}
for (auto column_name : response.columns_name) {
cout << column_name << endl;
}
} catch (const SQLException error) {
cout << error.what() << endl;
} }
}); });
wait(a1); wait(a1);
wait(a2); wait(a2);
wait(a3); wait(a3);
wait(a4);
wait(a5);
wait(a6);
// one by one
// try {
// auto response = mydb.exec<int,string>("SELECT id,domain FROM records WHERE enabled = 1;");
// // auto response = mydb.exec<int,string>("UPDATE records SET enabled = 1;");
// cout << response.affected << " " << response.have_result << endl;
// cout << response.rows << " " << response.columns << endl;
// for (auto row : response) {
// cout << get<0>(row) << " " << get<1>(row) << endl;
// }
// for (auto column_name : response.columns_name) {
// cout << column_name << endl;
// }
// } catch (const string err) {
// cout << err << endl;
// }
// auto a1 = atask ( [&mydb] () {
// try {
// auto response = mydb.exec<string,string>("SELECT username,email FROM records WHERE enabled = 1;");
// cout << response.affected << " " << response.have_result << endl;
// cout << response.rows << " " << response.columns << endl;
// for (auto row : response) {
// cout << get<0>(row) << " " << get<1>(row) << endl;
// }
// for (auto column_name : response.columns_name) {
// cout << column_name << endl;
// }
// } catch (const string err) {
// cout << err << endl;
// }
// });
// wait(a1);
auto end = high_resolution_clock::now(); auto end = high_resolution_clock::now();
auto duration = duration_cast<microseconds>(end - start); auto duration = duration_cast<microseconds>(end - start);
cout << "-------------Izvršilo se za: " << (double)(duration.count() / 1000.0) << " ms"<< endl; cout << "-------------Izvršilo se za: " << (double)(duration.count() / 1000.0) << " ms"<< endl;
sleep(100);
}
sleep(100);
} catch (const SQLException error) { } catch (const SQLException error) {
cout << error.what() << endl; cout << error.what() << endl;