Compare commits

..

18 Commits

  1. 3
      .gitignore
  2. 4
      .vscode/settings.json
  3. 187
      README.md
  4. 34
      lib/asynco.hpp
  5. 19
      lib/define.hpp
  6. 71
      lib/engine.hpp
  7. 66
      lib/event.hpp
  8. 15
      lib/filesystem.hpp
  9. 239
      lib/rotor.hpp
  10. 136
      lib/runner.hpp
  11. 161
      lib/timers.hpp
  12. 92
      lib/trigger.hpp
  13. 7
      src/engine.cpp
  14. 145
      src/timers.cpp
  15. 1
      test/compile.sh
  16. 296
      test/test.cpp

3
.gitignore vendored

@ -1,2 +1,3 @@
test/test test/test
test/*.txt test/*.txt
example

@ -69,6 +69,8 @@
"cinttypes": "cpp", "cinttypes": "cpp",
"typeindex": "cpp", "typeindex": "cpp",
"typeinfo": "cpp", "typeinfo": "cpp",
"variant": "cpp" "variant": "cpp",
"coroutine": "cpp",
"source_location": "cpp"
} }
} }

@ -3,6 +3,12 @@
A C++ library for event-driven asynchronous multi-threaded programming. A C++ library for event-driven asynchronous multi-threaded programming.
## Motivation
The original concept was to create an interface capable of asynchronously calling any function. It has since evolved into a library that incorporates a thread pool, each with its own event loop, event-driven programming, and functions inherently designed for asynchronous operation (including periodic and delayed functions).
The asynchronous filesystem is provided solely to guide users on how to wrap any time- or IO-intensive function for asynchronous execution.
## Features ## Features
- Object oriented - Object oriented
@ -10,27 +16,31 @@ A C++ library for event-driven asynchronous multi-threaded programming.
- Header only - Header only
- Asynchronous programming - Asynchronous programming
- Multithread - Multithread
- Asynchronous timer functions: interval, timeout - Asynchronous timer functions: periodic, delayed (like setInterval and setTimeout from JS)
- Typed events (on, emit, off) - Typed events (on, tick, off) (like EventEmitter from JS: on, emit, etc)
- Event loops - Event loops
- Multiple parallel execution loops - Multiple parallel execution loops
- Asynchronous file IO - Asynchronous file IO
- Based on ASIO (Boost Asio)
## Installation ## Installation
Just download the latest release and unzip it into your project. Just download the latest release and unzip it into your project.
```c++ ```c++
#define NUM_OF_RUNNERS 8 // To change the number of threads used by atask, without this it runs according to the number of cores #define NUM_OF_RUNNERS 8 // To change the number of threads used by asynco, without this it runs according to the number of cores
#include "asynco/lib/asynco.hpp" // atask(), wait() #include "asynco/lib/asynco.hpp" // async_ (), await_()
#include "asynco/lib/event.hpp" // event #include "asynco/lib/triggers.hpp" // trigger (event emitter)
#include "asynco/lib/rotor.hpp" // interval, timeout #include "asynco/lib/timers.hpp" // periodic, delayed (like setInterval and setTimeout from JS)
#include "asynco/lib/runner.hpp" // for own loop #include "asynco/lib/filesystem.hpp" // for async read and write files
#include "asynco/lib/filesystem.hpp" // for async read and write files
using namespace marcelb; using namespace marcelb;
using namespace asynco; using namespace asynco;
using namespace events; using namespace triggers;
// At the end of the main function, always set
_asynco_engine.run();
return 0;
``` ```
@ -39,21 +49,56 @@ using namespace events;
Time asynchronous functions Time asynchronous functions
```c++ ```c++
// start interval // start periodic
interval inter1 ([]() { periodic inter1 ([]() {
cout << "Interval 1" << endl; cout << "Interval 1" << endl;
}, 1000); }, 1000);
// stop interval // stop periodic
inter1.clear(); inter1.stop();
// how many times it has expired
int t = inter1.ticks();
// start timeout // is it stopped
timeout time1 ( [] () { bool stoped = inter1.stoped();
// start delayed
delayed time1 ( [] () {
cout << "Timeout 1 " << endl; cout << "Timeout 1 " << endl;
}, 10000); }, 10000);
// stop timeout // stop delayed
time1.clear(); time1.stop();
// is it expired
int t = time1.expired();
// is it stopped
bool stoped = time1.stoped();
// If you don't want to save in a variable, but you want to start a timer, use these functions
// And you can also save them, they are only of the shared pointer type
auto d = Delayed( [](){
cout << "Delayed" << endl;
}, 2000);
auto p = Periodic( [](){
cout << "Periodic" << endl;
}, 700);
Periodic( [&] (){
cout << "Delayed expire " << d->expired() << endl;
cout << "Periodic ticks " << p->ticks() << endl;
cout << "Delayed stoped " << d->stoped() << endl;
cout << "Periodic stoped " << p->stoped() << endl;
}, 1000);
Delayed( [&](){
p->stop();
}, 10000);
``` ```
Make functions asynchronous Make functions asynchronous
@ -62,9 +107,9 @@ Make functions asynchronous
* Run an lambda function asynchronously * Run an lambda function asynchronously
*/ */
atask( []() { async_ ( []() {
sleep_for(2s); // only for simulating long duration function sleep_for(2s); // only for simulating long duration function
cout << "atask" << endl; cout << "nonsync " << endl;
return 5; return 5;
}); });
@ -77,7 +122,7 @@ void notLambdaFunction() {
cout << "Call to not lambda function" << endl; cout << "Call to not lambda function" << endl;
} }
atask (notLambdaFunction); async_ (notLambdaFunction);
/** /**
* Run class method * Run class method
@ -91,41 +136,95 @@ class clm {
}; };
clm classes; clm classes;
atask( [&classes] () { async_ ( [&classes] () {
classes.classMethode(); classes.classMethode();
}); });
/** /**
* Wait after runned as async * await_ after runned as async
*/ */
auto a = atask( []() { auto a = async_ ( []() {
sleep_for(2s); // only for simulating long duration function sleep_for(2s); // only for simulating long duration function
cout << "atask" << endl; cout << "nonsync " << endl;
return 5; return 5;
}); });
cout << wait(a) << endl; cout << await_(a) << endl;
/** /**
* Wait async function call and use i cout * await_ async function call and use i cout
*/ */
cout << wait(atask( [] () { cout << await_(async_ ( [] () {
sleep_for(chrono::seconds(1)); // only for simulating long duration function sleep_for(chrono::seconds(1)); // only for simulating long duration function
cout << "wait end" << endl; cout << "await_ end" << endl;
return 4; return 4;
})) << endl; })) << endl;
/**
* Await all
**/
auto a = async_ ( []() {
cout << "A" << endl;
return 3;
});
auto b = async_ ( []() {
cout << "B" << endl;
throw runtime_error("Test exception");
return;
});
auto c = async_ ( []() {
cout << "C" << endl;
return "Hello";
});
int a_;
string c_;
auto await_all = [&] () {
a_ = await_(a);
await_(b);
c_ = await_(c);
};
try {
await_all();
cout << "a_ " << a_ << " c_ " << c_ << endl;
} catch (const exception& exc) {
cout << exc.what() << endl;
}
// // same type
vector<future<void>> fut_vec;
for (int i=0; i<5; i++) {
fut_vec.push_back(
async_ ( [i]() {
cout << "Async_ " << i << endl;
})
);
}
auto await_all = [&] () {
for (int i=0; i<fut_vec.size(); i++) {
await_ (fut_vec[i]);
}
};
/** /**
* Sleep with timeout sleep implement * Sleep with delayed sleep implement
*/ */
void sleep_to (int _time) { void sleep_to (int _time) {
promise<void> _promise; promise<void> _promise;
timeout t( [&]() { delayed t( [&]() {
_promise.set_value(); _promise.set_value();
}, _time); }, _time);
@ -140,7 +239,7 @@ sleep_to(3000);
void promise_reject (int _time) { void promise_reject (int _time) {
promise<void> _promise; promise<void> _promise;
timeout t( [&]() { delayed t( [&]() {
try { try {
// simulate except // simulate except
throw runtime_error("Error simulation"); throw runtime_error("Error simulation");
@ -166,9 +265,9 @@ Events
* initialization of typed events * initialization of typed events
*/ */
event<int, int> ev2int; trigger<int, int> ev2int;
event<int, string> evintString; trigger<int, string> evintString;
event<> evoid; trigger<> evoid;
ev2int.on("sum", [](int a, int b) { ev2int.on("sum", [](int a, int b) {
cout << "Sum " << a+b << endl; cout << "Sum " << a+b << endl;
@ -196,32 +295,32 @@ sleep(1);
* Emit * Emit
*/ */
ev2int.emit("sum", 5, 8); ev2int.tick("sum", 5, 8);
sleep(1); sleep(1);
evintString.emit("substract", 3, to_string(2)); evintString.tick("substract", 3, to_string(2));
sleep(1); sleep(1);
evoid.emit("void"); evoid.tick("void");
// Turn off the event listener // Turn off the event listener
evoid.off("void"); evoid.off("void");
evoid.emit("void"); // nothing is happening evoid.tick("void"); // nothing is happening
``` ```
Extend own class whit events Extend own class whit events
```c++ ```c++
class myOwnClass : public event<int> { class myOwnClass : public trigger<int> {
public: public:
myOwnClass() : event() {}; myOwnClass() : trigger() {};
}; };
myOwnClass myclass; myOwnClass myclass;
timeout t( [&] { delayed t( [&] {
myclass.emit("constructed", 1); myclass.tick("constructed", 1);
}, 200); }, 200);
myclass.on("constructed", [] (int i) { myclass.on("constructed", [] (int i) {
@ -256,7 +355,7 @@ fs::write("test1.txt", "Hello world", [] (exception* error) {
auto future_data = fs::read("test.txt"); auto future_data = fs::read("test.txt");
try { try {
string data = wait(future_data); string data = await_(future_data);
} catch (exception& err) { } catch (exception& err) {
cout << err.what() << endl; cout << err.what() << endl;
} }
@ -264,7 +363,7 @@ try {
auto future_status = fs::write("test.txt", "Hello world"); auto future_status = fs::write("test.txt", "Hello world");
try { try {
wait(future_status); await_(future_status);
} catch (exception& err) { } catch (exception& err) {
cout << err.what() << endl; cout << err.what() << endl;
} }

@ -1,7 +1,8 @@
#ifndef _ASYNCO_ #ifndef _ASYNCO_
#define _ASYNCO_ #define _ASYNCO_
#include "runner.hpp" #include "engine.hpp"
#include <iostream>
using namespace std; using namespace std;
@ -12,10 +13,9 @@ namespace asynco {
* Run the function asynchronously * Run the function asynchronously
*/ */
template<class F, class... Args> template<class F, class... Args>
auto atask(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type> { auto async_(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type> {
using return_type = typename result_of<F(Args...)>::type; using return_type = typename result_of<F(Args...)>::type;
future<return_type> res = _asynco_engine.io_context.post(boost::asio::use_future(bind(forward<F>(f), forward<Args>(args)...)));
future<return_type> res = _asyncon.put_task(bind(forward<F>(f), forward<Args>(args)...));
return res; return res;
} }
@ -23,7 +23,7 @@ auto atask(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type
* Block until the asynchronous call completes * Block until the asynchronous call completes
*/ */
template<typename T> template<typename T>
T wait(future<T>& r) { T await_(future<T>& r) {
return r.get(); return r.get();
} }
@ -31,7 +31,29 @@ T wait(future<T>& r) {
* Block until the asynchronous call completes * Block until the asynchronous call completes
*/ */
template<typename T> template<typename T>
T wait(future<T>&& r) { T await_(future<T>&& r) {
return move(r).get();
}
/**
* Block until the asynchronous call completes or time expired
*/
template<typename T>
T await_(future<T>& r, uint64_t time) {
if (r.wait_for(chrono::milliseconds(time)) == std::future_status::timeout) {
throw runtime_error("Asynchronous execution timed out");
}
return r.get();
}
/**
* Block until the asynchronous call completes or time expired
*/
template<typename T>
T await_(future<T>&& r, uint64_t time) {
if (r.wait_for(chrono::milliseconds(time)) == std::future_status::timeout) {
throw runtime_error("Asynchronous execution timed out");
}
return move(r).get(); return move(r).get();
} }

@ -0,0 +1,19 @@
#ifndef _ASYNCO_DEFINE_
#define _ASYNCO_DEFINE_
namespace marcelb {
namespace asynco {
/**
* Alternative names of functions - mostly for the sake of more beautiful coloring of the code
*/
#define async_ marcelb::asynco::async_
#define await_ marcelb::asynco::await_
}
}
#endif

@ -0,0 +1,71 @@
#ifndef _ASYNCO_ENGINE_
#define _ASYNCO_ENGINE_
#include <vector>
#include <memory>
using namespace std;
#include <boost/asio.hpp>
namespace marcelb {
namespace asynco {
#define HW_CONCURRENCY_MINIMAL 4
/**
* Internal anonymous class for initializing the ASIO context and thread pool
* !!! It is anonymous to protect against use in the initialization of other objects of the same type !!!
*/
class Engine {
public:
boost::asio::io_context io_context;
void run() {
for (auto& runner : runners) {
runner.join();
}
}
private:
unique_ptr<boost::asio::io_service::work> work { [&] () {
return new boost::asio::io_service::work(io_context);
} ()};
vector<thread> runners { [&] () {
vector<thread> _runs;
unsigned int num_of_runners;
#ifdef NUM_OF_RUNNERS
num_of_runners = NUM_OF_RUNNERS;
#else
num_of_runners = thread::hardware_concurrency();
if (num_of_runners < HW_CONCURRENCY_MINIMAL) {
num_of_runners = HW_CONCURRENCY_MINIMAL;
}
#endif
for (int i=0; i<num_of_runners; i++) {
_runs.push_back(thread ( [this] () {
io_context.run();
}));
}
return _runs;
} ()};
};
extern Engine _asynco_engine;
}
}
#endif

@ -1,66 +0,0 @@
#ifndef _EVENT_
#define _EVENT_
#include <iostream>
#include <map>
#include <vector>
#include <string>
#include <functional>
#include "runner.hpp"
using namespace std;
namespace marcelb {
namespace asynco {
namespace events {
/**
* Event class, for event-driven programming.
* These events are typed according to the arguments of the callback function
*/
template<typename... T>
class event {
private:
mutex m_eve;
unordered_map<string, vector<function<void(T...)>>> events;
public:
/**
* Defines event by key, and callback function
*/
void on(const string& key, function<void(T...)> callback) {
lock_guard _off(m_eve);
events[key].push_back(callback);
}
/**
* It emits an event and sends a callback function saved according to the key with the passed parameters
*/
template<typename... Args>
void emit(const string& key, Args... args) {
auto it_eve = events.find(key);
if (it_eve != events.end()) {
for (uint i =0; i<it_eve->second.size(); i++) {
auto callback = bind(it_eve->second[i], forward<Args>(args)...);
_asyncon.put_task(callback);
}
}
}
/**
* Remove an event listener from an event
*/
void off(const string& key) {
lock_guard _off(m_eve);
events.erase(key);
}
};
}
}
}
#endif

@ -3,13 +3,12 @@
#include "asynco.hpp" #include "asynco.hpp"
using namespace marcelb;
using namespace asynco;
#include <fstream> #include <fstream>
#include <iostream>
using namespace std; using namespace std;
using namespace marcelb;
using namespace asynco;
namespace marcelb { namespace marcelb {
namespace asynco { namespace asynco {
@ -20,7 +19,7 @@ namespace fs {
*/ */
template<typename Callback> template<typename Callback>
void read(string path, Callback&& callback) { void read(string path, Callback&& callback) {
atask( [&path, callback] () { asynco::async_( [&path, callback] () {
string content; string content;
try { try {
string line; string line;
@ -49,7 +48,7 @@ void read(string path, Callback&& callback) {
* Asynchronous file reading * Asynchronous file reading
*/ */
future<string> read(string path) { future<string> read(string path) {
return atask( [&path] () { return asynco::async_( [&path] () {
string content; string content;
string line; string line;
ifstream file (path); ifstream file (path);
@ -73,7 +72,7 @@ future<string> read(string path) {
*/ */
template<typename Callback> template<typename Callback>
void write(string path, string content, Callback&& callback) { void write(string path, string content, Callback&& callback) {
atask( [&path, &content, callback] () { asynco::async_( [&path, &content, callback] () {
try { try {
ofstream file (path); ofstream file (path);
if (file.is_open()) { if (file.is_open()) {
@ -96,7 +95,7 @@ void write(string path, string content, Callback&& callback) {
* Asynchronous file writing with callback after write complete * Asynchronous file writing with callback after write complete
*/ */
future<void> write(string path, string content) { future<void> write(string path, string content) {
return atask( [&path, &content] () { return asynco::async_( [&path, &content] () {
ofstream file (path); ofstream file (path);
if (file.is_open()) { if (file.is_open()) {
file << content; file << content;

@ -1,239 +0,0 @@
#ifndef _ROTOR_
#define _ROTOT_
#include "runner.hpp"
#include "chrono"
#include <memory>
#include "iostream"
using namespace std;
using namespace marcelb;
using namespace asynco;
namespace marcelb {
namespace asynco {
/**
* Get the time in ms from the epoch
*/
int64_t rtime_ms() {
return chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
int64_t rtime_us() {
return chrono::duration_cast<chrono::microseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
namespace {
/**
* Intern class for timer async loop
*/
class timer_core {
public:
mutex hangon;
condition_variable cv;
function<void()> callback;
int64_t time;
int64_t next;
bool repeat;
bool stop;
/**
* Timer constructor, receives a callback function and time
*/
timer_core( function<void()> _callback, int64_t _time, bool _repeat):
callback(_callback), time(_time*1000), repeat(_repeat), stop(false) {
next = rtime_us() + time;
}
/**
* Stop timer
*/
void clear() {
// lock_guard<mutex> hang(hangon);
stop = true;
cv.notify_one();
}
/**
* Destruktor of timer, call stop
*/
~timer_core() {
clear();
}
};
/**
* Event loop for time events
*/
class rotor {
vector<shared_ptr<timer_core>> tcores;
mutex te_m;
bool rotating = true;
int64_t sampling;
condition_variable te_cv;
/**
* Loop method, started by the constructor in a separate runner
* It checks the events on the stack and sends the expired ones to the runner
*/
void loop() {
while (rotating) {
vector<shared_ptr<timer_core>>::iterator next_tc;
shared_ptr<timer_core> next_ptr;
{
unique_lock<mutex> te_l(te_m);
te_cv.wait(te_l, [this]{ return !tcores.empty() || rotating; });
if (!rotating) {
break;
}
next_tc = min_element( tcores.begin(), tcores.end(),
[](shared_ptr<timer_core> a, shared_ptr<timer_core> b ) {
return a->next < b->next;
}
);
next_ptr = *next_tc;
}
unique_lock<mutex> next_l(next_ptr->hangon);
next_ptr->cv.wait_for(next_l, chrono::microseconds(next_ptr->next - rtime_us()), [&next_ptr] () {
return next_ptr->stop;
});
if (next_ptr->stop) {
remove(next_tc);
} else {
_asyncon.put_task(next_ptr->callback);
if (next_ptr->repeat) {
next_ptr->next += next_ptr->time;
}
else {
remove(next_tc);
}
}
}
}
/**
* The method deletes a non-repeating or stopped event from the stack
*/
void remove(vector<shared_ptr<timer_core>>::iterator it) {
lock_guard<mutex> lock(te_m);
tcores.erase(it);
// te_cv.notify_one();
}
public:
/**
* Constructor for the rotor, starts the given loop by occupying one runner
*/
rotor() {
_asyncon.put_task( [&] () {
loop();
});
};
/**
* Adds a time event to the stack
*/
void insert(shared_ptr<timer_core> tcore) {
lock_guard<mutex> lock(te_m);
tcores.push_back(tcore);
te_cv.notify_one();
};
/**
* Returns the number of active events
*/
int active() {
return tcores.size();
}
/**
* Stops all active events and stops the rotor
*/
~rotor() {
for (int i=0; i<tcores.size(); i++) {
tcores[i]->clear();
}
rotating = false;
}
};
}
/**
* It is intended that there is only one global declaration
*/
static rotor _rotor;
/**
* Core class for pure async timer functions
*/
class _timer_intern {
shared_ptr<timer_core> tcore;
public:
_timer_intern(function<void()> _callback, int64_t _time, bool repeat) {
tcore = make_shared<timer_core>(_callback, _time, repeat);
_rotor.insert(tcore);
}
/**
* Stop interval
*/
void clear() {
tcore->clear();
}
};
/**
* Class interval for periodic execution of the callback in time in ms
*/
class interval : public _timer_intern {
public:
/**
* The constructor receives a callback function and an interval time
*/
interval( function<void()> _callback, int64_t _time):
_timer_intern(_callback, _time, true) {
}
};
/**
* Class interval for delayed callback execution in ms
*/
class timeout : public _timer_intern {
public:
/**
* The constructor receives a callback function and a delay time
*/
timeout( function<void()> _callback, int64_t delay):
_timer_intern(_callback, delay, false) {
}
};
}
}
#endif

@ -1,136 +0,0 @@
#ifndef _RUNNER_
#define _RUNNER_
#include <thread>
#include <vector>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>
using namespace std;
namespace marcelb {
namespace asynco {
#define HW_CONCURRENCY_MINIMAL 4
/**
* The runner class implements multithread, task stack and event loop for asynchronous execution of tasks
*/
class runner {
private:
vector<thread> runners;
queue<function<void()>> tasks;
mutex q_io;
condition_variable cv;
bool stop;
public:
/**
* The constructor starts as many threads as the system has cores,
* and runs an event loop inside each one.
* Each event loop waits for tasks from the stack and executes them.
*/
runner(unsigned int _num_of_runners = 0) : stop(false) {
unsigned int num_of_runners = _num_of_runners;
if (num_of_runners == 0) {
#ifdef NUM_OF_RUNNERS
num_of_runners = NUM_OF_RUNNERS;
#else
num_of_runners = thread::hardware_concurrency();
if (num_of_runners < HW_CONCURRENCY_MINIMAL) {
num_of_runners = HW_CONCURRENCY_MINIMAL;
}
#endif
}
for (size_t i = 0; i < num_of_runners; ++i) {
runners.emplace_back( thread([&] {
while (!stop) {
function<void()> task;
{
unique_lock<mutex> lock(q_io);
cv.wait(lock, [this] { return stop || !tasks.empty(); });
// if (stop && tasks.empty())
if (stop)
return;
task = move(tasks.front());
tasks.pop();
}
task();
}
}));
}
}
/**
* With the method, we send the callback function and its arguments to the task stack
*/
template<class F, class... Args>
auto put_task(F&& f, Args&&... args)
-> future<typename result_of<F(Args...)>::type> {
using return_type = typename result_of<F(Args...)>::type;
auto task = make_shared<packaged_task<return_type()>>(bind(forward<F>(f), forward<Args>(args)...));
future<return_type> res = task->get_future();
{
unique_lock<mutex> lock(q_io);
if (stop) {
throw runtime_error("Pool is stoped!");
}
tasks.emplace([task]() { (*task)(); });
}
cv.notify_one();
return res;
}
/**
* Returns the number of tasks the runner has to perform
*/
unsigned int count_tasks() {
return tasks.size();
}
/**
* Returns the number of threads used by the runner
*/
unsigned int count_threads() {
return runners.size();
}
/**
* The destructor stops all loops and stops all threads
*/
~runner() {
{
unique_lock<mutex> lock(q_io);
stop = true;
}
cv.notify_all();
for (thread& runner : runners) {
runner.join();
}
runners.clear();
}
};
/**
* Internal global library variable
*/
static runner _asyncon;
}
}
#endif

@ -0,0 +1,161 @@
#ifndef _ASYNCO_TIMERS_
#define _ASYNCO_TIMERS_
#include <chrono>
using namespace std;
#include "asynco.hpp"
namespace marcelb {
namespace asynco {
/**
* Get the time in ms from the epoch
*/
int64_t rtime_ms();
/**
* Get the time in us from the epoch
*/
int64_t rtime_us();
/**
* Core timer class for construct time async functions
*/
class timer {
boost::asio::steady_timer st;
bool _stop = false;
bool repeate;
function<void()> callback;
uint64_t time;
uint64_t _ticks = 0;
/**
* A method to assign a callback wrapper and a reinitialization algorithm
*/
void init();
public:
/**
* The constructor creates the steady_timer and accompanying variables and runs a method to initialize the timer
*/
timer (function<void()> _callback, uint64_t _time, bool _repeate);
/**
* Stop timer
* The stop flag is set and timer remove it from the queue
*/
void stop();
/**
* Run callback now
* Forces the callback function to run independently of the timer
*/
void now();
/**
* Get the number of times the timer callback was runned
*/
uint64_t ticks();
/**
* The logic status of the timer stop state
*/
bool stoped();
/**
* The destructor stops the timer
*/
~timer();
};
/**
* Class periodic for periodic execution of the callback in time in ms
*/
class periodic {
shared_ptr<timer> _timer;
public:
/**
* Constructor initializes a shared pointer of type timer
*/
periodic(function<void()> callback, uint64_t time);
/**
* Stop periodic
* The stop flag is set and periodic remove it from the queue
*/
void stop();
/**
* Run callback now
* Forces the callback function to run independently of the periodic
*/
void now();
/**
* Get the number of times the periodic callback was runned
*/
uint64_t ticks();
/**
* The logic status of the periodic stop state
*/
bool stoped();
/**
* The destructor stops the periodic
*/
~periodic();
};
/**
* Class delayed for delayed callback execution in ms
*/
class delayed {
shared_ptr<timer> _timer;
public:
/**
* Constructor initializes a shared pointer of type timer
*/
delayed(function<void()> callback, uint64_t time);
/**
* Stop delayed
* The stop flag is set and delayed remove it from the queue
*/
void stop();
/**
* Run callback now
* Forces the callback function to run independently of the delayed
*/
void now();
/**
* Get is the delayed callback runned
*/
bool expired();
/**
* The logic status of the delayed stop state
*/
bool stoped();
/**
* The destructor stops the delayed
*/
~delayed();
};
shared_ptr<periodic> Periodic(function<void()> callback, uint64_t time);
shared_ptr<delayed> Delayed(function<void()> callback, uint64_t time);
}
}
#endif

@ -0,0 +1,92 @@
#ifndef _ASYNCO_TRIGGER_
#define _ASYNCO_TRIGGER_
#include <map>
#include <vector>
#include <string>
#include <functional>
using namespace std;
#include "engine.hpp"
namespace marcelb {
namespace asynco {
namespace triggers {
/**
* trigger class, for event-driven programming.
* These events are typed according to the arguments of the callback function
*/
template<typename... T>
class trigger {
private:
mutex m_eve;
unordered_map<string, vector<function<void(T...)>>> triggers;
public:
/**
* Defines event by key, and callback function
*/
void on(const string& key, function<void(T...)> callback) {
lock_guard _off(m_eve);
triggers[key].push_back(callback);
}
/**
* It emits an event and sends a callback function saved according to the key with the passed parameters
*/
template<typename... Args>
void tick(const string& key, Args... args) {
auto it_eve = triggers.find(key);
if (it_eve != triggers.end()) {
for (uint i =0; i<it_eve->second.size(); i++) {
auto callback = bind(it_eve->second[i], forward<Args>(args)...);
asynco::async_(callback);
}
}
}
/**
* Remove an trigger listener from an event
*/
void off(const string& key) {
lock_guard _off(m_eve);
triggers.erase(key);
}
/**
* Remove all trigger listener
*/
void off() {
lock_guard _off(m_eve);
triggers.clear();
}
/**
* Get num of listeners by an trigger key
*/
unsigned int listeners(const string& key) {
return triggers[key].size();
}
/**
* Get num of all listeners
*/
unsigned int listeners() {
unsigned int listeners = 0;
for (auto& ev : triggers) {
listeners += ev.second.size();
}
return listeners;
}
};
}
}
}
#endif

@ -0,0 +1,7 @@
#include "../lib/engine.hpp"
namespace marcelb::asynco {
Engine _asynco_engine;
};

@ -0,0 +1,145 @@
#include "../lib/timers.hpp"
namespace marcelb::asynco {
int64_t rtime_ms() {
return chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
int64_t rtime_us() {
return chrono::duration_cast<chrono::microseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
void timer::init() {
st.async_wait( [this] (const boost::system::error_code&) {
if (!_stop) {
callback();
if (repeate) {
st = boost::asio::steady_timer(_asynco_engine.io_context, boost::asio::chrono::milliseconds(time));
init();
}
_ticks++;
}
});
}
timer::timer (function<void()> _callback, uint64_t _time, bool _repeate) :
st(_asynco_engine.io_context, boost::asio::chrono::milliseconds(_time)),
_stop(false),
repeate(_repeate),
callback(_callback),
time(_time) {
init();
}
void timer::stop() {
_stop = true;
st.cancel();
}
void timer::now() {
st.cancel();
}
uint64_t timer::ticks() {
return _ticks;
}
bool timer::stoped() {
return _stop;
}
timer::~timer() {
stop();
}
periodic::periodic(function<void()> callback, uint64_t time) :
_timer(make_shared<timer> (callback, time, true)) {
}
void periodic::stop() {
_timer->stop();
}
void periodic::now() {
_timer->now();
}
uint64_t periodic::ticks() {
return _timer->ticks();
}
bool periodic::stoped() {
return _timer->stoped();
}
periodic::~periodic() {
stop();
}
delayed::delayed(function<void()> callback, uint64_t time) :
_timer(make_shared<timer> (callback, time, false)) {
}
void delayed::stop() {
_timer->stop();
}
void delayed::now() {
_timer->now();
}
bool delayed::expired() {
return bool(_timer->ticks());
}
bool delayed::stoped() {
return _timer->stoped();
}
delayed::~delayed() {
stop();
}
mutex p_io, d_io;
vector<shared_ptr<periodic>> periodic_calls_container;
vector<shared_ptr<delayed>> delayed_calls_container;
shared_ptr<periodic> Periodic(function<void()> callback, uint64_t time) {
shared_ptr<periodic> periodic_ptr(make_shared<periodic>(callback, time));
async_ ( [&, periodic_ptr](){
lock_guard<mutex> lock(p_io);
periodic_calls_container.push_back(periodic_ptr);
for (uint32_t i=0; i<periodic_calls_container.size(); i++) {
if (periodic_calls_container[i]->stoped()) {
periodic_calls_container.erase(periodic_calls_container.begin()+i);
i--;
}
}
});
return periodic_ptr;
}
shared_ptr<delayed> Delayed(function<void()> callback, uint64_t time) {
shared_ptr<delayed> delayed_ptr(make_shared<delayed>(callback, time));
async_ ( [&, delayed_ptr](){
lock_guard<mutex> lock(p_io);
delayed_calls_container.push_back(delayed_ptr);
for (uint32_t i=0; i<delayed_calls_container.size(); i++) {
if (delayed_calls_container[i]->stoped() || delayed_calls_container[i]->expired()) {
delayed_calls_container.erase(delayed_calls_container.begin()+i);
i--;
}
}
});
return delayed_ptr;
}
};

@ -0,0 +1 @@
g++ test.cpp ../src/* -o test

@ -1,22 +1,28 @@
// #define NUM_OF_RUNNERS 2 // // #define NUM_OF_RUNNERS 2
#include "../lib/asynco.hpp" #include "../lib/asynco.hpp"
#include "../lib/event.hpp" #include "../lib/trigger.hpp"
#include "../lib/rotor.hpp"
#include "../lib/filesystem.hpp" #include "../lib/filesystem.hpp"
#include "../lib/timers.hpp"
#include "../lib/define.hpp"
using namespace marcelb::asynco;
using namespace triggers;
#include <iostream> #include <iostream>
#include <unistd.h> #include <unistd.h>
#include <thread>
#include <future>
#include <vector>
using namespace std; using namespace std;
using namespace marcelb::asynco;
using namespace events;
using namespace asynco;
using namespace this_thread; using namespace this_thread;
void sleep_to (int _time) { void sleep_to (int _time) {
promise<void> _promise; promise<void> _promise;
timeout t( [&]() { delayed t( [&]() {
_promise.set_value(); _promise.set_value();
}, _time); }, _time);
@ -25,7 +31,7 @@ void sleep_to (int _time) {
void promise_reject (int _time) { void promise_reject (int _time) {
promise<void> _promise; promise<void> _promise;
timeout t( [&]() { delayed t( [&]() {
try { try {
// simulate except // simulate except
throw runtime_error("Error simulation"); throw runtime_error("Error simulation");
@ -51,9 +57,9 @@ class clm {
// ------------------ EXTEND OWN CLASS WITH EVENTS ------------------- // ------------------ EXTEND OWN CLASS WITH EVENTS -------------------
class myOwnClass : public event<int> { class myOwnClass : public trigger<int> {
public: public:
myOwnClass() : event() {}; myOwnClass() : trigger() {};
}; };
@ -64,86 +70,93 @@ int main () {
// --------------- TIME ASYNCHRONOUS FUNCTIONS -------------- // --------------- TIME ASYNCHRONOUS FUNCTIONS --------------
/** /**
* Init interval and timeout; clear interval and timeout * Init periodic and delayed; clear periodic and delayed
*/ */
vector<interval> intervals; // periodic inter1 ([&]() {
// cout << "periodic prvi " << rtime_ms() - start << endl;
// }, 1000);
for(int i=0; i<10; i++) { // periodic inter2 ([&]() {
intervals.push_back(interval( [i, &start]() { // cout << "periodic drugi " << rtime_ms() - start << endl;
cout << "interval " << i << " end: " << rtime_ms() - start << endl; // }, 2000);
}, (i%5 +1)*1000));
}
// interval( [&] () { // periodic inter3 ([&]() {
// cout << "interval 1: " << rtime_ms() - start << endl; // cout << "periodic treći " << rtime_ms() - start << endl;
// }, 50); // }, 1000);
// interval( [&] () { // periodic inter4 ([&]() {
// cout << "interval 1: " << rtime_ms() - start << endl; // // cout << "periodic cetvrti " << rtime_ms() - start << endl;
// }, 100); // cout << "Ticks " << inter3.ticks() << endl;
// }, 500);
// interval( [&] () { // periodic inter5 ([&]() {
// cout << "interval 2: " << rtime_ms() - start << endl; // cout << "periodic peti " << rtime_ms() - start << endl;
// }, 200); // }, 2000);
// interval( [&] () { // periodic inter6 ([&]() {
// cout << "interval 3: " << rtime_ms() - start << endl; // cout << "periodic sesti " << rtime_ms() - start << endl;
// }, 300); // }, 3000);
// delayed time1 ( [&] () {
// cout << "Close periodic 1 i 2 " << rtime_ms() - start << endl;
// inter1.stop();
// cout << "inter1.stop " << endl;
// inter2.stop();
// cout << "inter2.stop " << endl;
// }, 8000);
// interval( [&] () {
// cout << "interval 4: " << rtime_ms() - start << endl;
// }, 400);
// interval inter1 ([&]() { // delayed time2 ([&] () {
// cout << "interval prvi " << rtime_ms() - start << endl; // cout << "Close periodic 3 " << rtime_ms() - start << endl;
// }, 1000); // inter3.stop();
// cout << "Stoped " << inter3.stoped() << endl;
// // time1.stop();
// }, 5000);
// interval inter2 ([&]() {
// cout << "interval drugi " << rtime_ms() - start << endl;
// }, 2000);
// interval inter3 ([&]() { // if (time2.expired()) {
// cout << "interval treći " << rtime_ms() - start << endl; // cout << "isteko " << endl;
// }, 3000); // } else {
// cout << "nije isteko " << endl;
// }
// interval inter4 ([&]() { // // sleep(6);
// cout << "interval cetvrti " << rtime_ms() - start << endl;
// }, 1000);
// interval inter5 ([&]() { // if (time2.expired()) {
// cout << "interval peti " << rtime_ms() - start << endl; // cout << "isteko " << endl;
// }, 2000); // } else {
// cout << "nije isteko " << endl;
// }
// interval inter6 ([&]() { // auto d = Delayed( [](){
// cout << "interval sesti " << rtime_ms() - start << endl; // cout << "Delayed" << endl;
// }, 3000); // }, 2000);
// timeout time1 ( [&] () { // auto p = Periodic( [](){
// cout << "Close interval 1 i 2 " << rtime_ms() - start << endl; // cout << "Periodic" << endl;
// // inter1.clear(); // }, 700);
// // cout << "inter1.stop " << inter1.stop << endl;
// // inter2.clear();
// // cout << "inter2.stop " << inter2.stop << endl;
// }, 5000);
// Periodic( [&] (){
// cout << "Delayed expire " << d->expired() << endl;
// cout << "Periodic ticks " << p->ticks() << endl;
// cout << "Delayed stoped " << d->stoped() << endl;
// cout << "Periodic stoped " << p->stoped() << endl;
// }, 1000);
// timeout time2 ([&] () { // Delayed( [&](){
// cout << "Close interval 3 " << rtime_ms() - start << endl; // p->stop();
// // inter3.clear(); // }, 10000);
// time1.clear();
// }, 2000);
// // // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS ------------------------- // // // // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS -------------------------
// /** // // /**
// * Run an function asyncronic // // * Run an function asyncronic
// */ // // */
// atask( []() { // async_ ( []() {
// sleep_for(2s); // only for simulate log duration function // sleep_for(2s); // only for simulate log duration function
// cout << "atask 1" << endl; // cout << "asynco 1" << endl;
// return 5; // return 5;
// }); // });
@ -151,51 +164,56 @@ int main () {
// * Call not lambda function // * Call not lambda function
// */ // */
// atask (notLambdaFunction); // async_ (notLambdaFunction);
// wait ( // await_ (
// atask ( // async_ (
// notLambdaFunction // notLambdaFunction
// ) // )
// ); // );
// /**
// * Call class method // // async(launch::async, [] () {
// */ // // cout << "Another thread in async style!" << endl;
// // });
// // /**
// // * Call class method
// // */
// clm classes; // clm classes;
// atask( [&classes] () { // async_ ( [&classes] () {
// classes.classMethode(); // classes.classMethode();
// }); // });
// sleep(5); // sleep(5);
// /** // // /**
// * Wait after runned as async // // * await_ after runned as async
// */ // // */
// auto a = atask( []() { // auto aa = async_ ( []() {
// sleep_for(2s); // only for simulate log duration function // sleep_for(2s); // only for simulate log duration function
// cout << "atask 2" << endl; // cout << "async_ 2" << endl;
// return 5; // return 5;
// }); // });
// cout << wait(a) << endl; // cout << await_(aa) << endl;
// cout << "print after atask 2" << endl; // cout << "print after async_ 2" << endl;
// /** // /**
// * Wait async function call and use i cout // * await_ async function call and use i cout
// */ // */
// cout << wait(atask( [] () { // cout << await_(async_ ( [] () {
// sleep_for(chrono::seconds(1)); // only for simulate log duration function // sleep_for(chrono::seconds(1)); // only for simulate log duration function
// cout << "wait end" << endl; // cout << "await_ end" << endl;
// return 4; // return 4;
// })) << endl; // })) << endl;
// /** // /**
// * Sleep with timeout sleep implement // * Sleep with delayed sleep implement
// */ // */
// sleep_to(3000); // sleep_to(3000);
@ -219,22 +237,76 @@ int main () {
// */ // */
// atask( [] { // async_ ( [] {
// cout << "idemo ..." << endl; // cout << "idemo ..." << endl;
// atask( [] { // async_ ( [] {
// cout << "ugdnježdena async funkcija " << endl; // cout << "ugdnježdena async funkcija " << endl;
// }); // });
// }); // });
// // -------------------------- AWAIT ALL ----------------------------------
// auto a = async_ ( []() {
// cout << "A" << endl;
// return 3;
// });
// auto b = async_ ( []() {
// cout << "B" << endl;
// throw runtime_error("Test exception");
// return;
// });
// auto c = async_ ( []() {
// cout << "C" << endl;
// return "Hello";
// });
// int a_;
// string c_;
// auto await_all = [&] () {
// a_ = await_(a);
// await_(b);
// c_ = await_(c);
// };
// try {
// await_all();
// cout << "a_ " << a_ << " c_ " << c_ << endl;
// } catch (const exception& exc) {
// cout << exc.what() << endl;
// }
// // // same type
// vector<future<void>> fut_vec;
// for (int i=0; i<5; i++) {
// fut_vec.push_back(
// async_ ( [i]() {
// cout << "Async_ " << i << endl;
// })
// );
// }
// auto await_all2 = [&] () {
// for (int i=0; i<fut_vec.size(); i++) {
// await_ (fut_vec[i]);
// }
// };
// await_all2();
// // --------------- EVENTS ------------------- // // --------------- EVENTS -------------------
// /** // /**
// * initialization of typed events // * initialization of typed events
// */ // */
// event<int, int> ev2int; // trigger<int, int> ev2int;
// event<int, string> evintString; // trigger<int, string> evintString;
// event<> evoid; // trigger<> evoid;
// ev2int.on("sum", [](int a, int b) { // ev2int.on("sum", [](int a, int b) {
// cout << "Sum " << a+b << endl; // cout << "Sum " << a+b << endl;
@ -258,22 +330,28 @@ int main () {
// cout << "Void emited " << emited2 << endl; // cout << "Void emited " << emited2 << endl;
// }); // });
// evoid.emit("void"); // evoid.tick("void");
// sleep(1); // sleep(1);
// /** // /**
// * Emit // * Emit
// */ // */
// ev2int.emit("sum", 5, 8); // ev2int.tick("sum", 5, 8);
// sleep(1); // sleep(1);
// evintString.emit("substract", 3, to_string(2)); // evintString.tick("substract", 3, to_string(2));
// sleep(1); // sleep(1);
// evoid.off("void"); // evoid.off("void");
// evoid.emit("void"); // evoid.tick("void");
// cout << "Ukupno 2 int " << ev2int.listeners() << endl;
// cout << "Ukupno evintString " << evintString.listeners() << endl;
// cout << "Ukupno evoid " << evoid.listeners() << endl;
// cout << "Ukupno 2 int " << ev2int.listeners("sum") << endl;
// /** // /**
// * Own class // * Own class
@ -281,8 +359,8 @@ int main () {
// myOwnClass myclass; // myOwnClass myclass;
// timeout t( [&] { // delayed t( [&] {
// myclass.emit("constructed", 1); // myclass.tick("constructed", 1);
// }, 200); // }, 200);
// myclass.on("constructed", [] (int i) { // myclass.on("constructed", [] (int i) {
@ -295,14 +373,32 @@ int main () {
// try { // try {
// auto data = wait(status); // auto data = await_(status);
// cout << data; // cout << data;
// } catch (exception& err) { // } catch (exception& err) {
// cout << err.what() << endl; // cout << err.what() << endl;
// } // }
cout << "Sleep" << endl;
sleep(100000); // only for testing // string data_;
// auto start_read = rtime_us();
// fs::read("test1.txt", [&data_, &start_read] (string data, exception* error) {
// if (error) {
// cout << "Error " << error->what() << endl;
// } else {
// // cout << "Data " << endl << data << endl;
// // data_ = data;
// // cout << "Data_" << data_ << endl;
// cout << "read " << rtime_us() - start_read << endl;
// }
// });
// // ----------------------------------------------------------------------------------------------------
cout << "Run" << endl;
_asynco_engine.run();
return 0; return 0;
} }

Loading…
Cancel
Save