Compare commits

...

9 Commits

Author SHA1 Message Date
marcelb
87088bdaa7 Clear 2 2024-01-07 22:15:10 +01:00
marcelb
8f2dc7d1be Clear 2024-01-07 22:14:41 +01:00
marcelb
d4d0c3155d Try fix connection disconnect by server 2023-12-12 19:26:11 +01:00
marcelb
81f67c1420 Add protect if connection is not valid 2023-12-10 13:56:32 +01:00
marcelb
4c4fdcc938 Deque implementation done, dev tested 2023-11-21 19:46:11 +01:00
marcelb
774151e534 Fix connection close, and implement bot for reconnect 2023-11-17 20:13:08 +01:00
marcelb
6905bfe745 Pool connections support, tested 2023-09-26 19:34:38 +02:00
2ea748699e Work on connection pool support in mySQL 2023-09-26 09:40:35 +00:00
marcelb
54b1513d7b Pool class, and work on mySQL class pool support 2023-09-25 20:44:25 +02:00
7 changed files with 321 additions and 120 deletions

3
.gitignore vendored
View File

@ -1,2 +1,3 @@
example
test/test.o
test/test.o
test/*.test

15
.vscode/settings.json vendored
View File

@ -3,6 +3,19 @@
"string": "cpp",
"vector": "cpp",
"deque": "cpp",
"ostream": "cpp"
"ostream": "cpp",
"future": "cpp",
"optional": "cpp",
"array": "cpp",
"string_view": "cpp",
"initializer_list": "cpp",
"utility": "cpp",
"mutex": "cpp",
"iostream": "cpp",
"*.tcc": "cpp",
"fstream": "cpp",
"new": "cpp",
"thread": "cpp",
"chrono": "cpp"
}
}

View File

@ -4,9 +4,12 @@
#include <string>
#include <sstream>
#include <vector>
#include <deque>
#include <map>
#include <iostream>
#include <mutex>
#include <thread>
#include <future>
#include <mysql_driver.h>
#include <mysql_connection.h>
@ -16,7 +19,8 @@
#include <cppconn/prepared_statement.h>
#include <cppconn/resultset.h>
#define CONNECT_TRY_LIMIT 3
#define unlimited 0
#define reconnectSleep 10000 // in us
using namespace std;
using namespace sql;
@ -49,6 +53,8 @@ class sqlQA {
sqlQA& set(const string _column_value_pairs);
sqlQA& deleteFrom(const string _table);
void print(bool withDetail = false);
// answer methods
private:
@ -56,20 +62,30 @@ class sqlQA {
};
class mySQL {
public:
private:
mutex io;
MySQL_Driver *drv;
Connection *con;
deque<Connection*> con;
string path, username, password, db;
bool isPersistent;
uint available;
uint reconTrys = 3;
bool runBot = true;
future<void> bot;
mySQL(const string _path, const string _username, const string _password, const string _db, bool _isPersistent = false);
bool open(const string _db = "");
bool connect();
void getColumns(const string _table, vector<string> &_columns, Connection *ptr_con); // privatno
bool open_one(Connection* con_ptr);
Connection* create_con();
bool disconnect_one(Connection* con_ptr);
Connection* shift_con();
public:
mySQL(const string _path, const string _username, const string _password, const string _db, const uint _available = 1);
bool disconnect();
void reconnectTrys(const uint _trys);
void exec(sqlQA &sql_qa);
void getColumns(const string _table, vector<string> &_columns);
~mySQL();
};

View File

@ -59,6 +59,26 @@ sqlQA& sqlQA::deleteFrom(const string _table) {
return *this;
}
void sqlQA::print(bool withDetail) {
cout << "============================================" << endl;
for (auto i : result) {
for (auto j: i.second) {
cout << i.first << " : " << j << endl;
}
cout << "--------------------------------------------" << endl;
}
if (withDetail) {
cout << "-----------------DETAILS--------------------" << endl;
cout << "Is executed: " << (executed ? "true" : "false") << endl;
cout << "Update catch: " << updateCatch << endl;
cout << "Num of rows: " << num_rows << endl;
cout << "Num of columns: " << num_columns << endl;
}
cout << "============================================" << endl;
}
void sqlQA::parse_columns(const string _columns) {
istringstream iss(_columns);
@ -74,113 +94,158 @@ void sqlQA::parse_columns(const string _columns) {
}
}
mySQL::mySQL(const string _path, const string _username, const string _password, const string _db, bool _isPersistent) {
mySQL::mySQL(const string _path, const string _username, const string _password, const string _db, const uint _available) {
path = _path;
username = _username;
password = _password;
db = _db;
isPersistent = _isPersistent;
available = _available > 0 ? _available : 1;
if (isPersistent) {
if (connect()) {
throw string("[ERROR] Unable to connect database ");
}
drv = get_mysql_driver_instance();
if (!db.empty()) {
if (open()) {
throw string("[ERROR] Unable to open database " + db);
bot = async(launch::async, [&](){
while (runBot) {
sleep(1);
while (available>con.size() && runBot) {
try {
Connection* new_con_ptr = create_con();
if (!db.empty()) {
if (open_one(new_con_ptr)) {
throw string("[ERROR] Unable to open database " + db);
}
}
io.lock();
con.push_back(new_con_ptr);
io.unlock();
} catch (const SQLException except) {
cout << except.what() << endl;
} catch (const string except) {
cout << except << endl;
}
}
for (int i=0; i<con.size() && runBot; i++) {
if (!con[i]->isValid()) {
io.lock();
con.erase(con.begin()+i);
io.unlock();
i--;
}
}
}
}
return;
});
}
bool mySQL::open(const string _db) {
io.lock();
db = _db.empty() ? db : _db;
bool status = true;
try {
con->setSchema(db);
status = false;
io.unlock();
}
catch (const SQLException &error) {
cout << error.what() << endl;
io.unlock();
}
return status;
}
bool mySQL::connect() {
io.lock();
Connection* mySQL::create_con() {
uint trys = 0;
bool status = true;
Connection* new_con = NULL;
while (trys < CONNECT_TRY_LIMIT && status) {
while (reconTrys == unlimited ? status : (trys <= reconTrys && status)) {
try {
drv = get_mysql_driver_instance();
con = drv->connect(path, username, password);
status = false;
io.unlock();
Connection* con_can = drv->connect(path, username, password);
status = !con_can->isValid();
if (!status) {
new_con = con_can;
}
else if (!con_can->isClosed()) {
disconnect_one(con_can);
}
}
catch (const SQLException &error) {
cout << error.what() << endl;
usleep(10000*trys++);
io.unlock();
}
usleep(reconnectSleep);
reconTrys == unlimited ? trys : trys++;
}
}
return status;
return new_con;
}
bool mySQL::disconnect() {
io.lock();
bool status = true;
if (con->isValid() && !con->isClosed()) {
for (uint i=0; i<con.size(); i++) {
status = disconnect_one(con[i]) ;
}
io.unlock();
return status;
}
bool mySQL::disconnect_one(Connection* con_ptr) {
bool status = !con_ptr->isClosed();
if (status) {
try {
con->close();
status = false;
io.unlock();
con_ptr->close();
status = !con_ptr->isClosed();
}
catch (const SQLException &error) {
cout << error.what() << endl;
status = true;
io.unlock();
}
}
else {
status = false;
status = false; // već je zatvorena
}
delete con_ptr;
return status;
}
bool mySQL::open_one(Connection* con_ptr) {
bool status = true; // ako true greška je
uint trys = 0;
while (reconTrys == unlimited ? status : (trys <= reconTrys && status)) {
try {
if (con_ptr->isValid()) {
con_ptr->setSchema(db);
status = false;
}
else {
break;
}
}
catch (const SQLException &error) {
cout << error.what() << endl;
usleep(reconnectSleep);
reconTrys == unlimited ? trys : trys++;
}
}
return status;
}
void mySQL::exec(sqlQA &sql_qa) {
if (!isPersistent || !con->isValid() || con->isClosed()) {
if (connect()) {
throw string("[ERROR] Unable to connect database ");
}
if (open()) {
throw string("[ERROR] Unable to open database " + db);
}
}
/**
* Broj pokušaja usljed povezivanja s bazom od 1 do unlimited;
*/
void mySQL::reconnectTrys(const uint _trys) {
io.lock();
/**/
reconTrys = _trys;
io.unlock();
}
void mySQL::exec(sqlQA &sql_qa) {
Connection* con_ptr = shift_con();
try {
vector<string> columns = sql_qa.columns;
if (columns.empty() && !sql_qa.table.empty()) {
getColumns(sql_qa.table, columns);
getColumns(sql_qa.table, columns, con_ptr);
}
Statement *stmt;
stmt = con->createStatement();
stmt = con_ptr->createStatement();
if (sql_qa.isSelect) {
ResultSet *res = stmt->executeQuery(sql_qa.cmd);
@ -193,6 +258,7 @@ void mySQL::exec(sqlQA &sql_qa) {
}
}
res->close();
delete res;
sql_qa.num_columns = columns.size();
sql_qa.num_rows = num_raw_columns/columns.size();
@ -207,32 +273,23 @@ void mySQL::exec(sqlQA &sql_qa) {
sql_qa.executed = stmt->execute(sql_qa.cmd);
}
stmt->close();
delete stmt;
io.unlock();
disconnect_one(con_ptr);
}
catch (const SQLException &error) {
cout << error.what() << endl;
sql_qa.executed = false;
io.unlock();
}
catch (const string error) {
throw error;
io.unlock();
}
/**/
if (!isPersistent) {
if(disconnect()) {
throw string("[ERROR] Unable to close database ");
}
}
}
void mySQL::getColumns(const string _table, vector<string> &_columns) {
void mySQL::getColumns(const string _table, vector<string> &_columns, Connection *ptr_con) {
Statement *stmt;
stmt = con->createStatement();
stmt = ptr_con->createStatement();
ResultSet *columnsRes = stmt->executeQuery("SHOW COLUMNS from " + _table);
@ -240,12 +297,30 @@ void mySQL::getColumns(const string _table, vector<string> &_columns) {
_columns.push_back(columnsRes->getString("Field"));
}
columnsRes->close();
stmt->close();
delete columnsRes;
delete stmt;
}
mySQL::~mySQL() {
if(disconnect()) {
throw string("[ERROR] Unable to close database ");
Connection* mySQL::shift_con() {
while (true) {
while(con.size()) {
io.lock();
Connection* con_ptr = con[0];
con.pop_front();
if (con_ptr->isValid()) {
io.unlock();
return con_ptr;
}
io.unlock();
}
usleep(1000);
}
}
mySQL::~mySQL() {
runBot = false;
bot.get();
disconnect();
}

View File

@ -1 +1 @@
g++ test.cpp ../src/* -o test.o -lmysqlcppconn
g++ test.cpp ../src/* -o test.o -lmysqlcppconn -lpthread

View File

@ -1,56 +1,152 @@
#include <iostream>
#include <thread>
#include <chrono>
#include "../lib/mysql.hpp"
using namespace std;
using namespace chrono;
int main() {
mySQL mydb("tcp://192.168.2.10:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", true);
try {
// mySQL mydb("tcp://192.168.2.10:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 1);
mySQL mydb("tcp://192.168.2.10:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 5);
sqlQA test_qa;
// id,user_id,zone_id,domain,record_type,auth_key,last_update,enabled
// test_qa.select().from("records").where("enabled = 0").limit(2);
// mydb.exec(test_qa);
// sleep(3600*10);
sleep(20);
// for (auto i : test_qa.result) {
// for (auto j: i.second) {
// cout << i.first << " : " << j << endl;
auto start = high_resolution_clock::now();
// thread t1([&](){
// try {
// sqlQA test_qa;
// test_qa.select().from("records").where("enabled = 1");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// }
// });
// // sleep(2);
// thread t2([&](){
// try {
// sqlQA test_qa;
// test_qa.select().from("zones");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// });
// // sleep(3);
// thread t3([&](){
// try {
// sqlQA test_qa;
// test_qa.select().from("users");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// });
// // sleep(1);
// thread t4([&](){
// try {
// sqlQA test_qa;
// test_qa.select().from("records").where("enabled = 1");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// });
// thread t5([&](){
// try {
// sqlQA test_qa;
// test_qa.select().from("zones");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// });
// thread t6([&](){
// try {
// sqlQA test_qa;
// test_qa.select().from("users");
// mydb.exec(test_qa);
// test_qa.print(true);
// } catch (const string err) {
// cout << err << endl;
// }
// });
// t1.join();
// t2.join();
// t3.join();
// t4.join();
// t5.join();
// t6.join();
// one by one
try {
sqlQA test_qa;
test_qa.select().from("records").where("enabled = 1");
mydb.exec(test_qa);
test_qa.print(true);
} catch (const string err) {
cout << err << endl;
}
sleep(20);
try {
sqlQA test_qa;
test_qa.select().from("users");
mydb.exec(test_qa);
test_qa.print(true);
} catch (const string err) {
cout << err << endl;
}
sleep(20);
try {
sqlQA test_qa;
test_qa.select("zone_id,record_type,enabled").from("records").where("domain = 'bitelex.test'");
mydb.exec(test_qa);
test_qa.print(true);
} catch (const string err) {
cout << err << endl;
}
// test_qa.update("records").set("enabled = 1").where("domain = 'bitelex.test'");
// mydb.exec(test_qa);
// if (test_qa.executed) {
// cout << "Num rows affect " << test_qa.updateCatch << endl;
// }
// cout << "Num rows " << test_qa.num_rows << " num columns " << test_qa.num_columns << " executed " << test_qa.executed << endl;
// test_qa.insertInTo("records", "id,user_id,zone_id,domain,record_type,auth_key,last_update,enabled").values("'5',2,2,'www.bitelex.test','AAAA','jebiga',NULL,1");
test_qa.deleteFrom("records").where("record_type = AAAA");
// test_qa.update("records").set("enabled = 0").where("record_type = 'AAAA'");
cout << test_qa.cmd << endl;
mydb.exec(test_qa);
cout << "Num rows " << test_qa.num_rows << " num columns " << test_qa.num_columns << " catch " << test_qa.updateCatch << " executed " << test_qa.executed << endl;
}
catch (const SQLException error) {
auto end = high_resolution_clock::now();
auto duration = duration_cast<microseconds>(end - start);
cout << "-------------Izvršilo se za: " << (double)(duration.count() / 1000.0) << " ms"<< endl;
// sleep(100);
} catch (const SQLException error) {
cout << error.what() << endl;
}
// catch (const string error) {
// cout << error << endl;
// }
catch (...) {
} catch (const string error) {
cout << error << endl;
} catch (...) {
cout << "Jebi ga" << endl;
}
sleep(600);
return 0;
}

Binary file not shown.