Pool connections support, tested

queue v0.3_pool
marcelb 1 year ago
parent 2ea748699e
commit 6905bfe745
  1. 3
      .vscode/settings.json
  2. 22
      lib/mysql.hpp
  3. 153
      src/mysql.cpp
  4. 85
      test/test.cpp
  5. BIN
      test/test.o

@ -15,6 +15,7 @@
"*.tcc": "cpp", "*.tcc": "cpp",
"fstream": "cpp", "fstream": "cpp",
"new": "cpp", "new": "cpp",
"thread": "cpp" "thread": "cpp",
"chrono": "cpp"
} }
} }

@ -18,7 +18,7 @@
#include <cppconn/resultset.h> #include <cppconn/resultset.h>
#define unlimited 0 #define unlimited 0
#define reconnectSleep 100000 // in us #define reconnectSleep 10000 // in us
using namespace std; using namespace std;
using namespace sql; using namespace sql;
@ -60,32 +60,30 @@ class sqlQA {
}; };
class mySQL { class mySQL {
public: private:
mutex io; mutex io;
MySQL_Driver *drv; MySQL_Driver *drv;
// Connection *con;
// vector<Connection*> con;
vector<pair<mutex*, Connection*>> con; vector<pair<mutex*, Connection*>> con;
string path, username, password, db; string path, username, password, db;
bool isPersistent; bool isPersistent;
uint numOfCon; uint numOfCon;
uint reconTrys = 3; uint reconTrys = 3;
void getColumns(const string _table, vector<string> &_columns, Connection *ptr_con); // privatno
bool open_one(const uint idx);
bool connect_one(const uint idx);
bool disconnect_one(const uint idx);
uint findFreeCon();
public:
mySQL(const string _path, const string _username, const string _password, const string _db, const bool _isPersistent = false, const uint _numOfCon = 1); mySQL(const string _path, const string _username, const string _password, const string _db, const bool _isPersistent = false, const uint _numOfCon = 1);
bool open(const string _db = ""); bool open(const string _db = "");
bool connect(); bool connect();
bool disconnect(); bool disconnect();
void reconnectTrys(const uint _trys); void reconnectTrys(const uint _trys);
void exec(sqlQA &sql_qa); void exec(sqlQA &sql_qa);
void getColumns(const string _table, vector<string> &_columns, Connection *ptr_con); // privatno
// ove će biti privatne sigurno
bool open_one(Connection *ptr_con);
bool connect_one(Connection *ptr_con);
bool disconnect_one(Connection *ptr_con);
pair<mutex*, Connection*> findFreeCon();
~mySQL(); ~mySQL();
}; };

