Fix recycling connections
This commit is contained in:
parent
cf969c0424
commit
777ee53355
@ -134,6 +134,10 @@ wait(a2);
|
||||
wait(a3);
|
||||
```
|
||||
|
||||
## To do
|
||||
|
||||
- On error, implement virtual/friend function
|
||||
|
||||
## License
|
||||
|
||||
[APACHE 2.0](http://www.apache.org/licenses/LICENSE-2.0/)
|
||||
|
@ -1,7 +1,7 @@
|
||||
#ifndef _MYSQL_
|
||||
#define _MYSQL_
|
||||
|
||||
#include <deque>
|
||||
#include <queue>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <future>
|
||||
@ -66,7 +66,7 @@ inline int getValue<int>(ResultSet* res, int column) {
|
||||
return res->getInt(column);
|
||||
}
|
||||
template<>
|
||||
inline uint getValue<uint>(ResultSet* res, int column) {
|
||||
inline uint32_t getValue<uint32_t>(ResultSet* res, int column) {
|
||||
return res->getUInt(column);
|
||||
}
|
||||
template<>
|
||||
@ -97,34 +97,36 @@ inline bool getValue<bool>(ResultSet* res, int column) {
|
||||
|
||||
class MySQL {
|
||||
mutex io;
|
||||
condition_variable condition;
|
||||
MySQL_Driver *drv;
|
||||
deque<Connection*> con;
|
||||
string path, username, password, db;
|
||||
uint available;
|
||||
uint reconTrys = 3;
|
||||
queue<Connection*> connection_pool;
|
||||
string path, username, password, database;
|
||||
uint32_t pool_size;
|
||||
uint32_t connect_trys = 3;
|
||||
bool run_engin = true;
|
||||
future<void> periodic_engin;
|
||||
periodical_engine engine_type;
|
||||
|
||||
/**
|
||||
* Open one database
|
||||
*/
|
||||
bool open_one(Connection* con_ptr);
|
||||
|
||||
/**
|
||||
* Open one database server connection
|
||||
*/
|
||||
Connection* create_con();
|
||||
Connection* create_connection();
|
||||
|
||||
/**
|
||||
* Close one database connection
|
||||
*/
|
||||
bool disconnect_one(Connection* con_ptr);
|
||||
bool disconnect_connection(Connection* connection);
|
||||
|
||||
/**
|
||||
* Take an available database connection
|
||||
* Take an pool_size database connection
|
||||
*/
|
||||
Connection* shift_con();
|
||||
Connection* occupy_connection();
|
||||
|
||||
/**
|
||||
* Free an database connection
|
||||
*/
|
||||
|
||||
void release_connection(Connection* connection);
|
||||
|
||||
/**
|
||||
* Function parses a parameterized row
|
||||
@ -135,6 +137,17 @@ class MySQL {
|
||||
return make_tuple(getValue<Types>(res, Is + 1)...);
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect all connections to server
|
||||
*/
|
||||
|
||||
void connect_pool();
|
||||
|
||||
/**
|
||||
* Disconnect all connections to server
|
||||
*/
|
||||
void disconnect_pool();
|
||||
|
||||
public:
|
||||
|
||||
/**
|
||||
@ -143,30 +156,26 @@ class MySQL {
|
||||
* username, password, database name,
|
||||
* and number of active connections (optional)
|
||||
*/
|
||||
MySQL(const string _path, const string _username, const string _password, const string _db, const uint _available = 1, const periodical_engine _engine_type = periodical_engine::internal);
|
||||
MySQL(const string _path, const string _username, const string _password, const string _db, const uint32_t _available = 1, const periodical_engine _engine_type = periodical_engine::internal);
|
||||
|
||||
/**
|
||||
* Disconnect all connections to server
|
||||
*/
|
||||
bool disconnect();
|
||||
|
||||
/**
|
||||
* Define the maximum number of attempts to
|
||||
* reconnect to the server
|
||||
*/
|
||||
void reconnectTrys(const uint _trys);
|
||||
void set_connect_trys(const uint32_t _trys);
|
||||
|
||||
/**
|
||||
* Execute the SQL statement
|
||||
*/
|
||||
template<typename... Types>
|
||||
MySQL_Res<Types...> exec(const string& sql_q) {
|
||||
Connection* con_ptr = shift_con();
|
||||
Connection* connection = occupy_connection();
|
||||
MySQL_Res<Types...> result;
|
||||
|
||||
try {
|
||||
Statement *stmt;
|
||||
stmt = con_ptr->createStatement();
|
||||
stmt = connection->createStatement();
|
||||
result.have_result = stmt->execute(sql_q);
|
||||
|
||||
if (result.have_result) {
|
||||
@ -192,10 +201,12 @@ class MySQL {
|
||||
|
||||
stmt->close();
|
||||
delete stmt;
|
||||
disconnect_one(con_ptr);
|
||||
release_connection(connection);
|
||||
|
||||
} catch (sql::SQLException& e) {
|
||||
throw runtime_error(e.what());
|
||||
// std::cerr << "SQLState: " << e.getSQLState() << std::endl;
|
||||
// std::cerr << "Error code: " << e.getErrorCode() << std::endl;
|
||||
}
|
||||
|
||||
return result;
|
||||
|
160
src/mysql.cpp
160
src/mysql.cpp
@ -1,15 +1,16 @@
|
||||
#include "../lib/mysql.hpp"
|
||||
|
||||
|
||||
marcelb::mysql::MySQL::MySQL(const string _path, const string _username, const string _password, const string _db, const uint _available, const periodical_engine _engine_type) {
|
||||
marcelb::mysql::MySQL::MySQL(const string _path, const string _username, const string _password, const string _db, const uint32_t _available, const periodical_engine _engine_type) {
|
||||
path = _path;
|
||||
username = _username;
|
||||
password = _password;
|
||||
db = _db;
|
||||
available = _available > 0 ? _available : 1;
|
||||
database = _db;
|
||||
pool_size = _available > 0 ? _available : 1;
|
||||
engine_type = _engine_type;
|
||||
|
||||
drv = get_mysql_driver_instance();
|
||||
connect_pool();
|
||||
|
||||
if (engine_type == periodical_engine::internal) {
|
||||
periodic_engin = async(launch::async, [&](){
|
||||
@ -23,51 +24,58 @@ marcelb::mysql::MySQL::MySQL(const string _path, const string _username, const s
|
||||
}
|
||||
|
||||
|
||||
Connection* marcelb::mysql::MySQL::create_con() {
|
||||
uint trys = 0;
|
||||
Connection* marcelb::mysql::MySQL::create_connection() {
|
||||
uint32_t trys = 0;
|
||||
bool status = true;
|
||||
Connection* new_con = NULL;
|
||||
|
||||
while (reconTrys == unlimited ? status : (trys <= reconTrys && status)) {
|
||||
while (connect_trys == unlimited ? status : (trys <= connect_trys && status)) {
|
||||
try {
|
||||
Connection* con_can = drv->connect(path, username, password);
|
||||
con_can->setSchema(database);
|
||||
status = !con_can->isValid();
|
||||
if (!status) {
|
||||
new_con = con_can;
|
||||
}
|
||||
else if (!con_can->isClosed()) {
|
||||
disconnect_one(con_can);
|
||||
disconnect_connection(con_can);
|
||||
}
|
||||
}
|
||||
catch (const SQLException &error) {
|
||||
cout << error.what() << endl;
|
||||
// on_error -- ako se ikad impelementira pozovi ga ovdje!
|
||||
usleep(reconnectSleep);
|
||||
reconTrys == unlimited ? trys : trys++;
|
||||
connect_trys == unlimited ? trys : trys++;
|
||||
}
|
||||
}
|
||||
|
||||
return new_con;
|
||||
}
|
||||
|
||||
bool marcelb::mysql::MySQL::disconnect() {
|
||||
io.lock();
|
||||
bool status = true;
|
||||
|
||||
for (uint i=0; i<con.size(); i++) {
|
||||
status = disconnect_one(con[i]) ;
|
||||
void marcelb::mysql::MySQL::connect_pool() {
|
||||
lock_guard<mutex> lock(io);
|
||||
for (uint32_t i=0; i<pool_size; i++) {
|
||||
Connection* connection = create_connection();
|
||||
connection_pool.push(connection);
|
||||
}
|
||||
|
||||
io.unlock();
|
||||
return status;
|
||||
}
|
||||
|
||||
bool marcelb::mysql::MySQL::disconnect_one(Connection* con_ptr) {
|
||||
bool status = !con_ptr->isClosed();
|
||||
void marcelb::mysql::MySQL::disconnect_pool() {
|
||||
lock_guard<mutex> lock(io);
|
||||
for (uint32_t i=0; i<connection_pool.size(); i++) {
|
||||
Connection* connection = connection_pool.front();
|
||||
connection_pool.pop();
|
||||
disconnect_connection(connection) ;
|
||||
}
|
||||
}
|
||||
|
||||
bool marcelb::mysql::MySQL::disconnect_connection(Connection* connection) {
|
||||
bool status = !connection->isClosed();
|
||||
|
||||
if (status) {
|
||||
try {
|
||||
con_ptr->close();
|
||||
status = !con_ptr->isClosed();
|
||||
connection->close();
|
||||
status = !connection->isClosed();
|
||||
}
|
||||
catch (const SQLException &error) {
|
||||
cout << error.what() << endl;
|
||||
@ -79,58 +87,48 @@ bool marcelb::mysql::MySQL::disconnect_one(Connection* con_ptr) {
|
||||
status = false; // već je zatvorena
|
||||
}
|
||||
|
||||
delete con_ptr;
|
||||
delete connection;
|
||||
return status;
|
||||
}
|
||||
|
||||
bool marcelb::mysql::MySQL::open_one(Connection* con_ptr) {
|
||||
bool status = true; // ako true greška je
|
||||
uint trys = 0;
|
||||
|
||||
while (reconTrys == unlimited ? status : (trys <= reconTrys && status)) {
|
||||
try {
|
||||
if (con_ptr->isValid()) {
|
||||
con_ptr->setSchema(db);
|
||||
status = false;
|
||||
}
|
||||
else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
catch (const SQLException &error) {
|
||||
cout << error.what() << endl;
|
||||
usleep(reconnectSleep);
|
||||
reconTrys == unlimited ? trys : trys++;
|
||||
}
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
/**
|
||||
* Broj pokušaja usljed povezivanja s bazom od 1 do unlimited;
|
||||
*/
|
||||
|
||||
void marcelb::mysql::MySQL::reconnectTrys(const uint _trys) {
|
||||
io.lock();
|
||||
reconTrys = _trys;
|
||||
io.unlock();
|
||||
void marcelb::mysql::MySQL::set_connect_trys(const uint32_t _trys) {
|
||||
lock_guard<mutex> lock(io);
|
||||
connect_trys = _trys;
|
||||
}
|
||||
|
||||
Connection* marcelb::mysql::MySQL::shift_con() {
|
||||
while (true) {
|
||||
while(con.size()) {
|
||||
io.lock();
|
||||
Connection* con_ptr = con[0];
|
||||
con.pop_front();
|
||||
if (con_ptr->isValid()) {
|
||||
io.unlock();
|
||||
return con_ptr;
|
||||
}
|
||||
io.unlock();
|
||||
}
|
||||
usleep(1000);
|
||||
Connection* marcelb::mysql::MySQL::occupy_connection() {
|
||||
unique_lock<mutex> lock(io);
|
||||
while (connection_pool.empty()) {
|
||||
condition.wait(lock);
|
||||
}
|
||||
Connection *connection = connection_pool.front();
|
||||
connection_pool.pop();
|
||||
return connection;
|
||||
|
||||
// while (true) {
|
||||
// while(connection_pool.size()) {
|
||||
// io.lock();
|
||||
// Connection* connection = connection_pool.front();
|
||||
// connection_pool.pop();
|
||||
// if (connection->isValid()) {
|
||||
// io.unlock();
|
||||
// return connection;
|
||||
// }
|
||||
// io.unlock();
|
||||
// }
|
||||
// usleep(1000);
|
||||
// }
|
||||
}
|
||||
|
||||
void marcelb::mysql::MySQL::release_connection(Connection* connection) {
|
||||
lock_guard<std::mutex> lock(io);
|
||||
connection_pool.push(connection);
|
||||
condition.notify_one();
|
||||
}
|
||||
|
||||
marcelb::mysql::MySQL::~MySQL() {
|
||||
@ -138,38 +136,26 @@ marcelb::mysql::MySQL::~MySQL() {
|
||||
run_engin = false;
|
||||
periodic_engin.get();
|
||||
} else {
|
||||
|
||||
// ne bi bilo loše ubiti periodic nekako?!
|
||||
// iako disconnecta može periodic connect napraviti!!!
|
||||
run_engin = false;
|
||||
}
|
||||
|
||||
disconnect();
|
||||
disconnect_pool();
|
||||
}
|
||||
|
||||
|
||||
void marcelb::mysql::MySQL::periodic_maintenance() {
|
||||
while (available>con.size() && run_engin) {
|
||||
try {
|
||||
Connection* new_con_ptr = create_con();
|
||||
if (!db.empty()) {
|
||||
if (open_one(new_con_ptr)) {
|
||||
throw string("[ERROR] Unable to open database " + db);
|
||||
}
|
||||
for (size_t i = 0; i < pool_size && run_engin; i++) {
|
||||
Connection *conn = occupy_connection();
|
||||
if (conn->isValid()) {
|
||||
release_connection(conn);
|
||||
} else {
|
||||
if (!conn->isClosed()){
|
||||
conn->close();
|
||||
}
|
||||
io.lock();
|
||||
con.push_back(new_con_ptr);
|
||||
io.unlock();
|
||||
} catch (const SQLException except) {
|
||||
cout << except.what() << endl;
|
||||
} catch (const string except) {
|
||||
cout << except << endl;
|
||||
}
|
||||
}
|
||||
|
||||
for (int i=0; i<con.size() && run_engin; i++) {
|
||||
if (!con[i]->isValid()) {
|
||||
io.lock();
|
||||
con.erase(con.begin()+i);
|
||||
io.unlock();
|
||||
i--;
|
||||
Connection *n_conn = create_connection();
|
||||
release_connection(n_conn);
|
||||
}
|
||||
}
|
||||
}
|
118
test/test.cpp
118
test/test.cpp
@ -13,18 +13,19 @@ using namespace marcelb::asynco;
|
||||
|
||||
int main() {
|
||||
try {
|
||||
MySQL mydb("tcp://192.168.2.10:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 10, periodical_engine::external);
|
||||
// MySQL mydb("tcp://bitelex.ddns.net:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 5);
|
||||
// MySQL mydb("tcp://192.168.2.10:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 10, periodical_engine::external);
|
||||
// MySQL mydb("tcp://bitelex.ddns.net:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 10, periodical_engine::external);
|
||||
MySQL mydb("tcp://bitelex.ddns.net:3306", "dinio", "H€r5elfInd1aH@nds", "dinio", 5);
|
||||
|
||||
periodic mysql_maintenance ( [&mydb] () {
|
||||
cout << "IZVRŠAVA SE ENGINE" << endl;
|
||||
mydb.periodic_maintenance();
|
||||
}, MYSQL_PERIODIC_INTERNAL_TIME);
|
||||
// periodic mysql_maintenance ( [&mydb] () {
|
||||
// mydb.periodic_maintenance();
|
||||
// }, MYSQL_PERIODIC_INTERNAL_TIME);
|
||||
|
||||
sleep(5);
|
||||
|
||||
auto start = high_resolution_clock::now();
|
||||
|
||||
auto a1 = atask ( [&mydb] () {
|
||||
auto a1 = nonsync ( [&mydb] () {
|
||||
try {
|
||||
auto response = mydb.exec<int,string>("SELECT id,domain FROM records WHERE enabled = 1;");
|
||||
cout << response.affected << " " << response.have_result << endl;
|
||||
@ -43,7 +44,7 @@ int main() {
|
||||
}
|
||||
});
|
||||
|
||||
auto a2 = atask ( [&mydb] () {
|
||||
auto a2 = nonsync ( [&mydb] () {
|
||||
try {
|
||||
auto response = mydb.exec<string,string>("SELECT zonename,auth_key FROM zones;");
|
||||
cout << response.affected << " " << response.have_result << endl;
|
||||
@ -62,7 +63,64 @@ int main() {
|
||||
}
|
||||
});
|
||||
|
||||
auto a3 = atask ( [&mydb] () {
|
||||
auto a3 = nonsync ( [&mydb] () {
|
||||
try {
|
||||
auto response = mydb.exec<string,string>("SELECT username,email FROM users WHERE enabled = 1;");
|
||||
cout << response.affected << " " << response.have_result << endl;
|
||||
cout << response.rows << " " << response.columns << endl;
|
||||
|
||||
for (auto row : response) {
|
||||
cout << get<0>(row) << " " << get<1>(row) << endl;
|
||||
}
|
||||
|
||||
for (auto column_name : response.columns_name) {
|
||||
cout << column_name << endl;
|
||||
}
|
||||
|
||||
} catch (const string err) {
|
||||
cout << err << endl;
|
||||
}
|
||||
});
|
||||
|
||||
auto a4 = nonsync ( [&mydb] () {
|
||||
try {
|
||||
auto response = mydb.exec<int,string>("SELECT id,domain FROM records WHERE enabled = 1;");
|
||||
cout << response.affected << " " << response.have_result << endl;
|
||||
cout << response.rows << " " << response.columns << endl;
|
||||
|
||||
for (auto row : response) {
|
||||
cout << get<0>(row) << " " << get<1>(row) << endl;
|
||||
}
|
||||
|
||||
for (auto column_name : response.columns_name) {
|
||||
cout << column_name << endl;
|
||||
}
|
||||
|
||||
} catch (const string err) {
|
||||
cout << err << endl;
|
||||
}
|
||||
});
|
||||
|
||||
auto a5 = nonsync ( [&mydb] () {
|
||||
try {
|
||||
auto response = mydb.exec<string,string>("SELECT zonename,auth_key FROM zones;");
|
||||
cout << response.affected << " " << response.have_result << endl;
|
||||
cout << response.rows << " " << response.columns << endl;
|
||||
|
||||
for (auto row : response) {
|
||||
cout << get<0>(row) << " " << get<1>(row) << endl;
|
||||
}
|
||||
|
||||
for (auto column_name : response.columns_name) {
|
||||
cout << column_name << endl;
|
||||
}
|
||||
|
||||
} catch (const string err) {
|
||||
cout << err << endl;
|
||||
}
|
||||
});
|
||||
|
||||
auto a6 = nonsync ( [&mydb] () {
|
||||
try {
|
||||
auto response = mydb.exec<string,string>("SELECT username,email FROM users WHERE enabled = 1;");
|
||||
cout << response.affected << " " << response.have_result << endl;
|
||||
@ -84,46 +142,10 @@ int main() {
|
||||
wait(a1);
|
||||
wait(a2);
|
||||
wait(a3);
|
||||
wait(a4);
|
||||
wait(a5);
|
||||
wait(a6);
|
||||
|
||||
// one by one
|
||||
// try {
|
||||
// auto response = mydb.exec<int,string>("SELECT id,domain FROM records WHERE enabled = 1;");
|
||||
// // auto response = mydb.exec<int,string>("UPDATE records SET enabled = 1;");
|
||||
// cout << response.affected << " " << response.have_result << endl;
|
||||
// cout << response.rows << " " << response.columns << endl;
|
||||
|
||||
// for (auto row : response) {
|
||||
// cout << get<0>(row) << " " << get<1>(row) << endl;
|
||||
// }
|
||||
|
||||
// for (auto column_name : response.columns_name) {
|
||||
// cout << column_name << endl;
|
||||
// }
|
||||
|
||||
// } catch (const string err) {
|
||||
// cout << err << endl;
|
||||
// }
|
||||
|
||||
// auto a1 = atask ( [&mydb] () {
|
||||
// try {
|
||||
// auto response = mydb.exec<string,string>("SELECT username,email FROM records WHERE enabled = 1;");
|
||||
// cout << response.affected << " " << response.have_result << endl;
|
||||
// cout << response.rows << " " << response.columns << endl;
|
||||
|
||||
// for (auto row : response) {
|
||||
// cout << get<0>(row) << " " << get<1>(row) << endl;
|
||||
// }
|
||||
|
||||
// for (auto column_name : response.columns_name) {
|
||||
// cout << column_name << endl;
|
||||
// }
|
||||
|
||||
// } catch (const string err) {
|
||||
// cout << err << endl;
|
||||
// }
|
||||
// });
|
||||
|
||||
// wait(a1);
|
||||
|
||||
auto end = high_resolution_clock::now();
|
||||
auto duration = duration_cast<microseconds>(end - start);
|
||||
|
Loading…
x
Reference in New Issue
Block a user