External periodic engine support

generic v0.8
marcelb 3 months ago
parent 0ae1c69782
commit cf969c0424
  1. 72
      README.md
  2. 15
      lib/core_orm.hpp
  3. 33
      lib/mysql.hpp
  4. 96
      src/mysql.cpp
  5. 210
      test/test.cpp

@ -13,6 +13,7 @@ A small framework for basic MySQL database operations via MySQL/Connector++
- Response object
- Thread safe
- Exceptions
- Can use external periodic maintenance for connection management
## Installation
@ -31,7 +32,15 @@ using namespace marcelb;
## Usage
### Internal engine
It internally initializes a single thread that periodically checks the states of the connection pool, adds new ones as needed, and cleans up inactive ones.
```c++
#include "../lib/mysql.hpp"
using namespace marcelb::mysql;
/**
* Init
*/
@ -62,6 +71,69 @@ try { | | | |
cout << err << endl;
}
```
### External engine
As I developed quite a few wrappers that have some internal thread, I realized that it was inefficient and made it possible to call the necessary functions periodically outside (one thread per whole application or timer (ASIO), or my asynco wrapper).
```c++
#include "../lib/mysql.hpp"
using namespace marcelb::mysql;
#include "../../asynco/lib/timers.hpp"
using namespace marcelb::asynco;
/**
* Init
*/
MySQL mydb("tcp://192.168.2.10:3306", "user_nm", "passss", "my_db", 5, periodical_engine::external);
periodic mysql_maintenance ( [&mydb] () {
cout << "IZVRŠAVA SE ENGINE" << endl;
mydb.periodic_maintenance();
}, MYSQL_PERIODIC_INTERNAL_TIME);
/**
* You can call multiple queries asynchronously
*/
auto a1 = atask ( [&mydb] () {
try {
auto response = mydb.exec<int,string>("SELECT id,domain FROM records WHERE enabled = 1;");
for (auto row : response) {
cout << get<0>(row) << " " << get<1>(row) << endl;
}
} catch (const string err) {
cout << err << endl;
}
});
auto a2 = atask ( [&mydb] () {
try {
auto response = mydb.exec<string,string>("SELECT zonename,auth_key FROM zones;");
for (auto row : response) {
cout << get<0>(row) << " " << get<1>(row) << endl;
}
} catch (const string err) {
cout << err << endl;
}
});
auto a3 = atask ( [&mydb] () {
try {
auto response = mydb.exec<string,string>("SELECT username,email FROM users WHERE enabled = 1;");
for (auto row : response) {
cout << get<0>(row) << " " << get<1>(row) << endl;
}
} catch (const string err) {
cout << err << endl;
}
});
wait(a1);
wait(a2);
wait(a3);
```
## License
[APACHE 2.0](http://www.apache.org/licenses/LICENSE-2.0/)

@ -1,15 +0,0 @@
#ifndef _MYSQL_CORE_ORM_
#define _MYSQL_CORE_ORM_
using namespace std;
namespace marcelb {
/**
* Implementiraj klase za ORM koje će se ekstendati
*/
}
#endif

@ -25,6 +25,23 @@ using namespace sql;
using namespace mysql;
namespace marcelb {
namespace mysql {
/**
*
*/
#define MYSQL_PERIODIC_INTERNAL_TIME 1000
/**
* An enumeration of how periodic functions will be run
* internal - run periodic_maintenance() i new thread
* external - expects periodic_maintenance() to be run periodically outside the library
*
*/
enum class periodical_engine {
internal,
external
};
/**
* A class for creating sql responses
@ -42,7 +59,6 @@ class MySQL_Res : public vector<tuple<Types...>> {
/**
* Type conversion functions
*/
template<typename T>
T getValue(ResultSet* res, int column);
template<>
@ -86,8 +102,9 @@ class MySQL {
string path, username, password, db;
uint available;
uint reconTrys = 3;
bool runBot = true;
future<void> bot;
bool run_engin = true;
future<void> periodic_engin;
periodical_engine engine_type;
/**
* Open one database
@ -126,7 +143,7 @@ class MySQL {
* username, password, database name,
* and number of active connections (optional)
*/
MySQL(const string _path, const string _username, const string _password, const string _db, const uint _available = 1);
MySQL(const string _path, const string _username, const string _password, const string _db, const uint _available = 1, const periodical_engine _engine_type = periodical_engine::internal);
/**
* Disconnect all connections to server
@ -184,6 +201,13 @@ class MySQL {
return result;
}
/**
* If you are using an external periodic motor,
* please call this function in it for proper operation at a certain time interval.
* You can use the default MYSQL_PERIODIC_INTERNAL_TIME
*/
void periodic_maintenance();
/**
* Destruktor
* close all connections
@ -193,6 +217,7 @@ class MySQL {
};
}
}
#endif

@ -1,52 +1,29 @@
#include "../lib/mysql.hpp"
marcelb::MySQL::MySQL(const string _path, const string _username, const string _password, const string _db, const uint _available) {
marcelb::mysql::MySQL::MySQL(const string _path, const string _username, const string _password, const string _db, const uint _available, const periodical_engine _engine_type) {
path = _path;
username = _username;
password = _password;
db = _db;
available = _available > 0 ? _available : 1;
engine_type = _engine_type;
drv = get_mysql_driver_instance();
bot = async(launch::async, [&](){
while (runBot) {
sleep(1);
while (available>con.size() && runBot) {
try {
Connection* new_con_ptr = create_con();
if (!db.empty()) {
if (open_one(new_con_ptr)) {
throw string("[ERROR] Unable to open database " + db);
}
}
io.lock();
con.push_back(new_con_ptr);
io.unlock();
} catch (const SQLException except) {
cout << except.what() << endl;
} catch (const string except) {
cout << except << endl;
}
if (engine_type == periodical_engine::internal) {
periodic_engin = async(launch::async, [&](){
while (run_engin) {
usleep(MYSQL_PERIODIC_INTERNAL_TIME*1000);
periodic_maintenance();
}
for (int i=0; i<con.size() && runBot; i++) {
if (!con[i]->isValid()) {
io.lock();
con.erase(con.begin()+i);
io.unlock();
i--;
}
}
}
return;
});
return;
});
}
}
Connection* marcelb::MySQL::create_con() {
Connection* marcelb::mysql::MySQL::create_con() {
uint trys = 0;
bool status = true;
Connection* new_con = NULL;
@ -72,7 +49,7 @@ Connection* marcelb::MySQL::create_con() {
return new_con;
}
bool marcelb::MySQL::disconnect() {
bool marcelb::mysql::MySQL::disconnect() {
io.lock();
bool status = true;
@ -84,7 +61,7 @@ bool marcelb::MySQL::disconnect() {
return status;
}
bool marcelb::MySQL::disconnect_one(Connection* con_ptr) {
bool marcelb::mysql::MySQL::disconnect_one(Connection* con_ptr) {
bool status = !con_ptr->isClosed();
if (status) {
@ -106,7 +83,7 @@ bool marcelb::MySQL::disconnect_one(Connection* con_ptr) {
return status;
}
bool marcelb::MySQL::open_one(Connection* con_ptr) {
bool marcelb::mysql::MySQL::open_one(Connection* con_ptr) {
bool status = true; // ako true greška je
uint trys = 0;
@ -134,13 +111,13 @@ bool marcelb::MySQL::open_one(Connection* con_ptr) {
* Broj pokušaja usljed povezivanja s bazom od 1 do unlimited;
*/
void marcelb::MySQL::reconnectTrys(const uint _trys) {
void marcelb::mysql::MySQL::reconnectTrys(const uint _trys) {
io.lock();
reconTrys = _trys;
io.unlock();
}
Connection* marcelb::MySQL::shift_con() {
Connection* marcelb::mysql::MySQL::shift_con() {
while (true) {
while(con.size()) {
io.lock();
@ -156,8 +133,43 @@ Connection* marcelb::MySQL::shift_con() {
}
}
marcelb::MySQL::~MySQL() {
runBot = false;
bot.get();
marcelb::mysql::MySQL::~MySQL() {
if (engine_type == periodical_engine::internal) {
run_engin = false;
periodic_engin.get();
} else {
}
disconnect();
}
void marcelb::mysql::MySQL::periodic_maintenance() {
while (available>con.size() && run_engin) {
try {
Connection* new_con_ptr = create_con();
if (!db.empty()) {
if (open_one(new_con_ptr)) {
throw string("[ERROR] Unable to open database " + db);
}
}
io.lock();
con.push_back(new_con_ptr);
io.unlock();
} catch (const SQLException except) {
cout << except.what() << endl;
} catch (const string except) {
cout << except << endl;
}
}
for (int i=0; i<con.size() && run_engin; i++) {
if (!con[i]->isValid()) {
io.lock();
con.erase(con.begin()+i);
io.unlock();
i--;
}
}
}

@ -1,156 +1,135 @@
#include <iostream>
#include <thread>
#include <chrono>
using namespace std;
using namespace chrono;
#include "../lib/mysql.hpp"
using namespace marcelb::mysql;
using namespace std;
using namespace chrono;
using namespace marcelb;
#include "../../asynco/lib/asynco.hpp"
#include "../../asynco/lib/timers.hpp"
using namespace marcelb::asynco;
int main() {
try {
// MySQL mydb("tcp://192.168.2.10:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 1);
MySQL mydb("tcp://bitelex.ddns.net:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 5);
sleep(2);
MySQL mydb("tcp://192.168.2.10:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 10, periodical_engine::external);
// MySQL mydb("tcp://bitelex.ddns.net:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 5);
periodic mysql_maintenance ( [&mydb] () {
cout << "IZVRŠAVA SE ENGINE" << endl;
mydb.periodic_maintenance();
}, MYSQL_PERIODIC_INTERNAL_TIME);
sleep(5);
auto start = high_resolution_clock::now();
auto a1 = atask ( [&mydb] () {
try {
auto response = mydb.exec<int,string>("SELECT id,domain FROM records WHERE enabled = 1;");
cout << response.affected << " " << response.have_result << endl;
cout << response.rows << " " << response.columns << endl;
// thread t1([&](){
// try {
// sqlQA test_qa;
// test_qa.select().from("records").where("enabled = 1");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// });
for (auto row : response) {
cout << get<0>(row) << " " << get<1>(row) << endl;
}
// // sleep(2);
for (auto column_name : response.columns_name) {
cout << column_name << endl;
}
// thread t2([&](){
// try {
// sqlQA test_qa;
// test_qa.select().from("zones");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// });
// // sleep(3);
} catch (const string err) {
cout << err << endl;
}
});
// thread t3([&](){
// try {
// sqlQA test_qa;
// test_qa.select().from("users");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// });
auto a2 = atask ( [&mydb] () {
try {
auto response = mydb.exec<string,string>("SELECT zonename,auth_key FROM zones;");
cout << response.affected << " " << response.have_result << endl;
cout << response.rows << " " << response.columns << endl;
// // sleep(1);
for (auto row : response) {
cout << get<0>(row) << " " << get<1>(row) << endl;
}
// thread t4([&](){
// try {
// sqlQA test_qa;
// test_qa.select().from("records").where("enabled = 1");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// });
for (auto column_name : response.columns_name) {
cout << column_name << endl;
}
// thread t5([&](){
// try {
// sqlQA test_qa;
// test_qa.select().from("zones");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// });
} catch (const string err) {
cout << err << endl;
}
});
// thread t6([&](){
// try {
// sqlQA test_qa;
// test_qa.select().from("users");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// });
auto a3 = atask ( [&mydb] () {
try {
auto response = mydb.exec<string,string>("SELECT username,email FROM users WHERE enabled = 1;");
cout << response.affected << " " << response.have_result << endl;
cout << response.rows << " " << response.columns << endl;
// t1.join();
// t2.join();
// t3.join();
// t4.join();
// t5.join();
// t6.join();
for (auto row : response) {
cout << get<0>(row) << " " << get<1>(row) << endl;
}
// one by one
try {
// sqlQA test_qa;
// test_qa.select("id,domain").from("records").where("enabled = 1");
// cout << test_qa.cmd << endl;
// mydb.exec(test_qa);
// test_qa.print(true);
auto response = mydb.exec<int,string>("SELECT id,domain FROM records WHERE enabled = 0;");
// auto response = mydb.exec<int,string>("UPDATE records SET enabled = 1;");
cout << response.affected << " " << response.have_result << endl;
cout << response.rows << " " << response.columns << endl;
for (auto row : response) {
cout << get<0>(row) << " " << get<1>(row) << endl;
}
for (auto column_name : response.columns_name) {
cout << column_name << endl;
}
for (auto column_name : response.columns_name) {
cout << column_name << endl;
} catch (const string err) {
cout << err << endl;
}
});
} catch (const string err) {
cout << err << endl;
}
wait(a1);
wait(a2);
wait(a3);
// sleep(20);
// one by one
// try {
// sqlQA test_qa;
// test_qa.select().from("users");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// auto response = mydb.exec<int,string>("SELECT id,domain FROM records WHERE enabled = 1;");
// // auto response = mydb.exec<int,string>("UPDATE records SET enabled = 1;");
// cout << response.affected << " " << response.have_result << endl;
// cout << response.rows << " " << response.columns << endl;
// sleep(20);
// for (auto row : response) {
// cout << get<0>(row) << " " << get<1>(row) << endl;
// }
// for (auto column_name : response.columns_name) {
// cout << column_name << endl;
// }
// try {
// sqlQA test_qa;
// test_qa.select("zone_id,record_type,enabled").from("records").where("domain = 'bitelex.test'");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// auto a1 = atask ( [&mydb] () {
// try {
// auto response = mydb.exec<string,string>("SELECT username,email FROM records WHERE enabled = 1;");
// cout << response.affected << " " << response.have_result << endl;
// cout << response.rows << " " << response.columns << endl;
// for (auto row : response) {
// cout << get<0>(row) << " " << get<1>(row) << endl;
// }
// for (auto column_name : response.columns_name) {
// cout << column_name << endl;
// }
// } catch (const string err) {
// cout << err << endl;
// }
// });
// wait(a1);
auto end = high_resolution_clock::now();
auto duration = duration_cast<microseconds>(end - start);
cout << "-------------Izvršilo se za: " << (double)(duration.count() / 1000.0) << " ms"<< endl;
// sleep(100);
sleep(100);
} catch (const SQLException error) {
@ -161,7 +140,8 @@ int main() {
cout << "Jebi ga" << endl;
}
sleep(600);
// sleep(600);
_asynco_engine.run();
return 0;
}
Loading…
Cancel
Save