Compare commits

..

4 Commits

  1. 4
      .vscode/settings.json
  2. 55
      README.md
  3. 111
      lib/mysql.hpp
  4. 137
      src/mysql.cpp
  5. 3
      test/compile.sh
  6. 90
      test/test.cpp

@ -61,6 +61,8 @@
"cinttypes": "cpp", "cinttypes": "cpp",
"typeinfo": "cpp", "typeinfo": "cpp",
"any": "cpp", "any": "cpp",
"variant": "cpp" "variant": "cpp",
"*.ipp": "cpp",
"bitset": "cpp"
} }
} }

@ -13,7 +13,7 @@ A small framework for basic MySQL database operations via MySQL/Connector++
- Response object - Response object
- Thread safe - Thread safe
- Exceptions and log error callback - Exceptions and log error callback
- Can use external time loop for connection management - Support my Asynco wrapper around Boost ASIO and support threads
## Installation ## Installation
@ -22,6 +22,11 @@ First install dependency MySQL/Connector++
``` ```
sudo apt install libmysqlcppconn-dev sudo apt install libmysqlcppconn-dev
``` ```
If you are going to use with an Asynco wrapper, download the archive from the profile and install the Boost dependencies
```
sudo apt install libboost-all-dev
```
Just download the latest release and unzip it into your project. You can turn it on with: Just download the latest release and unzip it into your project. You can turn it on with:
@ -29,7 +34,15 @@ Just download the latest release and unzip it into your project. You can turn it
#include "mysql/lib/mysql.hpp" #include "mysql/lib/mysql.hpp"
using namespace marcelb; using namespace marcelb;
``` ```
Compiling
```
# use with my Asynco lib
g++ -DMYSQL_USE_ASYNCO test.cpp ../src/* ../../asynco/src/* -o test.o -lmysqlcppconn -lpthread
# or use without asnyco (in multithread)
g++ test.cpp ../src/* ../../asynco/src/* -o test.o -lmysqlcppconn -lpthread
```
## Usage ## Usage
### Internal engine ### Internal engine
@ -46,6 +59,14 @@ using namespace marcelb::mysql;
*/ */
MySQL mydb("tcp://192.168.2.10:3306", "user_nm", "passss", "my_db", 5); MySQL mydb("tcp://192.168.2.10:3306", "user_nm", "passss", "my_db", 5);
mydb.on_error = [](const string& error) {
cout << error << endl;
};
mydb.on_connect = []() {
cout << "Init all pool connection done" << endl;
};
/** /**
* Use ------------------ * Use ------------------
*/ | | */ | |
@ -72,34 +93,16 @@ try { | | | |
} }
``` ```
### External engine ### Run async with Asynco
As I developed quite a few wrappers that have some internal thread, I realized that it was inefficient and made it possible to call the necessary functions periodically outside (one thread per whole application or timer (ASIO), or my asynco wrapper). As I developed quite a few wrappers that have some internal thread, I realized that it was inefficient and made it possible to call the necessary functions periodically outside (one thread per whole application or timer (ASIO), or my asynco wrapper).
```c++ ```c++
#include "../lib/mysql.hpp"
using namespace marcelb::mysql;
#include "../../asynco/lib/timers.hpp"
using namespace marcelb::asynco;
/**
* Init
*/
MySQL mydb("tcp://192.168.2.10:3306", "user_nm", "passss", "my_db", 5, time_loop_type::external);
periodic mysql_maintenance ( [&mydb] () {
mydb.periodic_maintenance();
}, MYSQL_PERIODIC_INTERNAL_TIME);
mydb.set_on_error( [](const string& error) {
cout << error << endl; // print or log
});
/** /**
* You can call multiple queries asynchronously * You can call multiple queries asynchronously
*/ */
auto a1 = atask ( [&mydb] () { auto a1 = async_ ( [&mydb] () {
try { try {
auto response = mydb.exec<int,string>("SELECT id,domain FROM records WHERE enabled = 1;"); auto response = mydb.exec<int,string>("SELECT id,domain FROM records WHERE enabled = 1;");
for (auto row : response) { for (auto row : response) {
@ -110,7 +113,7 @@ auto a1 = atask ( [&mydb] () {
} }
}); });
auto a2 = atask ( [&mydb] () { auto a2 = async_ ( [&mydb] () {
try { try {
auto response = mydb.exec<string,string>("SELECT zonename,auth_key FROM zones;"); auto response = mydb.exec<string,string>("SELECT zonename,auth_key FROM zones;");
for (auto row : response) { for (auto row : response) {
@ -121,7 +124,7 @@ auto a2 = atask ( [&mydb] () {
} }
}); });
auto a3 = atask ( [&mydb] () { auto a3 = async_ ( [&mydb] () {
try { try {
auto response = mydb.exec<string,string>("SELECT username,email FROM users WHERE enabled = 1;"); auto response = mydb.exec<string,string>("SELECT username,email FROM users WHERE enabled = 1;");
for (auto row : response) { for (auto row : response) {
@ -132,9 +135,9 @@ auto a3 = atask ( [&mydb] () {
} }
}); });
wait(a1); await(a1);
wait(a2); await(a2);
wait(a3); await(a3);
``` ```
## License ## License

@ -8,7 +8,8 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include <tuple> #include <tuple>
#include "ctime" #include <chrono>
using namespace std;
#include <mysql_driver.h> #include <mysql_driver.h>
#include <mysql_connection.h> #include <mysql_connection.h>
@ -17,32 +18,28 @@
#include <cppconn/statement.h> #include <cppconn/statement.h>
#include <cppconn/prepared_statement.h> #include <cppconn/prepared_statement.h>
#include <cppconn/resultset.h> #include <cppconn/resultset.h>
#define unlimited 0
#define reconnectSleep 10000 // in us
using namespace std;
using namespace sql; using namespace sql;
using namespace mysql; using namespace mysql;
#ifdef MYSQL_USE_ASYNCO
#include "../../asynco/lib/asynco.hpp"
#include "../../asynco/lib/timers.hpp"
using namespace marcelb::asynco;
#endif
#define unlimited 0
#define reconnectSleep 1000 // in us
namespace marcelb { namespace marcelb {
namespace mysql { namespace mysql {
/** /**
* *
*/ */
#define MYSQL_PERIODIC_INTERNAL_TIME 1000 #define MYSQL_PERIODIC_INTERNAL_TIME 5000
#define MYSQL_MAX_CONNECTION_INIT_SAME_TIME 10
/** #define MYSQL_DELAYED_CONNECT_TIME 1000
* An enumeration of how periodic functions will be run
* internal - run periodic_maintenance() i new thread
* external - expects periodic_maintenance() to be run periodically outside the library
*
*/
enum class time_loop_type {
internal,
external
};
/** /**
* A class for creating sql responses * A class for creating sql responses
@ -103,10 +100,16 @@ class MySQL {
queue<Connection*> connection_pool; queue<Connection*> connection_pool;
string path, username, password, database; string path, username, password, database;
uint32_t pool_size; uint32_t pool_size;
const uint32_t max_t = thread::hardware_concurrency();
uint32_t t = 1;
#ifdef MYSQL_USE_ASYNCO
periodic p_loop;
// delayed d_connect;
#else
future<void> tloop_future, connect_f;
bool run_tloop = true; bool run_tloop = true;
future<void> tloop_future;
time_loop_type tloop_type; #endif
time_t last_loop_time;
/** /**
* Open one database server connection * Open one database server connection
@ -153,10 +156,59 @@ class MySQL {
* Internal tloop periodic * Internal tloop periodic
*/ */
void _tloop(); void _tloop(uint32_t b, uint32_t e);
/**
*
*/
vector<pair<int, int>> divide_and_conquer(int n, int t) {
vector<pair<int, int>> pairs;
int part_size = n / t;
int residue = n % t; // Ostatak koji treba rasporediti
for (int i = 0; i < t; i++) {
int start = i * part_size + std::min(i, residue);
int end = start + part_size + (i < residue ? 1 : 0);
pairs.push_back(make_pair(start, end));
}
return pairs;
}
/**
* Automatically adjusts
*/
void auto_adjusts(const uint32_t duration) {
if (duration > (MYSQL_PERIODIC_INTERNAL_TIME*2)/3) {
if (t < max_t) {
t++; // povećaj broj asinkronih zadataka drugi put
}
else if (on_error) {
on_error("Periodic maintenance takes too long");
}
}
else if (duration < MYSQL_PERIODIC_INTERNAL_TIME/3 && t > 1) {
t--; // smanji nema potrebe
}
}
#ifndef MYSQL_USE_ASYNCO
/**
*
*/
uint64_t timestamp() {
return chrono::duration_cast<chrono::milliseconds>(
chrono::system_clock::now().time_since_epoch()
).count();
};
#endif
public: public:
function<void(const string&)> on_error; function<void(const string&)> on_error;
function<void()> on_connect;
uint32_t connect_trys = 3; uint32_t connect_trys = 3;
@ -166,7 +218,7 @@ public:
* username, password, database name, * username, password, database name,
* and number of active connections (optional) * and number of active connections (optional)
*/ */
MySQL(const string _path, const string _username, const string _password, const string _db, const uint32_t _available = 1, const time_loop_type _engine_type = time_loop_type::internal); MySQL(const string _path, const string _username, const string _password, const string _db, const uint32_t _available = 1);
/** /**
* Execute the SQL statement * Execute the SQL statement
@ -205,23 +257,14 @@ public:
stmt->close(); stmt->close();
delete stmt; delete stmt;
release_connection(connection); release_connection(connection);
} catch (sql::SQLException& e) { } catch (sql::SQLException& e) {
throw runtime_error(e.what()); throw runtime_error(e.what());
// std::cerr << "SQLState: " << e.getSQLState() << std::endl; // cerr << "SQLState: " << e.getSQLState() << endl;
// std::cerr << "Error code: " << e.getErrorCode() << std::endl; // cerr << "Error code: " << e.getErrorCode() << endl;
} }
return result; return result;
} }
/**
* If you are using an external periodic motor,
* please call this function in it for proper operation at a certain time interval.
* You can use the default MYSQL_PERIODIC_INTERNAL_TIME
*/
void tloop();
/** /**
* Destruktor * Destruktor
* close all connections * close all connections

@ -1,28 +1,69 @@
#include "../lib/mysql.hpp" #include "../lib/mysql.hpp"
marcelb::mysql::MySQL::MySQL(const string _path, const string _username, const string _password, const string _db, const uint32_t _available):
#ifdef MYSQL_USE_ASYNCO
p_loop(periodic( [&] () {
const uint64_t start = rtime_ms();
auto pairs = divide_and_conquer(connection_pool.size(), t);
marcelb::mysql::MySQL::MySQL(const string _path, const string _username, const string _password, const string _db, const uint32_t _available, const time_loop_type _engine_type) { vector<future<void>> fuve;
path = _path; for (auto& pair : pairs) {
username = _username; fuve.push_back(async_ ([&, pair](){
password = _password; _tloop(pair.first, pair.second);
database = _db; }));
pool_size = _available > 0 ? _available : 1; }
tloop_type = _engine_type;
drv = get_mysql_driver_instance(); for (auto& fu : fuve) {
connect_pool(); try {
await_(fu);
} catch (...) {
if(on_error) {
on_error("Error in maintenance periodic loop.");
}
}
}
auto_adjusts(rtime_ms() - start);
if (tloop_type == time_loop_type::internal) { }, MYSQL_PERIODIC_INTERNAL_TIME)),
tloop_future = async(launch::async, [&](){ #else
tloop_future (async(launch::async, [&](){
while (run_tloop) { while (run_tloop) {
usleep(MYSQL_PERIODIC_INTERNAL_TIME*1000); usleep(MYSQL_PERIODIC_INTERNAL_TIME*1000);
_tloop();
const uint64_t start = timestamp();
auto pairs = divide_and_conquer(connection_pool.size(), t);
vector<future<void>> fuve;
for (auto& pair : pairs) {
fuve.push_back(async (launch::async, [&, pair](){
_tloop(pair.first, pair.second);
}));
}
for (auto& fu : fuve) {
try {
fu.get();
} catch (...) {
if(on_error) {
on_error("Error in maintenance periodic loop.");
}
} }
return;
});
} }
// set on initialization to avoid the error
last_loop_time = time(nullptr); auto_adjusts(timestamp() - start);
}
return;
})),
#endif
path(_path),
username(_username),
password(_password),
database(_db),
pool_size(_available > 0 ? _available : 1) {
drv = get_mysql_driver_instance();
connect_pool();
} }
@ -30,7 +71,6 @@ Connection* marcelb::mysql::MySQL::create_connection() {
uint32_t trys = 0; uint32_t trys = 0;
bool status = true; bool status = true;
Connection* new_con = NULL; Connection* new_con = NULL;
while (connect_trys == unlimited ? status : (trys <= connect_trys && status)) { while (connect_trys == unlimited ? status : (trys <= connect_trys && status)) {
try { try {
Connection* con_can = drv->connect(path, username, password); Connection* con_can = drv->connect(path, username, password);
@ -56,11 +96,28 @@ Connection* marcelb::mysql::MySQL::create_connection() {
} }
void marcelb::mysql::MySQL::connect_pool() { void marcelb::mysql::MySQL::connect_pool() {
lock_guard<mutex> lock(io); auto connect_ = [&]() -> void {
const uint32_t tens = pool_size/MYSQL_MAX_CONNECTION_INIT_SAME_TIME;
for (uint32_t i=0; i<pool_size; i++) { for (uint32_t i=0; i<pool_size; i++) {
Connection* connection = create_connection(); Connection* connection = create_connection();
{
lock_guard<mutex> lock(io);
connection_pool.push(connection); connection_pool.push(connection);
} }
if ((i+1)%tens == 0) {
usleep(MYSQL_DELAYED_CONNECT_TIME*1000);
}
}
if (on_connect) {
on_connect();
}
};
#ifdef MYSQL_USE_ASYNCO
async_ (connect_);
#else
connect_f = async (launch::async, connect_);
#endif
} }
void marcelb::mysql::MySQL::disconnect_pool() { void marcelb::mysql::MySQL::disconnect_pool() {
@ -95,23 +152,20 @@ bool marcelb::mysql::MySQL::disconnect_connection(Connection* connection) {
return status; return status;
} }
void marcelb::mysql::MySQL::_tloop() { void marcelb::mysql::MySQL::_tloop(uint32_t b, uint32_t e) {
if (!run_tloop) { for (size_t i=b; i<connection_pool.size() && i<e; i++) {
return;
}
lock_guard<mutex> lock(io);
for (size_t i=0; i<connection_pool.size(); i++) {
try { try {
Connection *conn = connection_pool.front(); Connection *conn = nullptr; // provjeri ovdje svugdje možeš li koristiti release i ocupacy sada
connection_pool.pop(); conn = occupy_connection();
if (conn->isValid()) { if (conn->isValid()) {
connection_pool.push(conn); release_connection(conn);
} else { } else {
if (!conn->isClosed()){ if (!conn->isClosed()){
conn->close(); disconnect_connection(conn);
} }
Connection *n_conn = create_connection(); Connection *n_conn = create_connection();
release_connection(n_conn); release_connection(n_conn);
} }
} catch (const SQLException &error) { } catch (const SQLException &error) {
if (on_error) { if (on_error) {
@ -119,16 +173,9 @@ void marcelb::mysql::MySQL::_tloop() {
} }
} }
} }
last_loop_time = time(nullptr);
} }
Connection* marcelb::mysql::MySQL::occupy_connection() { Connection* marcelb::mysql::MySQL::occupy_connection() {
if (last_loop_time + (MYSQL_PERIODIC_INTERNAL_TIME*3/1000) < time(nullptr)) {
if (on_error) {
on_error("The time loop is not executing properly");
}
}
unique_lock<mutex> lock(io); unique_lock<mutex> lock(io);
while (connection_pool.empty()) { while (connection_pool.empty()) {
condition.wait(lock); condition.wait(lock);
@ -145,23 +192,11 @@ void marcelb::mysql::MySQL::release_connection(Connection* connection) {
} }
marcelb::mysql::MySQL::~MySQL() { marcelb::mysql::MySQL::~MySQL() {
if (tloop_type == time_loop_type::internal) { #ifdef MYSQL_USE_ASYNCO
p_loop.stop();
#else
run_tloop = false; run_tloop = false;
tloop_future.get(); tloop_future.get();
} else { #endif
run_tloop = false;
}
disconnect_pool(); disconnect_pool();
} }
void marcelb::mysql::MySQL::tloop() {
if (tloop_type == time_loop_type::internal) {
if (on_error) {
on_error("Can't start external call tloop, internal is active!");
}
return;
}
_tloop();
}

@ -1 +1,2 @@
g++ test.cpp ../src/* -o test.o -lmysqlcppconn -lpthread g++ -DMYSQL_USE_ASYNCO test.cpp ../src/* ../../asynco/src/* -o test.o -lmysqlcppconn -lpthread
# g++ test.cpp ../src/* ../../asynco/src/* -o test.o -lmysqlcppconn -lpthread

@ -4,34 +4,78 @@
using namespace std; using namespace std;
using namespace chrono; using namespace chrono;
#include "../lib/mysql.hpp"
using namespace marcelb::mysql;
#include "../../asynco/lib/asynco.hpp" #include "../../asynco/lib/asynco.hpp"
#include "../../asynco/lib/timers.hpp" #include "../../asynco/lib/timers.hpp"
using namespace marcelb::asynco; using namespace marcelb::asynco;
#include "../lib/mysql.hpp"
using namespace marcelb::mysql;
int main() { int main() {
auto inis = rtime_ms();
try { try {
const int n = 10;
// MySQL mydb("tcp://192.168.2.10:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 5, time_loop_type::internal); // MySQL mydb("tcp://192.168.2.10:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 5, time_loop_type::internal);
MySQL mydb("tcp://bitelex.ddns.net:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 5, time_loop_type::external); MySQL mydb("tcp://bitelex.ddns.net:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", n);
// MySQL mydb("tcp://bitelex.ddns.net:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 5); // MySQL mydb("tcp://bitelex.ddns.net:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 5);
cout << "--------------init: " << rtime_ms() - inis << endl;
mydb.on_error = [](const string& error) { mydb.on_error = [](const string& error) {
cout << error << endl; cout << error << endl;
}; };
periodic mysql_tloop ( [&mydb] () { mydb.on_connect = []() {
cout << "loop---------------------------" << endl; cout << "Init all pool connection done" << endl;
mydb.tloop(); };
}, MYSQL_PERIODIC_INTERNAL_TIME);
// periodic mysql_tloop ( [&mydb] () {
// auto l_start = rtime_ms();
// vector<future<void>> to_wait;
// for (int i=0, old_i=0; i<n; old_i=i) {
// i += 5;
// to_wait.push_back( async_ ([&, i, old_i](){
// try {
// auto start = rtime_ms();
// mydb.tloop(old_i, i);
// cout << "old " << old_i << " i " << i << endl;
// cout << "loop--------------------------- nema error, trajalo: " << rtime_ms() - start << endl;
// } catch (...) {
// cout << "Bude neki error u loopu" << endl;
// }
// }));
// }
// async_ ([&](){
// try {
// auto start = rtime_ms();
// mydb.tloop(4, 8);
// cout << "loop--------------------------- nema error, trajalo: " << rtime_ms() - start << endl;
// } catch (...) {
// cout << "Bude neki error u loopu" << endl;
// }
// });
// async_ ([&](){
// try {
// auto start = rtime_ms();
// mydb.tloop(8, 12);
// cout << "loop--------------------------- nema error, trajalo: " << rtime_ms() - start << endl;
// } catch (...) {
// cout << "Bude neki error u loopu" << endl;
// }
// });
// for (auto& tw : to_wait) {
// wait (tw);
// }
// cout << "all loop !!!!!!!!!!!!!!1, trajalo: " << rtime_ms() - l_start << endl;
// }, 5000);
while (true) { while (true) {
sleep(5); sleep(60);
auto start = high_resolution_clock::now(); auto start = high_resolution_clock::now();
auto a1 = nonsync ( [&mydb] () { auto a1 = async_ ( [&mydb] () {
try { try {
auto response = mydb.exec<int,string>("SELECT id,domain FROM records WHERE enabled = 1;"); auto response = mydb.exec<int,string>("SELECT id,domain FROM records WHERE enabled = 1;");
cout << response.affected << " " << response.have_result << endl; cout << response.affected << " " << response.have_result << endl;
@ -50,7 +94,7 @@ while (true) {
} }
}); });
auto a2 = nonsync ( [&mydb] () { auto a2 = async_ ( [&mydb] () {
try { try {
auto response = mydb.exec<string,string>("SELECT zonename,auth_key FROM zones;"); auto response = mydb.exec<string,string>("SELECT zonename,auth_key FROM zones;");
cout << response.affected << " " << response.have_result << endl; cout << response.affected << " " << response.have_result << endl;
@ -69,7 +113,7 @@ while (true) {
} }
}); });
auto a3 = nonsync ( [&mydb] () { auto a3 = async_ ( [&mydb] () {
try { try {
auto response = mydb.exec<string,string>("SELECT username,email FROM users WHERE enabled = 1;"); auto response = mydb.exec<string,string>("SELECT username,email FROM users WHERE enabled = 1;");
cout << response.affected << " " << response.have_result << endl; cout << response.affected << " " << response.have_result << endl;
@ -88,7 +132,7 @@ while (true) {
} }
}); });
auto a4 = nonsync ( [&mydb] () { auto a4 = async_ ( [&mydb] () {
try { try {
auto response = mydb.exec<int,string>("SELECT id,domain FROM records WHERE enabled = 1;"); auto response = mydb.exec<int,string>("SELECT id,domain FROM records WHERE enabled = 1;");
cout << response.affected << " " << response.have_result << endl; cout << response.affected << " " << response.have_result << endl;
@ -107,7 +151,7 @@ while (true) {
} }
}); });
auto a5 = nonsync ( [&mydb] () { auto a5 = async_ ( [&mydb] () {
try { try {
auto response = mydb.exec<string,string>("SELECT zonename,auth_key FROM zones;"); auto response = mydb.exec<string,string>("SELECT zonename,auth_key FROM zones;");
cout << response.affected << " " << response.have_result << endl; cout << response.affected << " " << response.have_result << endl;
@ -126,7 +170,7 @@ while (true) {
} }
}); });
auto a6 = nonsync ( [&mydb] () { auto a6 = async_ ( [&mydb] () {
try { try {
auto response = mydb.exec<string,string>("SELECT username,email FROM users WHERE enabled = 1;"); auto response = mydb.exec<string,string>("SELECT username,email FROM users WHERE enabled = 1;");
cout << response.affected << " " << response.have_result << endl; cout << response.affected << " " << response.have_result << endl;
@ -145,12 +189,12 @@ while (true) {
} }
}); });
wait(a1); await_(a1);
wait(a2); await_(a2);
wait(a3); await_(a3);
wait(a4); await_(a4);
wait(a5); await_(a5);
wait(a6); await_(a6);
auto end = high_resolution_clock::now(); auto end = high_resolution_clock::now();
@ -163,7 +207,9 @@ while (true) {
} catch (const SQLException error) { } catch (const SQLException error) {
cout << error.what() << endl; cout << error.what() << endl;
} catch (const string error) { } catch (const exception& error) {
cout << error.what() << endl;
} catch (const string& error) {
cout << error << endl; cout << error << endl;
} catch (...) { } catch (...) {
cout << "Jebi ga" << endl; cout << "Jebi ga" << endl;

Loading…
Cancel
Save