diff --git a/.gitignore b/.gitignore index e69de29..b23ba77 100644 --- a/.gitignore +++ b/.gitignore @@ -0,0 +1 @@ +test/test \ No newline at end of file diff --git a/README.md b/README.md index e69de29..f16ce8c 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,216 @@ + +# Asynco + +A C++ library for event-driven asynchronous multi-threaded programming. + +## Features + +- Object oriented +- Small and easy to integrate +- Header only +- Asynchronous launch functions +- Multithread parallel execution of tasks +- Timer functions: interval, timeout +- Events (on, emit) +- Event loop + +## Installation + +Just download the latest release and unzip it into your project. + +```c++ +#include "asynco/lib/asynco.hpp" // asynco(), wait() +#include "asynco/lib/event.hpp" // event +#include "asynco/lib/rotor.hpp" // interval, timeout +#include "asynco/lib/runner.hpp" // on_async +using namespace marcelb; + +#ifndef ON_RUNNER +#define ON_RUNNER +runner on_async; +#endif +``` + +## Usage + +Time asynchronous functions + +```c++ +// start interval +interval inter1 ([&]() { + cout << "Interval 1" << endl; +}, 1000); + +// stop interval +inter1.clear(); + +// start timeout +timeout time1 ( [&] () { + cout << "Timeout 1 " << endl; +}, 10000); + +// stop timeout +time1.clear(); +``` +Make functions asynchronous + +```c++ +/** +* Put task directly and get returned value - it is not recommended to use it +*/ + +auto res1 = on_async.put_task( [] () { + cout << "Jebiga " < _promise; + timeout t( [&]() { + _promise.set_value(); + }, _time); + + return _promise.get_future().get(); +} + +sleep_to(3000); + +/** +* Catch promise reject +*/ + +void promise_reject (int _time) { + promise _promise; + timeout t( [&]() { + try { + // simulate except + throw runtime_error("Error simulation"); + _promise.set_value(); + } catch (...) { + _promise.set_exception(current_exception()); + } + }, _time); + + return _promise.get_future().get(); +} + +try { + promise_reject(3000); +} catch (runtime_error err) { + cout<< err.what() << endl; +} +``` +Events + +```c++ +/** +* initialization of typed events +*/ + +event ev2int; +event evintString; +event<> evoid; + +ev2int.on("sum", [](int a, int b) { + cout << "Sum " << a+b << endl; +}); + +evintString.on("substract", [](int a, string b) { + cout << "Substract " << a-stoi(b) << endl; +}); + +evoid.on("void", []() { + cout << "Void emited" << endl; +}); + +sleep(1); + +/** +* Emit +*/ + +ev2int.emit("sum", 5, 8); + +sleep(1); +evintString.emit("substract", 3, to_string(2)); + +sleep(1); +evoid.emit("void"); +``` +Extend own class whit events + +```c++ +class myOwnClass : public event { + public: + myOwnClass() : event() {}; +}; + +myOwnClass myclass; + +timeout t( [&] { + myclass.emit("constructed", 1); +}, 200); + +myclass.on("constructed", [] (int i) { + cout << "Constructed " << i << endl; +}); + +``` +## License + +[APACHE 2.0](http://www.apache.org/licenses/LICENSE-2.0/) + + +## Support & Feedback + +For support and any feedback, contact the address: marcelb96@yahoo.com. + +## Contributing + +Contributions are always welcome! + +Feel free to fork and start working with or without a later pull request. Or contact for suggest and request an option. + diff --git a/example/asynco.hpp b/example/asynco.hpp deleted file mode 100644 index 7803997..0000000 --- a/example/asynco.hpp +++ /dev/null @@ -1,164 +0,0 @@ -#ifndef _ASYNCO_ -#define _ASYNCO_ - -#include -#include -#include -#include - -using namespace std; - -namespace marcelb { - -class interval; -class timeout; - -class loop_core { - vector intervals; - vector timeouts; - time_t sampling; - mutex i_m, t_m; - future bot; - - loop_core() { - bot = async(launch::async, [this] () { - loop(); - }); - // on_async.put_task( [this] () { - // loop(); - // }); - } - - void run(interval& _interval) { - lock_guard lock(i_m); - intervals.push_back(_interval); - update_sampling(); - } - - void run(timeout& _timeout) { - lock_guard lock(t_m); - timeouts.push_back(_timeout); - update_sampling(); - } - - void loop() { - while (true) { - for (auto& _interval : intervals) { - int64_t now = chrono::duration_cast(chrono::system_clock::now().time_since_epoch()).count(); - if (now - _interval.execute >= _interval._duration) { - _interval.callback(); - _interval.execute = now; - } - } - - for (int i=0; i(chrono::system_clock::now().time_since_epoch()).count(); - if (now - timeouts[i]._construct >= timeouts[i]._duration) { - auto& _timeout = timeouts[i]; - { - lock_guard lock(t_m); - timeouts.erase(timeouts.begin() + i); - } - _timeout.callback(); - } - } - this_thread::sleep_for(chrono::milliseconds(sampling)); - } - } - - void update_sampling() { - sampling = 0; - for (auto& _interval : intervals) { - sampling += _interval._duration; - } - for (auto& _timeout : timeouts) { - sampling += _timeout._duration; - } - sampling /= (intervals.size() + timeouts.size())*2; - } - - -}; - -loop_core co_loop; - - - - -class interval { - public: - bool run = true; - function callback; - const time_t _duration; - time_t execute = 0; - - // public: - interval(function func, const time_t duration): callback(func), _duration(duration) { - #ifndef ON_ASYNC - throw string("Not on_async defined!"); - #endif - - auto task = [&] () { - while (run) { - // this_thread::sleep_for(chrono::milliseconds(_duration)); - // if (run) { - callback(); - // } - } - }; - - // on_async.put_task(task); - co_loop.run(this*); - } - - void clear() { - run = false; - } - - ~interval() { - clear(); - } -}; - -class timeout { - public: - bool run = true; - function callback; - const time_t _duration; - int64_t _construct = - chrono::duration_cast(chrono::system_clock::now().time_since_epoch()) - .count(); - - // public: - timeout(function f, const time_t duration): callback(f), _duration(duration) { - #ifndef ON_ASYNC - throw string("Not on_async defined!"); - #endif - - auto task = [&] () { - // int64_t _start = - // chrono::duration_cast(chrono::system_clock::now().time_since_epoch()) - // .count(); - // this_thread::sleep_for(chrono::milliseconds(_duration - (_start - _construct))); - if (run) { - callback(); - } - }; - - // on_async.put_task(task); - co_loop.run(this*); - } - - void clear() { - run = false; - } - - ~timeout() { - clear(); - } -}; - - -} - -#endif \ No newline at end of file diff --git a/lib/asynco.hpp b/lib/asynco.hpp index 1c67552..8fc3f32 100644 --- a/lib/asynco.hpp +++ b/lib/asynco.hpp @@ -1,162 +1,20 @@ #ifndef _ASYNCO_ #define _ASYNCO_ -#include "loop.hpp" +#include "runner.hpp" using namespace std; namespace marcelb { -#ifndef ON_ASYNC -#define ON_ASYNC -AsyncLoop on_async; +#ifndef ON_RUNNER +#define ON_RUNNER +runner on_async; #endif -class interval; -class timeout; - -class loop_core { - static vector intervals; - static vector timeouts; - time_t sampling; - mutex i_m, t_m; - - loop_core() { - on_async.put_task( [this] () { - loop(); - }); - } - - void run(interval& _interval) { - lock_guard lock(i_m); - intervals.push_back(_interval); - update_sampling(); - } - - void run(timeout& _timeout) { - lock_guard lock(t_m); - timeouts.push_back(_timeout); - update_sampling(); - } - - void loop() { - while (true) { - for (auto& _interval : intervals) { - int64_t now = chrono::duration_cast(chrono::system_clock::now().time_since_epoch()).count(); - if (now - _interval.execute >= _interval._duration) { - _interval.callback(); - _interval.execute = now; - } - } - - for (int i=0; i(chrono::system_clock::now().time_since_epoch()).count(); - if (now - timeouts[i]._construct >= timeouts[i]._duration) { - auto& _timeout = timeouts[i]; - { - lock_guard lock(t_m); - timeouts.erase(timeouts.begin() + i); - } - _timeout.callback(); - } - } - sleep_for(chrono::milliseconds(sampling)); - } - } - - void update_sampling() { - sampling = 0; - for (auto& _interval : intervals) { - sampling += _interval._duration; - } - for (auto& _timeout : timeouts) { - sampling += _timeout._duration; - } - sampling /= (intervals.size() + timeouts.size())*2; - } - - -}; - -loop_core co_loop; - - - - -class interval { - public: - bool run = true; - function callback; - const time_t _duration; - time_t execute = 0; - - // public: - interval(function func, const time_t duration): callback(func), _duration(duration) { - #ifndef ON_ASYNC - throw string("Not on_async defined!"); - #endif - - auto task = [&] () { - while (run) { - // this_thread::sleep_for(chrono::milliseconds(_duration)); - // if (run) { - callback(); - // } - } - }; - - // on_async.put_task(task); - co_loop.run(this*); - } - - void clear() { - run = false; - } - - ~interval() { - clear(); - } -}; - -class timeout { - public: - bool run = true; - function callback; - const time_t _duration; - int64_t _construct = - chrono::duration_cast(chrono::system_clock::now().time_since_epoch()) - .count(); - - // public: - timeout(function f, const time_t duration): callback(f), _duration(duration) { - #ifndef ON_ASYNC - throw string("Not on_async defined!"); - #endif - - auto task = [&] () { - // int64_t _start = - // chrono::duration_cast(chrono::system_clock::now().time_since_epoch()) - // .count(); - // this_thread::sleep_for(chrono::milliseconds(_duration - (_start - _construct))); - if (run) { - callback(); - } - }; - - // on_async.put_task(task); - co_loop.run(this*); - } - - void clear() { - run = false; - } - - ~timeout() { - clear(); - } -}; - - +/** + * Run the function asynchronously +*/ template auto asynco(F&& f, Args&&... args) -> future::type> { using return_type = typename result_of::type; @@ -165,6 +23,9 @@ auto asynco(F&& f, Args&&... args) -> future::typ return res; } +/** + * Block until the asynchronous call completes +*/ template T wait(future r) { return r.get(); diff --git a/lib/event.hpp b/lib/event.hpp index 3c774c2..c4e0a81 100644 --- a/lib/event.hpp +++ b/lib/event.hpp @@ -5,28 +5,38 @@ #include #include #include -#include "loop.hpp" +#include "runner.hpp" using namespace std; namespace marcelb { -#ifndef ON_ASYNC -#define ON_ASYNC -AsyncLoop on_async; +#ifndef ON_RUNNER +#define ON_RUNNER +runner on_async; #endif - +/** + * Event class, for event-driven programming. + * These events are typed according to the arguments of the callback function +*/ template class event { private: unordered_map> events; public: + + /** + * Defines event by key, and callback function + */ void on(const string& key, function callback) { events[key] = callback; } + /** + * It emits an event and sends a callback function saved according to the key with the passed parameters + */ template void emit(const string& key, Args... args) { auto it = events.find(key); diff --git a/lib/rotor.hpp b/lib/rotor.hpp new file mode 100644 index 0000000..6f0f105 --- /dev/null +++ b/lib/rotor.hpp @@ -0,0 +1,212 @@ +#ifndef _ROTOR_ +#define _ROTOT_ + +#include "runner.hpp" +#include "chrono" + +#include "iostream" + +using namespace std; +using namespace marcelb; + +#ifndef ON_RUNNER +#define ON_RUNNER +runner on_async; +#endif + +namespace marcelb { + +/** + * Get the time in ms from the epoch +*/ + +int64_t rtime_ms() { + return chrono::duration_cast(chrono::system_clock::now() + .time_since_epoch()) + .count(); +} + +/** + * Structure for time events +*/ + +struct time_event { + function callback; + int64_t init; + int64_t time; + bool repeat; + bool stop; +}; + +/** + * Event loop for time events +*/ + +class rotor { + vector tevents; + mutex te_m; + bool rotating = true; + int64_t sampling; + + /** + * Loop method, started by the constructor in a separate runner + * It checks the events on the stack and sends the expired ones to the runner + */ + void loop() { + while (rotating) { + for (int i=0; istop) { + remove(i); + i--; + } + + else if (expired(tevents[i])) { + on_async.put_task(tevents[i]->callback); + if (tevents[i]->repeat) { + tevents[i]->init = rtime_ms(); + } + else { + remove(i); + i--; + } + } + } + this_thread::sleep_for(chrono::milliseconds(sampling)); + } + } + + /** + * The method checks whether the time event has expired + */ + bool expired(struct time_event *tevent) { + return rtime_ms() - tevent->init >= tevent->time; + } + + /** + * The method deletes a non-repeating or stopped event from the stack + */ + void remove(const int& position) { + lock_guard lock(te_m); + tevents.erase(tevents.begin()+position); + } + + /** + * Updates the idle time of the loop, according to twice the frequency of available events + */ + void update_sampling() { + sampling = tevents[0]->time; + for (int i=0; i tevents[i]->time) { + sampling = tevents[i]->time; + } + } + sampling /= tevents.size()*2; + } + + public: + + /** + * Constructor for the rotor, starts the given loop by occupying one runner + */ + rotor() { + on_async.put_task( [&] () { + loop(); + }); + }; + + /** + * Adds a time event to the stack + */ + void insert(struct time_event *tevent) { + lock_guard lock(te_m); + tevents.push_back(tevent); + update_sampling(); + }; + + /** + * Returns the number of active events + */ + int active() { + return tevents.size(); + } + + /** + * Stops all active events and stops the rotor + */ + ~rotor() { + for (int i=0; istop = true; + } + rotating = false; + } + +}; + +/** + * It is intended that there is only one global declaration +*/ +rotor _rotor; + +/** + * A class for all timer functions +*/ +class timer_core { + public: + struct time_event t_event; + + /** + * Timer constructor, receives a callback function and time + */ + timer_core( function _callback, int64_t _time): + t_event({ _callback, rtime_ms(), _time, false, false }) { + + } + /** + * Stop timer + */ + void clear() { + t_event.stop = true; + } + + /** + * Destruktor of timer, call stop + */ + ~timer_core() { + clear(); + } +}; + +/** + * Class interval for periodic execution of the callback in time in ms +*/ +class interval : public timer_core { + public: + + /** + * The constructor receives a callback function and an interval time + */ + interval( function _callback, int64_t _time): timer_core(_callback, _time) { + t_event.repeat = true; + _rotor.insert(&t_event); + } +}; + +/** + * Class interval for delayed callback execution in ms +*/ +class timeout : public timer_core { + public: + + /** + * The constructor receives a callback function and a delay time + */ + timeout( function _callback, int64_t delay): timer_core(_callback, delay) { + t_event.repeat = false; + _rotor.insert(&t_event); + } +}; + +} + +#endif diff --git a/lib/loop.hpp b/lib/runner.hpp similarity index 62% rename from lib/loop.hpp rename to lib/runner.hpp index 3a051cc..987a16c 100644 --- a/lib/loop.hpp +++ b/lib/runner.hpp @@ -1,5 +1,5 @@ -#ifndef _LOOP_ -#define _LOOP_ +#ifndef _RUNNER_ +#define _RUNNER_ #include #include @@ -13,22 +13,31 @@ using namespace std; namespace marcelb { -#ifdef ON_ASYNC -extern AsyncLoop on_async; +#ifdef ON_RUNNER +extern runner on_async; #endif -class AsyncLoop { +/** + * The runner class implements multithread, task stack and event loop for asynchronous execution of tasks +*/ +class runner { private: - vector workers; + vector runners; queue> tasks; mutex q_io; condition_variable cv; bool stop; public: - AsyncLoop(size_t pool_size = thread::hardware_concurrency()) : stop(false) { + + /** + * The constructor starts as many threads as the system has cores, + * and runs an event loop inside each one. + * Each event loop waits for tasks from the stack and executes them. + */ + runner(size_t pool_size = thread::hardware_concurrency()) : stop(false) { for (size_t i = 0; i < pool_size; ++i) { - workers.emplace_back([this] { + runners.emplace_back( thread([this] { while (true) { function task; { @@ -41,10 +50,13 @@ class AsyncLoop { } task(); } - }); + })); } } + /** + * With the method, we send the callback function and its arguments to the task stack + */ template auto put_task(F&& f, Args&&... args) -> future::type> { @@ -66,22 +78,31 @@ class AsyncLoop { return res; } + /** + * Returns the number of tasks the runner has to perform + */ unsigned int count_tasks() { return tasks.size(); } + /** + * Returns the number of threads used by the runner + */ unsigned int count_threads() { - return workers.size(); + return runners.size(); } - ~AsyncLoop() { + /** + * The destructor stops all loops and stops all threads + */ + ~runner() { { unique_lock lock(q_io); stop = true; } cv.notify_all(); - for (thread& worker : workers) { - worker.join(); + for (thread& runner : runners) { + runner.join(); } } diff --git a/test/test b/test/test deleted file mode 100755 index ff1ea53..0000000 Binary files a/test/test and /dev/null differ diff --git a/test/test.cpp b/test/test.cpp index f48ca4e..55212df 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -1,8 +1,9 @@ -// #include "../lib/loop.hpp" -// #include "../lib/asynco.hpp" -// #include "../example/asynco.hpp" +#include "../lib/runner.hpp" +#include "../lib/asynco.hpp" #include "../lib/event.hpp" +#include "../lib/rotor.hpp" + #include #include @@ -10,49 +11,85 @@ using namespace std; using namespace marcelb; using namespace this_thread; -#ifndef ON_ASYNC -#define ON_ASYNC -AsyncLoop on_async(8); +#ifndef ON_RUNNER +#define ON_RUNNER +runner on_async; #endif + +void sleep_to (int _time) { + promise _promise; + timeout t( [&]() { + _promise.set_value(); + }, _time); + + return _promise.get_future().get(); +} + +void promise_reject (int _time) { + promise _promise; + timeout t( [&]() { + try { + // simulate except + throw runtime_error("Error simulation"); + _promise.set_value(); + } catch (...) { + _promise.set_exception(current_exception()); + } + }, _time); + + return _promise.get_future().get(); +} + +// ------------------ EXTEND OWN CLASS WITH EVENTS ------------------- + +class myOwnClass : public event { + public: + myOwnClass() : event() {}; +}; + + int main () { - // auto start = chrono::high_resolution_clock::now(); + auto start = rtime_ms(); + + // --------------- TIME ASYNCHRONOUS FUNCTIONS -------------- + + /** + * Init interval and timeout; clear interval and timeout + */ // interval inter1 ([&]() { - // cout << "interval prvi " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; + // cout << "interval prvi " << rtime_ms() - start << endl; // }, 1000); // interval inter2 ([&]() { - // cout << "interval drugi " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; + // cout << "interval drugi " << rtime_ms() - start << endl; // }, 2000); // interval inter3 ([&]() { - // cout << "interval treći " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; + // cout << "interval treći " << rtime_ms() - start << endl; // }, 3000); // timeout time1 ( [&] () { - // cout << "Close interval 1 i 2 " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; + // cout << "Close interval 1 i 2 " << rtime_ms() - start << endl; // inter1.clear(); // inter2.clear(); // }, 10000); // timeout time2 ([&] () { - // cout << "Close interval 3 " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; + // cout << "Close interval 3 " << rtime_ms() - start << endl; // inter3.clear(); + // time1.clear(); // }, 2000); - // cout << "zadataka: " << on_async.count_tasks() << " niti: " << on_async.count_threads() << endl; + // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS ------------------------- + /** + * Put task directly and get returned value - it is not recommended to use it + */ - // for (int i = 0; i < 8; ++i) { - // pool.put_task( [&] (int id) { - // this_thread::sleep_for(chrono::seconds(1)); - // cout << a*i << endl; - // }, i); - // } - - // auto res1 = pool.put_task( [] () { + // auto res1 = on_async.put_task( [] () { // cout << "Jebiga " < ev2int; // event evintString; + // event<> evoid; // ev2int.on("sum", [](int a, int b) { // cout << "Sum " << a+b << endl; @@ -99,15 +183,39 @@ int main () { // cout << "Substract " << a-stoi(b) << endl; // }); - // sleep(5); + // evoid.on("void", []() { + // cout << "Void emited" << endl; + // }); + + // sleep(1); + + /** + * Emit + */ // ev2int.emit("sum", 5, 8); - // sleep(2); + // sleep(1); // evintString.emit("substract", 3, to_string(2)); + // sleep(1); + // evoid.emit("void"); + + /** + * Own class + */ + + // myOwnClass myclass; + + // timeout t( [&] { + // myclass.emit("constructed", 1); + // }, 200); + + // myclass.on("constructed", [] (int i) { + // cout << "Constructed " << i << endl; + // }); - sleep(1000); + sleep(10000); // only for testing return 0; }