diff --git a/README.md b/README.md index f16ce8c..5d69ce2 100644 --- a/README.md +++ b/README.md @@ -8,12 +8,12 @@ A C++ library for event-driven asynchronous multi-threaded programming. - Object oriented - Small and easy to integrate - Header only -- Asynchronous launch functions -- Multithread parallel execution of tasks -- Timer functions: interval, timeout -- Events (on, emit) -- Event loop - +- Asynchronous programming +- Multithread +- Asynchronous timer functions: interval, timeout +- Typed events (on, emit) +- Event loops +- Parallel execution loops ## Installation Just download the latest release and unzip it into your project. @@ -37,7 +37,7 @@ Time asynchronous functions ```c++ // start interval -interval inter1 ([&]() { +interval inter1 ([]() { cout << "Interval 1" << endl; }, 1000); @@ -45,7 +45,7 @@ interval inter1 ([&]() { inter1.clear(); // start timeout -timeout time1 ( [&] () { +timeout time1 ( [] () { cout << "Timeout 1 " << endl; }, 10000); @@ -60,8 +60,8 @@ Make functions asynchronous */ auto res1 = on_async.put_task( [] () { - cout << "Jebiga " < lock(te_m); tevents.erase(tevents.begin()+position); + update_sampling(); } /** * Updates the idle time of the loop, according to twice the frequency of available events */ void update_sampling() { + if (tevents.empty()) { + sampling = 100; + return; + } sampling = tevents[0]->time; for (int i=0; i tevents[i]->time) { diff --git a/lib/runner.hpp b/lib/runner.hpp index 987a16c..b56ffd9 100644 --- a/lib/runner.hpp +++ b/lib/runner.hpp @@ -27,23 +27,20 @@ class runner { mutex q_io; condition_variable cv; bool stop; - - public: - + /** - * The constructor starts as many threads as the system has cores, - * and runs an event loop inside each one. - * Each event loop waits for tasks from the stack and executes them. + * Increase number of runners */ - runner(size_t pool_size = thread::hardware_concurrency()) : stop(false) { - for (size_t i = 0; i < pool_size; ++i) { - runners.emplace_back( thread([this] { - while (true) { + void increase_runners(unsigned int increase) { + for (size_t i = 0; i < increase; ++i) { + runners.emplace_back( thread([&] { + while (!stop) { function task; { unique_lock lock(q_io); cv.wait(lock, [this] { return stop || !tasks.empty(); }); - if (stop && tasks.empty()) + // if (stop && tasks.empty()) + if (stop) return; task = move(tasks.front()); tasks.pop(); @@ -54,6 +51,23 @@ class runner { } } + public: + + /** + * The constructor starts as many threads as the system has cores, + * and runs an event loop inside each one. + * Each event loop waits for tasks from the stack and executes them. + */ + runner(size_t pool_size = thread::hardware_concurrency()) : stop(false) { + if (pool_size < 4) { + pool_size = 4; + } + increase_runners(pool_size); + // start_all_runners(pool_size); + } + + + /** * With the method, we send the callback function and its arguments to the task stack */ @@ -78,6 +92,22 @@ class runner { return res; } + /** + * Change the number of runners + */ + void change_runners (unsigned int num_of_runners) { + if (num_of_runners == 0 || num_of_runners > 64) { + throw runtime_error("Not allowed runners size"); + } + + int difference = num_of_runners - count_threads(); + if (difference < 0) { // reduce + throw runtime_error("Is not allowed to reduce runners"); + } else if (difference > 0) { // increase + increase_runners(difference); + } + } + /** * Returns the number of tasks the runner has to perform */ @@ -104,6 +134,7 @@ class runner { for (thread& runner : runners) { runner.join(); } + runners.clear(); } }; diff --git a/test/test.cpp b/test/test.cpp index 55212df..083c8a8 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -41,6 +41,17 @@ void promise_reject (int _time) { return _promise.get_future().get(); } +void notLambdaFunction() { + cout << "Call to not lambda function" << endl; +} + +class clm { + public: + void classMethode() { + cout << "Call class method" << endl; + } +}; + // ------------------ EXTEND OWN CLASS WITH EVENTS ------------------- class myOwnClass : public event { @@ -50,6 +61,7 @@ class myOwnClass : public event { int main () { + on_async.change_runners(64); auto start = rtime_ms(); @@ -59,6 +71,26 @@ int main () { * Init interval and timeout; clear interval and timeout */ + // ovo ne radi + + // vector interv; + // vector tmout; + + // for (int i=0; i< 20; i++) { + // interv.push_back( interval( [i] () { + // cout << "interval " << i << endl; + // }, 1000)); + // tmout.push_back( timeout( [i] () { + // cout << "timeout " << i << endl; + // }, 1000*i)); + // } + + // ovo valja popravit + + // interval( [] () { + // cout << "interval " << endl; + // }, 1000); + // interval inter1 ([&]() { // cout << "interval prvi " << rtime_ms() - start << endl; // }, 1000); @@ -83,11 +115,11 @@ int main () { // time1.clear(); // }, 2000); - // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS ------------------------- + // // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS ------------------------- - /** - * Put task directly and get returned value - it is not recommended to use it - */ + // /** + // * Put task directly and get returned value - it is not recommended to use it + // */ // auto res1 = on_async.put_task( [] () { // cout << "Jebiga " < ev2int; // event evintString; @@ -187,11 +238,11 @@ int main () { // cout << "Void emited" << endl; // }); - // sleep(1); + // // sleep(1); - /** - * Emit - */ + // /** + // * Emit + // */ // ev2int.emit("sum", 5, 8); @@ -201,9 +252,9 @@ int main () { // sleep(1); // evoid.emit("void"); - /** - * Own class - */ + // /** + // * Own class + // */ // myOwnClass myclass; @@ -215,7 +266,7 @@ int main () { // cout << "Constructed " << i << endl; // }); - sleep(10000); // only for testing + sleep(100000); // only for testing return 0; }