Compare commits

..

14 Commits
0.3 ... dev

16 changed files with 690 additions and 360 deletions

5
.gitignore vendored
View File

@ -1,2 +1,3 @@
test/test build
test/*.txt .vscode
example

View File

@ -1,16 +0,0 @@
{
"configurations": [
{
"name": "Linux",
"includePath": [
"${workspaceFolder}/**"
],
"defines": [],
"compilerPath": "/usr/bin/gcc",
"cStandard": "c17",
"cppStandard": "gnu++17",
"intelliSenseMode": "linux-gcc-x64"
}
],
"version": 4
}

74
.vscode/settings.json vendored
View File

@ -1,74 +0,0 @@
{
"files.associations": {
"iostream": "cpp",
"functional": "cpp",
"thread": "cpp",
"chrono": "cpp",
"ostream": "cpp",
"condition_variable": "cpp",
"array": "cpp",
"atomic": "cpp",
"cwchar": "cpp",
"deque": "cpp",
"unordered_map": "cpp",
"vector": "cpp",
"exception": "cpp",
"initializer_list": "cpp",
"iosfwd": "cpp",
"mutex": "cpp",
"new": "cpp",
"ratio": "cpp",
"stdexcept": "cpp",
"tuple": "cpp",
"type_traits": "cpp",
"utility": "cpp",
"future": "cpp",
"*.ipp": "cpp",
"bitset": "cpp",
"algorithm": "cpp",
"string": "cpp",
"string_view": "cpp",
"fstream": "cpp",
"cctype": "cpp",
"clocale": "cpp",
"cmath": "cpp",
"csignal": "cpp",
"cstdarg": "cpp",
"cstddef": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"ctime": "cpp",
"cwctype": "cpp",
"any": "cpp",
"bit": "cpp",
"*.tcc": "cpp",
"codecvt": "cpp",
"compare": "cpp",
"complex": "cpp",
"concepts": "cpp",
"cstdint": "cpp",
"list": "cpp",
"map": "cpp",
"set": "cpp",
"iterator": "cpp",
"memory": "cpp",
"memory_resource": "cpp",
"numeric": "cpp",
"optional": "cpp",
"random": "cpp",
"system_error": "cpp",
"iomanip": "cpp",
"istream": "cpp",
"limits": "cpp",
"numbers": "cpp",
"semaphore": "cpp",
"sstream": "cpp",
"stop_token": "cpp",
"streambuf": "cpp",
"cinttypes": "cpp",
"typeindex": "cpp",
"typeinfo": "cpp",
"variant": "cpp"
}
}

28
.vscode/tasks.json vendored
View File

@ -1,28 +0,0 @@
{
"tasks": [
{
"type": "cppbuild",
"label": "C/C++: gcc build active file",
"command": "/usr/bin/g++",
"args": [
"-fdiagnostics-color=always",
"-g",
"${file}",
"-o",
"${fileDirname}/${fileBasenameNoExtension}"
],
"options": {
"cwd": "${fileDirname}"
},
"problemMatcher": [
"$gcc"
],
"group": {
"kind": "build",
"isDefault": true
},
"detail": "Task generated by Debugger."
}
],
"version": "2.0.0"
}

31
CMakeLists.txt Normal file
View File

@ -0,0 +1,31 @@
cmake_minimum_required(VERSION 3.10)
project(Asynco)
# Postavi verziju projekta
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# Pronađi Boost biblioteku (ako nije uobičajeni direktorijum, postavi put)
find_package(Boost REQUIRED COMPONENTS system)
# Dodaj direktorijume sa zaglavljima
include_directories(lib)
# Dodaj biblioteku
add_library(asynco STATIC
src/engine.cpp
src/timers.cpp
)
# Linkaj Asynco biblioteku sa Boost-om
target_link_libraries(asynco Boost::system)
# Dodaj testove
add_subdirectory(test)
# Instaliraj biblioteku
# install(TARGETS asynco DESTINATION lib)
# install(FILES lib/asynco.hpp lib/define.hpp lib/engine.hpp lib/filesystem.hpp lib/timers.hpp lib/trigger.hpp DESTINATION include/asynco)
#

181
README.md
View File

@ -17,7 +17,7 @@ The asynchronous filesystem is provided solely to guide users on how to wrap any
- Asynchronous programming - Asynchronous programming
- Multithread - Multithread
- Asynchronous timer functions: periodic, delayed (like setInterval and setTimeout from JS) - Asynchronous timer functions: periodic, delayed (like setInterval and setTimeout from JS)
- Typed events (on, emit, off) - Typed events (on, tick, off) (like EventEmitter from JS: on, emit, etc)
- Event loops - Event loops
- Multiple parallel execution loops - Multiple parallel execution loops
- Asynchronous file IO - Asynchronous file IO
@ -27,16 +27,16 @@ The asynchronous filesystem is provided solely to guide users on how to wrap any
Just download the latest release and unzip it into your project. Just download the latest release and unzip it into your project.
```c++ ```c++
#define NUM_OF_RUNNERS 8 // To change the number of threads used by atask, without this it runs according to the number of cores #define NUM_OF_RUNNERS 8 // To change the number of threads used by asynco, without this it runs according to the number of cores
#include "asynco/lib/asynco.hpp" // atask(), wait() #include "asynco/lib/asynco.hpp" // async_ (), await_()
#include "asynco/lib/event.hpp" // event #include "asynco/lib/triggers.hpp" // trigger (event emitter)
#include "asynco/lib/timers.hpp" // periodic, delayed (like setInterval and setTimeout from JS) #include "asynco/lib/timers.hpp" // periodic, delayed (like setInterval and setTimeout from JS)
#include "asynco/lib/filesystem.hpp" // for async read and write files #include "asynco/lib/filesystem.hpp" // for async read and write files
using namespace marcelb; using namespace marcelb;
using namespace asynco; using namespace asynco;
using namespace events; using namespace triggers;
// At the end of the main function, always set // At the end of the main function, always set
_asynco_engine.run(); _asynco_engine.run();
@ -77,6 +77,28 @@ int t = time1.expired();
// is it stopped // is it stopped
bool stoped = time1.stoped(); bool stoped = time1.stoped();
// If you don't want to save in a variable, but you want to start a timer, use these functions
// And you can also save them, they are only of the shared pointer type
auto d = Delayed( [](){
cout << "Delayed" << endl;
}, 2000);
auto p = Periodic( [](){
cout << "Periodic" << endl;
}, 700);
Periodic( [&] (){
cout << "Delayed expire " << d->expired() << endl;
cout << "Periodic ticks " << p->ticks() << endl;
cout << "Delayed stoped " << d->stoped() << endl;
cout << "Periodic stoped " << p->stoped() << endl;
}, 1000);
Delayed( [&](){
p->stop();
}, 10000);
``` ```
Make functions asynchronous Make functions asynchronous
@ -85,9 +107,9 @@ Make functions asynchronous
* Run an lambda function asynchronously * Run an lambda function asynchronously
*/ */
atask( []() { async_ ( []() {
sleep_for(2s); // only for simulating long duration function sleep_for(2s); // only for simulating long duration function
cout << "atask" << endl; cout << "nonsync " << endl;
return 5; return 5;
}); });
@ -100,7 +122,7 @@ void notLambdaFunction() {
cout << "Call to not lambda function" << endl; cout << "Call to not lambda function" << endl;
} }
atask (notLambdaFunction); async_ (notLambdaFunction);
/** /**
* Run class method * Run class method
@ -114,34 +136,88 @@ class clm {
}; };
clm classes; clm classes;
atask( [&classes] () { async_ ( [&classes] () {
classes.classMethode(); classes.classMethode();
}); });
/** /**
* Wait after runned as async * await_ after runned as async
*/ */
auto a = atask( []() { auto a = async_ ( []() {
sleep_for(2s); // only for simulating long duration function sleep_for(2s); // only for simulating long duration function
cout << "atask" << endl; cout << "nonsync " << endl;
return 5; return 5;
}); });
cout << wait(a) << endl; cout << await_(a) << endl;
/** /**
* Wait async function call and use i cout * await_ async function call and use i cout
*/ */
cout << wait(atask( [] () { cout << await_(async_ ( [] () {
sleep_for(chrono::seconds(1)); // only for simulating long duration function sleep_for(chrono::seconds(1)); // only for simulating long duration function
cout << "wait end" << endl; cout << "await_ end" << endl;
return 4; return 4;
})) << endl; })) << endl;
/**
* Await all
**/
auto a = async_ ( []() {
cout << "A" << endl;
return 3;
});
auto b = async_ ( []() {
cout << "B" << endl;
throw runtime_error("Test exception");
return;
});
auto c = async_ ( []() {
cout << "C" << endl;
return "Hello";
});
int a_;
string c_;
auto await_all = [&] () {
a_ = await_(a);
await_(b);
c_ = await_(c);
};
try {
await_all();
cout << "a_ " << a_ << " c_ " << c_ << endl;
} catch (const exception& exc) {
cout << exc.what() << endl;
}
// // same type
vector<future<void>> fut_vec;
for (int i=0; i<5; i++) {
fut_vec.push_back(
async_ ( [i]() {
cout << "Async_ " << i << endl;
})
);
}
auto await_all = [&] () {
for (int i=0; i<fut_vec.size(); i++) {
await_ (fut_vec[i]);
}
};
/** /**
* Sleep with delayed sleep implement * Sleep with delayed sleep implement
*/ */
@ -189,9 +265,9 @@ Events
* initialization of typed events * initialization of typed events
*/ */
event<int, int> ev2int; trigger<int, int> ev2int;
event<int, string> evintString; trigger<int, string> evintString;
event<> evoid; trigger<> evoid;
ev2int.on("sum", [](int a, int b) { ev2int.on("sum", [](int a, int b) {
cout << "Sum " << a+b << endl; cout << "Sum " << a+b << endl;
@ -219,32 +295,32 @@ sleep(1);
* Emit * Emit
*/ */
ev2int.emit("sum", 5, 8); ev2int.tick("sum", 5, 8);
sleep(1); sleep(1);
evintString.emit("substract", 3, to_string(2)); evintString.tick("substract", 3, to_string(2));
sleep(1); sleep(1);
evoid.emit("void"); evoid.tick("void");
// Turn off the event listener // Turn off the event listener
evoid.off("void"); evoid.off("void");
evoid.emit("void"); // nothing is happening evoid.tick("void"); // nothing is happening
``` ```
Extend own class whit events Extend own class whit events
```c++ ```c++
class myOwnClass : public event<int> { class myOwnClass : public trigger<int> {
public: public:
myOwnClass() : event() {}; myOwnClass() : trigger() {};
}; };
myOwnClass myclass; myOwnClass myclass;
delayed t( [&] { delayed t( [&] {
myclass.emit("constructed", 1); myclass.tick("constructed", 1);
}, 200); }, 200);
myclass.on("constructed", [] (int i) { myclass.on("constructed", [] (int i) {
@ -253,6 +329,55 @@ myclass.on("constructed", [] (int i) {
``` ```
Implementing a class with multiple triggers of different types
```c++
class ClassWithTriggers {
trigger<int> emitter1;
trigger<string> emitter2;
public:
template<typename... T>
void on(const string& key, function<void(T...)> callback) {
if constexpr (sizeof...(T) == 1 && is_same_v<tuple_element_t<0, tuple<T...>>, int>) {
emitter1.on(key, callback);
}
else if constexpr (sizeof...(T) == 1 && is_same_v<tuple_element_t<0, tuple<T...>>, string>) {
emitter2.on(key, callback);
}
}
template <typename... Args>
void tick(const string& key, Args&&... args) {
if constexpr (sizeof...(Args) == 1 && is_same_v<tuple_element_t<0, tuple<Args...>>, int>) {
emitter1.tick(key, forward<Args>(args)...);
}
else if constexpr (sizeof...(Args) == 1 && is_same_v<tuple_element_t<0, tuple<Args...>>, string>) {
emitter2.tick(key, forward<Args>(args)...);
}
else {
static_assert(sizeof...(Args) == 0, "Unsupported number or types of arguments");
}
}
};
ClassWithTriggers mt;
mt.on<int>("int", function<void(int)>([&](int i) {
cout << "Emit int " << i << endl;
}));
mt.on<string>("string", function<void(string)>([&](string s) {
cout << "Emit string " << s << endl;
}));
mt.tick("int", 5);
mt.tick("string", string("Hello world"));
```
Asynchronous file IO Asynchronous file IO
```c++ ```c++
@ -279,7 +404,7 @@ fs::write("test1.txt", "Hello world", [] (exception* error) {
auto future_data = fs::read("test.txt"); auto future_data = fs::read("test.txt");
try { try {
string data = wait(future_data); string data = await_(future_data);
} catch (exception& err) { } catch (exception& err) {
cout << err.what() << endl; cout << err.what() << endl;
} }
@ -287,7 +412,7 @@ try {
auto future_status = fs::write("test.txt", "Hello world"); auto future_status = fs::write("test.txt", "Hello world");
try { try {
wait(future_status); await_(future_status);
} catch (exception& err) { } catch (exception& err) {
cout << err.what() << endl; cout << err.what() << endl;
} }

View File

@ -1,7 +1,7 @@
#ifndef _ASYNCO_ #ifndef _ASYNCO_
#define _ASYNCO_ #define _ASYNCO_
#include <boost/asio.hpp> #include "engine.hpp"
#include <iostream> #include <iostream>
using namespace std; using namespace std;
@ -9,57 +9,11 @@ using namespace std;
namespace marcelb { namespace marcelb {
namespace asynco { namespace asynco {
#define HW_CONCURRENCY_MINIMAL 4
/**
* Internal anonymous class for initializing the ASIO context and thread pool
* !!! It is anonymous to protect against use in the initialization of other objects of the same type !!!
*/
class {
public:
boost::asio::io_context io_context;
void run() {
for (auto& runner : runners) {
runner.join();
}
}
private:
unique_ptr<boost::asio::io_service::work> work { [&] () {
return new boost::asio::io_service::work(io_context);
} ()};
vector<thread> runners { [&] () {
vector<thread> _runs;
unsigned int num_of_runners;
#ifdef NUM_OF_RUNNERS
num_of_runners = NUM_OF_RUNNERS;
#else
num_of_runners = thread::hardware_concurrency();
if (num_of_runners < HW_CONCURRENCY_MINIMAL) {
num_of_runners = HW_CONCURRENCY_MINIMAL;
}
#endif
for (int i=0; i<num_of_runners; i++) {
_runs.push_back(thread ( [this] () {
io_context.run();
}));
}
return _runs;
} ()};
} _asynco_engine;
/** /**
* Run the function asynchronously * Run the function asynchronously
*/ */
template<class F, class... Args> template<class F, class... Args>
auto atask(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type> { auto async_(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type> {
using return_type = typename result_of<F(Args...)>::type; using return_type = typename result_of<F(Args...)>::type;
future<return_type> res = _asynco_engine.io_context.post(boost::asio::use_future(bind(forward<F>(f), forward<Args>(args)...))); future<return_type> res = _asynco_engine.io_context.post(boost::asio::use_future(bind(forward<F>(f), forward<Args>(args)...)));
return res; return res;
@ -69,7 +23,7 @@ auto atask(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type
* Block until the asynchronous call completes * Block until the asynchronous call completes
*/ */
template<typename T> template<typename T>
T wait(future<T>& r) { T await_(future<T>& r) {
return r.get(); return r.get();
} }
@ -77,7 +31,29 @@ T wait(future<T>& r) {
* Block until the asynchronous call completes * Block until the asynchronous call completes
*/ */
template<typename T> template<typename T>
T wait(future<T>&& r) { T await_(future<T>&& r) {
return move(r).get();
}
/**
* Block until the asynchronous call completes or time expired
*/
template<typename T>
T await_(future<T>& r, uint64_t time) {
if (r.wait_for(chrono::milliseconds(time)) == std::future_status::timeout) {
throw runtime_error("Asynchronous execution timed out");
}
return r.get();
}
/**
* Block until the asynchronous call completes or time expired
*/
template<typename T>
T await_(future<T>&& r, uint64_t time) {
if (r.wait_for(chrono::milliseconds(time)) == std::future_status::timeout) {
throw runtime_error("Asynchronous execution timed out");
}
return move(r).get(); return move(r).get();
} }

19
lib/define.hpp Normal file
View File

@ -0,0 +1,19 @@
#ifndef _ASYNCO_DEFINE_
#define _ASYNCO_DEFINE_
namespace marcelb {
namespace asynco {
/**
* Alternative names of functions - mostly for the sake of more beautiful coloring of the code
*/
#define async_ marcelb::asynco::async_
#define await_ marcelb::asynco::await_
}
}
#endif

71
lib/engine.hpp Normal file
View File

@ -0,0 +1,71 @@
#ifndef _ASYNCO_ENGINE_
#define _ASYNCO_ENGINE_
#include <vector>
#include <memory>
using namespace std;
#include <boost/asio.hpp>
namespace marcelb {
namespace asynco {
#define HW_CONCURRENCY_MINIMAL 4
/**
* Internal anonymous class for initializing the ASIO context and thread pool
* !!! It is anonymous to protect against use in the initialization of other objects of the same type !!!
*/
class Engine {
public:
boost::asio::io_context io_context;
void run() {
for (auto& runner : runners) {
runner.join();
}
}
private:
unique_ptr<boost::asio::io_service::work> work { [&] () {
return new boost::asio::io_service::work(io_context);
} ()};
vector<thread> runners { [&] () {
vector<thread> _runs;
unsigned int num_of_runners;
#ifdef NUM_OF_RUNNERS
num_of_runners = NUM_OF_RUNNERS;
#else
num_of_runners = thread::hardware_concurrency();
if (num_of_runners < HW_CONCURRENCY_MINIMAL) {
num_of_runners = HW_CONCURRENCY_MINIMAL;
}
#endif
for (int i=0; i<num_of_runners; i++) {
_runs.push_back(thread ( [this] () {
io_context.run();
}));
}
return _runs;
} ()};
};
extern Engine _asynco_engine;
}
}
#endif

View File

@ -19,7 +19,7 @@ namespace fs {
*/ */
template<typename Callback> template<typename Callback>
void read(string path, Callback&& callback) { void read(string path, Callback&& callback) {
atask( [&path, callback] () { asynco::async_( [&path, callback] () {
string content; string content;
try { try {
string line; string line;
@ -48,7 +48,7 @@ void read(string path, Callback&& callback) {
* Asynchronous file reading * Asynchronous file reading
*/ */
future<string> read(string path) { future<string> read(string path) {
return atask( [&path] () { return asynco::async_( [&path] () {
string content; string content;
string line; string line;
ifstream file (path); ifstream file (path);
@ -72,7 +72,7 @@ future<string> read(string path) {
*/ */
template<typename Callback> template<typename Callback>
void write(string path, string content, Callback&& callback) { void write(string path, string content, Callback&& callback) {
atask( [&path, &content, callback] () { asynco::async_( [&path, &content, callback] () {
try { try {
ofstream file (path); ofstream file (path);
if (file.is_open()) { if (file.is_open()) {
@ -95,7 +95,7 @@ void write(string path, string content, Callback&& callback) {
* Asynchronous file writing with callback after write complete * Asynchronous file writing with callback after write complete
*/ */
future<void> write(string path, string content) { future<void> write(string path, string content) {
return atask( [&path, &content] () { return asynco::async_( [&path, &content] () {
ofstream file (path); ofstream file (path);
if (file.is_open()) { if (file.is_open()) {
file << content; file << content;

View File

@ -1,12 +1,10 @@
#ifndef _TIMERS_ #ifndef _ASYNCO_TIMERS_
#define _TIMERS_ #define _ASYNCO_TIMERS_
#include <chrono>
using namespace std;
#include "asynco.hpp" #include "asynco.hpp"
#include <chrono>
using namespace std;
using namespace marcelb;
using namespace asynco;
namespace marcelb { namespace marcelb {
namespace asynco { namespace asynco {
@ -15,21 +13,13 @@ namespace asynco {
* Get the time in ms from the epoch * Get the time in ms from the epoch
*/ */
int64_t rtime_ms() { int64_t rtime_ms();
return chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
/** /**
* Get the time in us from the epoch * Get the time in us from the epoch
*/ */
int64_t rtime_us() { int64_t rtime_us();
return chrono::duration_cast<chrono::microseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
/** /**
* Core timer class for construct time async functions * Core timer class for construct time async functions
@ -45,71 +35,40 @@ class timer {
/** /**
* A method to assign a callback wrapper and a reinitialization algorithm * A method to assign a callback wrapper and a reinitialization algorithm
*/ */
void init() { void init();
st.async_wait( [this] (const boost::system::error_code&) {
if (!_stop) {
callback();
if (repeate) {
st = boost::asio::steady_timer(_asynco_engine.io_context, boost::asio::chrono::milliseconds(time));
init();
}
_ticks++;
}
});
}
public: public:
/** /**
* The constructor creates the steady_timer and accompanying variables and runs a method to initialize the timer * The constructor creates the steady_timer and accompanying variables and runs a method to initialize the timer
*/ */
timer (function<void()> _callback, uint64_t _time, bool _repeate) : timer (function<void()> _callback, uint64_t _time, bool _repeate);
st(_asynco_engine.io_context, boost::asio::chrono::milliseconds(_time)),
_stop(false),
repeate(_repeate),
callback(_callback),
time(_time) {
init();
}
/** /**
* Stop timer * Stop timer
* The stop flag is set and timer remove it from the queue * The stop flag is set and timer remove it from the queue
*/ */
void stop() { void stop();
_stop = true;
st.cancel();
}
/** /**
* Run callback now * Run callback now
* Forces the callback function to run independently of the timer * Forces the callback function to run independently of the timer
*/ */
void now() { void now();
st.cancel();
}
/** /**
* Get the number of times the timer callback was runned * Get the number of times the timer callback was runned
*/ */
uint64_t ticks() { uint64_t ticks();
return _ticks;
}
/** /**
* The logic status of the timer stop state * The logic status of the timer stop state
*/ */
bool stoped() { bool stoped();
return _stop;
}
/** /**
* The destructor stops the timer * The destructor stops the timer
*/ */
~timer() { ~timer();
stop();
}
}; };
/** /**
@ -119,49 +78,37 @@ class periodic {
shared_ptr<timer> _timer; shared_ptr<timer> _timer;
public: public:
/** /**
* Constructor initializes a shared pointer of type timer * Constructor initializes a shared pointer of type timer
*/ */
periodic(function<void()> callback, uint64_t time) : periodic(function<void()> callback, uint64_t time);
_timer(make_shared<timer> (callback, time, true)) {
}
/** /**
* Stop periodic * Stop periodic
* The stop flag is set and periodic remove it from the queue * The stop flag is set and periodic remove it from the queue
*/ */
void stop() { void stop();
_timer->stop();
}
/** /**
* Run callback now * Run callback now
* Forces the callback function to run independently of the periodic * Forces the callback function to run independently of the periodic
*/ */
void now() { void now();
_timer->now();
}
/** /**
* Get the number of times the periodic callback was runned * Get the number of times the periodic callback was runned
*/ */
uint64_t ticks() { uint64_t ticks();
return _timer->ticks();
}
/** /**
* The logic status of the periodic stop state * The logic status of the periodic stop state
*/ */
bool stoped() { bool stoped();
return _timer->stoped();
}
/** /**
* The destructor stops the periodic * The destructor stops the periodic
*/ */
~periodic() { ~periodic();
stop();
}
}; };
/** /**
@ -171,52 +118,43 @@ class delayed {
shared_ptr<timer> _timer; shared_ptr<timer> _timer;
public: public:
/** /**
* Constructor initializes a shared pointer of type timer * Constructor initializes a shared pointer of type timer
*/ */
delayed(function<void()> callback, uint64_t time) : delayed(function<void()> callback, uint64_t time);
_timer(make_shared<timer> (callback, time, false)) {
}
/** /**
* Stop delayed * Stop delayed
* The stop flag is set and delayed remove it from the queue * The stop flag is set and delayed remove it from the queue
*/ */
void stop() { void stop();
_timer->stop();
}
/** /**
* Run callback now * Run callback now
* Forces the callback function to run independently of the delayed * Forces the callback function to run independently of the delayed
*/ */
void now() { void now();
_timer->now();
}
/** /**
* Get is the delayed callback runned * Get is the delayed callback runned
*/ */
bool expired() { bool expired();
return bool(_timer->ticks());
}
/** /**
* The logic status of the delayed stop state * The logic status of the delayed stop state
*/ */
bool stoped() { bool stoped();
return _timer->stoped();
}
/** /**
* The destructor stops the delayed * The destructor stops the delayed
*/ */
~delayed() { ~delayed();
stop();
}
}; };
shared_ptr<periodic> Periodic(function<void()> callback, uint64_t time);
shared_ptr<delayed> Delayed(function<void()> callback, uint64_t time);
} }
} }

View File

@ -1,5 +1,5 @@
#ifndef _EVENT_ #ifndef _ASYNCO_TRIGGER_
#define _EVENT_ #define _ASYNCO_TRIGGER_
#include <map> #include <map>
#include <vector> #include <vector>
@ -8,20 +8,20 @@
using namespace std; using namespace std;
#include "asynco.hpp" #include "engine.hpp"
namespace marcelb { namespace marcelb {
namespace asynco { namespace asynco {
namespace events { namespace triggers {
/** /**
* Event class, for event-driven programming. * trigger class, for event-driven programming.
* These events are typed according to the arguments of the callback function * These events are typed according to the arguments of the callback function
*/ */
template<typename... T> template<typename... T>
class event { class trigger {
private: private:
mutex m_eve; mutex m_eve;
unordered_map<string, vector<function<void(T...)>>> events; unordered_map<string, vector<function<void(T...)>>> triggers;
public: public:
@ -30,45 +30,45 @@ class event {
*/ */
void on(const string& key, function<void(T...)> callback) { void on(const string& key, function<void(T...)> callback) {
lock_guard _off(m_eve); lock_guard _off(m_eve);
events[key].push_back(callback); triggers[key].push_back(callback);
} }
/** /**
* It emits an event and sends a callback function saved according to the key with the passed parameters * It emits an event and sends a callback function saved according to the key with the passed parameters
*/ */
template<typename... Args> template<typename... Args>
void emit(const string& key, Args... args) { void tick(const string& key, Args... args) {
auto it_eve = events.find(key); auto it_eve = triggers.find(key);
if (it_eve != events.end()) { if (it_eve != triggers.end()) {
for (uint i =0; i<it_eve->second.size(); i++) { for (uint i =0; i<it_eve->second.size(); i++) {
auto callback = bind(it_eve->second[i], forward<Args>(args)...); auto callback = bind(it_eve->second[i], forward<Args>(args)...);
atask(callback); asynco::async_(callback);
} }
} }
} }
/** /**
* Remove an event listener from an event * Remove an trigger listener from an event
*/ */
void off(const string& key) { void off(const string& key) {
lock_guard _off(m_eve); lock_guard _off(m_eve);
events.erase(key); triggers.erase(key);
} }
/** /**
* Remove all event listener * Remove all trigger listener
*/ */
void off() { void off() {
lock_guard _off(m_eve); lock_guard _off(m_eve);
events.clear(); triggers.clear();
} }
/** /**
* Get num of listeners by an event key * Get num of listeners by an trigger key
*/ */
unsigned int listeners(const string& key) { unsigned int listeners(const string& key) {
return events[key].size(); return triggers[key].size();
} }
/** /**
@ -76,7 +76,7 @@ class event {
*/ */
unsigned int listeners() { unsigned int listeners() {
unsigned int listeners = 0; unsigned int listeners = 0;
for (auto& ev : events) { for (auto& ev : triggers) {
listeners += ev.second.size(); listeners += ev.second.size();
} }
return listeners; return listeners;

7
src/engine.cpp Normal file
View File

@ -0,0 +1,7 @@
#include "../lib/engine.hpp"
namespace marcelb::asynco {
Engine _asynco_engine;
};

145
src/timers.cpp Normal file
View File

@ -0,0 +1,145 @@
#include "../lib/timers.hpp"
namespace marcelb::asynco {
int64_t rtime_ms() {
return chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
int64_t rtime_us() {
return chrono::duration_cast<chrono::microseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
void timer::init() {
st.async_wait( [this] (const boost::system::error_code&) {
if (!_stop) {
callback();
if (repeate) {
st = boost::asio::steady_timer(_asynco_engine.io_context, boost::asio::chrono::milliseconds(time));
init();
}
_ticks++;
}
});
}
timer::timer (function<void()> _callback, uint64_t _time, bool _repeate) :
st(_asynco_engine.io_context, boost::asio::chrono::milliseconds(_time)),
_stop(false),
repeate(_repeate),
callback(_callback),
time(_time) {
init();
}
void timer::stop() {
_stop = true;
st.cancel();
}
void timer::now() {
st.cancel();
}
uint64_t timer::ticks() {
return _ticks;
}
bool timer::stoped() {
return _stop;
}
timer::~timer() {
stop();
}
periodic::periodic(function<void()> callback, uint64_t time) :
_timer(make_shared<timer> (callback, time, true)) {
}
void periodic::stop() {
_timer->stop();
}
void periodic::now() {
_timer->now();
}
uint64_t periodic::ticks() {
return _timer->ticks();
}
bool periodic::stoped() {
return _timer->stoped();
}
periodic::~periodic() {
stop();
}
delayed::delayed(function<void()> callback, uint64_t time) :
_timer(make_shared<timer> (callback, time, false)) {
}
void delayed::stop() {
_timer->stop();
}
void delayed::now() {
_timer->now();
}
bool delayed::expired() {
return bool(_timer->ticks());
}
bool delayed::stoped() {
return _timer->stoped();
}
delayed::~delayed() {
stop();
}
mutex p_io, d_io;
vector<shared_ptr<periodic>> periodic_calls_container;
vector<shared_ptr<delayed>> delayed_calls_container;
shared_ptr<periodic> Periodic(function<void()> callback, uint64_t time) {
shared_ptr<periodic> periodic_ptr(make_shared<periodic>(callback, time));
async_ ( [&, periodic_ptr](){
lock_guard<mutex> lock(p_io);
periodic_calls_container.push_back(periodic_ptr);
for (uint32_t i=0; i<periodic_calls_container.size(); i++) {
if (periodic_calls_container[i]->stoped()) {
periodic_calls_container.erase(periodic_calls_container.begin()+i);
i--;
}
}
});
return periodic_ptr;
}
shared_ptr<delayed> Delayed(function<void()> callback, uint64_t time) {
shared_ptr<delayed> delayed_ptr(make_shared<delayed>(callback, time));
async_ ( [&, delayed_ptr](){
lock_guard<mutex> lock(p_io);
delayed_calls_container.push_back(delayed_ptr);
for (uint32_t i=0; i<delayed_calls_container.size(); i++) {
if (delayed_calls_container[i]->stoped() || delayed_calls_container[i]->expired()) {
delayed_calls_container.erase(delayed_calls_container.begin()+i);
i--;
}
}
});
return delayed_ptr;
}
};

4
test/CMakeLists.txt Normal file
View File

@ -0,0 +1,4 @@
add_executable(asynco_test main.cpp)
# Linkaj test sa Asynco bibliotekom
target_link_libraries(asynco_test asynco Boost::system)

View File

@ -1,21 +1,25 @@
// // #define NUM_OF_RUNNERS 2 #define NUM_OF_RUNNERS 4
#include "../lib/asynco.hpp" #include "asynco.hpp"
#include "../lib/event.hpp" #include "trigger.hpp"
#include "../lib/filesystem.hpp" #include "filesystem.hpp"
#include "../lib/timers.hpp" #include "timers.hpp"
#include "define.hpp"
using namespace marcelb::asynco; using namespace marcelb::asynco;
using namespace events; using namespace triggers;
#include <iostream> #include <iostream>
#include <unistd.h> #include <unistd.h>
#include <thread> #include <thread>
#include <future>
#include <vector>
using namespace std; using namespace std;
using namespace this_thread; using namespace this_thread;
void sleep_to (int _time) { void sleep_to (int _time) {
promise<void> _promise; promise<void> _promise;
delayed t( [&]() { delayed t( [&]() {
@ -53,9 +57,40 @@ class clm {
// ------------------ EXTEND OWN CLASS WITH EVENTS ------------------- // ------------------ EXTEND OWN CLASS WITH EVENTS -------------------
class myOwnClass : public event<int> { class myOwnClass : public trigger<int> {
public: public:
myOwnClass() : event() {}; myOwnClass() : trigger() {};
};
// ----------------- MULTIPLE TRIGGERS IN ONE CLASS ------------------
class ClassWithTriggers {
trigger<int> emitter1;
trigger<string> emitter2;
public:
template<typename... T>
void on(const string& key, function<void(T...)> callback) {
if constexpr (sizeof...(T) == 1 && is_same_v<tuple_element_t<0, tuple<T...>>, int>) {
emitter1.on(key, callback);
}
else if constexpr (sizeof...(T) == 1 && is_same_v<tuple_element_t<0, tuple<T...>>, string>) {
emitter2.on(key, callback);
}
}
template <typename... Args>
void tick(const string& key, Args&&... args) {
if constexpr (sizeof...(Args) == 1 && is_same_v<tuple_element_t<0, tuple<Args...>>, int>) {
emitter1.tick(key, forward<Args>(args)...);
}
else if constexpr (sizeof...(Args) == 1 && is_same_v<tuple_element_t<0, tuple<Args...>>, string>) {
emitter2.tick(key, forward<Args>(args)...);
}
else {
static_assert(sizeof...(Args) == 0, "Unsupported number or types of arguments");
}
}
}; };
@ -65,9 +100,9 @@ int main () {
// --------------- TIME ASYNCHRONOUS FUNCTIONS -------------- // --------------- TIME ASYNCHRONOUS FUNCTIONS --------------
// /** /**
// * Init periodic and delayed; clear periodic and delayed * Init periodic and delayed; clear periodic and delayed
// */ */
// periodic inter1 ([&]() { // periodic inter1 ([&]() {
// cout << "periodic prvi " << rtime_ms() - start << endl; // cout << "periodic prvi " << rtime_ms() - start << endl;
@ -125,15 +160,34 @@ int main () {
// cout << "nije isteko " << endl; // cout << "nije isteko " << endl;
// } // }
// // // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS ------------------------- // auto d = Delayed( [](){
// cout << "Delayed" << endl;
// }, 2000);
// /** // auto p = Periodic( [](){
// * Run an function asyncronic // cout << "Periodic" << endl;
// */ // }, 700);
// atask( []() { // Periodic( [&] (){
// cout << "Delayed expire " << d->expired() << endl;
// cout << "Periodic ticks " << p->ticks() << endl;
// cout << "Delayed stoped " << d->stoped() << endl;
// cout << "Periodic stoped " << p->stoped() << endl;
// }, 1000);
// Delayed( [&](){
// p->stop();
// }, 10000);
// // // // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS -------------------------
// // /**
// // * Run an function asyncronic
// // */
// async_ ( []() {
// sleep_for(2s); // only for simulate log duration function // sleep_for(2s); // only for simulate log duration function
// cout << "atask 1" << endl; // cout << "asynco 1" << endl;
// return 5; // return 5;
// }); // });
@ -141,46 +195,51 @@ int main () {
// * Call not lambda function // * Call not lambda function
// */ // */
// atask (notLambdaFunction); // async_ (notLambdaFunction);
// wait ( // await_ (
// atask ( // async_ (
// notLambdaFunction // notLambdaFunction
// ) // )
// ); // );
// /**
// * Call class method // // async(launch::async, [] () {
// */ // // cout << "Another thread in async style!" << endl;
// // });
// // /**
// // * Call class method
// // */
// clm classes; // clm classes;
// atask( [&classes] () { // async_ ( [&classes] () {
// classes.classMethode(); // classes.classMethode();
// }); // });
// sleep(5); // sleep(5);
// /** // // /**
// * Wait after runned as async // // * await_ after runned as async
// */ // // */
// auto a = atask( []() { // auto aa = async_ ( []() {
// sleep_for(2s); // only for simulate log duration function // sleep_for(2s); // only for simulate log duration function
// cout << "atask 2" << endl; // cout << "async_ 2" << endl;
// return 5; // return 5;
// }); // });
// cout << wait(a) << endl; // cout << await_(aa) << endl;
// cout << "print after atask 2" << endl; // cout << "print after async_ 2" << endl;
// /** // /**
// * Wait async function call and use i cout // * await_ async function call and use i cout
// */ // */
// cout << wait(atask( [] () { // cout << await_(async_ ( [] () {
// sleep_for(chrono::seconds(1)); // only for simulate log duration function // sleep_for(chrono::seconds(1)); // only for simulate log duration function
// cout << "wait end" << endl; // cout << "await_ end" << endl;
// return 4; // return 4;
// })) << endl; // })) << endl;
@ -209,22 +268,76 @@ int main () {
// */ // */
// atask( [] { // async_ ( [] {
// cout << "idemo ..." << endl; // cout << "idemo ..." << endl;
// atask( [] { // async_ ( [] {
// cout << "ugdnježdena async funkcija " << endl; // cout << "ugdnježdena async funkcija " << endl;
// }); // });
// }); // });
// // -------------------------- AWAIT ALL ----------------------------------
// auto a = async_ ( []() {
// cout << "A" << endl;
// return 3;
// });
// auto b = async_ ( []() {
// cout << "B" << endl;
// throw runtime_error("Test exception");
// return;
// });
// auto c = async_ ( []() {
// cout << "C" << endl;
// return "Hello";
// });
// int a_;
// string c_;
// auto await_all = [&] () {
// a_ = await_(a);
// await_(b);
// c_ = await_(c);
// };
// try {
// await_all();
// cout << "a_ " << a_ << " c_ " << c_ << endl;
// } catch (const exception& exc) {
// cout << exc.what() << endl;
// }
// // // same type
// vector<future<void>> fut_vec;
// for (int i=0; i<5; i++) {
// fut_vec.push_back(
// async_ ( [i]() {
// cout << "Async_ " << i << endl;
// })
// );
// }
// auto await_all2 = [&] () {
// for (int i=0; i<fut_vec.size(); i++) {
// await_ (fut_vec[i]);
// }
// };
// await_all2();
// // --------------- EVENTS ------------------- // // --------------- EVENTS -------------------
// /** // /**
// * initialization of typed events // * initialization of typed events
// */ // */
// event<int, int> ev2int; // trigger<int, int> ev2int;
// event<int, string> evintString; // trigger<int, string> evintString;
// event<> evoid; // trigger<> evoid;
// ev2int.on("sum", [](int a, int b) { // ev2int.on("sum", [](int a, int b) {
// cout << "Sum " << a+b << endl; // cout << "Sum " << a+b << endl;
@ -248,22 +361,22 @@ int main () {
// cout << "Void emited " << emited2 << endl; // cout << "Void emited " << emited2 << endl;
// }); // });
// evoid.emit("void"); // evoid.tick("void");
// sleep(1); // sleep(1);
// /** // /**
// * Emit // * Emit
// */ // */
// ev2int.emit("sum", 5, 8); // ev2int.tick("sum", 5, 8);
// sleep(1); // sleep(1);
// evintString.emit("substract", 3, to_string(2)); // evintString.tick("substract", 3, to_string(2));
// sleep(1); // sleep(1);
// evoid.off("void"); // evoid.off("void");
// evoid.emit("void"); // evoid.tick("void");
// cout << "Ukupno 2 int " << ev2int.listeners() << endl; // cout << "Ukupno 2 int " << ev2int.listeners() << endl;
@ -278,20 +391,38 @@ int main () {
// myOwnClass myclass; // myOwnClass myclass;
// delayed t( [&] { // delayed t( [&] {
// myclass.emit("constructed", 1); // myclass.tick("constructed", 1);
// }, 200); // }, 200);
// myclass.on("constructed", [] (int i) { // myclass.on("constructed", [] (int i) {
// cout << "Constructed " << i << endl; // cout << "Constructed " << i << endl;
// }); // });
// /**
// *
// * Use class with multiple triggers
// *
// */
ClassWithTriggers mt;
mt.on<int>("int", function<void(int)>([&](int i) {
cout << "Emit int " << i << endl;
}));
mt.on<string>("string", function<void(string)>([&](string s) {
cout << "Emit string " << s << endl;
}));
mt.tick("int", 5);
mt.tick("string", string("Hello world"));
// auto status = fs::read("test1.txt"); // auto status = fs::read("test1.txt");
// try { // try {
// auto data = wait(status); // auto data = await_(status);
// cout << data; // cout << data;
// } catch (exception& err) { // } catch (exception& err) {
// cout << err.what() << endl; // cout << err.what() << endl;