Rename, add listeners, off all, edit readme

trigger 0.3
marcelb 7 months ago
parent e91c7f646d
commit 22dbfe89f1
  1. 36
      README.md
  2. 27
      lib/event.hpp
  3. 44
      lib/timers.hpp
  4. 362
      test/test.cpp

@ -3,6 +3,12 @@
A C++ library for event-driven asynchronous multi-threaded programming. A C++ library for event-driven asynchronous multi-threaded programming.
## Motivation
The original concept was to create an interface capable of asynchronously calling any function. It has since evolved into a library that incorporates a thread pool, each with its own event loop, event-driven programming, and functions inherently designed for asynchronous operation (including periodic and delayed functions).
The asynchronous filesystem is provided solely to guide users on how to wrap any time- or IO-intensive function for asynchronous execution.
## Features ## Features
- Object oriented - Object oriented
@ -10,7 +16,7 @@ A C++ library for event-driven asynchronous multi-threaded programming.
- Header only - Header only
- Asynchronous programming - Asynchronous programming
- Multithread - Multithread
- Asynchronous timer functions: interval, timeout - Asynchronous timer functions: periodic, delayed (like setInterval and setTimeout from JS)
- Typed events (on, emit, off) - Typed events (on, emit, off)
- Event loops - Event loops
- Multiple parallel execution loops - Multiple parallel execution loops
@ -23,10 +29,10 @@ Just download the latest release and unzip it into your project.
```c++ ```c++
#define NUM_OF_RUNNERS 8 // To change the number of threads used by atask, without this it runs according to the number of cores #define NUM_OF_RUNNERS 8 // To change the number of threads used by atask, without this it runs according to the number of cores
#include "asynco/lib/asynco.hpp" // atask(), wait() #include "asynco/lib/asynco.hpp" // atask(), wait()
#include "asynco/lib/event.hpp" // event #include "asynco/lib/event.hpp" // event
#include "asynco/lib/timers.hpp" // interval, timeout #include "asynco/lib/timers.hpp" // periodic, delayed (like setInterval and setTimeout from JS)
#include "asynco/lib/filesystem.hpp" // for async read and write files #include "asynco/lib/filesystem.hpp" // for async read and write files
using namespace marcelb; using namespace marcelb;
using namespace asynco; using namespace asynco;
@ -43,12 +49,12 @@ return 0;
Time asynchronous functions Time asynchronous functions
```c++ ```c++
// start interval // start periodic
interval inter1 ([]() { periodic inter1 ([]() {
cout << "Interval 1" << endl; cout << "Interval 1" << endl;
}, 1000); }, 1000);
// stop interval // stop periodic
inter1.stop(); inter1.stop();
// how many times it has expired // how many times it has expired
@ -57,12 +63,12 @@ int t = inter1.ticks();
// is it stopped // is it stopped
bool stoped = inter1.stoped(); bool stoped = inter1.stoped();
// start timeout // start delayed
timeout time1 ( [] () { delayed time1 ( [] () {
cout << "Timeout 1 " << endl; cout << "Timeout 1 " << endl;
}, 10000); }, 10000);
// stop timeout // stop delayed
time1.stop(); time1.stop();
// is it expired // is it expired
@ -137,12 +143,12 @@ cout << wait(atask( [] () {
})) << endl; })) << endl;
/** /**
* Sleep with timeout sleep implement * Sleep with delayed sleep implement
*/ */
void sleep_to (int _time) { void sleep_to (int _time) {
promise<void> _promise; promise<void> _promise;
timeout t( [&]() { delayed t( [&]() {
_promise.set_value(); _promise.set_value();
}, _time); }, _time);
@ -157,7 +163,7 @@ sleep_to(3000);
void promise_reject (int _time) { void promise_reject (int _time) {
promise<void> _promise; promise<void> _promise;
timeout t( [&]() { delayed t( [&]() {
try { try {
// simulate except // simulate except
throw runtime_error("Error simulation"); throw runtime_error("Error simulation");
@ -237,7 +243,7 @@ class myOwnClass : public event<int> {
myOwnClass myclass; myOwnClass myclass;
timeout t( [&] { delayed t( [&] {
myclass.emit("constructed", 1); myclass.emit("constructed", 1);
}, 200); }, 200);

@ -55,6 +55,33 @@ class event {
events.erase(key); events.erase(key);
} }
/**
* Remove all event listener
*/
void off() {
lock_guard _off(m_eve);
events.clear();
}
/**
* Get num of listeners by an event key
*/
unsigned int listeners(const string& key) {
return events[key].size();
}
/**
* Get num of all listeners
*/
unsigned int listeners() {
unsigned int listeners = 0;
for (auto& ev : events) {
listeners += ev.second.size();
}
return listeners;
}
}; };

@ -1,5 +1,5 @@
#ifndef _ROTOR_ #ifndef _TIMERS_
#define _ROTOT_ #define _TIMERS_
#include "asynco.hpp" #include "asynco.hpp"
#include <chrono> #include <chrono>
@ -113,22 +113,22 @@ class timer {
}; };
/** /**
* Class interval for periodic execution of the callback in time in ms * Class periodic for periodic execution of the callback in time in ms
*/ */
class interval { class periodic {
shared_ptr<timer> _timer; shared_ptr<timer> _timer;
public: public:
/** /**
* Constructor initializes a shared pointer of type timer * Constructor initializes a shared pointer of type timer
*/ */
interval(function<void()> callback, uint64_t time) : periodic(function<void()> callback, uint64_t time) :
_timer(make_shared<timer> (callback, time, true)) { _timer(make_shared<timer> (callback, time, true)) {
} }
/** /**
* Stop interval * Stop periodic
* The stop flag is set and interval remove it from the queue * The stop flag is set and periodic remove it from the queue
*/ */
void stop() { void stop() {
_timer->stop(); _timer->stop();
@ -136,51 +136,51 @@ class interval {
/** /**
* Run callback now * Run callback now
* Forces the callback function to run independently of the interval * Forces the callback function to run independently of the periodic
*/ */
void now() { void now() {
_timer->now(); _timer->now();
} }
/** /**
* Get the number of times the interval callback was runned * Get the number of times the periodic callback was runned
*/ */
uint64_t ticks() { uint64_t ticks() {
return _timer->ticks(); return _timer->ticks();
} }
/** /**
* The logic status of the interval stop state * The logic status of the periodic stop state
*/ */
bool stoped() { bool stoped() {
return _timer->stoped(); return _timer->stoped();
} }
/** /**
* The destructor stops the interval * The destructor stops the periodic
*/ */
~interval() { ~periodic() {
stop(); stop();
} }
}; };
/** /**
* Class timeout for delayed callback execution in ms * Class delayed for delayed callback execution in ms
*/ */
class timeout { class delayed {
shared_ptr<timer> _timer; shared_ptr<timer> _timer;
public: public:
/** /**
* Constructor initializes a shared pointer of type timer * Constructor initializes a shared pointer of type timer
*/ */
timeout(function<void()> callback, uint64_t time) : delayed(function<void()> callback, uint64_t time) :
_timer(make_shared<timer> (callback, time, false)) { _timer(make_shared<timer> (callback, time, false)) {
} }
/** /**
* Stop timeout * Stop delayed
* The stop flag is set and timeout remove it from the queue * The stop flag is set and delayed remove it from the queue
*/ */
void stop() { void stop() {
_timer->stop(); _timer->stop();
@ -188,30 +188,30 @@ class timeout {
/** /**
* Run callback now * Run callback now
* Forces the callback function to run independently of the timeout * Forces the callback function to run independently of the delayed
*/ */
void now() { void now() {
_timer->now(); _timer->now();
} }
/** /**
* Get the number of times the timeout callback was runned * Get is the delayed callback runned
*/ */
bool expired() { bool expired() {
return bool(_timer->ticks()); return bool(_timer->ticks());
} }
/** /**
* The logic status of the timeout stop state * The logic status of the delayed stop state
*/ */
bool stoped() { bool stoped() {
return _timer->stoped(); return _timer->stoped();
} }
/** /**
* The destructor stops the timeout * The destructor stops the delayed
*/ */
~timeout() { ~delayed() {
stop(); stop();
} }

@ -18,7 +18,7 @@ using namespace this_thread;
void sleep_to (int _time) { void sleep_to (int _time) {
promise<void> _promise; promise<void> _promise;
timeout t( [&]() { delayed t( [&]() {
_promise.set_value(); _promise.set_value();
}, _time); }, _time);
@ -27,7 +27,7 @@ void sleep_to (int _time) {
void promise_reject (int _time) { void promise_reject (int _time) {
promise<void> _promise; promise<void> _promise;
timeout t( [&]() { delayed t( [&]() {
try { try {
// simulate except // simulate except
throw runtime_error("Error simulation"); throw runtime_error("Error simulation");
@ -65,249 +65,255 @@ int main () {
// --------------- TIME ASYNCHRONOUS FUNCTIONS -------------- // --------------- TIME ASYNCHRONOUS FUNCTIONS --------------
/** // /**
* Init interval and timeout; clear interval and timeout // * Init periodic and delayed; clear periodic and delayed
*/ // */
interval inter1 ([&]() { // periodic inter1 ([&]() {
cout << "interval prvi " << rtime_ms() - start << endl; // cout << "periodic prvi " << rtime_ms() - start << endl;
}, 1000); // }, 1000);
interval inter2 ([&]() { // periodic inter2 ([&]() {
cout << "interval drugi " << rtime_ms() - start << endl; // cout << "periodic drugi " << rtime_ms() - start << endl;
}, 2000); // }, 2000);
interval inter3 ([&]() { // periodic inter3 ([&]() {
cout << "interval treći " << rtime_ms() - start << endl; // cout << "periodic treći " << rtime_ms() - start << endl;
}, 1000); // }, 1000);
interval inter4 ([&]() { // periodic inter4 ([&]() {
// cout << "interval cetvrti " << rtime_ms() - start << endl; // // cout << "periodic cetvrti " << rtime_ms() - start << endl;
cout << "Ticks " << inter3.ticks() << endl; // cout << "Ticks " << inter3.ticks() << endl;
}, 500); // }, 500);
interval inter5 ([&]() { // periodic inter5 ([&]() {
cout << "interval peti " << rtime_ms() - start << endl; // cout << "periodic peti " << rtime_ms() - start << endl;
}, 2000); // }, 2000);
interval inter6 ([&]() { // periodic inter6 ([&]() {
cout << "interval sesti " << rtime_ms() - start << endl; // cout << "periodic sesti " << rtime_ms() - start << endl;
}, 3000); // }, 3000);
timeout time1 ( [&] () { // delayed time1 ( [&] () {
cout << "Close interval 1 i 2 " << rtime_ms() - start << endl; // cout << "Close periodic 1 i 2 " << rtime_ms() - start << endl;
inter1.stop(); // inter1.stop();
cout << "inter1.stop " << endl; // cout << "inter1.stop " << endl;
inter2.stop(); // inter2.stop();
cout << "inter2.stop " << endl; // cout << "inter2.stop " << endl;
}, 8000); // }, 8000);
timeout time2 ([&] () { // delayed time2 ([&] () {
cout << "Close interval 3 " << rtime_ms() - start << endl; // cout << "Close periodic 3 " << rtime_ms() - start << endl;
inter3.stop(); // inter3.stop();
cout << "Stoped " << inter3.stoped() << endl; // cout << "Stoped " << inter3.stoped() << endl;
// time1.stop(); // // time1.stop();
}, 5000); // }, 5000);
if (time2.expired()) { // if (time2.expired()) {
cout << "isteko " << endl; // cout << "isteko " << endl;
} else { // } else {
cout << "nije isteko " << endl; // cout << "nije isteko " << endl;
} // }
// sleep(6); // // sleep(6);
if (time2.expired()) { // if (time2.expired()) {
cout << "isteko " << endl; // cout << "isteko " << endl;
} else { // } else {
cout << "nije isteko " << endl; // cout << "nije isteko " << endl;
} // }
// // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS ------------------------- // // // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS -------------------------
/** // /**
* Run an function asyncronic // * Run an function asyncronic
*/ // */
atask( []() { // atask( []() {
sleep_for(2s); // only for simulate log duration function // sleep_for(2s); // only for simulate log duration function
cout << "atask 1" << endl; // cout << "atask 1" << endl;
return 5; // return 5;
}); // });
/** // /**
* Call not lambda function // * Call not lambda function
*/ // */
atask (notLambdaFunction); // atask (notLambdaFunction);
wait ( // wait (
atask ( // atask (
notLambdaFunction // notLambdaFunction
) // )
); // );
/** // /**
* Call class method // * Call class method
*/ // */
clm classes; // clm classes;
atask( [&classes] () { // atask( [&classes] () {
classes.classMethode(); // classes.classMethode();
}); // });
sleep(5); // sleep(5);
/** // /**
* Wait after runned as async // * Wait after runned as async
*/ // */
auto a = atask( []() { // auto a = atask( []() {
sleep_for(2s); // only for simulate log duration function // sleep_for(2s); // only for simulate log duration function
cout << "atask 2" << endl; // cout << "atask 2" << endl;
return 5; // return 5;
}); // });
cout << wait(a) << endl; // cout << wait(a) << endl;
cout << "print after atask 2" << endl; // cout << "print after atask 2" << endl;
/** // /**
* Wait async function call and use i cout // * Wait async function call and use i cout
*/ // */
cout << wait(atask( [] () { // cout << wait(atask( [] () {
sleep_for(chrono::seconds(1)); // only for simulate log duration function // sleep_for(chrono::seconds(1)); // only for simulate log duration function
cout << "wait end" << endl; // cout << "wait end" << endl;
return 4; // return 4;
})) << endl; // })) << endl;
/** // /**
* Sleep with timeout sleep implement // * Sleep with delayed sleep implement
*/ // */
sleep_to(3000); // sleep_to(3000);
cout << "sleep_to " << rtime_ms() - start << endl; // cout << "sleep_to " << rtime_ms() - start << endl;
/** // /**
* Catch promise reject // * Catch promise reject
*/ // */
try { // try {
promise_reject(3000); // promise_reject(3000);
} catch (runtime_error err) { // } catch (runtime_error err) {
cout<< err.what() << endl; // cout<< err.what() << endl;
} // }
cout << "promise_reject " << rtime_ms() - start << endl; // cout << "promise_reject " << rtime_ms() - start << endl;
/** // /**
* Nested asynchronous invocation // * Nested asynchronous invocation
*/ // */
atask( [] { // atask( [] {
cout << "idemo ..." << endl; // cout << "idemo ..." << endl;
atask( [] { // atask( [] {
cout << "ugdnježdena async funkcija " << endl; // cout << "ugdnježdena async funkcija " << endl;
}); // });
}); // });
// // --------------- EVENTS ------------------- // // --------------- EVENTS -------------------
/** // /**
* initialization of typed events // * initialization of typed events
*/ // */
event<int, int> ev2int; // event<int, int> ev2int;
event<int, string> evintString; // event<int, string> evintString;
event<> evoid; // event<> evoid;
ev2int.on("sum", [](int a, int b) { // ev2int.on("sum", [](int a, int b) {
cout << "Sum " << a+b << endl; // cout << "Sum " << a+b << endl;
}); // });
ev2int.on("sum", [](int a, int b) { // ev2int.on("sum", [](int a, int b) {
cout << "Sum done" << endl; // cout << "Sum done" << endl;
}); // });
evintString.on("substract", [](int a, string b) { // evintString.on("substract", [](int a, string b) {
cout << "Substract " << a-stoi(b) << endl; // cout << "Substract " << a-stoi(b) << endl;
}); // });
evoid.on("void", []() { // evoid.on("void", []() {
cout << "Void emited" << endl; // cout << "Void emited" << endl;
}); // });
string emited2 = "2"; // string emited2 = "2";
evoid.on("void", [&]() { // evoid.on("void", [&]() {
cout << "Void emited " << emited2 << endl; // cout << "Void emited " << emited2 << endl;
}); // });
evoid.emit("void"); // evoid.emit("void");
sleep(1); // sleep(1);
/** // /**
* Emit // * Emit
*/ // */
ev2int.emit("sum", 5, 8); // ev2int.emit("sum", 5, 8);
sleep(1); // sleep(1);
evintString.emit("substract", 3, to_string(2)); // evintString.emit("substract", 3, to_string(2));
sleep(1); // sleep(1);
evoid.off("void"); // evoid.off("void");
evoid.emit("void"); // evoid.emit("void");
/**
* Own class
*/
myOwnClass myclass; // cout << "Ukupno 2 int " << ev2int.listeners() << endl;
// cout << "Ukupno evintString " << evintString.listeners() << endl;
// cout << "Ukupno evoid " << evoid.listeners() << endl;
// cout << "Ukupno 2 int " << ev2int.listeners("sum") << endl;
timeout t( [&] { // /**
myclass.emit("constructed", 1); // * Own class
}, 200); // */
myclass.on("constructed", [] (int i) { // myOwnClass myclass;
cout << "Constructed " << i << endl;
});
// delayed t( [&] {
// myclass.emit("constructed", 1);
// }, 200);
// myclass.on("constructed", [] (int i) {
// cout << "Constructed " << i << endl;
// });
auto status = fs::read("test1.txt");
try { // auto status = fs::read("test1.txt");
auto data = wait(status);
cout << data;
} catch (exception& err) {
cout << err.what() << endl;
}
string data_; // try {
auto start_read = rtime_us(); // auto data = wait(status);
// cout << data;
// } catch (exception& err) {
// cout << err.what() << endl;
// }
fs::read("test1.txt", [&data_, &start_read] (string data, exception* error) {
if (error) { // string data_;
cout << "Error " << error->what() << endl; // auto start_read = rtime_us();
} else {
// cout << "Data " << endl << data << endl; // fs::read("test1.txt", [&data_, &start_read] (string data, exception* error) {
// data_ = data; // if (error) {
// cout << "Data_" << data_ << endl; // cout << "Error " << error->what() << endl;
cout << "read " << rtime_us() - start_read << endl; // } else {
} // // cout << "Data " << endl << data << endl;
}); // // data_ = data;
// // cout << "Data_" << data_ << endl;
// cout << "read " << rtime_us() - start_read << endl;
// }
// });
// ---------------------------------------------------------------------------------------------------- // // ----------------------------------------------------------------------------------------------------
cout << "Run" << endl; cout << "Run" << endl;
_asynco_engine.run(); _asynco_engine.run();

Loading…
Cancel
Save