Refactor preprocessor directives, wait for reference and const objetct, multiple listeners, fix nameless timer functions, disable increase runners

dev-own-engine
mbandic 8 months ago
parent b123e9305d
commit 23fdd03dfe
  1. 4
      .vscode/settings.json
  2. 45
      README.md
  3. 17
      lib/asynco.hpp
  4. 30
      lib/event.hpp
  5. 121
      lib/rotor.hpp
  6. 66
      lib/runner.hpp
  7. 85
      test/test.cpp

@ -25,6 +25,8 @@
"future": "cpp", "future": "cpp",
"*.ipp": "cpp", "*.ipp": "cpp",
"bitset": "cpp", "bitset": "cpp",
"algorithm": "cpp" "algorithm": "cpp",
"string": "cpp",
"string_view": "cpp"
} }
} }

@ -11,24 +11,22 @@ A C++ library for event-driven asynchronous multi-threaded programming.
- Asynchronous programming - Asynchronous programming
- Multithread - Multithread
- Asynchronous timer functions: interval, timeout - Asynchronous timer functions: interval, timeout
- Typed events (on, emit) - Typed events (on, emit, off)
- Event loops - Event loops
- Parallel execution loops - Multiple parallel execution loops
## Installation ## Installation
Just download the latest release and unzip it into your project. Just download the latest release and unzip it into your project.
```c++ ```c++
#define NUM_OF_RUNNERS 2 // To change the number of threads used by asynco
#include "asynco/lib/asynco.hpp" // asynco(), wait() #include "asynco/lib/asynco.hpp" // asynco(), wait()
#include "asynco/lib/event.hpp" // event #include "asynco/lib/event.hpp" // event
#include "asynco/lib/rotor.hpp" // interval, timeout #include "asynco/lib/rotor.hpp" // interval, timeout
#include "asynco/lib/runner.hpp" // on_async #include "asynco/lib/runner.hpp" // for own loop
using namespace marcelb; using namespace marcelb;
#ifndef ON_RUNNER
#define ON_RUNNER
runner on_async;
#endif
``` ```
## Usage ## Usage
@ -56,22 +54,7 @@ Make functions asynchronous
```c++ ```c++
/** /**
* Put task directly and get returned value - it is not recommended to use it * Run an lambda function asynchronously
*/
auto res1 = on_async.put_task( [] () {
cout << "Not except " <<endl;
throw string ("Is except!");
});
try {
res1.get();
} catch (const string except) {
cout << except << endl;
}
/**
* Run an lambda function asyncronic
*/ */
asynco( []() { asynco( []() {
@ -119,7 +102,7 @@ auto a = asynco( []() {
return 5; return 5;
}); });
cout << wait(move(a)) << endl; cout << wait(a) << endl;
/** /**
* Wait async function call and use i cout * Wait async function call and use i cout
@ -194,6 +177,14 @@ evoid.on("void", []() {
cout << "Void emited" << endl; cout << "Void emited" << endl;
}); });
// multiple listeners
string emited2 = "2";
evoid.on("void", [&]() {
cout << "Void emited " << emited2 << endl;
});
sleep(1); sleep(1);
/** /**
@ -207,6 +198,12 @@ evintString.emit("substract", 3, to_string(2));
sleep(1); sleep(1);
evoid.emit("void"); evoid.emit("void");
// Turn off the event listener
evoid.off("void");
evoid.emit("void"); // nothing is happening
``` ```
Extend own class whit events Extend own class whit events

@ -7,11 +7,6 @@ using namespace std;
namespace marcelb { namespace marcelb {
#ifndef ON_RUNNER
#define ON_RUNNER
runner on_async;
#endif
/** /**
* Run the function asynchronously * Run the function asynchronously
*/ */
@ -19,7 +14,7 @@ template<class F, class... Args>
auto asynco(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type> { auto asynco(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type> {
using return_type = typename result_of<F(Args...)>::type; using return_type = typename result_of<F(Args...)>::type;
future<return_type> res = on_async.put_task(bind(forward<F>(f), forward<Args>(args)...)); future<return_type> res = _asyncon.put_task(bind(forward<F>(f), forward<Args>(args)...));
return res; return res;
} }
@ -27,10 +22,18 @@ auto asynco(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::typ
* Block until the asynchronous call completes * Block until the asynchronous call completes
*/ */
template<typename T> template<typename T>
T wait(future<T> r) { T wait(future<T>& r) {
return r.get(); return r.get();
} }
/**
* Block until the asynchronous call completes
*/
template<typename T>
T wait(future<T>&& r) {
return move(r).get();
}
} }
#endif #endif

@ -3,6 +3,7 @@
#include <iostream> #include <iostream>
#include <map> #include <map>
#include <vector>
#include <string> #include <string>
#include <functional> #include <functional>
#include "runner.hpp" #include "runner.hpp"
@ -11,11 +12,6 @@ using namespace std;
namespace marcelb { namespace marcelb {
#ifndef ON_RUNNER
#define ON_RUNNER
runner on_async;
#endif
/** /**
* Event class, for event-driven programming. * Event class, for event-driven programming.
* These events are typed according to the arguments of the callback function * These events are typed according to the arguments of the callback function
@ -23,7 +19,8 @@ runner on_async;
template<typename... T> template<typename... T>
class event { class event {
private: private:
unordered_map<string, function<void(T...)>> events; mutex m_eve;
unordered_map<string, vector<function<void(T...)>>> events;
public: public:
@ -31,7 +28,8 @@ class event {
* Defines event by key, and callback function * Defines event by key, and callback function
*/ */
void on(const string& key, function<void(T...)> callback) { void on(const string& key, function<void(T...)> callback) {
events[key] = callback; lock_guard _off(m_eve);
events[key].push_back(callback);
} }
/** /**
@ -39,13 +37,23 @@ class event {
*/ */
template<typename... Args> template<typename... Args>
void emit(const string& key, Args... args) { void emit(const string& key, Args... args) {
auto it = events.find(key); auto it_eve = events.find(key);
if (it != events.end()) { if (it_eve != events.end()) {
auto callback = bind(it->second, forward<Args>(args)...); for (uint i =0; i<it_eve->second.size(); i++) {
on_async.put_task(callback); auto callback = bind(it_eve->second[i], forward<Args>(args)...);
_asyncon.put_task(callback);
}
} }
} }
/**
* Remove an event listener from an event
*/
void off(const string& key) {
lock_guard _off(m_eve);
events.erase(key);
}
}; };

@ -3,17 +3,13 @@
#include "runner.hpp" #include "runner.hpp"
#include "chrono" #include "chrono"
#include <memory>
#include "iostream" #include "iostream"
using namespace std; using namespace std;
using namespace marcelb; using namespace marcelb;
#ifndef ON_RUNNER
#define ON_RUNNER
runner on_async;
#endif
namespace marcelb { namespace marcelb {
/** /**
@ -27,15 +23,38 @@ int64_t rtime_ms() {
} }
/** /**
* Structure for time events * Intern class for timer async loop
*/ */
class timer_core {
struct time_event { public:
mutex hangon;
function<void()> callback; function<void()> callback;
int64_t init; int64_t init;
int64_t time; int64_t time;
bool repeat; bool repeat;
bool stop; bool stop;
/**
* Timer constructor, receives a callback function and time
*/
timer_core( function<void()> _callback, int64_t _time, bool _repeat):
callback(_callback), init(rtime_ms()), time(_time), repeat(_repeat), stop(false) {
}
/**
* Stop timer
*/
void clear() {
lock_guard<mutex> hang(hangon);
stop = true;
}
/**
* Destruktor of timer, call stop
*/
~timer_core() {
clear();
}
}; };
/** /**
@ -43,7 +62,7 @@ struct time_event {
*/ */
class rotor { class rotor {
vector<struct time_event *> tevents; vector<shared_ptr<timer_core>> tcores;
mutex te_m; mutex te_m;
bool rotating = true; bool rotating = true;
int64_t sampling; int64_t sampling;
@ -54,17 +73,17 @@ class rotor {
*/ */
void loop() { void loop() {
while (rotating) { while (rotating) {
for (int i=0; i<tevents.size(); i++) { for (int i=0; i<tcores.size(); i++) {
if (tevents[i]->stop) { if (tcores[i]->stop) {
remove(i); remove(i);
i--; i--;
} }
else if (expired(tevents[i])) { else if (expired(tcores[i])) {
on_async.put_task(tevents[i]->callback); _asyncon.put_task(tcores[i]->callback);
if (tevents[i]->repeat) { if (tcores[i]->repeat) {
tevents[i]->init = rtime_ms(); tcores[i]->init = rtime_ms();
} }
else { else {
remove(i); remove(i);
@ -79,8 +98,8 @@ class rotor {
/** /**
* The method checks whether the time event has expired * The method checks whether the time event has expired
*/ */
bool expired(struct time_event *tevent) { bool expired(shared_ptr<timer_core> tcore) {
return rtime_ms() - tevent->init >= tevent->time; return rtime_ms() - tcore->init >= tcore->time;
} }
/** /**
@ -88,7 +107,7 @@ class rotor {
*/ */
void remove(const int& position) { void remove(const int& position) {
lock_guard<mutex> lock(te_m); lock_guard<mutex> lock(te_m);
tevents.erase(tevents.begin()+position); tcores.erase(tcores.begin()+position);
update_sampling(); update_sampling();
} }
@ -96,17 +115,17 @@ class rotor {
* Updates the idle time of the loop, according to twice the frequency of available events * Updates the idle time of the loop, according to twice the frequency of available events
*/ */
void update_sampling() { void update_sampling() {
if (tevents.empty()) { if (tcores.empty()) {
sampling = 100; sampling = 100;
return; return;
} }
sampling = tevents[0]->time; sampling = tcores[0]->time;
for (int i=0; i<tevents.size(); i++) { for (int i=0; i<tcores.size(); i++) {
if (sampling > tevents[i]->time) { if (sampling > tcores[i]->time) {
sampling = tevents[i]->time; sampling = tcores[i]->time;
} }
} }
sampling /= tevents.size()*2; sampling /= tcores.size()*2;
} }
public: public:
@ -115,7 +134,7 @@ class rotor {
* Constructor for the rotor, starts the given loop by occupying one runner * Constructor for the rotor, starts the given loop by occupying one runner
*/ */
rotor() { rotor() {
on_async.put_task( [&] () { _asyncon.put_task( [&] () {
loop(); loop();
}); });
}; };
@ -123,9 +142,9 @@ class rotor {
/** /**
* Adds a time event to the stack * Adds a time event to the stack
*/ */
void insert(struct time_event *tevent) { void insert(shared_ptr<timer_core> tcore) {
lock_guard<mutex> lock(te_m); lock_guard<mutex> lock(te_m);
tevents.push_back(tevent); tcores.push_back(tcore);
update_sampling(); update_sampling();
}; };
@ -133,15 +152,15 @@ class rotor {
* Returns the number of active events * Returns the number of active events
*/ */
int active() { int active() {
return tevents.size(); return tcores.size();
} }
/** /**
* Stops all active events and stops the rotor * Stops all active events and stops the rotor
*/ */
~rotor() { ~rotor() {
for (int i=0; i<tevents.size(); i++) { for (int i=0; i<tcores.size(); i++) {
tevents[i]->stop = true; tcores[i]->clear();
} }
rotating = false; rotating = false;
} }
@ -151,33 +170,32 @@ class rotor {
/** /**
* It is intended that there is only one global declaration * It is intended that there is only one global declaration
*/ */
rotor _rotor; static rotor _rotor;
/** /**
* A class for all timer functions * Core class for pure async timer functions
*/ */
class timer_core {
public:
struct time_event t_event;
/** class _timer_intern {
* Timer constructor, receives a callback function and time shared_ptr<timer_core> tcore;
*/ public:
timer_core( function<void()> _callback, int64_t _time):
t_event({ _callback, rtime_ms(), _time, false, false }) {
_timer_intern(function<void()> _callback, int64_t _time, bool repeat) {
tcore = make_shared<timer_core>(_callback, _time, repeat);
_rotor.insert(tcore);
} }
/** /**
* Stop timer * Stop interval
*/ */
void clear() { void clear() {
t_event.stop = true; tcore->clear();
} }
/** /**
* Destruktor of timer, call stop * Destruktor of timer, call stop
*/ */
~timer_core() { ~_timer_intern() {
clear(); clear();
} }
}; };
@ -185,31 +203,30 @@ class timer_core {
/** /**
* Class interval for periodic execution of the callback in time in ms * Class interval for periodic execution of the callback in time in ms
*/ */
class interval : public timer_core { class interval : public _timer_intern {
public: public:
/** /**
* The constructor receives a callback function and an interval time * The constructor receives a callback function and an interval time
*/ */
interval( function<void()> _callback, int64_t _time): timer_core(_callback, _time) { interval( function<void()> _callback, int64_t _time):
t_event.repeat = true; _timer_intern(_callback, _time, true) {
_rotor.insert(&t_event);
} }
}; };
/** /**
* Class interval for delayed callback execution in ms * Class interval for delayed callback execution in ms
*/ */
class timeout : public timer_core { class timeout : public _timer_intern {
public: public:
/** /**
* The constructor receives a callback function and a delay time * The constructor receives a callback function and a delay time
*/ */
timeout( function<void()> _callback, int64_t delay): timer_core(_callback, delay) { timeout( function<void()> _callback, int64_t delay):
t_event.repeat = false; _timer_intern(_callback, delay, false) {
_rotor.insert(&t_event);
} }
}; };
} }

@ -13,9 +13,7 @@ using namespace std;
namespace marcelb { namespace marcelb {
#ifdef ON_RUNNER #define HW_CONCURRENCY_MINIMAL 4
extern runner on_async;
#endif
/** /**
* The runner class implements multithread, task stack and event loop for asynchronous execution of tasks * The runner class implements multithread, task stack and event loop for asynchronous execution of tasks
@ -27,12 +25,29 @@ class runner {
mutex q_io; mutex q_io;
condition_variable cv; condition_variable cv;
bool stop; bool stop;
public:
/** /**
* Increase number of runners * The constructor starts as many threads as the system has cores,
* and runs an event loop inside each one.
* Each event loop waits for tasks from the stack and executes them.
*/ */
void increase_runners(unsigned int increase) { runner(unsigned int _num_of_runners = 0) : stop(false) {
for (size_t i = 0; i < increase; ++i) { unsigned int num_of_runners = _num_of_runners;
if (num_of_runners == 0) {
#ifdef NUM_OF_RUNNERS
num_of_runners = NUM_OF_RUNNERS;
#else
num_of_runners = thread::hardware_concurrency();
if (num_of_runners < HW_CONCURRENCY_MINIMAL) {
num_of_runners = HW_CONCURRENCY_MINIMAL;
}
#endif
}
for (size_t i = 0; i < num_of_runners; ++i) {
runners.emplace_back( thread([&] { runners.emplace_back( thread([&] {
while (!stop) { while (!stop) {
function<void()> task; function<void()> task;
@ -51,21 +66,6 @@ class runner {
} }
} }
public:
/**
* The constructor starts as many threads as the system has cores,
* and runs an event loop inside each one.
* Each event loop waits for tasks from the stack and executes them.
*/
runner(size_t pool_size = thread::hardware_concurrency()) : stop(false) {
if (pool_size < 4) {
pool_size = 4;
}
increase_runners(pool_size);
// start_all_runners(pool_size);
}
/** /**
@ -92,22 +92,6 @@ class runner {
return res; return res;
} }
/**
* Change the number of runners
*/
void change_runners (unsigned int num_of_runners) {
if (num_of_runners == 0 || num_of_runners > 64) {
throw runtime_error("Not allowed runners size");
}
int difference = num_of_runners - count_threads();
if (difference < 0) { // reduce
throw runtime_error("Is not allowed to reduce runners");
} else if (difference > 0) { // increase
increase_runners(difference);
}
}
/** /**
* Returns the number of tasks the runner has to perform * Returns the number of tasks the runner has to perform
*/ */
@ -139,6 +123,12 @@ class runner {
}; };
/**
* Internal global library variable
*/
static runner _asyncon;
} }
#endif #endif

@ -1,5 +1,5 @@
#define NUM_OF_RUNNERS 2
#include "../lib/runner.hpp"
#include "../lib/asynco.hpp" #include "../lib/asynco.hpp"
#include "../lib/event.hpp" #include "../lib/event.hpp"
#include "../lib/rotor.hpp" #include "../lib/rotor.hpp"
@ -11,12 +11,6 @@ using namespace std;
using namespace marcelb; using namespace marcelb;
using namespace this_thread; using namespace this_thread;
#ifndef ON_RUNNER
#define ON_RUNNER
runner on_async;
#endif
void sleep_to (int _time) { void sleep_to (int _time) {
promise<void> _promise; promise<void> _promise;
timeout t( [&]() { timeout t( [&]() {
@ -61,7 +55,6 @@ class myOwnClass : public event<int> {
int main () { int main () {
on_async.change_runners(64);
auto start = rtime_ms(); auto start = rtime_ms();
@ -71,22 +64,6 @@ int main () {
* Init interval and timeout; clear interval and timeout * Init interval and timeout; clear interval and timeout
*/ */
// ovo ne radi
// vector<interval> interv;
// vector<timeout> tmout;
// for (int i=0; i< 20; i++) {
// interv.push_back( interval( [i] () {
// cout << "interval " << i << endl;
// }, 1000));
// tmout.push_back( timeout( [i] () {
// cout << "timeout " << i << endl;
// }, 1000*i));
// }
// ovo valja popravit
// interval( [] () { // interval( [] () {
// cout << "interval " << endl; // cout << "interval " << endl;
// }, 1000); // }, 1000);
@ -106,8 +83,11 @@ int main () {
// timeout time1 ( [&] () { // timeout time1 ( [&] () {
// cout << "Close interval 1 i 2 " << rtime_ms() - start << endl; // cout << "Close interval 1 i 2 " << rtime_ms() - start << endl;
// inter1.clear(); // inter1.clear();
// // cout << "inter1.stop " << inter1.stop << endl;
// inter2.clear(); // inter2.clear();
// }, 10000); // // cout << "inter2.stop " << inter2.stop << endl;
// }, 5000);
// timeout time2 ([&] () { // timeout time2 ([&] () {
// cout << "Close interval 3 " << rtime_ms() - start << endl; // cout << "Close interval 3 " << rtime_ms() - start << endl;
@ -117,28 +97,13 @@ int main () {
// // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS ------------------------- // // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS -------------------------
// /**
// * Put task directly and get returned value - it is not recommended to use it
// */
// auto res1 = on_async.put_task( [] () {
// cout << "Jebiga " <<endl;
// throw string ("jebiga!!");
// });
// try {
// res1.get();
// } catch (const string except) {
// cout << except << endl;
// }
// /** // /**
// * Run an function asyncronic // * Run an function asyncronic
// */ // */
// asynco( []() { // asynco( []() {
// sleep_for(2s); // only for simulate log duration function // sleep_for(2s); // only for simulate log duration function
// cout << "asynco" << endl; // cout << "asynco 1" << endl;
// return 5; // return 5;
// }); // });
@ -148,6 +113,13 @@ int main () {
// asynco (notLambdaFunction); // asynco (notLambdaFunction);
// wait (
// asynco (
// notLambdaFunction
// )
// );
// /** // /**
// * Call class method // * Call class method
// */ // */
@ -157,10 +129,7 @@ int main () {
// classes.classMethode(); // classes.classMethode();
// }); // });
// // sleep(5); // sleep(5);
// /** // /**
// * Wait after runned as async // * Wait after runned as async
@ -168,11 +137,12 @@ int main () {
// auto a = asynco( []() { // auto a = asynco( []() {
// sleep_for(2s); // only for simulate log duration function // sleep_for(2s); // only for simulate log duration function
// cout << "asynco" << endl; // cout << "asynco 2" << endl;
// return 5; // return 5;
// }); // });
// cout << wait(move(a)) << endl; // cout << wait(a) << endl;
// cout << "print after asynco 2" << endl;
// /** // /**
// * Wait async function call and use i cout // * Wait async function call and use i cout
@ -191,9 +161,9 @@ int main () {
// sleep_to(3000); // sleep_to(3000);
// cout << "sleep_to " << rtime_ms() - start << endl; // cout << "sleep_to " << rtime_ms() - start << endl;
// /** /**
// * Catch promise reject * Catch promise reject
// */ */
// try { // try {
// promise_reject(3000); // promise_reject(3000);
@ -230,6 +200,10 @@ int main () {
// cout << "Sum " << a+b << endl; // cout << "Sum " << a+b << endl;
// }); // });
// ev2int.on("sum", [](int a, int b) {
// cout << "Sum done" << endl;
// });
// evintString.on("substract", [](int a, string b) { // evintString.on("substract", [](int a, string b) {
// cout << "Substract " << a-stoi(b) << endl; // cout << "Substract " << a-stoi(b) << endl;
// }); // });
@ -238,18 +212,27 @@ int main () {
// cout << "Void emited" << endl; // cout << "Void emited" << endl;
// }); // });
// // sleep(1); // string emited2 = "2";
// evoid.on("void", [&]() {
// cout << "Void emited " << emited2 << endl;
// });
// evoid.emit("void");
// sleep(1);
// /** // /**
// * Emit // * Emit
// */ // */
// ev2int.emit("sum", 5, 8); // ev2int.emit("sum", 5, 8);
// sleep(1); // sleep(1);
// evintString.emit("substract", 3, to_string(2)); // evintString.emit("substract", 3, to_string(2));
// sleep(1); // sleep(1);
// evoid.off("void");
// evoid.emit("void"); // evoid.emit("void");
// /** // /**

Loading…
Cancel
Save