Timer functions, test, comments and readme

dev-own-engine
marcelb 9 months ago
parent d3c7b3a91b
commit 8fdf56d1bb
  1. 1
      .gitignore
  2. 216
      README.md
  3. 164
      example/asynco.hpp
  4. 159
      lib/asynco.hpp
  5. 20
      lib/event.hpp
  6. 212
      lib/rotor.hpp
  7. 47
      lib/runner.hpp
  8. BIN
      test/test
  9. 186
      test/test.cpp

1
.gitignore vendored

@ -0,0 +1 @@
test/test

@ -0,0 +1,216 @@
# Asynco
A C++ library for event-driven asynchronous multi-threaded programming.
## Features
- Object oriented
- Small and easy to integrate
- Header only
- Asynchronous launch functions
- Multithread parallel execution of tasks
- Timer functions: interval, timeout
- Events (on, emit)
- Event loop
## Installation
Just download the latest release and unzip it into your project.
```c++
#include "asynco/lib/asynco.hpp" // asynco(), wait()
#include "asynco/lib/event.hpp" // event
#include "asynco/lib/rotor.hpp" // interval, timeout
#include "asynco/lib/runner.hpp" // on_async
using namespace marcelb;
#ifndef ON_RUNNER
#define ON_RUNNER
runner on_async;
#endif
```
## Usage
Time asynchronous functions
```c++
// start interval
interval inter1 ([&]() {
cout << "Interval 1" << endl;
}, 1000);
// stop interval
inter1.clear();
// start timeout
timeout time1 ( [&] () {
cout << "Timeout 1 " << endl;
}, 10000);
// stop timeout
time1.clear();
```
Make functions asynchronous
```c++
/**
* Put task directly and get returned value - it is not recommended to use it
*/
auto res1 = on_async.put_task( [] () {
cout << "Jebiga " <<endl;
throw string ("jebiga!!");
});
try {
res1.get();
} catch (const string except) {
cout << except << endl;
}
/**
* Run an function asyncronic
*/
asynco( []() {
sleep_for(2s); // only for simulate log duration function
cout << "asynco" << endl;
return 5;
});
/**
* Wait after runned as async
*/
auto a = asynco( []() {
sleep_for(2s); // only for simulate log duration function
cout << "asynco" << endl;
return 5;
});
cout << wait(move(a)) << endl;
/**
* Wait async function call and use i cout
*/
cout << wait(asynco( [] () {
sleep_for(chrono::seconds(1)); // only for simulate log duration function
cout << "wait end" << endl;
return 4;
})) << endl;
/**
* Sleep with timeout sleep implement
*/
void sleep_to (int _time) {
promise<void> _promise;
timeout t( [&]() {
_promise.set_value();
}, _time);
return _promise.get_future().get();
}
sleep_to(3000);
/**
* Catch promise reject
*/
void promise_reject (int _time) {
promise<void> _promise;
timeout t( [&]() {
try {
// simulate except
throw runtime_error("Error simulation");
_promise.set_value();
} catch (...) {
_promise.set_exception(current_exception());
}
}, _time);
return _promise.get_future().get();
}
try {
promise_reject(3000);
} catch (runtime_error err) {
cout<< err.what() << endl;
}
```
Events
```c++
/**
* initialization of typed events
*/
event<int, int> ev2int;
event<int, string> evintString;
event<> evoid;
ev2int.on("sum", [](int a, int b) {
cout << "Sum " << a+b << endl;
});
evintString.on("substract", [](int a, string b) {
cout << "Substract " << a-stoi(b) << endl;
});
evoid.on("void", []() {
cout << "Void emited" << endl;
});
sleep(1);
/**
* Emit
*/
ev2int.emit("sum", 5, 8);
sleep(1);
evintString.emit("substract", 3, to_string(2));
sleep(1);
evoid.emit("void");
```
Extend own class whit events
```c++
class myOwnClass : public event<int> {
public:
myOwnClass() : event() {};
};
myOwnClass myclass;
timeout t( [&] {
myclass.emit("constructed", 1);
}, 200);
myclass.on("constructed", [] (int i) {
cout << "Constructed " << i << endl;
});
```
## License
[APACHE 2.0](http://www.apache.org/licenses/LICENSE-2.0/)
## Support & Feedback
For support and any feedback, contact the address: marcelb96@yahoo.com.
## Contributing
Contributions are always welcome!
Feel free to fork and start working with or without a later pull request. Or contact for suggest and request an option.

@ -1,164 +0,0 @@
#ifndef _ASYNCO_
#define _ASYNCO_
#include <vector>
#include <mutex>
#include <future>
#include <thread>
using namespace std;
namespace marcelb {
class interval;
class timeout;
class loop_core {
vector<interval> intervals;
vector<timeout> timeouts;
time_t sampling;
mutex i_m, t_m;
future<void> bot;
loop_core() {
bot = async(launch::async, [this] () {
loop();
});
// on_async.put_task( [this] () {
// loop();
// });
}
void run(interval& _interval) {
lock_guard<mutex> lock(i_m);
intervals.push_back(_interval);
update_sampling();
}
void run(timeout& _timeout) {
lock_guard<mutex> lock(t_m);
timeouts.push_back(_timeout);
update_sampling();
}
void loop() {
while (true) {
for (auto& _interval : intervals) {
int64_t now = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
if (now - _interval.execute >= _interval._duration) {
_interval.callback();
_interval.execute = now;
}
}
for (int i=0; i<timeouts.size(); i++) {
int64_t now = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
if (now - timeouts[i]._construct >= timeouts[i]._duration) {
auto& _timeout = timeouts[i];
{
lock_guard<mutex> lock(t_m);
timeouts.erase(timeouts.begin() + i);
}
_timeout.callback();
}
}
this_thread::sleep_for(chrono::milliseconds(sampling));
}
}
void update_sampling() {
sampling = 0;
for (auto& _interval : intervals) {
sampling += _interval._duration;
}
for (auto& _timeout : timeouts) {
sampling += _timeout._duration;
}
sampling /= (intervals.size() + timeouts.size())*2;
}
};
loop_core co_loop;
class interval {
public:
bool run = true;
function<void()> callback;
const time_t _duration;
time_t execute = 0;
// public:
interval(function<void()> func, const time_t duration): callback(func), _duration(duration) {
#ifndef ON_ASYNC
throw string("Not on_async defined!");
#endif
auto task = [&] () {
while (run) {
// this_thread::sleep_for(chrono::milliseconds(_duration));
// if (run) {
callback();
// }
}
};
// on_async.put_task(task);
co_loop.run(this*);
}
void clear() {
run = false;
}
~interval() {
clear();
}
};
class timeout {
public:
bool run = true;
function<void()> callback;
const time_t _duration;
int64_t _construct =
chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch())
.count();
// public:
timeout(function<void()> f, const time_t duration): callback(f), _duration(duration) {
#ifndef ON_ASYNC
throw string("Not on_async defined!");
#endif
auto task = [&] () {
// int64_t _start =
// chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch())
// .count();
// this_thread::sleep_for(chrono::milliseconds(_duration - (_start - _construct)));
if (run) {
callback();
}
};
// on_async.put_task(task);
co_loop.run(this*);
}
void clear() {
run = false;
}
~timeout() {
clear();
}
};
}
#endif

