From 23fdd03dfe7750685a98d88d5076eae7109b343d Mon Sep 17 00:00:00 2001 From: mbandic Date: Fri, 22 Mar 2024 13:15:42 +0000 Subject: [PATCH] Refactor preprocessor directives, wait for reference and const objetct, multiple listeners, fix nameless timer functions, disable increase runners --- .vscode/settings.json | 4 +- README.md | 45 ++++++++-------- lib/asynco.hpp | 17 +++--- lib/event.hpp | 30 +++++++---- lib/rotor.hpp | 121 ++++++++++++++++++++++++------------------ lib/runner.hpp | 66 ++++++++++------------- test/test.cpp | 85 ++++++++++++----------------- 7 files changed, 184 insertions(+), 184 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 7d75b1d..3eeb5bb 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -25,6 +25,8 @@ "future": "cpp", "*.ipp": "cpp", "bitset": "cpp", - "algorithm": "cpp" + "algorithm": "cpp", + "string": "cpp", + "string_view": "cpp" } } \ No newline at end of file diff --git a/README.md b/README.md index 5d69ce2..c0f529b 100644 --- a/README.md +++ b/README.md @@ -11,24 +11,22 @@ A C++ library for event-driven asynchronous multi-threaded programming. - Asynchronous programming - Multithread - Asynchronous timer functions: interval, timeout -- Typed events (on, emit) +- Typed events (on, emit, off) - Event loops -- Parallel execution loops +- Multiple parallel execution loops ## Installation Just download the latest release and unzip it into your project. ```c++ +#define NUM_OF_RUNNERS 2 // To change the number of threads used by asynco + #include "asynco/lib/asynco.hpp" // asynco(), wait() #include "asynco/lib/event.hpp" // event #include "asynco/lib/rotor.hpp" // interval, timeout -#include "asynco/lib/runner.hpp" // on_async +#include "asynco/lib/runner.hpp" // for own loop using namespace marcelb; -#ifndef ON_RUNNER -#define ON_RUNNER -runner on_async; -#endif ``` ## Usage @@ -56,22 +54,7 @@ Make functions asynchronous ```c++ /** -* Put task directly and get returned value - it is not recommended to use it -*/ - -auto res1 = on_async.put_task( [] () { - cout << "Not except " < auto asynco(F&& f, Args&&... args) -> future::type> { using return_type = typename result_of::type; - future res = on_async.put_task(bind(forward(f), forward(args)...)); + future res = _asyncon.put_task(bind(forward(f), forward(args)...)); return res; } @@ -27,10 +22,18 @@ auto asynco(F&& f, Args&&... args) -> future::typ * Block until the asynchronous call completes */ template -T wait(future r) { +T wait(future& r) { return r.get(); } +/** + * Block until the asynchronous call completes +*/ +template +T wait(future&& r) { + return move(r).get(); +} + } #endif \ No newline at end of file diff --git a/lib/event.hpp b/lib/event.hpp index c4e0a81..3d24174 100644 --- a/lib/event.hpp +++ b/lib/event.hpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include "runner.hpp" @@ -11,11 +12,6 @@ using namespace std; namespace marcelb { -#ifndef ON_RUNNER -#define ON_RUNNER -runner on_async; -#endif - /** * Event class, for event-driven programming. * These events are typed according to the arguments of the callback function @@ -23,7 +19,8 @@ runner on_async; template class event { private: - unordered_map> events; + mutex m_eve; + unordered_map>> events; public: @@ -31,7 +28,8 @@ class event { * Defines event by key, and callback function */ void on(const string& key, function callback) { - events[key] = callback; + lock_guard _off(m_eve); + events[key].push_back(callback); } /** @@ -39,13 +37,23 @@ class event { */ template void emit(const string& key, Args... args) { - auto it = events.find(key); - if (it != events.end()) { - auto callback = bind(it->second, forward(args)...); - on_async.put_task(callback); + auto it_eve = events.find(key); + if (it_eve != events.end()) { + for (uint i =0; isecond.size(); i++) { + auto callback = bind(it_eve->second[i], forward(args)...); + _asyncon.put_task(callback); + } } } + /** + * Remove an event listener from an event + */ + void off(const string& key) { + lock_guard _off(m_eve); + events.erase(key); + } + }; diff --git a/lib/rotor.hpp b/lib/rotor.hpp index 4daad39..b92c2d5 100644 --- a/lib/rotor.hpp +++ b/lib/rotor.hpp @@ -3,17 +3,13 @@ #include "runner.hpp" #include "chrono" +#include #include "iostream" using namespace std; using namespace marcelb; -#ifndef ON_RUNNER -#define ON_RUNNER -runner on_async; -#endif - namespace marcelb { /** @@ -27,15 +23,38 @@ int64_t rtime_ms() { } /** - * Structure for time events + * Intern class for timer async loop */ - -struct time_event { +class timer_core { + public: + mutex hangon; function callback; int64_t init; int64_t time; bool repeat; bool stop; + + /** + * Timer constructor, receives a callback function and time + */ + timer_core( function _callback, int64_t _time, bool _repeat): + callback(_callback), init(rtime_ms()), time(_time), repeat(_repeat), stop(false) { + } + + /** + * Stop timer + */ + void clear() { + lock_guard hang(hangon); + stop = true; + } + + /** + * Destruktor of timer, call stop + */ + ~timer_core() { + clear(); + } }; /** @@ -43,7 +62,7 @@ struct time_event { */ class rotor { - vector tevents; + vector> tcores; mutex te_m; bool rotating = true; int64_t sampling; @@ -54,17 +73,17 @@ class rotor { */ void loop() { while (rotating) { - for (int i=0; istop) { + if (tcores[i]->stop) { remove(i); i--; } - else if (expired(tevents[i])) { - on_async.put_task(tevents[i]->callback); - if (tevents[i]->repeat) { - tevents[i]->init = rtime_ms(); + else if (expired(tcores[i])) { + _asyncon.put_task(tcores[i]->callback); + if (tcores[i]->repeat) { + tcores[i]->init = rtime_ms(); } else { remove(i); @@ -79,8 +98,8 @@ class rotor { /** * The method checks whether the time event has expired */ - bool expired(struct time_event *tevent) { - return rtime_ms() - tevent->init >= tevent->time; + bool expired(shared_ptr tcore) { + return rtime_ms() - tcore->init >= tcore->time; } /** @@ -88,7 +107,7 @@ class rotor { */ void remove(const int& position) { lock_guard lock(te_m); - tevents.erase(tevents.begin()+position); + tcores.erase(tcores.begin()+position); update_sampling(); } @@ -96,17 +115,17 @@ class rotor { * Updates the idle time of the loop, according to twice the frequency of available events */ void update_sampling() { - if (tevents.empty()) { + if (tcores.empty()) { sampling = 100; return; } - sampling = tevents[0]->time; - for (int i=0; i tevents[i]->time) { - sampling = tevents[i]->time; + sampling = tcores[0]->time; + for (int i=0; i tcores[i]->time) { + sampling = tcores[i]->time; } } - sampling /= tevents.size()*2; + sampling /= tcores.size()*2; } public: @@ -115,7 +134,7 @@ class rotor { * Constructor for the rotor, starts the given loop by occupying one runner */ rotor() { - on_async.put_task( [&] () { + _asyncon.put_task( [&] () { loop(); }); }; @@ -123,9 +142,9 @@ class rotor { /** * Adds a time event to the stack */ - void insert(struct time_event *tevent) { + void insert(shared_ptr tcore) { lock_guard lock(te_m); - tevents.push_back(tevent); + tcores.push_back(tcore); update_sampling(); }; @@ -133,15 +152,15 @@ class rotor { * Returns the number of active events */ int active() { - return tevents.size(); + return tcores.size(); } /** * Stops all active events and stops the rotor */ ~rotor() { - for (int i=0; istop = true; + for (int i=0; iclear(); } rotating = false; } @@ -151,33 +170,32 @@ class rotor { /** * It is intended that there is only one global declaration */ -rotor _rotor; +static rotor _rotor; /** - * A class for all timer functions + * Core class for pure async timer functions */ -class timer_core { - public: - struct time_event t_event; - /** - * Timer constructor, receives a callback function and time - */ - timer_core( function _callback, int64_t _time): - t_event({ _callback, rtime_ms(), _time, false, false }) { +class _timer_intern { + shared_ptr tcore; + public: + _timer_intern(function _callback, int64_t _time, bool repeat) { + tcore = make_shared(_callback, _time, repeat); + _rotor.insert(tcore); } + /** - * Stop timer + * Stop interval */ void clear() { - t_event.stop = true; + tcore->clear(); } /** * Destruktor of timer, call stop */ - ~timer_core() { + ~_timer_intern() { clear(); } }; @@ -185,31 +203,30 @@ class timer_core { /** * Class interval for periodic execution of the callback in time in ms */ -class interval : public timer_core { +class interval : public _timer_intern { public: - + /** * The constructor receives a callback function and an interval time */ - interval( function _callback, int64_t _time): timer_core(_callback, _time) { - t_event.repeat = true; - _rotor.insert(&t_event); + interval( function _callback, int64_t _time): + _timer_intern(_callback, _time, true) { } }; /** * Class interval for delayed callback execution in ms */ -class timeout : public timer_core { +class timeout : public _timer_intern { public: /** * The constructor receives a callback function and a delay time */ - timeout( function _callback, int64_t delay): timer_core(_callback, delay) { - t_event.repeat = false; - _rotor.insert(&t_event); + timeout( function _callback, int64_t delay): + _timer_intern(_callback, delay, false) { } + }; } diff --git a/lib/runner.hpp b/lib/runner.hpp index b56ffd9..678723c 100644 --- a/lib/runner.hpp +++ b/lib/runner.hpp @@ -13,9 +13,7 @@ using namespace std; namespace marcelb { -#ifdef ON_RUNNER -extern runner on_async; -#endif +#define HW_CONCURRENCY_MINIMAL 4 /** * The runner class implements multithread, task stack and event loop for asynchronous execution of tasks @@ -27,12 +25,29 @@ class runner { mutex q_io; condition_variable cv; bool stop; - + + public: + /** - * Increase number of runners + * The constructor starts as many threads as the system has cores, + * and runs an event loop inside each one. + * Each event loop waits for tasks from the stack and executes them. */ - void increase_runners(unsigned int increase) { - for (size_t i = 0; i < increase; ++i) { + runner(unsigned int _num_of_runners = 0) : stop(false) { + unsigned int num_of_runners = _num_of_runners; + + if (num_of_runners == 0) { + #ifdef NUM_OF_RUNNERS + num_of_runners = NUM_OF_RUNNERS; + #else + num_of_runners = thread::hardware_concurrency(); + if (num_of_runners < HW_CONCURRENCY_MINIMAL) { + num_of_runners = HW_CONCURRENCY_MINIMAL; + } + #endif + } + + for (size_t i = 0; i < num_of_runners; ++i) { runners.emplace_back( thread([&] { while (!stop) { function task; @@ -51,21 +66,6 @@ class runner { } } - public: - - /** - * The constructor starts as many threads as the system has cores, - * and runs an event loop inside each one. - * Each event loop waits for tasks from the stack and executes them. - */ - runner(size_t pool_size = thread::hardware_concurrency()) : stop(false) { - if (pool_size < 4) { - pool_size = 4; - } - increase_runners(pool_size); - // start_all_runners(pool_size); - } - /** @@ -92,22 +92,6 @@ class runner { return res; } - /** - * Change the number of runners - */ - void change_runners (unsigned int num_of_runners) { - if (num_of_runners == 0 || num_of_runners > 64) { - throw runtime_error("Not allowed runners size"); - } - - int difference = num_of_runners - count_threads(); - if (difference < 0) { // reduce - throw runtime_error("Is not allowed to reduce runners"); - } else if (difference > 0) { // increase - increase_runners(difference); - } - } - /** * Returns the number of tasks the runner has to perform */ @@ -139,6 +123,12 @@ class runner { }; + +/** + * Internal global library variable +*/ +static runner _asyncon; + } #endif \ No newline at end of file diff --git a/test/test.cpp b/test/test.cpp index 083c8a8..a035dd9 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -1,5 +1,5 @@ +#define NUM_OF_RUNNERS 2 -#include "../lib/runner.hpp" #include "../lib/asynco.hpp" #include "../lib/event.hpp" #include "../lib/rotor.hpp" @@ -11,12 +11,6 @@ using namespace std; using namespace marcelb; using namespace this_thread; -#ifndef ON_RUNNER -#define ON_RUNNER -runner on_async; -#endif - - void sleep_to (int _time) { promise _promise; timeout t( [&]() { @@ -61,7 +55,6 @@ class myOwnClass : public event { int main () { - on_async.change_runners(64); auto start = rtime_ms(); @@ -71,22 +64,6 @@ int main () { * Init interval and timeout; clear interval and timeout */ - // ovo ne radi - - // vector interv; - // vector tmout; - - // for (int i=0; i< 20; i++) { - // interv.push_back( interval( [i] () { - // cout << "interval " << i << endl; - // }, 1000)); - // tmout.push_back( timeout( [i] () { - // cout << "timeout " << i << endl; - // }, 1000*i)); - // } - - // ovo valja popravit - // interval( [] () { // cout << "interval " << endl; // }, 1000); @@ -106,8 +83,11 @@ int main () { // timeout time1 ( [&] () { // cout << "Close interval 1 i 2 " << rtime_ms() - start << endl; // inter1.clear(); + // // cout << "inter1.stop " << inter1.stop << endl; // inter2.clear(); - // }, 10000); + // // cout << "inter2.stop " << inter2.stop << endl; + + // }, 5000); // timeout time2 ([&] () { // cout << "Close interval 3 " << rtime_ms() - start << endl; @@ -117,28 +97,13 @@ int main () { // // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS ------------------------- - // /** - // * Put task directly and get returned value - it is not recommended to use it - // */ - - // auto res1 = on_async.put_task( [] () { - // cout << "Jebiga " <