Compare commits

..

No commits in common. 'dev' and 'dev-asio-engine' have entirely different histories.

  1. 3
      .gitignore
  2. 4
      .vscode/settings.json
  3. 164
      README.md
  4. 76
      lib/asynco.hpp
  5. 19
      lib/define.hpp
  6. 71
      lib/engine.hpp
  7. 65
      lib/event.hpp
  8. 8
      lib/filesystem.hpp
  9. 156
      lib/timers.hpp
  10. 92
      lib/trigger.hpp
  11. 7
      src/engine.cpp
  12. 145
      src/timers.cpp
  13. 1
      test/compile.sh
  14. 216
      test/test.cpp

3
.gitignore vendored

@ -1,3 +1,2 @@
test/test test/test
test/*.txt test/*.txt
example

@ -69,8 +69,6 @@
"cinttypes": "cpp", "cinttypes": "cpp",
"typeindex": "cpp", "typeindex": "cpp",
"typeinfo": "cpp", "typeinfo": "cpp",
"variant": "cpp", "variant": "cpp"
"coroutine": "cpp",
"source_location": "cpp"
} }
} }

@ -3,12 +3,6 @@
A C++ library for event-driven asynchronous multi-threaded programming. A C++ library for event-driven asynchronous multi-threaded programming.
## Motivation
The original concept was to create an interface capable of asynchronously calling any function. It has since evolved into a library that incorporates a thread pool, each with its own event loop, event-driven programming, and functions inherently designed for asynchronous operation (including periodic and delayed functions).
The asynchronous filesystem is provided solely to guide users on how to wrap any time- or IO-intensive function for asynchronous execution.
## Features ## Features
- Object oriented - Object oriented
@ -16,8 +10,8 @@ The asynchronous filesystem is provided solely to guide users on how to wrap any
- Header only - Header only
- Asynchronous programming - Asynchronous programming
- Multithread - Multithread
- Asynchronous timer functions: periodic, delayed (like setInterval and setTimeout from JS) - Asynchronous timer functions: interval, timeout
- Typed events (on, tick, off) (like EventEmitter from JS: on, emit, etc) - Typed events (on, emit, off)
- Event loops - Event loops
- Multiple parallel execution loops - Multiple parallel execution loops
- Asynchronous file IO - Asynchronous file IO
@ -27,16 +21,16 @@ The asynchronous filesystem is provided solely to guide users on how to wrap any
Just download the latest release and unzip it into your project. Just download the latest release and unzip it into your project.
```c++ ```c++
#define NUM_OF_RUNNERS 8 // To change the number of threads used by asynco, without this it runs according to the number of cores #define NUM_OF_RUNNERS 8 // To change the number of threads used by atask, without this it runs according to the number of cores
#include "asynco/lib/asynco.hpp" // async_ (), await_() #include "asynco/lib/asynco.hpp" // atask(), wait()
#include "asynco/lib/triggers.hpp" // trigger (event emitter) #include "asynco/lib/event.hpp" // event
#include "asynco/lib/timers.hpp" // periodic, delayed (like setInterval and setTimeout from JS) #include "asynco/lib/timers.hpp" // interval, timeout
#include "asynco/lib/filesystem.hpp" // for async read and write files #include "asynco/lib/filesystem.hpp" // for async read and write files
using namespace marcelb; using namespace marcelb;
using namespace asynco; using namespace asynco;
using namespace triggers; using namespace events;
// At the end of the main function, always set // At the end of the main function, always set
_asynco_engine.run(); _asynco_engine.run();
@ -49,12 +43,12 @@ return 0;
Time asynchronous functions Time asynchronous functions
```c++ ```c++
// start periodic // start interval
periodic inter1 ([]() { interval inter1 ([]() {
cout << "Interval 1" << endl; cout << "Interval 1" << endl;
}, 1000); }, 1000);
// stop periodic // stop interval
inter1.stop(); inter1.stop();
// how many times it has expired // how many times it has expired
@ -63,12 +57,12 @@ int t = inter1.ticks();
// is it stopped // is it stopped
bool stoped = inter1.stoped(); bool stoped = inter1.stoped();
// start delayed // start timeout
delayed time1 ( [] () { timeout time1 ( [] () {
cout << "Timeout 1 " << endl; cout << "Timeout 1 " << endl;
}, 10000); }, 10000);
// stop delayed // stop timeout
time1.stop(); time1.stop();
// is it expired // is it expired
@ -77,28 +71,6 @@ int t = time1.expired();
// is it stopped // is it stopped
bool stoped = time1.stoped(); bool stoped = time1.stoped();
// If you don't want to save in a variable, but you want to start a timer, use these functions
// And you can also save them, they are only of the shared pointer type
auto d = Delayed( [](){
cout << "Delayed" << endl;
}, 2000);
auto p = Periodic( [](){
cout << "Periodic" << endl;
}, 700);
Periodic( [&] (){
cout << "Delayed expire " << d->expired() << endl;
cout << "Periodic ticks " << p->ticks() << endl;
cout << "Delayed stoped " << d->stoped() << endl;
cout << "Periodic stoped " << p->stoped() << endl;
}, 1000);
Delayed( [&](){
p->stop();
}, 10000);
``` ```
Make functions asynchronous Make functions asynchronous
@ -107,9 +79,9 @@ Make functions asynchronous
* Run an lambda function asynchronously * Run an lambda function asynchronously
*/ */
async_ ( []() { atask( []() {
sleep_for(2s); // only for simulating long duration function sleep_for(2s); // only for simulating long duration function
cout << "nonsync " << endl; cout << "atask" << endl;
return 5; return 5;
}); });
@ -122,7 +94,7 @@ void notLambdaFunction() {
cout << "Call to not lambda function" << endl; cout << "Call to not lambda function" << endl;
} }
async_ (notLambdaFunction); atask (notLambdaFunction);
/** /**
* Run class method * Run class method
@ -136,95 +108,41 @@ class clm {
}; };
clm classes; clm classes;
async_ ( [&classes] () { atask( [&classes] () {
classes.classMethode(); classes.classMethode();
}); });
/** /**
* await_ after runned as async * Wait after runned as async
*/ */
auto a = async_ ( []() { auto a = atask( []() {
sleep_for(2s); // only for simulating long duration function sleep_for(2s); // only for simulating long duration function
cout << "nonsync " << endl; cout << "atask" << endl;
return 5; return 5;
}); });
cout << await_(a) << endl; cout << wait(a) << endl;
/** /**
* await_ async function call and use i cout * Wait async function call and use i cout
*/ */
cout << await_(async_ ( [] () { cout << wait(atask( [] () {
sleep_for(chrono::seconds(1)); // only for simulating long duration function sleep_for(chrono::seconds(1)); // only for simulating long duration function
cout << "await_ end" << endl; cout << "wait end" << endl;
return 4; return 4;
})) << endl; })) << endl;
/**
* Await all
**/
auto a = async_ ( []() {
cout << "A" << endl;
return 3;
});
auto b = async_ ( []() {
cout << "B" << endl;
throw runtime_error("Test exception");
return;
});
auto c = async_ ( []() {
cout << "C" << endl;
return "Hello";
});
int a_;
string c_;
auto await_all = [&] () {
a_ = await_(a);
await_(b);
c_ = await_(c);
};
try {
await_all();
cout << "a_ " << a_ << " c_ " << c_ << endl;
} catch (const exception& exc) {
cout << exc.what() << endl;
}
// // same type
vector<future<void>> fut_vec;
for (int i=0; i<5; i++) {
fut_vec.push_back(
async_ ( [i]() {
cout << "Async_ " << i << endl;
})
);
}
auto await_all = [&] () {
for (int i=0; i<fut_vec.size(); i++) {
await_ (fut_vec[i]);
}
};
/** /**
* Sleep with delayed sleep implement * Sleep with timeout sleep implement
*/ */
void sleep_to (int _time) { void sleep_to (int _time) {
promise<void> _promise; promise<void> _promise;
delayed t( [&]() { timeout t( [&]() {
_promise.set_value(); _promise.set_value();
}, _time); }, _time);
@ -239,7 +157,7 @@ sleep_to(3000);
void promise_reject (int _time) { void promise_reject (int _time) {
promise<void> _promise; promise<void> _promise;
delayed t( [&]() { timeout t( [&]() {
try { try {
// simulate except // simulate except
throw runtime_error("Error simulation"); throw runtime_error("Error simulation");
@ -265,9 +183,9 @@ Events
* initialization of typed events * initialization of typed events
*/ */
trigger<int, int> ev2int; event<int, int> ev2int;
trigger<int, string> evintString; event<int, string> evintString;
trigger<> evoid; event<> evoid;
ev2int.on("sum", [](int a, int b) { ev2int.on("sum", [](int a, int b) {
cout << "Sum " << a+b << endl; cout << "Sum " << a+b << endl;
@ -295,32 +213,32 @@ sleep(1);
* Emit * Emit
*/ */
ev2int.tick("sum", 5, 8); ev2int.emit("sum", 5, 8);
sleep(1); sleep(1);
evintString.tick("substract", 3, to_string(2)); evintString.emit("substract", 3, to_string(2));
sleep(1); sleep(1);
evoid.tick("void"); evoid.emit("void");
// Turn off the event listener // Turn off the event listener
evoid.off("void"); evoid.off("void");
evoid.tick("void"); // nothing is happening evoid.emit("void"); // nothing is happening
``` ```
Extend own class whit events Extend own class whit events
```c++ ```c++
class myOwnClass : public trigger<int> { class myOwnClass : public event<int> {
public: public:
myOwnClass() : trigger() {}; myOwnClass() : event() {};
}; };
myOwnClass myclass; myOwnClass myclass;
delayed t( [&] { timeout t( [&] {
myclass.tick("constructed", 1); myclass.emit("constructed", 1);
}, 200); }, 200);
myclass.on("constructed", [] (int i) { myclass.on("constructed", [] (int i) {
@ -355,7 +273,7 @@ fs::write("test1.txt", "Hello world", [] (exception* error) {
auto future_data = fs::read("test.txt"); auto future_data = fs::read("test.txt");
try { try {
string data = await_(future_data); string data = wait(future_data);
} catch (exception& err) { } catch (exception& err) {
cout << err.what() << endl; cout << err.what() << endl;
} }
@ -363,7 +281,7 @@ try {
auto future_status = fs::write("test.txt", "Hello world"); auto future_status = fs::write("test.txt", "Hello world");
try { try {
await_(future_status); wait(future_status);
} catch (exception& err) { } catch (exception& err) {
cout << err.what() << endl; cout << err.what() << endl;
} }

@ -1,7 +1,7 @@
#ifndef _ASYNCO_ #ifndef _ASYNCO_
#define _ASYNCO_ #define _ASYNCO_
#include "engine.hpp" #include <boost/asio.hpp>
#include <iostream> #include <iostream>
using namespace std; using namespace std;
@ -9,11 +9,57 @@ using namespace std;
namespace marcelb { namespace marcelb {
namespace asynco { namespace asynco {
#define HW_CONCURRENCY_MINIMAL 4
/**
* Internal anonymous class for initializing the ASIO context and thread pool
* !!! It is anonymous to protect against use in the initialization of other objects of the same type !!!
*/
class {
public:
boost::asio::io_context io_context;
void run() {
for (auto& runner : runners) {
runner.join();
}
}
private:
unique_ptr<boost::asio::io_service::work> work { [&] () {
return new boost::asio::io_service::work(io_context);
} ()};
vector<thread> runners { [&] () {
vector<thread> _runs;
unsigned int num_of_runners;
#ifdef NUM_OF_RUNNERS
num_of_runners = NUM_OF_RUNNERS;
#else
num_of_runners = thread::hardware_concurrency();
if (num_of_runners < HW_CONCURRENCY_MINIMAL) {
num_of_runners = HW_CONCURRENCY_MINIMAL;
}
#endif
for (int i=0; i<num_of_runners; i++) {
_runs.push_back(thread ( [this] () {
io_context.run();
}));
}
return _runs;
} ()};
} _asynco_engine;
/** /**
* Run the function asynchronously * Run the function asynchronously
*/ */
template<class F, class... Args> template<class F, class... Args>
auto async_(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type> { auto atask(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type> {
using return_type = typename result_of<F(Args...)>::type; using return_type = typename result_of<F(Args...)>::type;
future<return_type> res = _asynco_engine.io_context.post(boost::asio::use_future(bind(forward<F>(f), forward<Args>(args)...))); future<return_type> res = _asynco_engine.io_context.post(boost::asio::use_future(bind(forward<F>(f), forward<Args>(args)...)));
return res; return res;
@ -23,7 +69,7 @@ auto async_(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::typ
* Block until the asynchronous call completes * Block until the asynchronous call completes
*/ */
template<typename T> template<typename T>
T await_(future<T>& r) { T wait(future<T>& r) {
return r.get(); return r.get();
} }
@ -31,29 +77,7 @@ T await_(future<T>& r) {
* Block until the asynchronous call completes * Block until the asynchronous call completes
*/ */
template<typename T> template<typename T>
T await_(future<T>&& r) { T wait(future<T>&& r) {
return move(r).get();
}
/**
* Block until the asynchronous call completes or time expired
*/
template<typename T>
T await_(future<T>& r, uint64_t time) {
if (r.wait_for(chrono::milliseconds(time)) == std::future_status::timeout) {
throw runtime_error("Asynchronous execution timed out");
}
return r.get();
}
/**
* Block until the asynchronous call completes or time expired
*/
template<typename T>
T await_(future<T>&& r, uint64_t time) {
if (r.wait_for(chrono::milliseconds(time)) == std::future_status::timeout) {
throw runtime_error("Asynchronous execution timed out");
}
return move(r).get(); return move(r).get();
} }

@ -1,19 +0,0 @@
#ifndef _ASYNCO_DEFINE_
#define _ASYNCO_DEFINE_
namespace marcelb {
namespace asynco {
/**
* Alternative names of functions - mostly for the sake of more beautiful coloring of the code
*/
#define async_ marcelb::asynco::async_
#define await_ marcelb::asynco::await_
}
}
#endif

@ -1,71 +0,0 @@
#ifndef _ASYNCO_ENGINE_
#define _ASYNCO_ENGINE_
#include <vector>
#include <memory>
using namespace std;
#include <boost/asio.hpp>
namespace marcelb {
namespace asynco {
#define HW_CONCURRENCY_MINIMAL 4
/**
* Internal anonymous class for initializing the ASIO context and thread pool
* !!! It is anonymous to protect against use in the initialization of other objects of the same type !!!
*/
class Engine {
public:
boost::asio::io_context io_context;
void run() {
for (auto& runner : runners) {
runner.join();
}
}
private:
unique_ptr<boost::asio::io_service::work> work { [&] () {
return new boost::asio::io_service::work(io_context);
} ()};
vector<thread> runners { [&] () {
vector<thread> _runs;
unsigned int num_of_runners;
#ifdef NUM_OF_RUNNERS
num_of_runners = NUM_OF_RUNNERS;
#else
num_of_runners = thread::hardware_concurrency();
if (num_of_runners < HW_CONCURRENCY_MINIMAL) {
num_of_runners = HW_CONCURRENCY_MINIMAL;
}
#endif
for (int i=0; i<num_of_runners; i++) {
_runs.push_back(thread ( [this] () {
io_context.run();
}));
}
return _runs;
} ()};
};
extern Engine _asynco_engine;
}
}
#endif

@ -0,0 +1,65 @@
#ifndef _EVENT_
#define _EVENT_
#include <map>
#include <vector>
#include <string>
#include <functional>
using namespace std;
#include "asynco.hpp"
namespace marcelb {
namespace asynco {
namespace events {
/**
* Event class, for event-driven programming.
* These events are typed according to the arguments of the callback function
*/
template<typename... T>
class event {
private:
mutex m_eve;
unordered_map<string, vector<function<void(T...)>>> events;
public:
/**
* Defines event by key, and callback function
*/
void on(const string& key, function<void(T...)> callback) {
lock_guard _off(m_eve);
events[key].push_back(callback);
}
/**
* It emits an event and sends a callback function saved according to the key with the passed parameters
*/
template<typename... Args>
void emit(const string& key, Args... args) {
auto it_eve = events.find(key);
if (it_eve != events.end()) {
for (uint i =0; i<it_eve->second.size(); i++) {
auto callback = bind(it_eve->second[i], forward<Args>(args)...);
atask(callback);
}
}
}
/**
* Remove an event listener from an event
*/
void off(const string& key) {
lock_guard _off(m_eve);
events.erase(key);
}
};
}
}
}
#endif

@ -19,7 +19,7 @@ namespace fs {
*/ */
template<typename Callback> template<typename Callback>
void read(string path, Callback&& callback) { void read(string path, Callback&& callback) {
asynco::async_( [&path, callback] () { atask( [&path, callback] () {
string content; string content;
try { try {
string line; string line;
@ -48,7 +48,7 @@ void read(string path, Callback&& callback) {
* Asynchronous file reading * Asynchronous file reading
*/ */
future<string> read(string path) { future<string> read(string path) {
return asynco::async_( [&path] () { return atask( [&path] () {
string content; string content;
string line; string line;
ifstream file (path); ifstream file (path);
@ -72,7 +72,7 @@ future<string> read(string path) {
*/ */
template<typename Callback> template<typename Callback>
void write(string path, string content, Callback&& callback) { void write(string path, string content, Callback&& callback) {
asynco::async_( [&path, &content, callback] () { atask( [&path, &content, callback] () {
try { try {
ofstream file (path); ofstream file (path);
if (file.is_open()) { if (file.is_open()) {
@ -95,7 +95,7 @@ void write(string path, string content, Callback&& callback) {
* Asynchronous file writing with callback after write complete * Asynchronous file writing with callback after write complete
*/ */
future<void> write(string path, string content) { future<void> write(string path, string content) {
return asynco::async_( [&path, &content] () { return atask( [&path, &content] () {
ofstream file (path); ofstream file (path);
if (file.is_open()) { if (file.is_open()) {
file << content; file << content;

@ -1,10 +1,12 @@
#ifndef _ASYNCO_TIMERS_ #ifndef _ROTOR_
#define _ASYNCO_TIMERS_ #define _ROTOT_
#include "asynco.hpp"
#include <chrono> #include <chrono>
using namespace std;
#include "asynco.hpp" using namespace std;
using namespace marcelb;
using namespace asynco;
namespace marcelb { namespace marcelb {
namespace asynco { namespace asynco {
@ -13,13 +15,21 @@ namespace asynco {
* Get the time in ms from the epoch * Get the time in ms from the epoch
*/ */
int64_t rtime_ms(); int64_t rtime_ms() {
return chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
/** /**
* Get the time in us from the epoch * Get the time in us from the epoch
*/ */
int64_t rtime_us(); int64_t rtime_us() {
return chrono::duration_cast<chrono::microseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
/** /**
* Core timer class for construct time async functions * Core timer class for construct time async functions
@ -35,126 +45,178 @@ class timer {
/** /**
* A method to assign a callback wrapper and a reinitialization algorithm * A method to assign a callback wrapper and a reinitialization algorithm
*/ */
void init(); void init() {
st.async_wait( [this] (const boost::system::error_code&) {
if (!_stop) {
callback();
if (repeate) {
st = boost::asio::steady_timer(_asynco_engine.io_context, boost::asio::chrono::milliseconds(time));
init();
}
_ticks++;
}
});
}
public: public:
/** /**
* The constructor creates the steady_timer and accompanying variables and runs a method to initialize the timer * The constructor creates the steady_timer and accompanying variables and runs a method to initialize the timer
*/ */
timer (function<void()> _callback, uint64_t _time, bool _repeate); timer (function<void()> _callback, uint64_t _time, bool _repeate) :
st(_asynco_engine.io_context, boost::asio::chrono::milliseconds(_time)),
_stop(false),
repeate(_repeate),
callback(_callback),
time(_time) {
init();
}
/** /**
* Stop timer * Stop timer
* The stop flag is set and timer remove it from the queue * The stop flag is set and timer remove it from the queue
*/ */
void stop(); void stop() {
_stop = true;
st.cancel();
}
/** /**
* Run callback now * Run callback now
* Forces the callback function to run independently of the timer * Forces the callback function to run independently of the timer
*/ */
void now(); void now() {
st.cancel();
}
/** /**
* Get the number of times the timer callback was runned * Get the number of times the timer callback was runned
*/ */
uint64_t ticks(); uint64_t ticks() {
return _ticks;
}
/** /**
* The logic status of the timer stop state * The logic status of the timer stop state
*/ */
bool stoped(); bool stoped() {
return _stop;
}
/** /**
* The destructor stops the timer * The destructor stops the timer
*/ */
~timer(); ~timer() {
stop();
}
}; };
/** /**
* Class periodic for periodic execution of the callback in time in ms * Class interval for periodic execution of the callback in time in ms
*/ */
class periodic { class interval {
shared_ptr<timer> _timer; shared_ptr<timer> _timer;
public: public:
/** /**
* Constructor initializes a shared pointer of type timer * Constructor initializes a shared pointer of type timer
*/ */
periodic(function<void()> callback, uint64_t time); interval(function<void()> callback, uint64_t time) :
_timer(make_shared<timer> (callback, time, true)) {
}
/** /**
* Stop periodic * Stop interval
* The stop flag is set and periodic remove it from the queue * The stop flag is set and interval remove it from the queue
*/ */
void stop(); void stop() {
_timer->stop();
}
/** /**
* Run callback now * Run callback now
* Forces the callback function to run independently of the periodic * Forces the callback function to run independently of the interval
*/ */
void now(); void now() {
_timer->now();
}
/** /**
* Get the number of times the periodic callback was runned * Get the number of times the interval callback was runned
*/ */
uint64_t ticks(); uint64_t ticks() {
return _timer->ticks();
}
/** /**
* The logic status of the periodic stop state * The logic status of the interval stop state
*/ */
bool stoped(); bool stoped() {
return _timer->stoped();
}
/** /**
* The destructor stops the periodic * The destructor stops the interval
*/ */
~periodic(); ~interval() {
stop();
}
}; };
/** /**
* Class delayed for delayed callback execution in ms * Class timeout for delayed callback execution in ms
*/ */
class delayed { class timeout {
shared_ptr<timer> _timer; shared_ptr<timer> _timer;
public: public:
/** /**
* Constructor initializes a shared pointer of type timer * Constructor initializes a shared pointer of type timer
*/ */
delayed(function<void()> callback, uint64_t time); timeout(function<void()> callback, uint64_t time) :
_timer(make_shared<timer> (callback, time, false)) {
}
/** /**
* Stop delayed * Stop timeout
* The stop flag is set and delayed remove it from the queue * The stop flag is set and timeout remove it from the queue
*/ */
void stop(); void stop() {
_timer->stop();
}
/** /**
* Run callback now * Run callback now
* Forces the callback function to run independently of the delayed * Forces the callback function to run independently of the timeout
*/ */
void now(); void now() {
_timer->now();
}
/** /**
* Get is the delayed callback runned * Get the number of times the timeout callback was runned
*/ */
bool expired(); bool expired() {
return bool(_timer->ticks());
}
/** /**
* The logic status of the delayed stop state * The logic status of the timeout stop state
*/ */
bool stoped(); bool stoped() {
return _timer->stoped();
}
/** /**
* The destructor stops the delayed * The destructor stops the timeout
*/ */
~delayed(); ~timeout() {
stop();
}
}; };
shared_ptr<periodic> Periodic(function<void()> callback, uint64_t time);
shared_ptr<delayed> Delayed(function<void()> callback, uint64_t time);
} }
} }

@ -1,92 +0,0 @@
#ifndef _ASYNCO_TRIGGER_
#define _ASYNCO_TRIGGER_
#include <map>
#include <vector>
#include <string>
#include <functional>
using namespace std;
#include "engine.hpp"
namespace marcelb {
namespace asynco {
namespace triggers {
/**
* trigger class, for event-driven programming.
* These events are typed according to the arguments of the callback function
*/
template<typename... T>
class trigger {
private:
mutex m_eve;
unordered_map<string, vector<function<void(T...)>>> triggers;
public:
/**
* Defines event by key, and callback function
*/
void on(const string& key, function<void(T...)> callback) {
lock_guard _off(m_eve);
triggers[key].push_back(callback);
}
/**
* It emits an event and sends a callback function saved according to the key with the passed parameters
*/
template<typename... Args>
void tick(const string& key, Args... args) {
auto it_eve = triggers.find(key);
if (it_eve != triggers.end()) {
for (uint i =0; i<it_eve->second.size(); i++) {
auto callback = bind(it_eve->second[i], forward<Args>(args)...);
asynco::async_(callback);
}
}
}
/**
* Remove an trigger listener from an event
*/
void off(const string& key) {
lock_guard _off(m_eve);
triggers.erase(key);
}
/**
* Remove all trigger listener
*/
void off() {
lock_guard _off(m_eve);
triggers.clear();
}
/**
* Get num of listeners by an trigger key
*/
unsigned int listeners(const string& key) {
return triggers[key].size();
}
/**
* Get num of all listeners
*/
unsigned int listeners() {
unsigned int listeners = 0;
for (auto& ev : triggers) {
listeners += ev.second.size();
}
return listeners;
}
};
}
}
}
#endif

@ -1,7 +0,0 @@
#include "../lib/engine.hpp"
namespace marcelb::asynco {
Engine _asynco_engine;
};

@ -1,145 +0,0 @@
#include "../lib/timers.hpp"
namespace marcelb::asynco {
int64_t rtime_ms() {
return chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
int64_t rtime_us() {
return chrono::duration_cast<chrono::microseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
void timer::init() {
st.async_wait( [this] (const boost::system::error_code&) {
if (!_stop) {
callback();
if (repeate) {
st = boost::asio::steady_timer(_asynco_engine.io_context, boost::asio::chrono::milliseconds(time));
init();
}
_ticks++;
}
});
}
timer::timer (function<void()> _callback, uint64_t _time, bool _repeate) :
st(_asynco_engine.io_context, boost::asio::chrono::milliseconds(_time)),
_stop(false),
repeate(_repeate),
callback(_callback),
time(_time) {
init();
}
void timer::stop() {
_stop = true;
st.cancel();
}
void timer::now() {
st.cancel();
}
uint64_t timer::ticks() {
return _ticks;
}
bool timer::stoped() {
return _stop;
}
timer::~timer() {
stop();
}
periodic::periodic(function<void()> callback, uint64_t time) :
_timer(make_shared<timer> (callback, time, true)) {
}
void periodic::stop() {
_timer->stop();
}
void periodic::now() {
_timer->now();
}
uint64_t periodic::ticks() {
return _timer->ticks();
}
bool periodic::stoped() {
return _timer->stoped();
}
periodic::~periodic() {
stop();
}
delayed::delayed(function<void()> callback, uint64_t time) :
_timer(make_shared<timer> (callback, time, false)) {
}
void delayed::stop() {
_timer->stop();
}
void delayed::now() {
_timer->now();
}
bool delayed::expired() {
return bool(_timer->ticks());
}
bool delayed::stoped() {
return _timer->stoped();
}
delayed::~delayed() {
stop();
}
mutex p_io, d_io;
vector<shared_ptr<periodic>> periodic_calls_container;
vector<shared_ptr<delayed>> delayed_calls_container;
shared_ptr<periodic> Periodic(function<void()> callback, uint64_t time) {
shared_ptr<periodic> periodic_ptr(make_shared<periodic>(callback, time));
async_ ( [&, periodic_ptr](){
lock_guard<mutex> lock(p_io);
periodic_calls_container.push_back(periodic_ptr);
for (uint32_t i=0; i<periodic_calls_container.size(); i++) {
if (periodic_calls_container[i]->stoped()) {
periodic_calls_container.erase(periodic_calls_container.begin()+i);
i--;
}
}
});
return periodic_ptr;
}
shared_ptr<delayed> Delayed(function<void()> callback, uint64_t time) {
shared_ptr<delayed> delayed_ptr(make_shared<delayed>(callback, time));
async_ ( [&, delayed_ptr](){
lock_guard<mutex> lock(p_io);
delayed_calls_container.push_back(delayed_ptr);
for (uint32_t i=0; i<delayed_calls_container.size(); i++) {
if (delayed_calls_container[i]->stoped() || delayed_calls_container[i]->expired()) {
delayed_calls_container.erase(delayed_calls_container.begin()+i);
i--;
}
}
});
return delayed_ptr;
}
};

@ -1 +0,0 @@
g++ test.cpp ../src/* -o test

@ -1,28 +1,24 @@
// // #define NUM_OF_RUNNERS 2 // #define NUM_OF_RUNNERS 2
#include "../lib/asynco.hpp" #include "../lib/asynco.hpp"
#include "../lib/trigger.hpp" #include "../lib/event.hpp"
#include "../lib/filesystem.hpp" #include "../lib/filesystem.hpp"
#include "../lib/timers.hpp" #include "../lib/timers.hpp"
#include "../lib/define.hpp"
using namespace marcelb::asynco; using namespace marcelb::asynco;
using namespace triggers; using namespace events;
#include <iostream> #include <iostream>
#include <unistd.h> #include <unistd.h>
#include <thread> #include <thread>
#include <future>
#include <vector>
using namespace std; using namespace std;
using namespace this_thread; using namespace this_thread;
void sleep_to (int _time) { void sleep_to (int _time) {
promise<void> _promise; promise<void> _promise;
delayed t( [&]() { timeout t( [&]() {
_promise.set_value(); _promise.set_value();
}, _time); }, _time);
@ -31,7 +27,7 @@ void sleep_to (int _time) {
void promise_reject (int _time) { void promise_reject (int _time) {
promise<void> _promise; promise<void> _promise;
delayed t( [&]() { timeout t( [&]() {
try { try {
// simulate except // simulate except
throw runtime_error("Error simulation"); throw runtime_error("Error simulation");
@ -57,9 +53,9 @@ class clm {
// ------------------ EXTEND OWN CLASS WITH EVENTS ------------------- // ------------------ EXTEND OWN CLASS WITH EVENTS -------------------
class myOwnClass : public trigger<int> { class myOwnClass : public event<int> {
public: public:
myOwnClass() : trigger() {}; myOwnClass() : event() {};
}; };
@ -69,37 +65,37 @@ int main () {
// --------------- TIME ASYNCHRONOUS FUNCTIONS -------------- // --------------- TIME ASYNCHRONOUS FUNCTIONS --------------
/** // /**
* Init periodic and delayed; clear periodic and delayed // * Init interval and timeout; clear interval and timeout
*/ // */
// periodic inter1 ([&]() { // interval inter1 ([&]() {
// cout << "periodic prvi " << rtime_ms() - start << endl; // cout << "interval prvi " << rtime_ms() - start << endl;
// }, 1000); // }, 1000);
// periodic inter2 ([&]() { // interval inter2 ([&]() {
// cout << "periodic drugi " << rtime_ms() - start << endl; // cout << "interval drugi " << rtime_ms() - start << endl;
// }, 2000); // }, 2000);
// periodic inter3 ([&]() { // interval inter3 ([&]() {
// cout << "periodic treći " << rtime_ms() - start << endl; // cout << "interval treći " << rtime_ms() - start << endl;
// }, 1000); // }, 1000);
// periodic inter4 ([&]() { // interval inter4 ([&]() {
// // cout << "periodic cetvrti " << rtime_ms() - start << endl; // // cout << "interval cetvrti " << rtime_ms() - start << endl;
// cout << "Ticks " << inter3.ticks() << endl; // cout << "Ticks " << inter3.ticks() << endl;
// }, 500); // }, 500);
// periodic inter5 ([&]() { // interval inter5 ([&]() {
// cout << "periodic peti " << rtime_ms() - start << endl; // cout << "interval peti " << rtime_ms() - start << endl;
// }, 2000); // }, 2000);
// periodic inter6 ([&]() { // interval inter6 ([&]() {
// cout << "periodic sesti " << rtime_ms() - start << endl; // cout << "interval sesti " << rtime_ms() - start << endl;
// }, 3000); // }, 3000);
// delayed time1 ( [&] () { // timeout time1 ( [&] () {
// cout << "Close periodic 1 i 2 " << rtime_ms() - start << endl; // cout << "Close interval 1 i 2 " << rtime_ms() - start << endl;
// inter1.stop(); // inter1.stop();
// cout << "inter1.stop " << endl; // cout << "inter1.stop " << endl;
// inter2.stop(); // inter2.stop();
@ -107,8 +103,8 @@ int main () {
// }, 8000); // }, 8000);
// delayed time2 ([&] () { // timeout time2 ([&] () {
// cout << "Close periodic 3 " << rtime_ms() - start << endl; // cout << "Close interval 3 " << rtime_ms() - start << endl;
// inter3.stop(); // inter3.stop();
// cout << "Stoped " << inter3.stoped() << endl; // cout << "Stoped " << inter3.stoped() << endl;
// // time1.stop(); // // time1.stop();
@ -129,34 +125,15 @@ int main () {
// cout << "nije isteko " << endl; // cout << "nije isteko " << endl;
// } // }
// auto d = Delayed( [](){ // // // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS -------------------------
// cout << "Delayed" << endl;
// }, 2000);
// auto p = Periodic( [](){
// cout << "Periodic" << endl;
// }, 700);
// Periodic( [&] (){ // /**
// cout << "Delayed expire " << d->expired() << endl; // * Run an function asyncronic
// cout << "Periodic ticks " << p->ticks() << endl; // */
// cout << "Delayed stoped " << d->stoped() << endl;
// cout << "Periodic stoped " << p->stoped() << endl;
// }, 1000);
// Delayed( [&](){
// p->stop();
// }, 10000);
// // // // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS -------------------------
// // /**
// // * Run an function asyncronic
// // */
// async_ ( []() { // atask( []() {
// sleep_for(2s); // only for simulate log duration function // sleep_for(2s); // only for simulate log duration function
// cout << "asynco 1" << endl; // cout << "atask 1" << endl;
// return 5; // return 5;
// }); // });
@ -164,56 +141,51 @@ int main () {
// * Call not lambda function // * Call not lambda function
// */ // */
// async_ (notLambdaFunction); // atask (notLambdaFunction);
// await_ ( // wait (
// async_ ( // atask (
// notLambdaFunction // notLambdaFunction
// ) // )
// ); // );
// /**
// // async(launch::async, [] () { // * Call class method
// // cout << "Another thread in async style!" << endl; // */
// // });
// // /**
// // * Call class method
// // */
// clm classes; // clm classes;
// async_ ( [&classes] () { // atask( [&classes] () {
// classes.classMethode(); // classes.classMethode();
// }); // });
// sleep(5); // sleep(5);
// // /** // /**
// // * await_ after runned as async // * Wait after runned as async
// // */ // */
// auto aa = async_ ( []() { // auto a = atask( []() {
// sleep_for(2s); // only for simulate log duration function // sleep_for(2s); // only for simulate log duration function
// cout << "async_ 2" << endl; // cout << "atask 2" << endl;
// return 5; // return 5;
// }); // });
// cout << await_(aa) << endl; // cout << wait(a) << endl;
// cout << "print after async_ 2" << endl; // cout << "print after atask 2" << endl;
// /** // /**
// * await_ async function call and use i cout // * Wait async function call and use i cout
// */ // */
// cout << await_(async_ ( [] () { // cout << wait(atask( [] () {
// sleep_for(chrono::seconds(1)); // only for simulate log duration function // sleep_for(chrono::seconds(1)); // only for simulate log duration function
// cout << "await_ end" << endl; // cout << "wait end" << endl;
// return 4; // return 4;
// })) << endl; // })) << endl;
// /** // /**
// * Sleep with delayed sleep implement // * Sleep with timeout sleep implement
// */ // */
// sleep_to(3000); // sleep_to(3000);
@ -237,76 +209,22 @@ int main () {
// */ // */
// async_ ( [] { // atask( [] {
// cout << "idemo ..." << endl; // cout << "idemo ..." << endl;
// async_ ( [] { // atask( [] {
// cout << "ugdnježdena async funkcija " << endl; // cout << "ugdnježdena async funkcija " << endl;
// }); // });
// }); // });
// // // --------------- EVENTS -------------------
// // -------------------------- AWAIT ALL ----------------------------------
// auto a = async_ ( []() {
// cout << "A" << endl;
// return 3;
// });
// auto b = async_ ( []() {
// cout << "B" << endl;
// throw runtime_error("Test exception");
// return;
// });
// auto c = async_ ( []() {
// cout << "C" << endl;
// return "Hello";
// });
// int a_;
// string c_;
// auto await_all = [&] () {
// a_ = await_(a);
// await_(b);
// c_ = await_(c);
// };
// try {
// await_all();
// cout << "a_ " << a_ << " c_ " << c_ << endl;
// } catch (const exception& exc) {
// cout << exc.what() << endl;
// }
// // // same type
// vector<future<void>> fut_vec;
// for (int i=0; i<5; i++) {
// fut_vec.push_back(
// async_ ( [i]() {
// cout << "Async_ " << i << endl;
// })
// );
// }
// auto await_all2 = [&] () {
// for (int i=0; i<fut_vec.size(); i++) {
// await_ (fut_vec[i]);
// }
// };
// await_all2();
// // --------------- EVENTS -------------------
// /** // /**
// * initialization of typed events // * initialization of typed events
// */ // */
// trigger<int, int> ev2int; // event<int, int> ev2int;
// trigger<int, string> evintString; // event<int, string> evintString;
// trigger<> evoid; // event<> evoid;
// ev2int.on("sum", [](int a, int b) { // ev2int.on("sum", [](int a, int b) {
// cout << "Sum " << a+b << endl; // cout << "Sum " << a+b << endl;
@ -330,28 +248,22 @@ int main () {
// cout << "Void emited " << emited2 << endl; // cout << "Void emited " << emited2 << endl;
// }); // });
// evoid.tick("void"); // evoid.emit("void");
// sleep(1); // sleep(1);
// /** // /**
// * Emit // * Emit
// */ // */
// ev2int.tick("sum", 5, 8); // ev2int.emit("sum", 5, 8);
// sleep(1); // sleep(1);
// evintString.tick("substract", 3, to_string(2)); // evintString.emit("substract", 3, to_string(2));
// sleep(1); // sleep(1);
// evoid.off("void"); // evoid.off("void");
// evoid.tick("void"); // evoid.emit("void");
// cout << "Ukupno 2 int " << ev2int.listeners() << endl;
// cout << "Ukupno evintString " << evintString.listeners() << endl;
// cout << "Ukupno evoid " << evoid.listeners() << endl;
// cout << "Ukupno 2 int " << ev2int.listeners("sum") << endl;
// /** // /**
// * Own class // * Own class
@ -359,8 +271,8 @@ int main () {
// myOwnClass myclass; // myOwnClass myclass;
// delayed t( [&] { // timeout t( [&] {
// myclass.tick("constructed", 1); // myclass.emit("constructed", 1);
// }, 200); // }, 200);
// myclass.on("constructed", [] (int i) { // myclass.on("constructed", [] (int i) {
@ -373,7 +285,7 @@ int main () {
// try { // try {
// auto data = await_(status); // auto data = wait(status);
// cout << data; // cout << data;
// } catch (exception& err) { // } catch (exception& err) {
// cout << err.what() << endl; // cout << err.what() << endl;
@ -395,7 +307,7 @@ int main () {
// }); // });
// // ---------------------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------------------
cout << "Run" << endl; cout << "Run" << endl;
_asynco_engine.run(); _asynco_engine.run();

Loading…
Cancel
Save