@ -1,162 +1,20 @@
#ifndef _ASYNCO_
#define _ASYNCO_
#include "loop.hpp"
#include "runner.hpp"
using namespace std;
namespace marcelb {
#ifndef ON_ASYNC
#define ON_ASYNC
AsyncLoop on_async;
#ifndef ON_RUNNER
#define ON_RUNNER
runner on_async;
#endif
class interval;
class timeout;
class loop_core {
static vector<interval> intervals;
static vector<timeout> timeouts;
time_t sampling;
mutex i_m, t_m;
loop_core() {
on_async.put_task( [this] () {
loop();
});
}
void run(interval& _interval) {
lock_guard<mutex> lock(i_m);
intervals.push_back(_interval);
update_sampling();
}
void run(timeout& _timeout) {
lock_guard<mutex> lock(t_m);
timeouts.push_back(_timeout);
update_sampling();
}
void loop() {
while (true) {
for (auto& _interval : intervals) {
int64_t now = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
if (now - _interval.execute >= _interval._duration) {
_interval.callback();
_interval.execute = now;
}
}
for (int i=0; i<timeouts.size(); i++) {
int64_t now = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
if (now - timeouts[i]._construct >= timeouts[i]._duration) {
auto& _timeout = timeouts[i];
{
lock_guard<mutex> lock(t_m);
timeouts.erase(timeouts.begin() + i);
}
_timeout.callback();
}
}
sleep_for(chrono::milliseconds(sampling));
}
}
void update_sampling() {
sampling = 0;
for (auto& _interval : intervals) {
sampling += _interval._duration;
}
for (auto& _timeout : timeouts) {
sampling += _timeout._duration;
}
sampling /= (intervals.size() + timeouts.size())*2;
}
};
loop_core co_loop;
class interval {
public:
bool run = true;
function<void()> callback;
const time_t _duration;
time_t execute = 0;
// public:
interval(function<void()> func, const time_t duration): callback(func), _duration(duration) {
#ifndef ON_ASYNC
throw string("Not on_async defined!");
#endif
auto task = [&] () {
while (run) {
// this_thread::sleep_for(chrono::milliseconds(_duration));
// if (run) {
callback();
// }
}
};
// on_async.put_task(task);
co_loop.run(this*);
}
void clear() {
run = false;
}
~interval() {
clear();
}
};
class timeout {
public:
bool run = true;
function<void()> callback;
const time_t _duration;
int64_t _construct =
chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch())
.count();
// public:
timeout(function<void()> f, const time_t duration): callback(f), _duration(duration) {
#ifndef ON_ASYNC
throw string("Not on_async defined!");
#endif
auto task = [&] () {
// int64_t _start =
// chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch())
// .count();
// this_thread::sleep_for(chrono::milliseconds(_duration - (_start - _construct)));
if (run) {
callback();
}
};
// on_async.put_task(task);
co_loop.run(this*);
}
void clear() {
run = false;
}
~timeout() {
clear();
}
};
/**
* Run the function asynchronously
*/
template<class F, class... Args>
auto asynco(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type> {
using return_type = typename result_of<F(Args...)>::type;
@ -165,6 +23,9 @@ auto asynco(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::typ
return res;
}
/**
* Block until the asynchronous call completes
*/
template<typename T>
T wait(future<T> r) {
return r.get();

@ -5,28 +5,38 @@
#include <map>
#include <string>
#include <functional>
#include "loop.hpp"
#include "runner.hpp"
using namespace std;
namespace marcelb {
#ifndef ON_ASYNC
#define ON_ASYNC
AsyncLoop on_async;
#ifndef ON_RUNNER
#define ON_RUNNER
runner on_async;
#endif
/**
* Event class, for event-driven programming.
* These events are typed according to the arguments of the callback function
*/
template<typename... T>
class event {
private:
unordered_map<string, function<void(T...)>> events;
public:
/**
* Defines event by key, and callback function
*/
void on(const string& key, function<void(T...)> callback) {
events[key] = callback;
}
/**
* It emits an event and sends a callback function saved according to the key with the passed parameters
*/
template<typename... Args>
void emit(const string& key, Args... args) {
auto it = events.find(key);

@ -0,0 +1,212 @@
#ifndef _ROTOR_
#define _ROTOT_
#include "runner.hpp"
#include "chrono"
#include "iostream"
using namespace std;
using namespace marcelb;
#ifndef ON_RUNNER
#define ON_RUNNER
runner on_async;
#endif
namespace marcelb {
/**
* Get the time in ms from the epoch
*/
int64_t rtime_ms() {
return chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
/**
* Structure for time events
*/
struct time_event {
function<void()> callback;
int64_t init;
int64_t time;
bool repeat;
bool stop;
};
/**
* Event loop for time events
*/
class rotor {
vector<struct time_event *> tevents;
mutex te_m;
bool rotating = true;
int64_t sampling;
/**
* Loop method, started by the constructor in a separate runner
* It checks the events on the stack and sends the expired ones to the runner
*/
void loop() {
while (rotating) {
for (int i=0; i<tevents.size(); i++) {
if (tevents[i]->stop) {
remove(i);
i--;
}
else if (expired(tevents[i])) {
on_async.put_task(tevents[i]->callback);
if (tevents[i]->repeat) {
tevents[i]->init = rtime_ms();
}
else {
remove(i);
i--;
}
}
}
this_thread::sleep_for(chrono::milliseconds(sampling));
}
}
/**
* The method checks whether the time event has expired
*/
bool expired(struct time_event *tevent) {
return rtime_ms() - tevent->init >= tevent->time;
}
/**
* The method deletes a non-repeating or stopped event from the stack
*/
void remove(const int& position) {
lock_guard<mutex> lock(te_m);
tevents.erase(tevents.begin()+position);
}
/**
* Updates the idle time of the loop, according to twice the frequency of available events
*/
void update_sampling() {
sampling = tevents[0]->time;
for (int i=0; i<tevents.size(); i++) {
if (sampling > tevents[i]->time) {
sampling = tevents[i]->time;
}
}
sampling /= tevents.size()*2;
}
public:
/**
* Constructor for the rotor, starts the given loop by occupying one runner
*/
rotor() {
on_async.put_task( [&] () {
loop();
});
};
/**
* Adds a time event to the stack
*/
void insert(struct time_event *tevent) {
lock_guard<mutex> lock(te_m);
tevents.push_back(tevent);
update_sampling();
};
/**
* Returns the number of active events
*/
int active() {
return tevents.size();
}
/**
* Stops all active events and stops the rotor
*/
~rotor() {
for (int i=0; i<tevents.size(); i++) {
tevents[i]->stop = true;
}
rotating = false;
}
};
/**
* It is intended that there is only one global declaration
*/
rotor _rotor;
/**
* A class for all timer functions
*/
class timer_core {
public:
struct time_event t_event;
/**
* Timer constructor, receives a callback function and time
*/
timer_core( function<void()> _callback, int64_t _time):
t_event({ _callback, rtime_ms(), _time, false, false }) {
}
/**
* Stop timer
*/
void clear() {
t_event.stop = true;
}
/**
* Destruktor of timer, call stop
*/
~timer_core() {
clear();
}
};
/**
* Class interval for periodic execution of the callback in time in ms
*/
class interval : public timer_core {
public:
/**
* The constructor receives a callback function and an interval time
*/
interval( function<void()> _callback, int64_t _time): timer_core(_callback, _time) {
t_event.repeat = true;
_rotor.insert(&t_event);
}
};
/**
* Class interval for delayed callback execution in ms
*/
class timeout : public timer_core {
public:
/**
* The constructor receives a callback function and a delay time
*/
timeout( function<void()> _callback, int64_t delay): timer_core(_callback, delay) {
t_event.repeat = false;
_rotor.insert(&t_event);
}
};
}
#endif

@ -1,5 +1,5 @@
#ifndef _LOOP_
#define _LOOP_
#ifndef _RUNNER_
#define _RUNNER_
#include <thread>
#include <vector>
@ -13,22 +13,31 @@ using namespace std;
namespace marcelb {
#ifdef ON_ASYNC
extern AsyncLoop on_async;
#ifdef ON_RUNNER
extern runner on_async;
#endif
class AsyncLoop {
/**
* The runner class implements multithread, task stack and event loop for asynchronous execution of tasks
*/
class runner {
private:
vector<thread> workers;
vector<thread> runners;
queue<function<void()>> tasks;
mutex q_io;
condition_variable cv;
bool stop;
public:
AsyncLoop(size_t pool_size = thread::hardware_concurrency()) : stop(false) {
/**
* The constructor starts as many threads as the system has cores,
* and runs an event loop inside each one.
* Each event loop waits for tasks from the stack and executes them.
*/
runner(size_t pool_size = thread::hardware_concurrency()) : stop(false) {
for (size_t i = 0; i < pool_size; ++i) {
workers.emplace_back([this] {
runners.emplace_back( thread([this] {
while (true) {
function<void()> task;
{
@ -41,10 +50,13 @@ class AsyncLoop {
}
task();
}
});
}));
}
}
/**
* With the method, we send the callback function and its arguments to the task stack
*/
template<class F, class... Args>
auto put_task(F&& f, Args&&... args)
-> future<typename result_of<F(Args...)>::type> {
@ -66,22 +78,31 @@ class AsyncLoop {
return res;
}
/**
* Returns the number of tasks the runner has to perform
*/
unsigned int count_tasks() {
return tasks.size();
}
/**
* Returns the number of threads used by the runner
*/
unsigned int count_threads() {
return workers.size();
return runners.size();
}
~AsyncLoop() {
/**
* The destructor stops all loops and stops all threads
*/
~runner() {
{
unique_lock<mutex> lock(q_io);
stop = true;
}
cv.notify_all();
for (thread& worker : workers) {
worker.join();
for (thread& runner : runners) {
runner.join();
}
}

Binary file not shown.

@ -1,8 +1,9 @@
// #include "../lib/loop.hpp"
// #include "../lib/asynco.hpp"
// #include "../example/asynco.hpp"
#include "../lib/runner.hpp"
#include "../lib/asynco.hpp"
#include "../lib/event.hpp"
#include "../lib/rotor.hpp"
#include <iostream>
#include <unistd.h>
@ -10,49 +11,85 @@ using namespace std;
using namespace marcelb;
using namespace this_thread;
#ifndef ON_ASYNC
#define ON_ASYNC
AsyncLoop on_async(8);
#ifndef ON_RUNNER
#define ON_RUNNER
runner on_async;
#endif
void sleep_to (int _time) {
promise<void> _promise;
timeout t( [&]() {
_promise.set_value();
}, _time);
return _promise.get_future().get();
}
void promise_reject (int _time) {
promise<void> _promise;
timeout t( [&]() {
try {
// simulate except
throw runtime_error("Error simulation");
_promise.set_value();
} catch (...) {
_promise.set_exception(current_exception());
}
}, _time);
return _promise.get_future().get();
}
// ------------------ EXTEND OWN CLASS WITH EVENTS -------------------
class myOwnClass : public event<int> {
public:
myOwnClass() : event() {};
};
int main () {
// auto start = chrono::high_resolution_clock::now();
auto start = rtime_ms();
// --------------- TIME ASYNCHRONOUS FUNCTIONS --------------
/**
* Init interval and timeout; clear interval and timeout
*/
// interval inter1 ([&]() {
// cout << "interval prvi " << chrono::duration_cast<chrono::milliseconds>(chrono::high_resolution_clock::now() - start).count() << endl;
// cout << "interval prvi " << rtime_ms() - start << endl;
// }, 1000);
// interval inter2 ([&]() {
// cout << "interval drugi " << chrono::duration_cast<chrono::milliseconds>(chrono::high_resolution_clock::now() - start).count() << endl;
// cout << "interval drugi " << rtime_ms() - start << endl;
// }, 2000);
// interval inter3 ([&]() {
// cout << "interval treći " << chrono::duration_cast<chrono::milliseconds>(chrono::high_resolution_clock::now() - start).count() << endl;
// cout << "interval treći " << rtime_ms() - start << endl;
// }, 3000);
// timeout time1 ( [&] () {
// cout << "Close interval 1 i 2 " << chrono::duration_cast<chrono::milliseconds>(chrono::high_resolution_clock::now() - start).count() << endl;
// cout << "Close interval 1 i 2 " << rtime_ms() - start << endl;
// inter1.clear();
// inter2.clear();
// }, 10000);
// timeout time2 ([&] () {
// cout << "Close interval 3 " << chrono::duration_cast<chrono::milliseconds>(chrono::high_resolution_clock::now() - start).count() << endl;
// cout << "Close interval 3 " << rtime_ms() - start << endl;
// inter3.clear();
// time1.clear();
// }, 2000);
// cout << "zadataka: " << on_async.count_tasks() << " niti: " << on_async.count_threads() << endl;
// ------------------------ MAKE FUNCTIONS ASYNCHRONOUS -------------------------
// for (int i = 0; i < 8; ++i) {
// pool.put_task( [&] (int id) {
// this_thread::sleep_for(chrono::seconds(1));
// cout << a*i << endl;
// }, i);
// }
/**
* Put task directly and get returned value - it is not recommended to use it
*/
// auto res1 = pool.put_task( [] () {
// auto res1 = on_async.put_task( [] () {
// cout << "Jebiga " <<endl;
// throw string ("jebiga!!");
// });
@ -63,33 +100,80 @@ int main () {
// cout << except << endl;
// }
// auto res = pool.put_task( []() {
// this_thread::sleep_for(chrono::seconds(1));
// return 4;
/**
* Run an function asyncronic
*/
// asynco( []() {
// sleep_for(2s); // only for simulate log duration function
// cout << "asynco" << endl;
// return 5;
// });
/**
* Wait after runned as async
*/
// auto a = asynco( []() {
// sleep_for(2s); // only for simulate log duration function
// cout << "asynco" << endl;
// return 5;
// });
// cout << wait(move(a)) << endl;
/**
* Wait async function call and use i cout
*/
// cout << wait(asynco( [] () {
// sleep_for(chrono::seconds(1));
// cout << "RETURN" << endl;
// sleep_for(chrono::seconds(1)); // only for simulate log duration function
// cout << "wait end" << endl;
// return 4;
// })) << endl;
// asynco( []() {
// sleep_for(2s);
// cout << "RETURN 2" << endl;
// return 5;
// });
/**
* Sleep with timeout sleep implement
*/
// sleep_to(3000);
// cout << "sleep_to " << rtime_ms() - start << endl;
/**
* Catch promise reject
*/
// try {
// promise_reject(3000);
// } catch (runtime_error err) {
// cout<< err.what() << endl;
// }
// cout << "promise_reject " << rtime_ms() - start << endl;
// cout << wait(pool.put_task( []() {
// this_thread::sleep_for(chrono::seconds(1));
// return 7;
// }));
/**
* Nested asynchronous invocation
*/
// cout << wait(run1) << endl;
// asynco( [] {
// cout << "idemo ..." << endl;
// asynco( [] {
// cout << "ugdnježdena async funkcija " << endl;
// });
// });
// --------------- EVENTS -------------------
/**
* initialization of typed events
*/
// event<int, int> ev2int;
// event<int, string> evintString;
// event<> evoid;
// ev2int.on("sum", [](int a, int b) {
// cout << "Sum " << a+b << endl;
@ -99,15 +183,39 @@ int main () {
// cout << "Substract " << a-stoi(b) << endl;
// });
// sleep(5);
// evoid.on("void", []() {
// cout << "Void emited" << endl;
// });
// sleep(1);
/**
* Emit
*/
// ev2int.emit("sum", 5, 8);
// sleep(2);
// sleep(1);
// evintString.emit("substract", 3, to_string(2));
// sleep(1);
// evoid.emit("void");
/**
* Own class
*/
// myOwnClass myclass;
// timeout t( [&] {
// myclass.emit("constructed", 1);
// }, 200);
// myclass.on("constructed", [] (int i) {
// cout << "Constructed " << i << endl;
// });
sleep(1000);
sleep(10000); // only for testing
return 0;
}

Loading…
Cancel
Save