diff --git a/README.md b/README.md index 5b0d11d..610e36b 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ A C++ library for event-driven asynchronous multi-threaded programming. - Event loops - Multiple parallel execution loops - Asynchronous file IO +- Based on ASIO (Boost Asio) ## Installation Just download the latest release and unzip it into your project. @@ -24,14 +25,17 @@ Just download the latest release and unzip it into your project. #include "asynco/lib/asynco.hpp" // atask(), wait() #include "asynco/lib/event.hpp" // event -#include "asynco/lib/rotor.hpp" // interval, timeout -#include "asynco/lib/runner.hpp" // for own loop +#include "asynco/lib/timers.hpp" // interval, timeout #include "asynco/lib/filesystem.hpp" // for async read and write files using namespace marcelb; using namespace asynco; using namespace events; +// At the end of the main function, always set +_asynco_engine.run(); +return 0; + ``` ## Usage @@ -45,7 +49,13 @@ interval inter1 ([]() { }, 1000); // stop interval -inter1.clear(); +inter1.stop(); + +// how many times it has expired +int t = inter1.ticks(); + +// is it stopped +bool stoped = inter1.stoped(); // start timeout timeout time1 ( [] () { @@ -53,7 +63,14 @@ timeout time1 ( [] () { }, 10000); // stop timeout -time1.clear(); +time1.stop(); + +// is it expired +int t = time1.expired(); + +// is it stopped +bool stoped = time1.stoped(); + ``` Make functions asynchronous diff --git a/lib/asynco.hpp b/lib/asynco.hpp index afd99c6..9104f27 100644 --- a/lib/asynco.hpp +++ b/lib/asynco.hpp @@ -1,21 +1,67 @@ #ifndef _ASYNCO_ #define _ASYNCO_ -#include "runner.hpp" +#include +#include using namespace std; namespace marcelb { namespace asynco { +#define HW_CONCURRENCY_MINIMAL 4 + + +/** + * Internal anonymous class for initializing the ASIO context and thread pool + * !!! It is anonymous to protect against use in the initialization of other objects of the same type !!! +*/ +class { + public: + boost::asio::io_context io_context; + + void run() { + for (auto& runner : runners) { + runner.join(); + } + } + + private: + + unique_ptr work { [&] () { + return new boost::asio::io_service::work(io_context); + } ()}; + + vector runners { [&] () { + vector _runs; + unsigned int num_of_runners; + #ifdef NUM_OF_RUNNERS + num_of_runners = NUM_OF_RUNNERS; + #else + num_of_runners = thread::hardware_concurrency(); + if (num_of_runners < HW_CONCURRENCY_MINIMAL) { + num_of_runners = HW_CONCURRENCY_MINIMAL; + } + #endif + + for (int i=0; i auto atask(F&& f, Args&&... args) -> future::type> { using return_type = typename result_of::type; - - future res = _asyncon.put_task(bind(forward(f), forward(args)...)); + future res = _asynco_engine.io_context.post(boost::asio::use_future(bind(forward(f), forward(args)...))); return res; } diff --git a/lib/event.hpp b/lib/event.hpp index cb70873..a7593be 100644 --- a/lib/event.hpp +++ b/lib/event.hpp @@ -1,15 +1,14 @@ #ifndef _EVENT_ #define _EVENT_ -#include #include #include #include #include -#include "runner.hpp" using namespace std; +#include "asynco.hpp" namespace marcelb { namespace asynco { namespace events { @@ -43,7 +42,7 @@ class event { if (it_eve != events.end()) { for (uint i =0; isecond.size(); i++) { auto callback = bind(it_eve->second[i], forward(args)...); - _asyncon.put_task(callback); + atask(callback); } } } diff --git a/lib/filesystem.hpp b/lib/filesystem.hpp index bd9faeb..18dbb0a 100644 --- a/lib/filesystem.hpp +++ b/lib/filesystem.hpp @@ -3,13 +3,12 @@ #include "asynco.hpp" +using namespace marcelb; +using namespace asynco; #include -#include - using namespace std; -using namespace marcelb; -using namespace asynco; + namespace marcelb { namespace asynco { diff --git a/lib/runner.hpp b/lib/runner.hpp deleted file mode 100644 index 387bd65..0000000 --- a/lib/runner.hpp +++ /dev/null @@ -1,136 +0,0 @@ -#ifndef _RUNNER_ -#define _RUNNER_ - -#include -#include -#include -#include -#include -#include -#include - -using namespace std; - -namespace marcelb { -namespace asynco { - -#define HW_CONCURRENCY_MINIMAL 4 - -/** - * The runner class implements multithread, task stack and event loop for asynchronous execution of tasks -*/ -class runner { - private: - vector runners; - queue> tasks; - mutex q_io; - condition_variable cv; - bool stop; - - public: - - /** - * The constructor starts as many threads as the system has cores, - * and runs an event loop inside each one. - * Each event loop waits for tasks from the stack and executes them. - */ - runner(unsigned int _num_of_runners = 0) : stop(false) { - unsigned int num_of_runners = _num_of_runners; - - if (num_of_runners == 0) { - #ifdef NUM_OF_RUNNERS - num_of_runners = NUM_OF_RUNNERS; - #else - num_of_runners = thread::hardware_concurrency(); - if (num_of_runners < HW_CONCURRENCY_MINIMAL) { - num_of_runners = HW_CONCURRENCY_MINIMAL; - } - #endif - } - - for (size_t i = 0; i < num_of_runners; ++i) { - runners.emplace_back( thread([&] { - while (!stop) { - function task; - { - unique_lock lock(q_io); - cv.wait(lock, [this] { return stop || !tasks.empty(); }); - // if (stop && tasks.empty()) - if (stop) - return; - task = move(tasks.front()); - tasks.pop(); - } - task(); - } - })); - } - } - - - - /** - * With the method, we send the callback function and its arguments to the task stack - */ - template - auto put_task(F&& f, Args&&... args) - -> future::type> { - using return_type = typename result_of::type; - - auto task = make_shared>(bind(forward(f), forward(args)...)); - future res = task->get_future(); - { - unique_lock lock(q_io); - - if (stop) { - throw runtime_error("Pool is stoped!"); - } - - tasks.emplace([task]() { (*task)(); }); - } - - cv.notify_one(); - return res; - } - - /** - * Returns the number of tasks the runner has to perform - */ - unsigned int count_tasks() { - return tasks.size(); - } - - /** - * Returns the number of threads used by the runner - */ - unsigned int count_threads() { - return runners.size(); - } - - /** - * The destructor stops all loops and stops all threads - */ - ~runner() { - { - unique_lock lock(q_io); - stop = true; - } - cv.notify_all(); - for (thread& runner : runners) { - runner.join(); - } - runners.clear(); - } - -}; - - -/** - * Internal global library variable -*/ -static runner _asyncon; - -} -} - -#endif \ No newline at end of file diff --git a/lib/timers.hpp b/lib/timers.hpp new file mode 100644 index 0000000..b4befdd --- /dev/null +++ b/lib/timers.hpp @@ -0,0 +1,223 @@ +#ifndef _ROTOR_ +#define _ROTOT_ + +#include "asynco.hpp" +#include + +using namespace std; +using namespace marcelb; +using namespace asynco; + +namespace marcelb { +namespace asynco { + +/** + * Get the time in ms from the epoch +*/ + +int64_t rtime_ms() { + return chrono::duration_cast(chrono::system_clock::now() + .time_since_epoch()) + .count(); +} + +/** + * Get the time in us from the epoch +*/ + +int64_t rtime_us() { + return chrono::duration_cast(chrono::system_clock::now() + .time_since_epoch()) + .count(); +} + +/** + * Core timer class for construct time async functions +*/ +class timer { + boost::asio::steady_timer st; + bool _stop = false; + bool repeate; + function callback; + uint64_t time; + uint64_t _ticks = 0; + + /** + * A method to assign a callback wrapper and a reinitialization algorithm + */ + void init() { + st.async_wait( [this] (const boost::system::error_code&) { + if (!_stop) { + callback(); + if (repeate) { + st = boost::asio::steady_timer(_asynco_engine.io_context, boost::asio::chrono::milliseconds(time)); + init(); + } + _ticks++; + } + }); + } + + public: + + /** + * The constructor creates the steady_timer and accompanying variables and runs a method to initialize the timer + */ + timer (function _callback, uint64_t _time, bool _repeate) : + st(_asynco_engine.io_context, boost::asio::chrono::milliseconds(_time)), + _stop(false), + repeate(_repeate), + callback(_callback), + time(_time) { + + init(); + } + + /** + * Stop timer + * The stop flag is set and timer remove it from the queue + */ + void stop() { + _stop = true; + st.cancel(); + } + + /** + * Run callback now + * Forces the callback function to run independently of the timer + */ + void now() { + st.cancel(); + } + + /** + * Get the number of times the timer callback was runned + */ + uint64_t ticks() { + return _ticks; + } + + /** + * The logic status of the timer stop state + */ + bool stoped() { + return _stop; + } + + /** + * The destructor stops the timer + */ + ~timer() { + stop(); + } +}; + +/** + * Class interval for periodic execution of the callback in time in ms +*/ +class interval { + shared_ptr _timer; + + public: + /** + * Constructor initializes a shared pointer of type timer + */ + interval(function callback, uint64_t time) : + _timer(make_shared (callback, time, true)) { + } + + /** + * Stop interval + * The stop flag is set and interval remove it from the queue + */ + void stop() { + _timer->stop(); + } + + /** + * Run callback now + * Forces the callback function to run independently of the interval + */ + void now() { + _timer->now(); + } + + /** + * Get the number of times the interval callback was runned + */ + uint64_t ticks() { + return _timer->ticks(); + } + + /** + * The logic status of the interval stop state + */ + bool stoped() { + return _timer->stoped(); + } + + /** + * The destructor stops the interval + */ + ~interval() { + stop(); + } +}; + +/** + * Class timeout for delayed callback execution in ms +*/ +class timeout { + shared_ptr _timer; + + public: + /** + * Constructor initializes a shared pointer of type timer + */ + timeout(function callback, uint64_t time) : + _timer(make_shared (callback, time, false)) { + } + + /** + * Stop timeout + * The stop flag is set and timeout remove it from the queue + */ + void stop() { + _timer->stop(); + } + + /** + * Run callback now + * Forces the callback function to run independently of the timeout + */ + void now() { + _timer->now(); + } + + /** + * Get the number of times the timeout callback was runned + */ + bool expired() { + return bool(_timer->ticks()); + } + + /** + * The logic status of the timeout stop state + */ + bool stoped() { + return _timer->stoped(); + } + + /** + * The destructor stops the timeout + */ + ~timeout() { + stop(); + } + +}; + +} +} + +#endif diff --git a/test/test.cpp b/test/test.cpp index a1a427b..ed6c04c 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -1,19 +1,21 @@ -// #define NUM_OF_RUNNERS 2 +// // #define NUM_OF_RUNNERS 2 #include "../lib/asynco.hpp" #include "../lib/event.hpp" -#include "../lib/rotor.hpp" #include "../lib/filesystem.hpp" +#include "../lib/timers.hpp" + +using namespace marcelb::asynco; +using namespace events; #include #include +#include using namespace std; -using namespace marcelb::asynco; -using namespace events; -using namespace asynco; using namespace this_thread; + void sleep_to (int _time) { promise _promise; timeout t( [&]() { @@ -63,38 +65,9 @@ int main () { // --------------- TIME ASYNCHRONOUS FUNCTIONS -------------- - /** - * Init interval and timeout; clear interval and timeout - */ - - vector intervals; - - for(int i=0; i<10; i++) { - intervals.push_back(interval( [i, &start]() { - cout << "interval " << i << " end: " << rtime_ms() - start << endl; - }, (i%5 +1)*1000)); - } - - // interval( [&] () { - // cout << "interval 1: " << rtime_ms() - start << endl; - // }, 50); - - // interval( [&] () { - // cout << "interval 1: " << rtime_ms() - start << endl; - // }, 100); - - // interval( [&] () { - // cout << "interval 2: " << rtime_ms() - start << endl; - // }, 200); - - // interval( [&] () { - // cout << "interval 3: " << rtime_ms() - start << endl; - // }, 300); - - - // interval( [&] () { - // cout << "interval 4: " << rtime_ms() - start << endl; - // }, 400); + // /** + // * Init interval and timeout; clear interval and timeout + // */ // interval inter1 ([&]() { // cout << "interval prvi " << rtime_ms() - start << endl; @@ -106,11 +79,12 @@ int main () { // interval inter3 ([&]() { // cout << "interval treći " << rtime_ms() - start << endl; - // }, 3000); + // }, 1000); // interval inter4 ([&]() { - // cout << "interval cetvrti " << rtime_ms() - start << endl; - // }, 1000); + // // cout << "interval cetvrti " << rtime_ms() - start << endl; + // cout << "Ticks " << inter3.ticks() << endl; + // }, 500); // interval inter5 ([&]() { // cout << "interval peti " << rtime_ms() - start << endl; @@ -122,18 +96,34 @@ int main () { // timeout time1 ( [&] () { // cout << "Close interval 1 i 2 " << rtime_ms() - start << endl; - // // inter1.clear(); - // // cout << "inter1.stop " << inter1.stop << endl; - // // inter2.clear(); - // // cout << "inter2.stop " << inter2.stop << endl; - // }, 5000); + // inter1.stop(); + // cout << "inter1.stop " << endl; + // inter2.stop(); + // cout << "inter2.stop " << endl; + // }, 8000); // timeout time2 ([&] () { // cout << "Close interval 3 " << rtime_ms() - start << endl; - // // inter3.clear(); - // time1.clear(); - // }, 2000); + // inter3.stop(); + // cout << "Stoped " << inter3.stoped() << endl; + // // time1.stop(); + // }, 5000); + + + // if (time2.expired()) { + // cout << "isteko " << endl; + // } else { + // cout << "nije isteko " << endl; + // } + + // // sleep(6); + + // if (time2.expired()) { + // cout << "isteko " << endl; + // } else { + // cout << "nije isteko " << endl; + // } // // // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS ------------------------- @@ -226,7 +216,7 @@ int main () { // }); // }); - // // --------------- EVENTS ------------------- + // // // --------------- EVENTS ------------------- // /** // * initialization of typed events @@ -301,8 +291,26 @@ int main () { // cout << err.what() << endl; // } - cout << "Sleep" << endl; - sleep(100000); // only for testing + + // string data_; + // auto start_read = rtime_us(); + + // fs::read("test1.txt", [&data_, &start_read] (string data, exception* error) { + // if (error) { + // cout << "Error " << error->what() << endl; + // } else { + // // cout << "Data " << endl << data << endl; + // // data_ = data; + // // cout << "Data_" << data_ << endl; + // cout << "read " << rtime_us() - start_read << endl; + // } + // }); + + + // ---------------------------------------------------------------------------------------------------- + + cout << "Run" << endl; + _asynco_engine.run(); return 0; }