diff --git a/CMakeLists.txt b/CMakeLists.txt index b89631e..6522cee 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14,7 +14,8 @@ include_directories(lib) # Dodaj biblioteku add_library(asynco STATIC - src/engine.cpp + src/asynco.cpp + src/asynco_default.cpp src/timers.cpp ) diff --git a/README.md b/README.md index 6d36554..94fadae 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ using namespace marcelb; using namespace asynco; // At the end of the main function, always set -_asynco_engine.run(); +Asynco_Default_Runtime.run(); return 0; ``` diff --git a/lib/asynco.hpp b/lib/asynco.hpp index 149fa84..4b2d15e 100644 --- a/lib/asynco.hpp +++ b/lib/asynco.hpp @@ -1,10 +1,21 @@ #ifndef _ASYNCO_ #define _ASYNCO_ -#include "engine.hpp" +#include +#include +#include +#include +#include +#include #include using namespace std; +#include +using namespace boost::asio; + +#include "timers.hpp" +#include "trigger.hpp" + #if __cplusplus >= 202002L #include #include @@ -15,111 +26,144 @@ namespace marcelb { namespace asynco { /** - * Run the function asynchronously + * Asynco runtime */ -template -auto async_(F&& f, Args&&... args) -> future::type> { - using return_type = typename result_of::type; - future res = _asynco_engine.io_context.post(boost::asio::use_future(bind(forward(f), forward(args)...))); - return res; -} +class Asynco { + vector _runners; + unique_ptr _work; + + void init_loops_in_threads(uint8_t threads); + +public: + io_context io_ctx; + + // Asynco(uint8_t threads = thread::hardware_concurrency()); + + void run(uint8_t threads = thread::hardware_concurrency()); + + void join(); + + /** + * Run the function asynchronously + */ + template + auto async(F&& f, Args&&... args) -> future> { + cout << "async" << endl; + + using return_type = invoke_result_t; + future res = io_ctx.post(boost::asio::use_future(bind(forward(f), forward(args)...))); + return res; + } #if __cplusplus >= 202002L -/** - * Run the coroutine -*/ -template -std::future async_(boost::asio::awaitable _coroutine) { - std::promise promise; - auto future = promise.get_future(); + /** + * Run the coroutine + */ + template + future async(boost::asio::awaitable _coroutine) { + promise promise; + auto future = promise.get_future(); - co_spawn(_asynco_engine.io_context, [_coroutine = std::move(_coroutine), promise = std::move(promise)]() mutable -> boost::asio::awaitable { - try { - if constexpr (!std::is_void_v) { - T result = co_await std::move(_coroutine); - promise.set_value(std::move(result)); - } else { - co_await std::move(_coroutine); - promise.set_value(); // Za void ne postavljamo rezultat + co_spawn(io_ctx, [_coroutine = move(_coroutine), promise = move(promise)]() mutable -> boost::asio::awaitable { + try { + if constexpr (!is_void_v) { + T result = co_await move(_coroutine); + promise.set_value(move(result)); + } else { + co_await move(_coroutine); + promise.set_value(); // Za void ne postavljamo rezultat + } + } catch (...) { + promise.set_exception(current_exception()); // Postavljamo izuzetak } - } catch (...) { - promise.set_exception(std::current_exception()); // Postavljamo izuzetak - } - }, boost::asio::detached); - - return future; -} + }, boost::asio::detached); + return future; + } #endif -/** - * Block until the multiple asynchronous call completes - * Use only on no-void calls - */ - -template -auto await_(F&&... f) -> std::tuple::type...> { - return std::make_tuple(move(f).get()...); -} - -/** - * Block until the multiple asynchronous call completes - * Use only on no-void calls - */ - -template -auto await_(F&... f) -> std::tuple::type...> { - return std::make_tuple(f.get()...); -} - -/** - * Block until the asynchronous call completes - dont block asynco engine loop -*/ -template -T await_(future& r, uint16_t time_us = 10) { - while (r.wait_for(std::chrono::microseconds(time_us)) != std::future_status::ready) { - _asynco_engine.io_context.poll_one(); + /** + * Block until the asynchronous call completes - dont block asynco engine loop + */ + template + T await(future& r, uint16_t time_us = 10) { + while (r.wait_for(std::chrono::microseconds(time_us)) != future_status::ready) { + io_ctx.poll_one(); + } + return r.get(); } - return r.get(); -} -/** - * Block until the asynchronous call completes - dont block asynco engine loop -*/ -template -T await_(future&& r, uint16_t time_us = 10) { - while (r.wait_for(std::chrono::microseconds(time_us)) != std::future_status::ready) { - _asynco_engine.io_context.poll_one(); + /** + * Block until the asynchronous call completes - dont block asynco engine loop + */ + template + T await(future&& r, uint16_t time_us = 10) { + while (r.wait_for(std::chrono::microseconds(time_us)) != future_status::ready) { + io_ctx.poll_one(); + } + return move(r).get(); } - return move(r).get(); -} - -/** - * Run the function asynchronously an block until completes -*/ -template -auto await_(F&& f, Args&&... args) -> typename result_of::type { - return await_( - async_(f, args...) - ); -} + /** + * Run the function asynchronously an block until completes + */ + template + auto await(F&& f, Args&&... args) -> invoke_result_t { + return await( + async(f, args...) + ); + } #if __cplusplus >= 202002L - -/** - * Run the coruotine and wait - */ -template -T await_(boost::asio::awaitable _coroutine) { - return await_( - async_( - move(_coroutine) - )); -} - + /** + * Run the coruotine and wait + */ + template + T await(boost::asio::awaitable _coroutine) { + return await( + async( + move(_coroutine) + )); + } #endif + /** + * Block until the multiple asynchronous call completes + * Use only on no-void calls + */ + + template + auto await(F&&... f) -> tuple::type...> { + return make_tuple(move(f).get()...); + } + + /** + * Block until the multiple asynchronous call completes + * Use only on no-void calls + */ + + template + auto await(F&... f) -> tuple::type...> { + return make_tuple(await(f)...); + } + + Timer delayed(function callback, uint64_t time) ;/*{ + return Timer(io_ctx, callback, time, TimerType::Delayed); + }*/ + + Timer periodic(function callback, uint64_t time) ;/*{ + return Timer(io_ctx, callback, time, TimerType::Periodic); + }*/ + + template + Trigger trigger() { + return Trigger(this); + } + + +}; + + } } diff --git a/lib/asynco_default.hpp b/lib/asynco_default.hpp new file mode 100644 index 0000000..5d537b5 --- /dev/null +++ b/lib/asynco_default.hpp @@ -0,0 +1,112 @@ +#ifndef _ASYNCO_DEFAULT_ +#define _ASYNCO_DEFAULT_ + +#include "asynco.hpp" + +namespace marcelb { +namespace asynco { + + +extern Asynco Asynco_Default_Runtime; +// Asynco& Asynco_Default_Runtime(); + +/** + * Run the function asynchronously +*/ +template +auto async_(F&& f, Args&&... args) -> future> { + cout << "async_default" << endl; + + return Asynco_Default_Runtime.async(bind(forward(f), forward(args)...)); +} + +#if __cplusplus >= 202002L +/** + * Run the coroutine +*/ +template +std::future async_(boost::asio::awaitable _coroutine) { + return Asynco_Default_Runtime.async(move(_coroutine)); +} +#endif + +/** + * Block until the asynchronous call completes - dont block asynco engine loop +*/ +template +T await_(future& r, uint16_t time_us = 10) { + return Asynco_Default_Runtime.await(r, time_us); +} + +/** + * Block until the asynchronous call completes - dont block asynco engine loop +*/ +template +T await_(future&& r, uint16_t time_us = 10) { + return Asynco_Default_Runtime.await(r, time_us); +} + +/** + * Run the function asynchronously an block until completes +*/ +template +auto await_(F&& f, Args&&... args) -> invoke_result_t { + return Asynco_Default_Runtime.await(bind(forward(f), forward(args)...)); +} + + +#if __cplusplus >= 202002L +/** + * Run the coruotine and wait + */ +template +T await_(boost::asio::awaitable _coroutine) { + return Asynco_Default_Runtime.await(move(_coroutine)); +} +#endif + +/** + * Block until the multiple asynchronous call completes + * Use only on no-void calls + */ + +template +auto await_(F&&... f) -> std::tuple::type...> { + return Asynco_Default_Runtime.await(move(f)...); +} + +/** + * Block until the multiple asynchronous call completes + * Use only on no-void calls + */ + +template +auto await_(F&... f) -> std::tuple::type...> { + return Asynco_Default_Runtime.await(f...);; +} + +Timer delayed(function callback, uint64_t time); + +Timer periodic(function callback, uint64_t time); + +template +Trigger trigger() { + return Trigger(Asynco_Default_Runtime); +} + +/** + * Alternative names of functions - mostly for the sake of more beautiful coloring of the code + */ +#define async_ marcelb::asynco::async_ +#define await_ marcelb::asynco::await_ + +#if __cplusplus >= 202002L +#define asyncable boost::asio::awaitable +#endif + +} +} + + + +#endif \ No newline at end of file diff --git a/lib/define.hpp b/lib/define.hpp deleted file mode 100644 index b14acf4..0000000 --- a/lib/define.hpp +++ /dev/null @@ -1,23 +0,0 @@ -#ifndef _ASYNCO_DEFINE_ -#define _ASYNCO_DEFINE_ - -namespace marcelb { -namespace asynco { - -/** - * Alternative names of functions - mostly for the sake of more beautiful coloring of the code - */ - -#define async_ marcelb::asynco::async_ -#define await_ marcelb::asynco::await_ - -#if __cplusplus >= 202002L -#define asyncable boost::asio::awaitable -#endif - -} -} - - - -#endif \ No newline at end of file diff --git a/lib/engine.hpp b/lib/engine.hpp deleted file mode 100644 index f9aa9a8..0000000 --- a/lib/engine.hpp +++ /dev/null @@ -1,71 +0,0 @@ -#ifndef _ASYNCO_ENGINE_ -#define _ASYNCO_ENGINE_ - -#include -#include -using namespace std; - -#include - -namespace marcelb { -namespace asynco { - -#define HW_CONCURRENCY_MINIMAL 4 - -/** - * Internal anonymous class for initializing the ASIO context and thread pool - * !!! It is anonymous to protect against use in the initialization of other objects of the same type !!! -*/ -class Engine { - public: - boost::asio::io_context io_context; - - void run() { - for (auto& runner : runners) { - runner.join(); - } - } - - private: - - unique_ptr work { [&] () { - return new boost::asio::io_service::work(io_context); - } ()}; - - vector runners { [&] () { - vector _runs; - unsigned int num_of_runners; - #ifdef NUM_OF_RUNNERS - num_of_runners = NUM_OF_RUNNERS; - #else - num_of_runners = thread::hardware_concurrency(); - if (num_of_runners < HW_CONCURRENCY_MINIMAL) { - num_of_runners = HW_CONCURRENCY_MINIMAL; - } - #endif - - for (int i=0; i -using namespace std; - - -namespace marcelb { -namespace asynco { -namespace fs { - -/** - * Asynchronous file reading with callback after read complete -*/ -template -void read(string path, Callback&& callback) { - asynco::async_( [&path, callback] () { - string content; - try { - string line; - ifstream file (path); - if (file.is_open()) { - line.clear(); - while ( getline (file,line) ) { - content += line + "\n"; - } - file.close(); - } - - else { - throw runtime_error("Unable to open file"); - } - - callback(content, nullptr); - } catch(exception& error) { - callback(content, &error); - } - }); -} - - -/** - * Asynchronous file reading -*/ -future read(string path) { - return asynco::async_( [&path] () { - string content; - string line; - ifstream file (path); - if (file.is_open()) { - line.clear(); - while ( getline (file,line) ) { - content += line + "\n"; - } - file.close(); - return content; - } - - else { - throw runtime_error("Unable to open file"); - } - }); -} - -/** - * Asynchronous file writing with callback after write complete -*/ -template -void write(string path, string content, Callback&& callback) { - asynco::async_( [&path, &content, callback] () { - try { - ofstream file (path); - if (file.is_open()) { - file << content; - file.close(); - } - else { - throw runtime_error("Unable to open file"); - } - - callback(nullptr); - } catch(exception& error) { - callback(&error); - } - }); -} - - -/** - * Asynchronous file writing with callback after write complete -*/ -future write(string path, string content) { - return asynco::async_( [&path, &content] () { - ofstream file (path); - if (file.is_open()) { - file << content; - file.close(); - return; - } - else { - throw runtime_error("Unable to open file"); - } - }); -} - -} -} -} - -#endif \ No newline at end of file diff --git a/lib/timers.hpp b/lib/timers.hpp index 7dc6fe5..2196811 100644 --- a/lib/timers.hpp +++ b/lib/timers.hpp @@ -2,9 +2,11 @@ #define _ASYNCO_TIMERS_ #include +#include using namespace std; -#include "asynco.hpp" +#include +using namespace boost::asio; namespace marcelb { namespace asynco { @@ -21,13 +23,19 @@ int64_t rtime_ms(); int64_t rtime_us(); +enum TimerType { + Delayed, + Periodic +}; + /** * Core timer class for construct time async functions */ class Timer { - boost::asio::steady_timer st; + io_context& io_ctx; + steady_timer st; bool _stop = false; - bool repeate; + TimerType type; function callback; uint64_t time; uint64_t _ticks = 0; @@ -41,7 +49,7 @@ class Timer { /** * The constructor creates the steady_timer and accompanying variables and runs a method to initialize the timer */ - Timer (function _callback, uint64_t _time, bool _repeate); + Timer (io_context& io_ctx, function _callback, uint64_t _time, TimerType _type = TimerType::Delayed); /** * Stop timer @@ -60,6 +68,11 @@ class Timer { */ uint64_t ticks(); + /** + * Get is the delayed callback runned + */ + bool expired(); + /** * The logic status of the timer stop state */ @@ -70,86 +83,6 @@ class Timer { ~Timer(); }; -/** - * Class periodic for periodic execution of the callback in time in ms -*/ -class Periodic { - shared_ptr _timer; - - public: - - /** - * Constructor initializes a shared pointer of type timer - */ - Periodic(function callback, uint64_t time); - - /** - * Stop periodic - * The stop flag is set and periodic remove it from the queue - */ - void stop(); - - /** - * Run callback now - * Forces the callback function to run independently of the periodic - */ - void now(); - - /** - * Get the number of times the periodic callback was runned - */ - uint64_t ticks(); - /** - * The logic status of the periodic stop state - */ - bool stoped(); - - /** - * The destructor stops the periodic - */ - ~Periodic(); -}; - -/** - * Class delayed for delayed callback execution in ms -*/ -class Delayed { - shared_ptr _timer; - - public: - - /** - * Constructor initializes a shared pointer of type timer - */ - Delayed(function callback, uint64_t time); - - /** - * Stop delayed - * The stop flag is set and delayed remove it from the queue - */ - void stop(); - - /** - * Run callback now - * Forces the callback function to run independently of the delayed - */ - void now(); - - /** - * Get is the delayed callback runned - */ - bool expired(); - /** - * The logic status of the delayed stop state - */ - bool stoped(); - /** - * The destructor stops the delayed - */ - ~Delayed(); - -}; - } } diff --git a/lib/trigger.hpp b/lib/trigger.hpp index 0b37ca1..f90f58b 100644 --- a/lib/trigger.hpp +++ b/lib/trigger.hpp @@ -8,10 +8,12 @@ using namespace std; -#include "engine.hpp" +#include "asynco.hpp" namespace marcelb { namespace asynco { +class Asynco; + /** * Trigger class, for event-driven programming. * These events are typed according to the arguments of the callback function @@ -19,9 +21,13 @@ namespace asynco { template class Trigger { private: + Asynco& engine; mutex m_eve; unordered_map>> triggers; + Trigger(Asynco& _engine) + : engine(_engine) {} + public: /** @@ -32,6 +38,7 @@ class Trigger { triggers[key].push_back(callback); } + /** * It emits an event and sends a callback function saved according to the key with the passed parameters */ @@ -41,7 +48,7 @@ class Trigger { if (it_eve != triggers.end()) { for (uint i =0; isecond.size(); i++) { auto callback = bind(it_eve->second[i], forward(args)...); - asynco::async_(callback); + engine.async(callback); } } } @@ -49,7 +56,7 @@ class Trigger { /** * Remove an Trigger listener from an event */ - void off(const string& key) { + void off(const string& key) { lock_guard _off(m_eve); triggers.erase(key); } @@ -62,7 +69,6 @@ class Trigger { triggers.clear(); } - /** * Get num of listeners by an Trigger key */ diff --git a/src/asynco.cpp b/src/asynco.cpp new file mode 100644 index 0000000..03fff93 --- /dev/null +++ b/src/asynco.cpp @@ -0,0 +1,42 @@ +#include "../lib/asynco.hpp" + +namespace marcelb::asynco { + + +void Asynco::init_loops_in_threads(uint8_t threads) { + for (int i=0; i(io_ctx); + cout << "Asynco" << endl; + init_loops_in_threads(threads); +} + +void Asynco::join() { + for (auto& runner : _runners) { + runner.join(); + } +} + +Timer Asynco::delayed(function callback, uint64_t time) { + return Timer(io_ctx, callback, time, TimerType::Delayed); +} + +Timer Asynco::periodic(function callback, uint64_t time) { + return Timer(io_ctx, callback, time, TimerType::Periodic); +} + + +}; diff --git a/src/asynco_default.cpp b/src/asynco_default.cpp new file mode 100644 index 0000000..1e705f3 --- /dev/null +++ b/src/asynco_default.cpp @@ -0,0 +1,21 @@ + +#include "../lib/asynco_default.hpp" + +namespace marcelb::asynco { + +Asynco Asynco_Default_Runtime; + +// Asynco& Asynco_Default_Runtime() { +// static Asynco _default; // ili koliko već treba +// return _default; +// } + +Timer delayed(function callback, uint64_t time) { + return Timer(Asynco_Default_Runtime.io_ctx, callback, time, TimerType::Delayed); +} + +Timer periodic(function callback, uint64_t time) { + return Timer(Asynco_Default_Runtime.io_ctx, callback, time, TimerType::Periodic); +} + +}; diff --git a/src/engine.cpp b/src/engine.cpp deleted file mode 100644 index 2dc7b75..0000000 --- a/src/engine.cpp +++ /dev/null @@ -1,7 +0,0 @@ -#include "../lib/engine.hpp" - -namespace marcelb::asynco { - -Engine _asynco_engine; - -}; diff --git a/src/timers.cpp b/src/timers.cpp index 04ff1fd..f004d2a 100644 --- a/src/timers.cpp +++ b/src/timers.cpp @@ -3,13 +3,13 @@ namespace marcelb::asynco { int64_t rtime_ms() { - return chrono::duration_cast(chrono::system_clock::now() + return std::chrono::duration_cast(std::chrono::system_clock::now() .time_since_epoch()) .count(); } int64_t rtime_us() { - return chrono::duration_cast(chrono::system_clock::now() + return std::chrono::duration_cast(std::chrono::system_clock::now() .time_since_epoch()) .count(); } @@ -18,8 +18,8 @@ void Timer::init() { st.async_wait( [this] (const boost::system::error_code&) { if (!_stop) { callback(); - if (repeate) { - st = boost::asio::steady_timer(_asynco_engine.io_context, boost::asio::chrono::milliseconds(time)); + if (type == TimerType::Periodic) { + st = steady_timer(io_ctx, boost::asio::chrono::milliseconds(time)); init(); } _ticks++; @@ -27,13 +27,14 @@ void Timer::init() { }); } -Timer::Timer (function _callback, uint64_t _time, bool _repeate) : - st(_asynco_engine.io_context, boost::asio::chrono::milliseconds(_time)), +Timer::Timer (io_context& _io_ctx, function _callback, uint64_t _time, TimerType _type): + io_ctx(_io_ctx), + st(io_ctx, boost::asio::chrono::milliseconds(_time)), _stop(false), - repeate(_repeate), + type(_type), callback(_callback), time(_time) { - + cout << "Timer" << endl; init(); } @@ -50,6 +51,10 @@ uint64_t Timer::ticks() { return _ticks; } +bool Timer::expired() { + return bool(_ticks); +} + bool Timer::stoped() { return _stop; } @@ -58,53 +63,4 @@ Timer::~Timer() { stop(); } -Periodic::Periodic(function callback, uint64_t time) : - _timer(make_shared (callback, time, true)) { -} - -void Periodic::stop() { - _timer->stop(); -} - -void Periodic::now() { - _timer->now(); -} - -uint64_t Periodic::ticks() { - return _timer->ticks(); -} - -bool Periodic::stoped() { - return _timer->stoped(); -} - -Periodic::~Periodic() { - stop(); -} - -Delayed::Delayed(function callback, uint64_t time) : - _timer(make_shared (callback, time, false)) { -} - -void Delayed::stop() { - _timer->stop(); -} - -void Delayed::now() { - _timer->now(); -} - -bool Delayed::expired() { - return bool(_timer->ticks()); -} - -bool Delayed::stoped() { - return _timer->stoped(); -} - -Delayed::~Delayed() { - stop(); -} - - }; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 27c726e..bae1259 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,4 +1,14 @@ -add_executable(asynco_test main.cpp) +# add_executable(asynco_test main.cpp) + +# # Linkaj test sa Asynco bibliotekom +# target_link_libraries(asynco_test asynco Boost::system) + +add_executable(asynco_default main_default.cpp) # Linkaj test sa Asynco bibliotekom -target_link_libraries(asynco_test asynco Boost::system) +target_link_libraries(asynco_default asynco Boost::system) + +add_executable(asynco_asynco main_asynco.cpp) + +# Linkaj test sa Asynco bibliotekom +target_link_libraries(asynco_asynco asynco Boost::system) \ No newline at end of file diff --git a/test/main.cpp b/test/main.cpp index db14b7c..b8f66b9 100644 --- a/test/main.cpp +++ b/test/main.cpp @@ -619,7 +619,7 @@ int main () { cout << "-------------end main------------- " << rtime_ms() - start << endl; - _asynco_engine.run(); + Asynco_Default_Runtime.run(); return 0; } diff --git a/test/main_asynco.cpp b/test/main_asynco.cpp new file mode 100644 index 0000000..7b70bde --- /dev/null +++ b/test/main_asynco.cpp @@ -0,0 +1,19 @@ +#include "../lib/asynco.hpp" +using namespace marcelb::asynco; + +#include +using namespace std; + +int main() { + + Asynco asynco; + asynco.run(2); + + auto interval = asynco.periodic([](){ + cout << "idemo" << endl; + }, 1000); + + + asynco.join(); + return 0; +} \ No newline at end of file diff --git a/test/main_default.cpp b/test/main_default.cpp new file mode 100644 index 0000000..83be494 --- /dev/null +++ b/test/main_default.cpp @@ -0,0 +1,22 @@ +#include "../lib/asynco_default.hpp" +using namespace marcelb::asynco; + +#include +using namespace std; + +int main() { + Asynco_Default_Runtime.run(); + cout << "main" << endl; + + async_([](){ + cout << "idemo" << endl; + }); + + auto interval = periodic([&](){ + cout << "idemo" << endl; + }, 1000); + + + Asynco_Default_Runtime.join(); + return 0; +} \ No newline at end of file