diff --git a/src/pool.cpp b/.gitignore similarity index 100% rename from src/pool.cpp rename to .gitignore diff --git a/.vscode/settings.json b/.vscode/settings.json index 3d201ae..7d75b1d 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -22,6 +22,9 @@ "tuple": "cpp", "type_traits": "cpp", "utility": "cpp", - "future": "cpp" + "future": "cpp", + "*.ipp": "cpp", + "bitset": "cpp", + "algorithm": "cpp" } } \ No newline at end of file diff --git a/example/asynco.hpp b/example/asynco.hpp new file mode 100644 index 0000000..7803997 --- /dev/null +++ b/example/asynco.hpp @@ -0,0 +1,164 @@ +#ifndef _ASYNCO_ +#define _ASYNCO_ + +#include +#include +#include +#include + +using namespace std; + +namespace marcelb { + +class interval; +class timeout; + +class loop_core { + vector intervals; + vector timeouts; + time_t sampling; + mutex i_m, t_m; + future bot; + + loop_core() { + bot = async(launch::async, [this] () { + loop(); + }); + // on_async.put_task( [this] () { + // loop(); + // }); + } + + void run(interval& _interval) { + lock_guard lock(i_m); + intervals.push_back(_interval); + update_sampling(); + } + + void run(timeout& _timeout) { + lock_guard lock(t_m); + timeouts.push_back(_timeout); + update_sampling(); + } + + void loop() { + while (true) { + for (auto& _interval : intervals) { + int64_t now = chrono::duration_cast(chrono::system_clock::now().time_since_epoch()).count(); + if (now - _interval.execute >= _interval._duration) { + _interval.callback(); + _interval.execute = now; + } + } + + for (int i=0; i(chrono::system_clock::now().time_since_epoch()).count(); + if (now - timeouts[i]._construct >= timeouts[i]._duration) { + auto& _timeout = timeouts[i]; + { + lock_guard lock(t_m); + timeouts.erase(timeouts.begin() + i); + } + _timeout.callback(); + } + } + this_thread::sleep_for(chrono::milliseconds(sampling)); + } + } + + void update_sampling() { + sampling = 0; + for (auto& _interval : intervals) { + sampling += _interval._duration; + } + for (auto& _timeout : timeouts) { + sampling += _timeout._duration; + } + sampling /= (intervals.size() + timeouts.size())*2; + } + + +}; + +loop_core co_loop; + + + + +class interval { + public: + bool run = true; + function callback; + const time_t _duration; + time_t execute = 0; + + // public: + interval(function func, const time_t duration): callback(func), _duration(duration) { + #ifndef ON_ASYNC + throw string("Not on_async defined!"); + #endif + + auto task = [&] () { + while (run) { + // this_thread::sleep_for(chrono::milliseconds(_duration)); + // if (run) { + callback(); + // } + } + }; + + // on_async.put_task(task); + co_loop.run(this*); + } + + void clear() { + run = false; + } + + ~interval() { + clear(); + } +}; + +class timeout { + public: + bool run = true; + function callback; + const time_t _duration; + int64_t _construct = + chrono::duration_cast(chrono::system_clock::now().time_since_epoch()) + .count(); + + // public: + timeout(function f, const time_t duration): callback(f), _duration(duration) { + #ifndef ON_ASYNC + throw string("Not on_async defined!"); + #endif + + auto task = [&] () { + // int64_t _start = + // chrono::duration_cast(chrono::system_clock::now().time_since_epoch()) + // .count(); + // this_thread::sleep_for(chrono::milliseconds(_duration - (_start - _construct))); + if (run) { + callback(); + } + }; + + // on_async.put_task(task); + co_loop.run(this*); + } + + void clear() { + run = false; + } + + ~timeout() { + clear(); + } +}; + + +} + +#endif \ No newline at end of file diff --git a/lib/asynco.hpp b/lib/asynco.hpp index 9b68337..1c67552 100644 --- a/lib/asynco.hpp +++ b/lib/asynco.hpp @@ -12,27 +12,101 @@ namespace marcelb { AsyncLoop on_async; #endif +class interval; +class timeout; + +class loop_core { + static vector intervals; + static vector timeouts; + time_t sampling; + mutex i_m, t_m; + + loop_core() { + on_async.put_task( [this] () { + loop(); + }); + } + + void run(interval& _interval) { + lock_guard lock(i_m); + intervals.push_back(_interval); + update_sampling(); + } + + void run(timeout& _timeout) { + lock_guard lock(t_m); + timeouts.push_back(_timeout); + update_sampling(); + } + + void loop() { + while (true) { + for (auto& _interval : intervals) { + int64_t now = chrono::duration_cast(chrono::system_clock::now().time_since_epoch()).count(); + if (now - _interval.execute >= _interval._duration) { + _interval.callback(); + _interval.execute = now; + } + } + + for (int i=0; i(chrono::system_clock::now().time_since_epoch()).count(); + if (now - timeouts[i]._construct >= timeouts[i]._duration) { + auto& _timeout = timeouts[i]; + { + lock_guard lock(t_m); + timeouts.erase(timeouts.begin() + i); + } + _timeout.callback(); + } + } + sleep_for(chrono::milliseconds(sampling)); + } + } + + void update_sampling() { + sampling = 0; + for (auto& _interval : intervals) { + sampling += _interval._duration; + } + for (auto& _timeout : timeouts) { + sampling += _timeout._duration; + } + sampling /= (intervals.size() + timeouts.size())*2; + } + + +}; + +loop_core co_loop; + + + + class interval { + public: bool run = true; function callback; const time_t _duration; + time_t execute = 0; - public: + // public: interval(function func, const time_t duration): callback(func), _duration(duration) { #ifndef ON_ASYNC - throw string("Not on_async defined!"); + throw string("Not on_async defined!"); #endif auto task = [&] () { while (run) { - this_thread::sleep_for(chrono::milliseconds(_duration)); - if (run) { + // this_thread::sleep_for(chrono::milliseconds(_duration)); + // if (run) { callback(); - } + // } } }; - on_async.put_task(task); + // on_async.put_task(task); + co_loop.run(this*); } void clear() { @@ -45,24 +119,32 @@ class interval { }; class timeout { + public: bool run = true; function callback; const time_t _duration; + int64_t _construct = + chrono::duration_cast(chrono::system_clock::now().time_since_epoch()) + .count(); - public: + // public: timeout(function f, const time_t duration): callback(f), _duration(duration) { #ifndef ON_ASYNC - throw string("Not on_async defined!"); + throw string("Not on_async defined!"); #endif auto task = [&] () { - this_thread::sleep_for(chrono::milliseconds(_duration)); + // int64_t _start = + // chrono::duration_cast(chrono::system_clock::now().time_since_epoch()) + // .count(); + // this_thread::sleep_for(chrono::milliseconds(_duration - (_start - _construct))); if (run) { callback(); } }; - on_async.put_task(task); + // on_async.put_task(task); + co_loop.run(this*); } void clear() { diff --git a/lib/event.hpp b/lib/event.hpp index cfbdd7e..3c774c2 100644 --- a/lib/event.hpp +++ b/lib/event.hpp @@ -5,34 +5,38 @@ #include #include #include +#include "loop.hpp" using namespace std; namespace marcelb { +#ifndef ON_ASYNC +#define ON_ASYNC +AsyncLoop on_async; +#endif + + +template class event { - map&)>> events; -public: - - template - void on(const string& key, function f) { - // events[key] = [f](Args... args) { - // f(args...); - // }; + private: + unordered_map> events; + + public: + void on(const string& key, function callback) { + events[key] = callback; } - template - void emit(const string& key, Args&&... args) { - if (events.find(key) == events.end()) { - cout << "No defined listener for event: " << key << endl; - return; - } - else { - for (auto& func : events[key]) { - func(forward(args)...); - } + template + void emit(const string& key, Args... args) { + auto it = events.find(key); + if (it != events.end()) { + auto callback = bind(it->second, forward(args)...); + on_async.put_task(callback); } } + + }; } diff --git a/lib/loop.hpp b/lib/loop.hpp index 3dfd37e..3a051cc 100644 --- a/lib/loop.hpp +++ b/lib/loop.hpp @@ -80,8 +80,9 @@ class AsyncLoop { stop = true; } cv.notify_all(); - for (thread& worker : workers) + for (thread& worker : workers) { worker.join(); + } } }; diff --git a/test/test b/test/test index eef6b04..ff1ea53 100755 Binary files a/test/test and b/test/test differ diff --git a/test/test.cpp b/test/test.cpp index 04fbf06..f48ca4e 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -1,7 +1,8 @@ -#include "../lib/loop.hpp" -#include "../lib/asynco.hpp" -// #include "../lib/event.hpp" +// #include "../lib/loop.hpp" +// #include "../lib/asynco.hpp" +// #include "../example/asynco.hpp" +#include "../lib/event.hpp" #include #include @@ -11,37 +12,37 @@ using namespace this_thread; #ifndef ON_ASYNC #define ON_ASYNC -AsyncLoop on_async; +AsyncLoop on_async(8); #endif int main () { - auto start = chrono::high_resolution_clock::now(); + // auto start = chrono::high_resolution_clock::now(); - interval inter1 ([&]() { - cout << "interval prvi " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; - }, 1000); + // interval inter1 ([&]() { + // cout << "interval prvi " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; + // }, 1000); - cout << "Blokira stoka" << endl; + // interval inter2 ([&]() { + // cout << "interval drugi " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; + // }, 2000); - interval inter2 ([&]() { - cout << "interval drugi " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; - }, 2000); + // interval inter3 ([&]() { + // cout << "interval treći " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; + // }, 3000); - interval inter3 ([&]() { - cout << "interval treći " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; - }, 3000); + // timeout time1 ( [&] () { + // cout << "Close interval 1 i 2 " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; + // inter1.clear(); + // inter2.clear(); + // }, 10000); - timeout time1 ( [&] () { - cout << "Close interval 1 i 2 " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; - inter1.clear(); - inter2.clear(); - }, 10000); + // timeout time2 ([&] () { + // cout << "Close interval 3 " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; + // inter3.clear(); + // }, 2000); - timeout time2 ([&] () { - cout << "Close interval 3 " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; - inter3.clear(); - }, 20000); + // cout << "zadataka: " << on_async.count_tasks() << " niti: " << on_async.count_threads() << endl; // for (int i = 0; i < 8; ++i) { @@ -87,15 +88,24 @@ int main () { // cout << wait(run1) << endl; - // event dog; + // event ev2int; + // event evintString; - // dog.on("roge", [](int a, int b) { - // cout << "Rogeee" << a << b << endl; + // ev2int.on("sum", [](int a, int b) { + // cout << "Sum " << a+b << endl; + // }); + + // evintString.on("substract", [](int a, string b) { + // cout << "Substract " << a-stoi(b) << endl; // }); // sleep(5); - // dog.emit("roge", 5, 8); + // ev2int.emit("sum", 5, 8); + + // sleep(2); + // evintString.emit("substract", 3, to_string(2)); + sleep(1000);