Work on connection pool support in mySQL

queue
mbandic 12 months ago
parent 54b1513d7b
commit 2ea748699e
  1. 13
      .vscode/settings.json
  2. 46
      lib/mysql.hpp
  3. 218
      src/mysql.cpp
  4. 2
      test/compile.sh
  5. 96
      test/test.cpp
  6. BIN
      test/test.o

@ -4,6 +4,17 @@
"vector": "cpp", "vector": "cpp",
"deque": "cpp", "deque": "cpp",
"ostream": "cpp", "ostream": "cpp",
"future": "cpp" "future": "cpp",
"optional": "cpp",
"array": "cpp",
"string_view": "cpp",
"initializer_list": "cpp",
"utility": "cpp",
"mutex": "cpp",
"iostream": "cpp",
"*.tcc": "cpp",
"fstream": "cpp",
"new": "cpp",
"thread": "cpp"
} }
} }

@ -51,6 +51,8 @@ class sqlQA {
sqlQA& set(const string _column_value_pairs); sqlQA& set(const string _column_value_pairs);
sqlQA& deleteFrom(const string _table); sqlQA& deleteFrom(const string _table);
void print(bool withDetail = false);
// answer methods // answer methods
private: private:
@ -62,49 +64,29 @@ class mySQL {
mutex io; mutex io;
MySQL_Driver *drv; MySQL_Driver *drv;
// Connection *con; // Connection *con;
vector<Connection*> con; // vector<Connection*> con;
vector<pair<mutex*, Connection*>> con;
string path, username, password, db; string path, username, password, db;
bool isPersistent; bool isPersistent;
uint numOfCon; uint numOfCon;
uint reconTrys = 3; uint reconTrys = 3;
mySQL(const string _path, const string _username, const string _password, const string _db, const bool _isPersistent = false, const uint _numOfCon = 1); mySQL(const string _path, const string _username, const string _password, const string _db, const bool _isPersistent = false, const uint _numOfCon = 1);
bool open(const string _db = "", const int con_idx = -1); bool open(const string _db = "");
bool connect(const int con_idx = -1); bool connect();
bool disconnect(); bool disconnect();
void reconnectTrys(const uint _trys); void reconnectTrys(const uint _trys);
void exec(sqlQA &sql_qa); void exec(sqlQA &sql_qa);
void getColumns(const string _table, vector<string> &_columns); void getColumns(const string _table, vector<string> &_columns, Connection *ptr_con); // privatno
~mySQL();
};
// class mySQLPool {
// public:
// struct Drop {
// mySQL* instance;
// bool used = false;
// };
// struct Swimmer {
// thread instance;
// bool used = false;
// };
// mutex io; // ove će biti privatne sigurno
// uint maxpools = 0; bool open_one(Connection *ptr_con);
// vector<struct Drop> droplets; bool connect_one(Connection *ptr_con);
// bool fixServer = false; bool disconnect_one(Connection *ptr_con);
// bool fixScheme = false; pair<mutex*, Connection*> findFreeCon();
// vector<struct Swimmer> swimmers;
// mySQLPool(const uint _maxpools); ~mySQL();
// mySQLPool(const uint _maxpools, const string _path, const string _username, const string _password, const string _db); };
// void exec(sqlQA &sql_qa, const string _db = "");
// };
#endif #endif

@ -59,6 +59,21 @@ sqlQA& sqlQA::deleteFrom(const string _table) {
return *this; return *this;
} }
void sqlQA::print(bool withDetail) {
// istražit da se prikaže u tabeli
for (auto i : result) {
for (auto j: i.second) {
cout << i.first << " : " << j << endl;
}
}
if (withDetail) {
cout << "Is executed: " << (executed ? "true" : "false") << endl;
cout << "Update catch: " << updateCatch << endl;
cout << "Num of rows: " << num_rows << endl;
cout << "Num of columns: " << num_columns << endl;
}
}
void sqlQA::parse_columns(const string _columns) { void sqlQA::parse_columns(const string _columns) {
istringstream iss(_columns); istringstream iss(_columns);
@ -82,6 +97,8 @@ mySQL::mySQL(const string _path, const string _username, const string _password,
isPersistent = _numOfCon > 1 ? true : _isPersistent; isPersistent = _numOfCon > 1 ? true : _isPersistent;
numOfCon = _numOfCon; numOfCon = _numOfCon;
cout << "Num of con: " << numOfCon << endl;
drv = get_mysql_driver_instance(); drv = get_mysql_driver_instance();
if (isPersistent) { if (isPersistent) {
@ -98,56 +115,58 @@ mySQL::mySQL(const string _path, const string _username, const string _password,
} }
bool mySQL::open(const string _db, const int con_idx) { bool mySQL::open(const string _db) {
io.lock(); io.lock();
db = _db.empty() ? db : _db; db = _db.empty() ? db : _db;
bool status = true; // ako true greška je bool status = true; // ako true greška je
for (uint i=0; i<con.size(); i++) { for (uint i=0; i<con.size(); i++) {
con_idx != -1 ? i = con_idx : i;
try { try {
con[i]->setSchema(db); con[i].second->setSchema(db);
status = false; status = false;
io.unlock();
} }
catch (const SQLException &error) { catch (const SQLException &error) {
cout << error.what() << endl; cout << error.what() << endl;
io.unlock();
} }
con_idx != -1 ? i = con.size() : i;
} }
io.unlock();
return status; return status;
} }
bool mySQL::connect(const int con_idx) { bool mySQL::connect() {
io.lock(); io.lock();
uint trys = 0; // uint trys = 0;
bool status = true; // bool status = true;
bool Gstatus = true;
for (uint i=0; i<numOfCon; i++) { for (uint i=0; i<numOfCon; i++) {
con_idx != -1 ? i = con_idx : i; uint trys = 0;
while (reconTrys == 0 ? status : (trys < reconTrys && status)) { bool status = true;
cout << "Init connection " << i << endl;
while (reconTrys == unlimited ? status : (trys < reconTrys && status)) {
cout << "Try connect " << trys << endl;
try { try {
if (con_idx == -1) { con.push_back(make_pair(new mutex, drv->connect(path, username, password)));
con.push_back(drv->connect(path, username, password)); // con[i].first = new mutex;
} // con[i].second = drv->connect(path, username, password);
else {
con[i] = drv->connect(path, username, password);
}
status = false; status = false;
io.unlock(); Gstatus *= status;
} }
catch (const SQLException &error) { catch (const SQLException &error) {
cout << error.what() << endl; cout << error.what() << endl;
usleep(reconnectSleep); usleep(reconnectSleep);
reconTrys == 0 ? trys : trys++; reconTrys == unlimited ? trys : trys++;
io.unlock();
} }
} }
} }
return status; cout << "Num of pairs " << con.size() << endl;
io.unlock();
// return status;
return Gstatus;
} }
bool mySQL::disconnect() { bool mySQL::disconnect() {
@ -155,22 +174,75 @@ bool mySQL::disconnect() {
bool status = true; bool status = true;
for (uint i=0; i<con.size(); i++) { for (uint i=0; i<con.size(); i++) {
if (con[i]->isValid() && !con[i]->isClosed()) { if (con[i].second->isValid() && !con[i].second->isClosed()) {
try { try {
con[i]->close(); con[i].second->close();
status = false; status = false;
io.unlock();
} }
catch (const SQLException &error) { catch (const SQLException &error) {
cout << error.what() << endl; cout << error.what() << endl;
status = true; status = true;
}
}
else {
status = false; // već je zatvorena
}
}
io.unlock(); io.unlock();
return status;
}
bool mySQL::disconnect_one(Connection *ptr_con) {
bool status = true;
if (ptr_con->isValid() && !ptr_con->isClosed()) {
try {
ptr_con->close();
status = false;
}
catch (const SQLException &error) {
cout << error.what() << endl;
status = true;
} }
} }
else { else {
status = false; // već je zatvorena status = false; // već je zatvorena
} }
return status;
}
bool mySQL::connect_one(Connection *ptr_con) {
uint trys = 0;
bool status = true;
while (reconTrys == unlimited ? status : (trys < reconTrys && status)) {
try {
ptr_con = drv->connect(path, username, password);
status = false;
}
catch (const SQLException &error) {
cout << error.what() << endl;
usleep(reconnectSleep);
reconTrys == unlimited ? trys : trys++;
}
}
return status;
}
bool mySQL::open_one(Connection *ptr_con) {
bool status = true; // ako true greška je
try {
ptr_con->setSchema(db);
status = false;
}
catch (const SQLException &error) {
cout << error.what() << endl;
} }
return status; return status;
@ -188,31 +260,28 @@ void mySQL::reconnectTrys(const uint _trys) {
void mySQL::exec(sqlQA &sql_qa) { void mySQL::exec(sqlQA &sql_qa) {
for (uint i=0; i<con.size(); i++) { pair<mutex*, Connection*> workCon = findFreeCon();
if (!isPersistent || !con[i]->isValid() || con[i]->isClosed()) {
if (connect(i)) { if (!isPersistent || !workCon.second->isValid() || workCon.second->isClosed()) {
if (connect_one(workCon.second)) {
throw string("[ERROR] Unable to connect database "); throw string("[ERROR] Unable to connect database ");
} }
if (open(i)) { if (open_one(workCon.second)) {
throw string("[ERROR] Unable to open database " + db); throw string("[ERROR] Unable to open database " + db);
} }
} }
}
// find free connection
io.lock();
/**/ /**/
try { try {
vector<string> columns = sql_qa.columns; vector<string> columns = sql_qa.columns;
if (columns.empty() && !sql_qa.table.empty()) { if (columns.empty() && !sql_qa.table.empty()) {
getColumns(sql_qa.table, columns); getColumns(sql_qa.table, columns, workCon.second);
} }
Statement *stmt; Statement *stmt;
stmt = con->createStatement(); stmt = workCon.second->createStatement();
if (sql_qa.isSelect) { if (sql_qa.isSelect) {
ResultSet *res = stmt->executeQuery(sql_qa.cmd); ResultSet *res = stmt->executeQuery(sql_qa.cmd);
@ -225,6 +294,7 @@ void mySQL::exec(sqlQA &sql_qa) {
} }
} }
res->close();
delete res; delete res;
sql_qa.num_columns = columns.size(); sql_qa.num_columns = columns.size();
sql_qa.num_rows = num_raw_columns/columns.size(); sql_qa.num_rows = num_raw_columns/columns.size();
@ -239,32 +309,33 @@ void mySQL::exec(sqlQA &sql_qa) {
sql_qa.executed = stmt->execute(sql_qa.cmd); sql_qa.executed = stmt->execute(sql_qa.cmd);
} }
stmt->close();
delete stmt; delete stmt;
io.unlock();
} }
catch (const SQLException &error) { catch (const SQLException &error) {
cout << error.what() << endl; cout << error.what() << endl;
sql_qa.executed = false; sql_qa.executed = false;
io.unlock();
} }
catch (const string error) { catch (const string error) {
workCon.first->unlock();
throw error; throw error;
io.unlock();
} }
/**/ /**/
if (!isPersistent) { if (!isPersistent) {
if(disconnect()) { if(disconnect_one(workCon.second)) {
throw string("[ERROR] Unable to close database "); throw string("[ERROR] Unable to close database ");
} }
} }
workCon.first->unlock();
} }
void mySQL::getColumns(const string _table, vector<string> &_columns) { void mySQL::getColumns(const string _table, vector<string> &_columns, Connection *ptr_con) {
Statement *stmt; Statement *stmt;
stmt = con->createStatement(); stmt = ptr_con->createStatement();
ResultSet *columnsRes = stmt->executeQuery("SHOW COLUMNS from " + _table); ResultSet *columnsRes = stmt->executeQuery("SHOW COLUMNS from " + _table);
@ -272,59 +343,32 @@ void mySQL::getColumns(const string _table, vector<string> &_columns) {
_columns.push_back(columnsRes->getString("Field")); _columns.push_back(columnsRes->getString("Field"));
} }
columnsRes->close();
stmt->close();
delete columnsRes; delete columnsRes;
delete stmt; delete stmt;
} }
pair<mutex*, Connection*> mySQL::findFreeCon() {
io.lock();
while (true) {
cout << "Tražim konekciju " << endl;
for (uint i=0; i<con.size(); i++) {
cout << "Pokušavam s " << i << " konekciju" << endl;
if (con[i].first->try_lock()) {
cout << "Koristim " << i << " konekciju" << endl;
io.unlock();
return con[i];
}
// else {
// usleep(1000);
// }
}
}
}
mySQL::~mySQL() { mySQL::~mySQL() {
if(disconnect()) { if(disconnect()) {
throw string("[ERROR] Unable to close database "); throw string("[ERROR] Unable to close database ");
} }
} }
// mySQLPool::mySQLPool(const uint _maxpools) {
// maxpools = _maxpools;
// fixServer = false;
// fixScheme = true;
// }
// mySQLPool::mySQLPool(const uint _maxpools, const string _path, const string _username, const string _password, const string _db) {
// maxpools = _maxpools;
// fixServer = true;
// fixScheme = !_db.empty();
// for (uint i=0; i<maxpools; i++) {
// mySQL tmpmysql(_path, _username, _password, _db);
// // struct Drop tmpdrop(new mySQL(_path, _username, _password, _db), false);
// droplets.push_back({mySQL(_path, _username, _password, _db, true), false});
// }
// }
// void mySQLPool::exec(sqlQA &sql_qa, const string _db) {
// if (!fixScheme && _db.empty()) {
// throw string("[ERROR] Database is not selected! ");
// }
// for (uint i=0; i<droplets.size(); i++) {
// if (droplets[i].used == false) {
// droplets[i].used = true;
// bool isRunned = false;
// while (!isRunned) {
// for (uint i=0; i<swimmers.size(); i++) {
// if (swimmers[i].used == false) {
// swimmers[i].used = true;
// swimmers[i].instance = thread([&]() {
// droplets[i].instance->exec(sql_qa);
// });
// swimmers[i].instance.join();
// swimmers[i].used = false;
// isRunned = true;
// }
// }
// }
// droplets[i].used = false;
// }
// }
// }

@ -1 +1 @@
g++ test.cpp ../src/* -o test.o -lmysqlcppconn g++ test.cpp ../src/* -o test.o -lmysqlcppconn -lpthread

@ -1,53 +1,95 @@
#include <iostream> #include <iostream>
#include <thread>
#include <chrono>
#include "../lib/mysql.hpp" #include "../lib/mysql.hpp"
using namespace std; using namespace std;
using namespace chrono;
int main() { int main() {
mySQL mydb("tcp://192.168.2.10:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", true);
try { try {
mySQL mydb("tcp://92.240.56.92:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", true, 2);
sqlQA test_qa; auto start = high_resolution_clock::now();
// id,user_id,zone_id,domain,record_type,auth_key,last_update,enabled
// test_qa.select().from("records").where("enabled = 0").limit(2);
// mydb.exec(test_qa);
// for (auto i : test_qa.result) { // thread t1([&](){
// for (auto j: i.second) { // try {
// cout << i.first << " : " << j << endl; // sqlQA test_qa;
// } // test_qa.select().from("records").where("enabled = 1");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// } // }
// });
// thread t2([&](){
// try {
// sqlQA test_qa;
// test_qa.select().from("zones");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// });
// test_qa.update("records").set("enabled = 1").where("domain = 'bitelex.test'"); // thread t3([&](){
// try {
// sqlQA test_qa;
// test_qa.select().from("users");
// mydb.exec(test_qa); // mydb.exec(test_qa);
// if (test_qa.executed) { // test_qa.print(true);
// cout << "Num rows affect " << test_qa.updateCatch << endl; // } catch (const string err) {
// cout << err << endl;
// } // }
// });
// cout << "Num rows " << test_qa.num_rows << " num columns " << test_qa.num_columns << " executed " << test_qa.executed << endl;
// t1.join();
// t2.join();
// t3.join();
// test_qa.insertInTo("records", "id,user_id,zone_id,domain,record_type,auth_key,last_update,enabled").values("'5',2,2,'www.bitelex.test','AAAA','jebiga',NULL,1"); // one by one
test_qa.deleteFrom("records").where("record_type = AAAA"); try {
// test_qa.update("records").set("enabled = 0").where("record_type = 'AAAA'"); sqlQA test_qa;
cout << test_qa.cmd << endl; test_qa.select().from("records").where("enabled = 1");
mydb.exec(test_qa); mydb.exec(test_qa);
cout << "Num rows " << test_qa.num_rows << " num columns " << test_qa.num_columns << " catch " << test_qa.updateCatch << " executed " << test_qa.executed << endl; test_qa.print(true);
} catch (const string err) {
cout << err << endl;
}
try {
sqlQA test_qa;
test_qa.select().from("users");
mydb.exec(test_qa);
test_qa.print(true);
} catch (const string err) {
cout << err << endl;
}
try {
sqlQA test_qa;
test_qa.select().from("users");
mydb.exec(test_qa);
test_qa.print(true);
} catch (const string err) {
cout << err << endl;
} }
catch (const SQLException error) {
auto end = high_resolution_clock::now();
auto duration = duration_cast<microseconds>(end - start);
cout << "-------------Izvršilo se za: " << (double)(duration.count() / 1000.0) << endl;
} catch (const SQLException error) {
cout << error.what() << endl; cout << error.what() << endl;
} } catch (const string error) {
// catch (const string error) { cout << error << endl;
// cout << error << endl; } catch (...) {
// }
catch (...) {
cout << "Jebi ga" << endl; cout << "Jebi ga" << endl;
} }

Binary file not shown.
Loading…
Cancel
Save