#ifndef _MYSQL_ #define _MYSQL_ #include #include #include #include #include #include #include #include using namespace std; #include #include #include #include #include #include #include using namespace sql; using namespace mysql; #ifdef MYSQL_USE_ASYNCO #include "../../asynco/lib/asynco.hpp" #include "../../asynco/lib/timers.hpp" using namespace marcelb::asynco; #endif #define unlimited 0 #define reconnectSleep 1000 // in us namespace marcelb { namespace mysql { /** * */ #define MYSQL_PERIODIC_INTERNAL_TIME 5000 #define MYSQL_MAX_CONNECTION_INIT_SAME_TIME 10 #define MYSQL_DELAYED_CONNECT_TIME 1000 /** * A class for creating sql responses */ template class MySQL_Res : public vector> { public: bool have_result = false; uint16_t affected = 0; uint16_t rows = 0; uint16_t columns = sizeof...(Types); vector columns_name; }; /** * Type conversion functions */ template T getValue(ResultSet* res, int column); template<> inline int getValue(ResultSet* res, int column) { return res->getInt(column); } template<> inline uint32_t getValue(ResultSet* res, int column) { return res->getUInt(column); } template<> inline int64_t getValue(ResultSet* res, int column) { return res->getInt64(column); } template<> inline uint64_t getValue(ResultSet* res, int column) { return res->getUInt64(column); } template<> inline float getValue(ResultSet* res, int column) { return res->getDouble(column); } template<> inline double getValue(ResultSet* res, int column) { return res->getDouble(column); } template<> inline string getValue(ResultSet* res, int column) { return res->getString(column); } template<> inline bool getValue(ResultSet* res, int column) { return res->getBoolean(column); } // implementiraj neku c++ kompatibilnu pretvorbu za timestampe, time, date datetime itd. class MySQL { mutex io; condition_variable condition; MySQL_Driver *drv; queue connection_pool; string path, username, password, database; uint32_t pool_size; const uint32_t max_t = thread::hardware_concurrency(); uint32_t t = 1; #ifdef MYSQL_USE_ASYNCO periodic p_loop; // delayed d_connect; #else future tloop_future, connect_f; bool run_tloop = true; #endif /** * Open one database server connection */ Connection* create_connection(); /** * Close one database connection */ bool disconnect_connection(Connection* connection); /** * Take an pool_size database connection */ Connection* occupy_connection(); /** * Free an database connection */ void release_connection(Connection* connection); /** * Function parses a parameterized row */ template static tuple getRow(sql::ResultSet* res, index_sequence) { return make_tuple(getValue(res, Is + 1)...); } /** * Connect all connections to server */ void connect_pool(); /** * Disconnect all connections to server */ void disconnect_pool(); /** * Internal tloop periodic */ void _tloop(uint32_t b, uint32_t e); /** * */ vector> divide_and_conquer(int n, int t) { vector> pairs; int part_size = n / t; int residue = n % t; // Ostatak koji treba rasporediti for (int i = 0; i < t; i++) { int start = i * part_size + std::min(i, residue); int end = start + part_size + (i < residue ? 1 : 0); pairs.push_back(make_pair(start, end)); } return pairs; } /** * Automatically adjusts */ void auto_adjusts(const uint32_t duration) { if (duration > (MYSQL_PERIODIC_INTERNAL_TIME*2)/3) { if (t < max_t) { t++; // povećaj broj asinkronih zadataka drugi put } else if (on_error) { on_error("Periodic maintenance takes too long"); } } else if (duration < MYSQL_PERIODIC_INTERNAL_TIME/3 && t > 1) { t--; // smanji nema potrebe } } #ifndef MYSQL_USE_ASYNCO /** * */ uint64_t timestamp() { return chrono::duration_cast( chrono::system_clock::now().time_since_epoch() ).count(); }; #endif public: function on_error; function on_connect; uint32_t connect_trys = 3; /** * MySQL constructor, * receive the path to the mysql server, * username, password, database name, * and number of active connections (optional) */ MySQL(const string _path, const string _username, const string _password, const string _db, const uint32_t _available = 1); /** * Execute the SQL statement */ template MySQL_Res exec(const string& sql_q) { Connection* connection = occupy_connection(); MySQL_Res result; try { Statement *stmt; stmt = connection->createStatement(); result.have_result = stmt->execute(sql_q); if (result.have_result) { ResultSet* res = stmt->getResultSet(); result.rows = res->rowsCount(); ResultSetMetaData *metaData = res->getMetaData(); int columnCount = metaData->getColumnCount(); for (int i = 1; i <= columnCount; ++i) { result.columns_name.push_back(metaData->getColumnName(i)); } while (res->next()) { result.push_back(MySQL::getRow(res, make_index_sequence{})); } res->close(); delete res; } else { result.affected = stmt->getUpdateCount(); } stmt->close(); delete stmt; release_connection(connection); } catch (sql::SQLException& e) { throw runtime_error(e.what()); // cerr << "SQLState: " << e.getSQLState() << endl; // cerr << "Error code: " << e.getErrorCode() << endl; } return result; } /** * Destruktor * close all connections */ ~MySQL(); }; } } #endif