Support Boost.Asio coroutine, clean, rename

This commit is contained in:
marcelb 2025-03-28 23:00:01 +01:00
parent 1853318016
commit aca2724e0c
8 changed files with 322 additions and 188 deletions

View File

@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 3.10)
project(Asynco)
# Postavi verziju projekta
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# Pronađi Boost biblioteku (ako nije uobičajeni direktorijum, postavi put)

View File

@ -22,6 +22,7 @@ The asynchronous filesystem is provided solely to guide users on how to wrap any
- Multiple parallel execution loops
- Asynchronous file IO
- Based on ASIO (Boost Asio)
- On C++20 support Boost.Asio coroutines
## Installation
Just download the latest release and unzip it into your project.
@ -36,7 +37,6 @@ Just download the latest release and unzip it into your project.
using namespace marcelb;
using namespace asynco;
using namespace triggers;
// At the end of the main function, always set
_asynco_engine.run();
@ -419,6 +419,55 @@ try {
```
## Coroutine
If `define.hpp` is included, you can initialize coroutines using `coroutine<T>`; if not, just use `boost::asio::awaitable<T>`.
```c++
coroutine<int> c2(int a) {
co_return a * 2;
}
```
To run the coroutine at runtime, simply call:
```c++
async_(c2(4));
```
Or using a lambda expression:
```c++
async_([]() -> coroutine<void> {
std::cout << "Hello" << std::endl;
co_await c2(4);
co_return;
}());
```
To retrieve results from coroutines, you can do so as you would from classical functions by calling `await_`:
```c++
int r = await_(
async_(
c2(10)
));
```
Timers and triggers work the same with coroutines; it is important to call the coroutine with `async_` in the callback, and to call `async_`, wrap it with a lambda expression:
```c++
Periodic p([]() {
async_(c2(34));
}, 2000);
```
If you need a result, you can also retrieve it with `await_`.
## License
[APACHE 2.0](http://www.apache.org/licenses/LICENSE-2.0/)

View File

@ -3,9 +3,14 @@
#include "engine.hpp"
#include <iostream>
using namespace std;
#if __cplusplus >= 202002L
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/use_awaitable.hpp>
#endif
namespace marcelb {
namespace asynco {
@ -19,6 +24,34 @@ auto async_(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::typ
return res;
}
#if __cplusplus >= 202002L
/**
* Run the coroutine asynchronously
*/
template <typename T>
std::future<T> async_(boost::asio::awaitable<T> _coroutine) {
std::promise<T> promise;
auto future = promise.get_future();
co_spawn(_asynco_engine.io_context, [_coroutine = std::move(_coroutine), promise = std::move(promise)]() mutable -> boost::asio::awaitable<void> {
try {
if constexpr (!std::is_void_v<T>) {
T result = co_await std::move(_coroutine);
promise.set_value(std::move(result));
} else {
co_await std::move(_coroutine);
promise.set_value(); // Za void ne postavljamo rezultat
}
} catch (...) {
promise.set_exception(std::current_exception()); // Postavljamo izuzetak
}
}, boost::asio::detached);
return future;
}
#endif
/**
* Block until the asynchronous call completes
*/
@ -38,24 +71,24 @@ T await_(future<T>&& r) {
/**
* Block until the asynchronous call completes or time expired
*/
template<typename T>
T await_(future<T>& r, uint64_t time) {
if (r.wait_for(chrono::milliseconds(time)) == std::future_status::timeout) {
throw runtime_error("Asynchronous execution timed out");
}
return r.get();
}
// template<typename T>
// T await_(future<T>& r, uint64_t time) {
// if (r.wait_for(chrono::milliseconds(time)) == std::future_status::timeout) {
// throw runtime_error("Asynchronous execution timed out");
// }
// return r.get();
// }
/**
* Block until the asynchronous call completes or time expired
*/
template<typename T>
T await_(future<T>&& r, uint64_t time) {
if (r.wait_for(chrono::milliseconds(time)) == std::future_status::timeout) {
throw runtime_error("Asynchronous execution timed out");
}
return move(r).get();
}
// template<typename T>
// T await_(future<T>&& r, uint64_t time) {
// if (r.wait_for(chrono::milliseconds(time)) == std::future_status::timeout) {
// throw runtime_error("Asynchronous execution timed out");
// }
// return move(r).get();
// }
}
}

View File

@ -11,6 +11,10 @@ namespace asynco {
#define async_ marcelb::asynco::async_
#define await_ marcelb::asynco::await_
#if __cplusplus >= 202002L
#define coroutine boost::asio::awaitable
#endif
}
}

View File

@ -24,7 +24,7 @@ int64_t rtime_us();
/**
* Core timer class for construct time async functions
*/
class timer {
class Timer {
boost::asio::steady_timer st;
bool _stop = false;
bool repeate;
@ -35,14 +35,13 @@ class timer {
/**
* A method to assign a callback wrapper and a reinitialization algorithm
*/
void init();
void init();
public:
/**
* The constructor creates the steady_timer and accompanying variables and runs a method to initialize the timer
*/
timer (function<void()> _callback, uint64_t _time, bool _repeate);
Timer (function<void()> _callback, uint64_t _time, bool _repeate);
/**
* Stop timer
@ -68,21 +67,21 @@ class timer {
/**
* The destructor stops the timer
*/
~timer();
~Timer();
};
/**
* Class periodic for periodic execution of the callback in time in ms
*/
class periodic {
shared_ptr<timer> _timer;
class Periodic {
shared_ptr<Timer> _timer;
public:
/**
* Constructor initializes a shared pointer of type timer
*/
periodic(function<void()> callback, uint64_t time);
Periodic(function<void()> callback, uint64_t time);
/**
* Stop periodic
@ -108,21 +107,21 @@ class periodic {
/**
* The destructor stops the periodic
*/
~periodic();
~Periodic();
};
/**
* Class delayed for delayed callback execution in ms
*/
class delayed {
shared_ptr<timer> _timer;
class Delayed {
shared_ptr<Timer> _timer;
public:
/**
* Constructor initializes a shared pointer of type timer
*/
delayed(function<void()> callback, uint64_t time);
Delayed(function<void()> callback, uint64_t time);
/**
* Stop delayed
@ -147,13 +146,10 @@ class delayed {
/**
* The destructor stops the delayed
*/
~delayed();
~Delayed();
};
shared_ptr<periodic> Periodic(function<void()> callback, uint64_t time);
shared_ptr<delayed> Delayed(function<void()> callback, uint64_t time);
}
}

View File

@ -11,14 +11,13 @@ using namespace std;
#include "engine.hpp"
namespace marcelb {
namespace asynco {
namespace triggers {
/**
* trigger class, for event-driven programming.
* Trigger class, for event-driven programming.
* These events are typed according to the arguments of the callback function
*/
template<typename... T>
class trigger {
class Trigger {
private:
mutex m_eve;
unordered_map<string, vector<function<void(T...)>>> triggers;
@ -48,7 +47,7 @@ class trigger {
}
/**
* Remove an trigger listener from an event
* Remove an Trigger listener from an event
*/
void off(const string& key) {
lock_guard _off(m_eve);
@ -56,7 +55,7 @@ class trigger {
}
/**
* Remove all trigger listener
* Remove all Trigger listener
*/
void off() {
lock_guard _off(m_eve);
@ -65,7 +64,7 @@ class trigger {
/**
* Get num of listeners by an trigger key
* Get num of listeners by an Trigger key
*/
unsigned int listeners(const string& key) {
return triggers[key].size();
@ -85,7 +84,6 @@ class trigger {
};
}
}
}

View File

@ -14,7 +14,7 @@ int64_t rtime_us() {
.count();
}
void timer::init() {
void Timer::init() {
st.async_wait( [this] (const boost::system::error_code&) {
if (!_stop) {
callback();
@ -27,7 +27,7 @@ void timer::init() {
});
}
timer::timer (function<void()> _callback, uint64_t _time, bool _repeate) :
Timer::Timer (function<void()> _callback, uint64_t _time, bool _repeate) :
st(_asynco_engine.io_context, boost::asio::chrono::milliseconds(_time)),
_stop(false),
repeate(_repeate),
@ -37,109 +37,74 @@ timer::timer (function<void()> _callback, uint64_t _time, bool _repeate) :
init();
}
void timer::stop() {
void Timer::stop() {
_stop = true;
st.cancel();
}
void timer::now() {
void Timer::now() {
st.cancel();
}
uint64_t timer::ticks() {
uint64_t Timer::ticks() {
return _ticks;
}
bool timer::stoped() {
bool Timer::stoped() {
return _stop;
}
timer::~timer() {
Timer::~Timer() {
stop();
}
periodic::periodic(function<void()> callback, uint64_t time) :
_timer(make_shared<timer> (callback, time, true)) {
Periodic::Periodic(function<void()> callback, uint64_t time) :
_timer(make_shared<Timer> (callback, time, true)) {
}
void periodic::stop() {
void Periodic::stop() {
_timer->stop();
}
void periodic::now() {
void Periodic::now() {
_timer->now();
}
uint64_t periodic::ticks() {
uint64_t Periodic::ticks() {
return _timer->ticks();
}
bool periodic::stoped() {
bool Periodic::stoped() {
return _timer->stoped();
}
periodic::~periodic() {
Periodic::~Periodic() {
stop();
}
delayed::delayed(function<void()> callback, uint64_t time) :
_timer(make_shared<timer> (callback, time, false)) {
Delayed::Delayed(function<void()> callback, uint64_t time) :
_timer(make_shared<Timer> (callback, time, false)) {
}
void delayed::stop() {
void Delayed::stop() {
_timer->stop();
}
void delayed::now() {
void Delayed::now() {
_timer->now();
}
bool delayed::expired() {
bool Delayed::expired() {
return bool(_timer->ticks());
}
bool delayed::stoped() {
bool Delayed::stoped() {
return _timer->stoped();
}
delayed::~delayed() {
Delayed::~Delayed() {
stop();
}
mutex p_io, d_io;
vector<shared_ptr<periodic>> periodic_calls_container;
vector<shared_ptr<delayed>> delayed_calls_container;
shared_ptr<periodic> Periodic(function<void()> callback, uint64_t time) {
shared_ptr<periodic> periodic_ptr(make_shared<periodic>(callback, time));
async_ ( [&, periodic_ptr](){
lock_guard<mutex> lock(p_io);
periodic_calls_container.push_back(periodic_ptr);
for (uint32_t i=0; i<periodic_calls_container.size(); i++) {
if (periodic_calls_container[i]->stoped()) {
periodic_calls_container.erase(periodic_calls_container.begin()+i);
i--;
}
}
});
return periodic_ptr;
}
shared_ptr<delayed> Delayed(function<void()> callback, uint64_t time) {
shared_ptr<delayed> delayed_ptr(make_shared<delayed>(callback, time));
async_ ( [&, delayed_ptr](){
lock_guard<mutex> lock(p_io);
delayed_calls_container.push_back(delayed_ptr);
for (uint32_t i=0; i<delayed_calls_container.size(); i++) {
if (delayed_calls_container[i]->stoped() || delayed_calls_container[i]->expired()) {
delayed_calls_container.erase(delayed_calls_container.begin()+i);
i--;
}
}
});
return delayed_ptr;
}
};

View File

@ -7,7 +7,6 @@
#include "define.hpp"
using namespace marcelb::asynco;
using namespace triggers;
#include <iostream>
#include <unistd.h>
@ -18,80 +17,92 @@ using namespace triggers;
using namespace std;
using namespace this_thread;
coroutine<int> c2 (int a) {
co_return a*2;
}
void sleep_to (int _time) {
promise<void> _promise;
delayed t( [&]() {
_promise.set_value();
}, _time);
return _promise.get_future().get();
}
coroutine<void> c () {
cout << "Ispisi" << endl;
co_await c2(0);
co_return;
}
void promise_reject (int _time) {
promise<void> _promise;
delayed t( [&]() {
try {
// simulate except
throw runtime_error("Error simulation");
_promise.set_value();
} catch (...) {
_promise.set_exception(current_exception());
}
}, _time);
return _promise.get_future().get();
}
void notLambdaFunction() {
cout << "Call to not lambda function" << endl;
}
// void sleep_to (int _time) {
// promise<void> _promise;
// Delayed t( [&]() {
// _promise.set_value();
// }, _time);
class clm {
public:
void classMethode() {
cout << "Call class method" << endl;
}
};
// return _promise.get_future().get();
// }
// ------------------ EXTEND OWN CLASS WITH EVENTS -------------------
// void promise_reject (int _time) {
// promise<void> _promise;
// Delayed t( [&]() {
// try {
// // simulate except
// throw runtime_error("Error simulation");
// _promise.set_value();
// } catch (...) {
// _promise.set_exception(current_exception());
// }
// }, _time);
class myOwnClass : public trigger<int> {
public:
myOwnClass() : trigger() {};
};
// return _promise.get_future().get();
// }
// void notLambdaFunction() {
// cout << "Call to not lambda function" << endl;
// }
// class clm {
// public:
// void classMethode() {
// cout << "Call class method" << endl;
// }
// };
// // ------------------ EXTEND OWN CLASS WITH EVENTS -------------------
// class myOwnClass : public Trigger<int> {
// public:
// myOwnClass() : Trigger() {};
// };
// ----------------- MULTIPLE TRIGGERS IN ONE CLASS ------------------
class ClassWithTriggers {
trigger<int> emitter1;
trigger<string> emitter2;
// class ClassWithTriggers {
// Trigger<int> emitter1;
// Trigger<string> emitter2;
public:
template<typename... T>
void on(const string& key, function<void(T...)> callback) {
if constexpr (sizeof...(T) == 1 && is_same_v<tuple_element_t<0, tuple<T...>>, int>) {
emitter1.on(key, callback);
}
else if constexpr (sizeof...(T) == 1 && is_same_v<tuple_element_t<0, tuple<T...>>, string>) {
emitter2.on(key, callback);
}
}
// public:
// template<typename... T>
// void on(const string& key, function<void(T...)> callback) {
// if constexpr (sizeof...(T) == 1 && is_same_v<tuple_element_t<0, tuple<T...>>, int>) {
// emitter1.on(key, callback);
// }
// else if constexpr (sizeof...(T) == 1 && is_same_v<tuple_element_t<0, tuple<T...>>, string>) {
// emitter2.on(key, callback);
// }
// }
template <typename... Args>
void tick(const string& key, Args&&... args) {
if constexpr (sizeof...(Args) == 1 && is_same_v<tuple_element_t<0, tuple<Args...>>, int>) {
emitter1.tick(key, forward<Args>(args)...);
}
else if constexpr (sizeof...(Args) == 1 && is_same_v<tuple_element_t<0, tuple<Args...>>, string>) {
emitter2.tick(key, forward<Args>(args)...);
}
else {
static_assert(sizeof...(Args) == 0, "Unsupported number or types of arguments");
}
}
};
// template <typename... Args>
// void tick(const string& key, Args&&... args) {
// if constexpr (sizeof...(Args) == 1 && is_same_v<tuple_element_t<0, tuple<Args...>>, int>) {
// emitter1.tick(key, forward<Args>(args)...);
// }
// else if constexpr (sizeof...(Args) == 1 && is_same_v<tuple_element_t<0, tuple<Args...>>, string>) {
// emitter2.tick(key, forward<Args>(args)...);
// }
// else {
// static_assert(sizeof...(Args) == 0, "Unsupported number or types of arguments");
// }
// }
// };
int main () {
@ -101,36 +112,36 @@ int main () {
// --------------- TIME ASYNCHRONOUS FUNCTIONS --------------
/**
* Init periodic and delayed; clear periodic and delayed
* Init Periodic and delayed; clear Periodic and delayed
*/
// periodic inter1 ([&]() {
// cout << "periodic prvi " << rtime_ms() - start << endl;
// Periodic inter1 ([&]() {
// cout << "Periodic prvi " << rtime_ms() - start << endl;
// }, 1000);
// periodic inter2 ([&]() {
// cout << "periodic drugi " << rtime_ms() - start << endl;
// Periodic inter2 ([&]() {
// cout << "Periodic drugi " << rtime_ms() - start << endl;
// }, 2000);
// periodic inter3 ([&]() {
// cout << "periodic treći " << rtime_ms() - start << endl;
// Periodic inter3 ([&]() {
// cout << "Periodic treći " << rtime_ms() - start << endl;
// }, 1000);
// periodic inter4 ([&]() {
// // cout << "periodic cetvrti " << rtime_ms() - start << endl;
// Periodic inter4 ([&]() {
// // cout << "Periodic cetvrti " << rtime_ms() - start << endl;
// cout << "Ticks " << inter3.ticks() << endl;
// }, 500);
// periodic inter5 ([&]() {
// cout << "periodic peti " << rtime_ms() - start << endl;
// Periodic inter5 ([&]() {
// cout << "Periodic peti " << rtime_ms() - start << endl;
// }, 2000);
// periodic inter6 ([&]() {
// cout << "periodic sesti " << rtime_ms() - start << endl;
// Periodic inter6 ([&]() {
// cout << "Periodic sesti " << rtime_ms() - start << endl;
// }, 3000);
// delayed time1 ( [&] () {
// cout << "Close periodic 1 i 2 " << rtime_ms() - start << endl;
// Delayed time1 ( [&] () {
// cout << "Close Periodic 1 i 2 " << rtime_ms() - start << endl;
// inter1.stop();
// cout << "inter1.stop " << endl;
// inter2.stop();
@ -138,8 +149,8 @@ int main () {
// }, 8000);
// delayed time2 ([&] () {
// cout << "Close periodic 3 " << rtime_ms() - start << endl;
// Delayed time2 ([&] () {
// cout << "Close Periodic 3 " << rtime_ms() - start << endl;
// inter3.stop();
// cout << "Stoped " << inter3.stoped() << endl;
// // time1.stop();
@ -244,7 +255,7 @@ int main () {
// })) << endl;
// /**
// * Sleep with delayed sleep implement
// * Sleep with Delayed sleep implement
// */
// sleep_to(3000);
@ -335,9 +346,9 @@ int main () {
// * initialization of typed events
// */
// trigger<int, int> ev2int;
// trigger<int, string> evintString;
// trigger<> evoid;
// Trigger<int, int> ev2int;
// Trigger<int, string> evintString;
// Trigger<> evoid;
// ev2int.on("sum", [](int a, int b) {
// cout << "Sum " << a+b << endl;
@ -390,7 +401,7 @@ int main () {
// myOwnClass myclass;
// delayed t( [&] {
// Delayed t( [&] {
// myclass.tick("constructed", 1);
// }, 200);
@ -404,18 +415,18 @@ int main () {
// *
// */
ClassWithTriggers mt;
// ClassWithTriggers mt;
mt.on<int>("int", function<void(int)>([&](int i) {
cout << "Emit int " << i << endl;
}));
// mt.on<int>("int", function<void(int)>([&](int i) {
// cout << "Emit int " << i << endl;
// }));
mt.on<string>("string", function<void(string)>([&](string s) {
cout << "Emit string " << s << endl;
}));
// mt.on<string>("string", function<void(string)>([&](string s) {
// cout << "Emit string " << s << endl;
// }));
mt.tick("int", 5);
mt.tick("string", string("Hello world"));
// mt.tick("int", 5);
// mt.tick("string", string("Hello world"));
// auto status = fs::read("test1.txt");
@ -446,6 +457,84 @@ int main () {
// // ----------------------------------------------------------------------------------------------------
// auto i = async_ ( []() -> coroutine<int> {
// cout << "aaaa" << endl;
// co_return 5;
// });
// auto i = async_ (retint());
// auto i_ = await_(i);
// cout << i_ << endl;
// Periodic a( []() -> coroutine<void> {
// cout << "corutina" << endl;
// // co_await retint();
// }, 2000);
// Periodic b( []() {
// cout << "funckija" << endl;
// }, 2000);
// Trigger<int, int> ev2int;
// Trigger<int, string> evintString;
Trigger<> evoid;
// ev2int.on("sum", [](int a, int b) -> coroutine<void> {
// cout << "Sum " << a+b << endl;
// });
// ev2int.on("sum", [](int a, int b) -> coroutine<void> {
// cout << "Sum done" << endl;
// });
// evintString.on("substract", [](int a, string b) -> coroutine<void> {
// cout << "Substract " << a-stoi(b) << endl;
// });
evoid.on("void", []() {
auto a = await_ (async_ (c2(34)));
cout << "A " << a << endl;
});
// auto c1 = []() -> coroutine<void> {
// cout << "Roge " << endl;
// co_return;
// };
// async_ ( c2(3));
async_ ([]() -> coroutine<void> {
cout << "Hello" << endl;
co_await c2(4);
co_return;
}());
Periodic p( []() {
auto a = await_ (async_ (c2(34)));
cout << "A " << a << endl;
}, 2000);
// await_( async_co2 ( [c1 = move(c1)]() -> coroutine<void> {
// cout << "Baba roga" << endl;
// co_await c1();
// }));
// string emited2 = "2";
// evoid.on("void", [&]() -> coroutine<void> {
// cout << "Void emited " << emited2 << endl;
// });
evoid.tick("void");
cout << "Run" << endl;
_asynco_engine.run();