@ -60,19 +60,24 @@ sqlQA& sqlQA::deleteFrom(const string _table) {
} }
void sqlQA::print(bool withDetail) { void sqlQA::print(bool withDetail) {
// istražit da se prikaže u tabeli cout << "============================================" << endl;
for (auto i : result) { for (auto i : result) {
for (auto j: i.second) { for (auto j: i.second) {
cout << i.first << " : " << j << endl; cout << i.first << " : " << j << endl;
} }
cout << "--------------------------------------------" << endl;
} }
if (withDetail) { if (withDetail) {
cout << "-----------------DETAILS--------------------" << endl;
cout << "Is executed: " << (executed ? "true" : "false") << endl; cout << "Is executed: " << (executed ? "true" : "false") << endl;
cout << "Update catch: " << updateCatch << endl; cout << "Update catch: " << updateCatch << endl;
cout << "Num of rows: " << num_rows << endl; cout << "Num of rows: " << num_rows << endl;
cout << "Num of columns: " << num_columns << endl; cout << "Num of columns: " << num_columns << endl;
} }
cout << "============================================" << endl;
} }
void sqlQA::parse_columns(const string _columns) { void sqlQA::parse_columns(const string _columns) {
@ -95,9 +100,7 @@ mySQL::mySQL(const string _path, const string _username, const string _password,
password = _password; password = _password;
db = _db; db = _db;
isPersistent = _numOfCon > 1 ? true : _isPersistent; isPersistent = _numOfCon > 1 ? true : _isPersistent;
numOfCon = _numOfCon; numOfCon = _numOfCon > 0 ? _numOfCon : 1;
cout << "Num of con: " << numOfCon << endl;
drv = get_mysql_driver_instance(); drv = get_mysql_driver_instance();
@ -115,44 +118,31 @@ mySQL::mySQL(const string _path, const string _username, const string _password,
} }
bool mySQL::open(const string _db) { bool mySQL::connect() {
io.lock(); io.lock();
db = _db.empty() ? db : _db; bool status = true;
bool status = true; // ako true greška je
for (uint i=0; i<con.size(); i++) {
try {
con[i].second->setSchema(db);
status = false;
}
catch (const SQLException &error) {
cout << error.what() << endl;
}
for (uint i=0; i<numOfCon; i++) {
status = connect_one(i);
} }
io.unlock(); io.unlock();
return status; return status;
} }
bool mySQL::connect() { bool mySQL::connect_one(const uint idx) {
io.lock();
// uint trys = 0;
// bool status = true;
bool Gstatus = true;
for (uint i=0; i<numOfCon; i++) {
uint trys = 0; uint trys = 0;
bool status = true; bool status = true;
cout << "Init connection " << i << endl;
while (reconTrys == unlimited ? status : (trys < reconTrys && status)) { while (reconTrys == unlimited ? status : (trys <= reconTrys && status)) {
cout << "Try connect " << trys << endl;
try { try {
if (con.size() < numOfCon) {
con.push_back(make_pair(new mutex, drv->connect(path, username, password))); con.push_back(make_pair(new mutex, drv->connect(path, username, password)));
// con[i].first = new mutex; }
// con[i].second = drv->connect(path, username, password); else {
status = false; con[idx].second = drv->connect(path, username, password);
Gstatus *= status; }
status = !con[idx].second->isValid() && con[idx].second->isClosed() || con[idx].second->isClosed();
} }
catch (const SQLException &error) { catch (const SQLException &error) {
cout << error.what() << endl; cout << error.what() << endl;
@ -160,13 +150,8 @@ bool mySQL::connect() {
reconTrys == unlimited ? trys : trys++; reconTrys == unlimited ? trys : trys++;
} }
} }
}
cout << "Num of pairs " << con.size() << endl;
io.unlock(); return status;
// return status;
return Gstatus;
} }
bool mySQL::disconnect() { bool mySQL::disconnect() {
@ -174,33 +159,20 @@ bool mySQL::disconnect() {
bool status = true; bool status = true;
for (uint i=0; i<con.size(); i++) { for (uint i=0; i<con.size(); i++) {
if (con[i].second->isValid() && !con[i].second->isClosed()) { status = disconnect_one(i) ;
try {
con[i].second->close();
status = false;
}
catch (const SQLException &error) {
cout << error.what() << endl;
status = true;
}
}
else {
status = false; // već je zatvorena
}
} }
io.unlock(); io.unlock();
return status; return status;
} }
bool mySQL::disconnect_one(Connection *ptr_con) { bool mySQL::disconnect_one(const uint idx) {
bool status = true; bool status = !con[idx].second->isClosed();
if (ptr_con->isValid() && !ptr_con->isClosed()) { if (status) {
try { try {
ptr_con->close(); con[idx].second->close();
status = false; status = !con[idx].second->isClosed();
} }
catch (const SQLException &error) { catch (const SQLException &error) {
cout << error.what() << endl; cout << error.what() << endl;
@ -215,34 +187,39 @@ bool mySQL::disconnect_one(Connection *ptr_con) {
return status; return status;
} }
bool mySQL::connect_one(Connection *ptr_con) {
uint trys = 0;
bool status = true;
while (reconTrys == unlimited ? status : (trys < reconTrys && status)) { bool mySQL::open(const string _db) {
try { io.lock();
ptr_con = drv->connect(path, username, password); db = _db.empty() ? db : _db;
status = false; bool status = true; // ako true greška je
}
catch (const SQLException &error) { for (uint i=0; i<con.size(); i++) {
cout << error.what() << endl; status = open_one(i);
usleep(reconnectSleep);
reconTrys == unlimited ? trys : trys++;
}
} }
io.unlock();
return status; return status;
} }
bool mySQL::open_one(Connection *ptr_con) { bool mySQL::open_one(const uint idx) {
bool status = true; // ako true greška je bool status = true; // ako true greška je
uint trys = 0;
while (reconTrys == unlimited ? status : (trys <= reconTrys && status)) {
try { try {
ptr_con->setSchema(db); if (con[idx].second->isValid()) {
con[idx].second->setSchema(db);
status = false; status = false;
} }
else {
break;
}
}
catch (const SQLException &error) { catch (const SQLException &error) {
cout << error.what() << endl; cout << error.what() << endl;
usleep(reconnectSleep);
reconTrys == unlimited ? trys : trys++;
}
} }
return status; return status;
@ -259,29 +236,27 @@ void mySQL::reconnectTrys(const uint _trys) {
} }
void mySQL::exec(sqlQA &sql_qa) { void mySQL::exec(sqlQA &sql_qa) {
const uint idx = findFreeCon();
pair<mutex*, Connection*> workCon = findFreeCon(); if (!isPersistent || !con[idx].second->isValid() || con[idx].second->isClosed()) {
if (connect_one(idx)) {
if (!isPersistent || !workCon.second->isValid() || workCon.second->isClosed()) {
if (connect_one(workCon.second)) {
throw string("[ERROR] Unable to connect database "); throw string("[ERROR] Unable to connect database ");
} }
if (open_one(workCon.second)) { if (open_one(idx)) {
throw string("[ERROR] Unable to open database " + db); throw string("[ERROR] Unable to open database " + db);
} }
} }
/**/
try { try {
vector<string> columns = sql_qa.columns; vector<string> columns = sql_qa.columns;
if (columns.empty() && !sql_qa.table.empty()) { if (columns.empty() && !sql_qa.table.empty()) {
getColumns(sql_qa.table, columns, workCon.second); getColumns(sql_qa.table, columns, con[idx].second);
} }
Statement *stmt; Statement *stmt;
stmt = workCon.second->createStatement(); stmt = con[idx].second->createStatement();
if (sql_qa.isSelect) { if (sql_qa.isSelect) {
ResultSet *res = stmt->executeQuery(sql_qa.cmd); ResultSet *res = stmt->executeQuery(sql_qa.cmd);
@ -317,20 +292,17 @@ void mySQL::exec(sqlQA &sql_qa) {
sql_qa.executed = false; sql_qa.executed = false;
} }
catch (const string error) { catch (const string error) {
workCon.first->unlock(); con[idx].first->unlock();
throw error; throw error;
} }
/**/
if (!isPersistent) { if (!isPersistent) {
if(disconnect_one(workCon.second)) { if(disconnect_one(idx)) {
throw string("[ERROR] Unable to close database "); throw string("[ERROR] Unable to close database ");
} }
} }
workCon.first->unlock(); con[idx].first->unlock();
} }
void mySQL::getColumns(const string _table, vector<string> &_columns, Connection *ptr_con) { void mySQL::getColumns(const string _table, vector<string> &_columns, Connection *ptr_con) {
@ -349,20 +321,13 @@ void mySQL::getColumns(const string _table, vector<string> &_columns, Connection
delete stmt; delete stmt;
} }
pair<mutex*, Connection*> mySQL::findFreeCon() { uint mySQL::findFreeCon() {
io.lock(); lock_guard<mutex> master(io);
while (true) { while (true) {
cout << "Tražim konekciju " << endl;
for (uint i=0; i<con.size(); i++) { for (uint i=0; i<con.size(); i++) {
cout << "Pokušavam s " << i << " konekciju" << endl;
if (con[i].first->try_lock()) { if (con[i].first->try_lock()) {
cout << "Koristim " << i << " konekciju" << endl; return i;
io.unlock();
return con[i];
} }
// else {
// usleep(1000);
// }
} }
} }
} }

@ -9,49 +9,11 @@ using namespace chrono;
int main() { int main() {
try { try {
mySQL mydb("tcp://92.240.56.92:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", true, 2); mySQL mydb("tcp://192.168.2.10:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", true, 3);
auto start = high_resolution_clock::now(); auto start = high_resolution_clock::now();
// thread t1([&](){ thread t1([&](){
// try {
// sqlQA test_qa;
// test_qa.select().from("records").where("enabled = 1");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// });
// thread t2([&](){
// try {
// sqlQA test_qa;
// test_qa.select().from("zones");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// });
// thread t3([&](){
// try {
// sqlQA test_qa;
// test_qa.select().from("users");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// });
// t1.join();
// t2.join();
// t3.join();
// one by one
try { try {
sqlQA test_qa; sqlQA test_qa;
test_qa.select().from("records").where("enabled = 1"); test_qa.select().from("records").where("enabled = 1");
@ -60,16 +22,20 @@ int main() {
} catch (const string err) { } catch (const string err) {
cout << err << endl; cout << err << endl;
} }
});
thread t2([&](){
try { try {
sqlQA test_qa; sqlQA test_qa;
test_qa.select().from("users"); test_qa.select().from("zones");
mydb.exec(test_qa); mydb.exec(test_qa);
test_qa.print(true); test_qa.print(true);
} catch (const string err) { } catch (const string err) {
cout << err << endl; cout << err << endl;
} }
});
thread t3([&](){
try { try {
sqlQA test_qa; sqlQA test_qa;
test_qa.select().from("users"); test_qa.select().from("users");
@ -78,11 +44,46 @@ int main() {
} catch (const string err) { } catch (const string err) {
cout << err << endl; cout << err << endl;
} }
});
t1.join();
t2.join();
t3.join();
// one by one
// try {
// sqlQA test_qa;
// test_qa.select().from("records").where("enabled = 1");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// try {
// sqlQA test_qa;
// test_qa.select().from("users");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// try {
// sqlQA test_qa;
// test_qa.select().from("users");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
auto end = high_resolution_clock::now(); auto end = high_resolution_clock::now();
auto duration = duration_cast<microseconds>(end - start); auto duration = duration_cast<microseconds>(end - start);
cout << "-------------Izvršilo se za: " << (double)(duration.count() / 1000.0) << endl; cout << "-------------Izvršilo se za: " << (double)(duration.count() / 1000.0) << " ms"<< endl;
} catch (const SQLException error) { } catch (const SQLException error) {

Binary file not shown.
Loading…
Cancel
Save