commit 03b11840f44bd72c96dfb7861c078565111e2f92 Author: marcelb Date: Thu Feb 29 17:01:55 2024 +0100 AsyncLoop (ThreadPool, task execute loop), interval, timeout, asynco, wait diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json new file mode 100644 index 0000000..4039bef --- /dev/null +++ b/.vscode/c_cpp_properties.json @@ -0,0 +1,16 @@ +{ + "configurations": [ + { + "name": "Linux", + "includePath": [ + "${workspaceFolder}/**" + ], + "defines": [], + "compilerPath": "/usr/bin/gcc", + "cStandard": "c17", + "cppStandard": "gnu++17", + "intelliSenseMode": "linux-gcc-x64" + } + ], + "version": 4 +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..3d201ae --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,27 @@ +{ + "files.associations": { + "iostream": "cpp", + "functional": "cpp", + "thread": "cpp", + "chrono": "cpp", + "ostream": "cpp", + "condition_variable": "cpp", + "array": "cpp", + "atomic": "cpp", + "cwchar": "cpp", + "deque": "cpp", + "unordered_map": "cpp", + "vector": "cpp", + "exception": "cpp", + "initializer_list": "cpp", + "iosfwd": "cpp", + "mutex": "cpp", + "new": "cpp", + "ratio": "cpp", + "stdexcept": "cpp", + "tuple": "cpp", + "type_traits": "cpp", + "utility": "cpp", + "future": "cpp" + } +} \ No newline at end of file diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..2193d8d --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,28 @@ +{ + "tasks": [ + { + "type": "cppbuild", + "label": "C/C++: gcc build active file", + "command": "/usr/bin/g++", + "args": [ + "-fdiagnostics-color=always", + "-g", + "${file}", + "-o", + "${fileDirname}/${fileBasenameNoExtension}" + ], + "options": { + "cwd": "${fileDirname}" + }, + "problemMatcher": [ + "$gcc" + ], + "group": { + "kind": "build", + "isDefault": true + }, + "detail": "Task generated by Debugger." + } + ], + "version": "2.0.0" +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/lib/asynco.hpp b/lib/asynco.hpp new file mode 100644 index 0000000..9b68337 --- /dev/null +++ b/lib/asynco.hpp @@ -0,0 +1,93 @@ +#ifndef _ASYNCO_ +#define _ASYNCO_ + +#include "loop.hpp" + +using namespace std; + +namespace marcelb { + +#ifndef ON_ASYNC +#define ON_ASYNC +AsyncLoop on_async; +#endif + +class interval { + bool run = true; + function callback; + const time_t _duration; + + public: + interval(function func, const time_t duration): callback(func), _duration(duration) { + #ifndef ON_ASYNC + throw string("Not on_async defined!"); + #endif + + auto task = [&] () { + while (run) { + this_thread::sleep_for(chrono::milliseconds(_duration)); + if (run) { + callback(); + } + } + }; + + on_async.put_task(task); + } + + void clear() { + run = false; + } + + ~interval() { + clear(); + } +}; + +class timeout { + bool run = true; + function callback; + const time_t _duration; + + public: + timeout(function f, const time_t duration): callback(f), _duration(duration) { + #ifndef ON_ASYNC + throw string("Not on_async defined!"); + #endif + + auto task = [&] () { + this_thread::sleep_for(chrono::milliseconds(_duration)); + if (run) { + callback(); + } + }; + + on_async.put_task(task); + } + + void clear() { + run = false; + } + + ~timeout() { + clear(); + } +}; + + +template +auto asynco(F&& f, Args&&... args) -> future::type> { + using return_type = typename result_of::type; + + future res = on_async.put_task(bind(forward(f), forward(args)...)); + return res; +} + +template +T wait(future r) { + return r.get(); +} + +} + +#endif \ No newline at end of file diff --git a/lib/event.hpp b/lib/event.hpp new file mode 100644 index 0000000..cfbdd7e --- /dev/null +++ b/lib/event.hpp @@ -0,0 +1,40 @@ +#ifndef _EVENT_ +#define _EVENT_ + +#include +#include +#include +#include + +using namespace std; + +namespace marcelb { + +class event { + map&)>> events; +public: + + template + void on(const string& key, function f) { + // events[key] = [f](Args... args) { + // f(args...); + // }; + } + + template + void emit(const string& key, Args&&... args) { + if (events.find(key) == events.end()) { + cout << "No defined listener for event: " << key << endl; + return; + } + else { + for (auto& func : events[key]) { + func(forward(args)...); + } + } + } +}; + +} + +#endif \ No newline at end of file diff --git a/lib/loop.hpp b/lib/loop.hpp new file mode 100644 index 0000000..3dfd37e --- /dev/null +++ b/lib/loop.hpp @@ -0,0 +1,91 @@ +#ifndef _LOOP_ +#define _LOOP_ + +#include +#include +#include +#include +#include +#include +#include + +using namespace std; + +namespace marcelb { + +#ifdef ON_ASYNC +extern AsyncLoop on_async; +#endif + +class AsyncLoop { + private: + vector workers; + queue> tasks; + mutex q_io; + condition_variable cv; + bool stop; + + public: + AsyncLoop(size_t pool_size = thread::hardware_concurrency()) : stop(false) { + for (size_t i = 0; i < pool_size; ++i) { + workers.emplace_back([this] { + while (true) { + function task; + { + unique_lock lock(q_io); + cv.wait(lock, [this] { return stop || !tasks.empty(); }); + if (stop && tasks.empty()) + return; + task = move(tasks.front()); + tasks.pop(); + } + task(); + } + }); + } + } + + template + auto put_task(F&& f, Args&&... args) + -> future::type> { + using return_type = typename result_of::type; + + auto task = make_shared>(bind(forward(f), forward(args)...)); + future res = task->get_future(); + { + unique_lock lock(q_io); + + if (stop) { + throw runtime_error("Pool is stoped!"); + } + + tasks.emplace([task]() { (*task)(); }); + } + + cv.notify_one(); + return res; + } + + unsigned int count_tasks() { + return tasks.size(); + } + + unsigned int count_threads() { + return workers.size(); + } + + ~AsyncLoop() { + { + unique_lock lock(q_io); + stop = true; + } + cv.notify_all(); + for (thread& worker : workers) + worker.join(); + } + +}; + +} + +#endif \ No newline at end of file diff --git a/src/pool.cpp b/src/pool.cpp new file mode 100644 index 0000000..e69de29 diff --git a/test/test b/test/test new file mode 100755 index 0000000..eef6b04 Binary files /dev/null and b/test/test differ diff --git a/test/test.cpp b/test/test.cpp new file mode 100644 index 0000000..04fbf06 --- /dev/null +++ b/test/test.cpp @@ -0,0 +1,104 @@ + +#include "../lib/loop.hpp" +#include "../lib/asynco.hpp" +// #include "../lib/event.hpp" +#include +#include + +using namespace std; +using namespace marcelb; +using namespace this_thread; + +#ifndef ON_ASYNC +#define ON_ASYNC +AsyncLoop on_async; +#endif + +int main () { + + auto start = chrono::high_resolution_clock::now(); + + interval inter1 ([&]() { + cout << "interval prvi " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; + }, 1000); + + cout << "Blokira stoka" << endl; + + interval inter2 ([&]() { + cout << "interval drugi " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; + }, 2000); + + interval inter3 ([&]() { + cout << "interval treći " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; + }, 3000); + + timeout time1 ( [&] () { + cout << "Close interval 1 i 2 " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; + inter1.clear(); + inter2.clear(); + }, 10000); + + timeout time2 ([&] () { + cout << "Close interval 3 " << chrono::duration_cast(chrono::high_resolution_clock::now() - start).count() << endl; + inter3.clear(); + }, 20000); + + + // for (int i = 0; i < 8; ++i) { + // pool.put_task( [&] (int id) { + // this_thread::sleep_for(chrono::seconds(1)); + // cout << a*i << endl; + // }, i); + // } + + // auto res1 = pool.put_task( [] () { + // cout << "Jebiga " <