From e4fbabd530b7cdeb90b8a1bec95c43f6da129e0a Mon Sep 17 00:00:00 2001 From: marcelb Date: Fri, 29 Mar 2024 11:39:58 +0100 Subject: [PATCH] Switch atask, events, timers to asio --- README.md | 25 ++++- lib/asynco.hpp | 52 +++++++++- lib/event.hpp | 5 +- lib/filesystem.hpp | 7 +- lib/rotor.hpp | 237 --------------------------------------------- lib/runner.hpp | 136 -------------------------- lib/timers.hpp | 223 ++++++++++++++++++++++++++++++++++++++++++ test/test.cpp | 76 +++++++-------- 8 files changed, 333 insertions(+), 428 deletions(-) delete mode 100644 lib/rotor.hpp delete mode 100644 lib/runner.hpp create mode 100644 lib/timers.hpp diff --git a/README.md b/README.md index 5b0d11d..610e36b 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ A C++ library for event-driven asynchronous multi-threaded programming. - Event loops - Multiple parallel execution loops - Asynchronous file IO +- Based on ASIO (Boost Asio) ## Installation Just download the latest release and unzip it into your project. @@ -24,14 +25,17 @@ Just download the latest release and unzip it into your project. #include "asynco/lib/asynco.hpp" // atask(), wait() #include "asynco/lib/event.hpp" // event -#include "asynco/lib/rotor.hpp" // interval, timeout -#include "asynco/lib/runner.hpp" // for own loop +#include "asynco/lib/timers.hpp" // interval, timeout #include "asynco/lib/filesystem.hpp" // for async read and write files using namespace marcelb; using namespace asynco; using namespace events; +// At the end of the main function, always set +_asynco_engine.run(); +return 0; + ``` ## Usage @@ -45,7 +49,13 @@ interval inter1 ([]() { }, 1000); // stop interval -inter1.clear(); +inter1.stop(); + +// how many times it has expired +int t = inter1.ticks(); + +// is it stopped +bool stoped = inter1.stoped(); // start timeout timeout time1 ( [] () { @@ -53,7 +63,14 @@ timeout time1 ( [] () { }, 10000); // stop timeout -time1.clear(); +time1.stop(); + +// is it expired +int t = time1.expired(); + +// is it stopped +bool stoped = time1.stoped(); + ``` Make functions asynchronous diff --git a/lib/asynco.hpp b/lib/asynco.hpp index afd99c6..9104f27 100644 --- a/lib/asynco.hpp +++ b/lib/asynco.hpp @@ -1,21 +1,67 @@ #ifndef _ASYNCO_ #define _ASYNCO_ -#include "runner.hpp" +#include +#include using namespace std; namespace marcelb { namespace asynco { +#define HW_CONCURRENCY_MINIMAL 4 + + +/** + * Internal anonymous class for initializing the ASIO context and thread pool + * !!! It is anonymous to protect against use in the initialization of other objects of the same type !!! +*/ +class { + public: + boost::asio::io_context io_context; + + void run() { + for (auto& runner : runners) { + runner.join(); + } + } + + private: + + unique_ptr work { [&] () { + return new boost::asio::io_service::work(io_context); + } ()}; + + vector runners { [&] () { + vector _runs; + unsigned int num_of_runners; + #ifdef NUM_OF_RUNNERS + num_of_runners = NUM_OF_RUNNERS; + #else + num_of_runners = thread::hardware_concurrency(); + if (num_of_runners < HW_CONCURRENCY_MINIMAL) { + num_of_runners = HW_CONCURRENCY_MINIMAL; + } + #endif + + for (int i=0; i auto atask(F&& f, Args&&... args) -> future::type> { using return_type = typename result_of::type; - - future res = _asyncon.put_task(bind(forward(f), forward(args)...)); + future res = _asynco_engine.io_context.post(boost::asio::use_future(bind(forward(f), forward(args)...))); return res; } diff --git a/lib/event.hpp b/lib/event.hpp index cb70873..a7593be 100644 --- a/lib/event.hpp +++ b/lib/event.hpp @@ -1,15 +1,14 @@ #ifndef _EVENT_ #define _EVENT_ -#include #include #include #include #include -#include "runner.hpp" using namespace std; +#include "asynco.hpp" namespace marcelb { namespace asynco { namespace events { @@ -43,7 +42,7 @@ class event { if (it_eve != events.end()) { for (uint i =0; isecond.size(); i++) { auto callback = bind(it_eve->second[i], forward(args)...); - _asyncon.put_task(callback); + atask(callback); } } } diff --git a/lib/filesystem.hpp b/lib/filesystem.hpp index bd9faeb..18dbb0a 100644 --- a/lib/filesystem.hpp +++ b/lib/filesystem.hpp @@ -3,13 +3,12 @@ #include "asynco.hpp" +using namespace marcelb; +using namespace asynco; #include -#include - using namespace std; -using namespace marcelb; -using namespace asynco; + namespace marcelb { namespace asynco { diff --git a/lib/rotor.hpp b/lib/rotor.hpp deleted file mode 100644 index c832df7..0000000 --- a/lib/rotor.hpp +++ /dev/null @@ -1,237 +0,0 @@ -#ifndef _ROTOR_ -#define _ROTOT_ - -#include "runner.hpp" -#include "chrono" -#include - -#include "iostream" - -using namespace std; -using namespace marcelb; -using namespace asynco; - -namespace marcelb { -namespace asynco { - -/** - * Get the time in ms from the epoch -*/ - -int64_t rtime_ms() { - return chrono::duration_cast(chrono::system_clock::now() - .time_since_epoch()) - .count(); -} - -int64_t rtime_us() { - return chrono::duration_cast(chrono::system_clock::now() - .time_since_epoch()) - .count(); -} - -namespace { - -/** - * Intern class for timer async loop -*/ -class timer_core { - public: - mutex hangon; - condition_variable cv; - function callback; - int64_t time; - int64_t next; - bool repeat; - bool stop; - - /** - * Timer constructor, receives a callback function and time - */ - timer_core( function _callback, int64_t _time, bool _repeat): - callback(_callback), time(_time*1000), repeat(_repeat), stop(false) { - next = rtime_us() + time; - } - - /** - * Stop timer - */ - void clear() { - // lock_guard hang(hangon); - stop = true; - cv.notify_one(); - } - - /** - * Destruktor of timer, call stop - */ - ~timer_core() { - clear(); - } -}; - -/** - * Event loop for time events -*/ - -class rotor { - vector> tcores; - mutex te_m; - bool rotating = true; - int64_t sampling; - - condition_variable te_cv; - - /** - * Loop method, started by the constructor in a separate runner - * It checks the events on the stack and sends the expired ones to the runner - */ - void loop() { - while (rotating) { - vector>::iterator next_tc; - shared_ptr next_ptr; - - { - unique_lock te_l(te_m); - te_cv.wait(te_l, [this]{ return !tcores.empty() || rotating; }); - if (!rotating) { - break; - } - - next_tc = min_element( tcores.begin(), tcores.end(), - [](shared_ptr a, shared_ptr b ) { - return a->next < b->next; - } - ); - - next_ptr = *next_tc; - } - - unique_lock next_l(next_ptr->hangon); - next_ptr->cv.wait_for(next_l, chrono::microseconds(next_ptr->next - rtime_us()), [&next_ptr] () { - return next_ptr->stop; - }); - - if (next_ptr->stop) { - remove(next_tc); - } else { - _asyncon.put_task(next_ptr->callback); - if (next_ptr->repeat) { - next_ptr->next += next_ptr->time; - } - else { - remove(next_tc); - } - } - - } - } - - /** - * The method deletes a non-repeating or stopped event from the stack - */ - void remove(vector>::iterator it) { - lock_guard lock(te_m); - tcores.erase(it); - // te_cv.notify_one(); - } - - public: - - /** - * Constructor for the rotor, starts the given loop by occupying one runner - */ - rotor() { - _asyncon.put_task( [&] () { - loop(); - }); - }; - - /** - * Adds a time event to the stack - */ - void insert(shared_ptr tcore) { - lock_guard lock(te_m); - tcores.push_back(tcore); - te_cv.notify_one(); - }; - - /** - * Returns the number of active events - */ - int active() { - return tcores.size(); - } - - /** - * Stops all active events and stops the rotor - */ - ~rotor() { - for (int i=0; iclear(); - } - rotating = false; - } - -}; - -/** - * It is intended that there is only one global declaration -*/ -static rotor _rotor; -} - -/** - * Core class for pure async timer functions -*/ - -class _timer_intern { - shared_ptr tcore; - public: - - _timer_intern(function _callback, int64_t _time, bool repeat) { - tcore = make_shared(_callback, _time, repeat); - _rotor.insert(tcore); - } - - /** - * Stop interval - */ - void clear() { - tcore->clear(); - } -}; - -/** - * Class interval for periodic execution of the callback in time in ms -*/ -class interval : public _timer_intern { - public: - - /** - * The constructor receives a callback function and an interval time - */ - interval( function _callback, int64_t _time): - _timer_intern(_callback, _time, true) { - } -}; - -/** - * Class interval for delayed callback execution in ms -*/ -class timeout : public _timer_intern { - public: - - /** - * The constructor receives a callback function and a delay time - */ - timeout( function _callback, int64_t delay): - _timer_intern(_callback, delay, false) { - } - -}; - -} -} - -#endif diff --git a/lib/runner.hpp b/lib/runner.hpp deleted file mode 100644 index 387bd65..0000000 --- a/lib/runner.hpp +++ /dev/null @@ -1,136 +0,0 @@ -#ifndef _RUNNER_ -#define _RUNNER_ - -#include -#include -#include -#include -#include -#include -#include - -using namespace std; - -namespace marcelb { -namespace asynco { - -#define HW_CONCURRENCY_MINIMAL 4 - -/** - * The runner class implements multithread, task stack and event loop for asynchronous execution of tasks -*/ -class runner { - private: - vector runners; - queue> tasks; - mutex q_io; - condition_variable cv; - bool stop; - - public: - - /** - * The constructor starts as many threads as the system has cores, - * and runs an event loop inside each one. - * Each event loop waits for tasks from the stack and executes them. - */ - runner(unsigned int _num_of_runners = 0) : stop(false) { - unsigned int num_of_runners = _num_of_runners; - - if (num_of_runners == 0) { - #ifdef NUM_OF_RUNNERS - num_of_runners = NUM_OF_RUNNERS; - #else - num_of_runners = thread::hardware_concurrency(); - if (num_of_runners < HW_CONCURRENCY_MINIMAL) { - num_of_runners = HW_CONCURRENCY_MINIMAL; - } - #endif - } - - for (size_t i = 0; i < num_of_runners; ++i) { - runners.emplace_back( thread([&] { - while (!stop) { - function task; - { - unique_lock lock(q_io); - cv.wait(lock, [this] { return stop || !tasks.empty(); }); - // if (stop && tasks.empty()) - if (stop) - return; - task = move(tasks.front()); - tasks.pop(); - } - task(); - } - })); - } - } - - - - /** - * With the method, we send the callback function and its arguments to the task stack - */ - template - auto put_task(F&& f, Args&&... args) - -> future::type> { - using return_type = typename result_of::type; - - auto task = make_shared>(bind(forward(f), forward(args)...)); - future res = task->get_future(); - { - unique_lock lock(q_io); - - if (stop) { - throw runtime_error("Pool is stoped!"); - } - - tasks.emplace([task]() { (*task)(); }); - } - - cv.notify_one(); - return res; - } - - /** - * Returns the number of tasks the runner has to perform - */ - unsigned int count_tasks() { - return tasks.size(); - } - - /** - * Returns the number of threads used by the runner - */ - unsigned int count_threads() { - return runners.size(); - } - - /** - * The destructor stops all loops and stops all threads - */ - ~runner() { - { - unique_lock lock(q_io); - stop = true; - } - cv.notify_all(); - for (thread& runner : runners) { - runner.join(); - } - runners.clear(); - } - -}; - - -/** - * Internal global library variable -*/ -static runner _asyncon; - -} -} - -#endif \ No newline at end of file diff --git a/lib/timers.hpp b/lib/timers.hpp new file mode 100644 index 0000000..b4befdd --- /dev/null +++ b/lib/timers.hpp @@ -0,0 +1,223 @@ +#ifndef _ROTOR_ +#define _ROTOT_ + +#include "asynco.hpp" +#include + +using namespace std; +using namespace marcelb; +using namespace asynco; + +namespace marcelb { +namespace asynco { + +/** + * Get the time in ms from the epoch +*/ + +int64_t rtime_ms() { + return chrono::duration_cast(chrono::system_clock::now() + .time_since_epoch()) + .count(); +} + +/** + * Get the time in us from the epoch +*/ + +int64_t rtime_us() { + return chrono::duration_cast(chrono::system_clock::now() + .time_since_epoch()) + .count(); +} + +/** + * Core timer class for construct time async functions +*/ +class timer { + boost::asio::steady_timer st; + bool _stop = false; + bool repeate; + function callback; + uint64_t time; + uint64_t _ticks = 0; + + /** + * A method to assign a callback wrapper and a reinitialization algorithm + */ + void init() { + st.async_wait( [this] (const boost::system::error_code&) { + if (!_stop) { + callback(); + if (repeate) { + st = boost::asio::steady_timer(_asynco_engine.io_context, boost::asio::chrono::milliseconds(time)); + init(); + } + _ticks++; + } + }); + } + + public: + + /** + * The constructor creates the steady_timer and accompanying variables and runs a method to initialize the timer + */ + timer (function _callback, uint64_t _time, bool _repeate) : + st(_asynco_engine.io_context, boost::asio::chrono::milliseconds(_time)), + _stop(false), + repeate(_repeate), + callback(_callback), + time(_time) { + + init(); + } + + /** + * Stop timer + * The stop flag is set and timer remove it from the queue + */ + void stop() { + _stop = true; + st.cancel(); + } + + /** + * Run callback now + * Forces the callback function to run independently of the timer + */ + void now() { + st.cancel(); + } + + /** + * Get the number of times the timer callback was runned + */ + uint64_t ticks() { + return _ticks; + } + + /** + * The logic status of the timer stop state + */ + bool stoped() { + return _stop; + } + + /** + * The destructor stops the timer + */ + ~timer() { + stop(); + } +}; + +/** + * Class interval for periodic execution of the callback in time in ms +*/ +class interval { + shared_ptr _timer; + + public: + /** + * Constructor initializes a shared pointer of type timer + */ + interval(function callback, uint64_t time) : + _timer(make_shared (callback, time, true)) { + } + + /** + * Stop interval + * The stop flag is set and interval remove it from the queue + */ + void stop() { + _timer->stop(); + } + + /** + * Run callback now + * Forces the callback function to run independently of the interval + */ + void now() { + _timer->now(); + } + + /** + * Get the number of times the interval callback was runned + */ + uint64_t ticks() { + return _timer->ticks(); + } + + /** + * The logic status of the interval stop state + */ + bool stoped() { + return _timer->stoped(); + } + + /** + * The destructor stops the interval + */ + ~interval() { + stop(); + } +}; + +/** + * Class timeout for delayed callback execution in ms +*/ +class timeout { + shared_ptr _timer; + + public: + /** + * Constructor initializes a shared pointer of type timer + */ + timeout(function callback, uint64_t time) : + _timer(make_shared (callback, time, false)) { + } + + /** + * Stop timeout + * The stop flag is set and timeout remove it from the queue + */ + void stop() { + _timer->stop(); + } + + /** + * Run callback now + * Forces the callback function to run independently of the timeout + */ + void now() { + _timer->now(); + } + + /** + * Get the number of times the timeout callback was runned + */ + bool expired() { + return bool(_timer->ticks()); + } + + /** + * The logic status of the timeout stop state + */ + bool stoped() { + return _timer->stoped(); + } + + /** + * The destructor stops the timeout + */ + ~timeout() { + stop(); + } + +}; + +} +} + +#endif diff --git a/test/test.cpp b/test/test.cpp index 381e50b..d3589cc 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -2,20 +2,20 @@ #include "../lib/asynco.hpp" #include "../lib/event.hpp" -// #include "../lib/rotor.hpp" #include "../lib/filesystem.hpp" #include "../lib/timers.hpp" +using namespace marcelb::asynco; +using namespace events; + #include #include #include using namespace std; -using namespace marcelb::asynco; -using namespace events; -using namespace asynco; using namespace this_thread; + void sleep_to (int _time) { promise _promise; timeout t( [&]() { @@ -65,19 +65,9 @@ int main () { // --------------- TIME ASYNCHRONOUS FUNCTIONS -------------- - /** - * Init interval and timeout; clear interval and timeout - */ - - - // vector intervals; - - // for(int i=0; i<1000; i++) { - // intervals.push_back(interval( [i, &start]() { - // cout << "interval " << i << " end: " << rtime_ms() - start << endl; - // }, (i%5 +1)*1000)); - // } - + // /** + // * Init interval and timeout; clear interval and timeout + // */ // interval inter1 ([&]() { // cout << "interval prvi " << rtime_ms() - start << endl; @@ -89,11 +79,12 @@ int main () { // interval inter3 ([&]() { // cout << "interval treći " << rtime_ms() - start << endl; - // }, 3000); + // }, 1000); // interval inter4 ([&]() { - // cout << "interval cetvrti " << rtime_ms() - start << endl; - // }, 1000); + // // cout << "interval cetvrti " << rtime_ms() - start << endl; + // cout << "Ticks " << inter3.ticks() << endl; + // }, 500); // interval inter5 ([&]() { // cout << "interval peti " << rtime_ms() - start << endl; @@ -105,7 +96,7 @@ int main () { // timeout time1 ( [&] () { // cout << "Close interval 1 i 2 " << rtime_ms() - start << endl; - // // inter1.stop(); + // inter1.stop(); // cout << "inter1.stop " << endl; // inter2.stop(); // cout << "inter2.stop " << endl; @@ -113,15 +104,26 @@ int main () { // timeout time2 ([&] () { - // // cout << "Close interval 3 " << rtime_ms() - start << endl; + // cout << "Close interval 3 " << rtime_ms() - start << endl; // inter3.stop(); + // cout << "Stoped " << inter3.stoped() << endl; // // time1.stop(); - // }, 2000); + // }, 5000); - // interval ( [] () { - // cout << "BROJ TAJMERA " << _intern_asynco_timer_globals.timers.size() << endl; - // }, 5000); + // if (time2.expired()) { + // cout << "isteko " << endl; + // } else { + // cout << "nije isteko " << endl; + // } + + // // sleep(6); + + // if (time2.expired()) { + // cout << "isteko " << endl; + // } else { + // cout << "nije isteko " << endl; + // } // // // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS ------------------------- @@ -214,7 +216,7 @@ int main () { // }); // }); - // // --------------- EVENTS ------------------- + // // // --------------- EVENTS ------------------- // /** // * initialization of typed events @@ -290,8 +292,8 @@ int main () { // } - string data_; - auto start_read = rtime_us(); + // string data_; + // auto start_read = rtime_us(); // fs::read("test1.txt", [&data_, &start_read] (string data, exception* error) { // if (error) { @@ -304,19 +306,11 @@ int main () { // } // }); - fs::read2("test1.txt", [&data_, &start_read] (string data, exception* error) { - if (error) { - cout << "Error " << error->what() << endl; - } else { - // cout << "Data " << endl << data << endl; - // data_ = data; - // cout << "Data_" << data_ << endl; - cout << "read " << rtime_us() - start_read << endl; - } - }); - cout << "Sleep" << endl; - sleep(100000); // only for testing + // ---------------------------------------------------------------------------------------------------- + + cout << "Run" << endl; + _asynco_engine.run(); return 0; }