Delete rotor - uncomplited merge

dev
marcelb 5 months ago
parent 377e381037
commit e91c7f646d
  1. 239
      lib/rotor.hpp
  2. 374
      test/test.cpp

@ -1,239 +0,0 @@
#ifndef _ROTOR_
#define _ROTOT_
#include "runner.hpp"
#include "chrono"
#include <memory>
#include "iostream"
using namespace std;
using namespace marcelb;
using namespace asynco;
namespace marcelb {
namespace asynco {
/**
* Get the time in ms from the epoch
*/
int64_t rtime_ms() {
return chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
int64_t rtime_us() {
return chrono::duration_cast<chrono::microseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
namespace {
/**
* Intern class for timer async loop
*/
class timer_core {
public:
mutex hangon;
condition_variable cv;
function<void()> callback;
int64_t time;
int64_t next;
bool repeat;
bool stop;
/**
* Timer constructor, receives a callback function and time
*/
timer_core( function<void()> _callback, int64_t _time, bool _repeat):
callback(_callback), time(_time*1000), repeat(_repeat), stop(false) {
next = rtime_us() + time;
}
/**
* Stop timer
*/
void clear() {
// lock_guard<mutex> hang(hangon);
stop = true;
cv.notify_one();
}
/**
* Destruktor of timer, call stop
*/
~timer_core() {
clear();
}
};
/**
* Event loop for time events
*/
class rotor {
vector<shared_ptr<timer_core>> tcores;
mutex te_m;
bool rotating = true;
int64_t sampling;
condition_variable te_cv;
/**
* Loop method, started by the constructor in a separate runner
* It checks the events on the stack and sends the expired ones to the runner
*/
void loop() {
while (rotating) {
vector<shared_ptr<timer_core>>::iterator next_tc;
shared_ptr<timer_core> next_ptr;
{
unique_lock<mutex> te_l(te_m);
te_cv.wait(te_l, [this]{ return !tcores.empty() || rotating; });
if (!rotating) {
break;
}
next_tc = min_element( tcores.begin(), tcores.end(),
[](shared_ptr<timer_core> a, shared_ptr<timer_core> b ) {
return a->next < b->next;
}
);
next_ptr = *next_tc;
}
unique_lock<mutex> next_l(next_ptr->hangon);
next_ptr->cv.wait_for(next_l, chrono::microseconds(next_ptr->next - rtime_us()), [&next_ptr] () {
return next_ptr->stop;
});
if (next_ptr->stop) {
remove(next_tc);
} else {
_asyncon.put_task(next_ptr->callback);
if (next_ptr->repeat) {
next_ptr->next += next_ptr->time;
}
else {
remove(next_tc);
}
}
}
}
/**
* The method deletes a non-repeating or stopped event from the stack
*/
void remove(vector<shared_ptr<timer_core>>::iterator it) {
lock_guard<mutex> lock(te_m);
tcores.erase(it);
// te_cv.notify_one();
}
public:
/**
* Constructor for the rotor, starts the given loop by occupying one runner
*/
rotor() {
_asyncon.put_task( [&] () {
loop();
});
};
/**
* Adds a time event to the stack
*/
void insert(shared_ptr<timer_core> tcore) {
lock_guard<mutex> lock(te_m);
tcores.push_back(tcore);
te_cv.notify_one();
};
/**
* Returns the number of active events
*/
int active() {
return tcores.size();
}
/**
* Stops all active events and stops the rotor
*/
~rotor() {
for (int i=0; i<tcores.size(); i++) {
tcores[i]->clear();
}
rotating = false;
}
};
}
/**
* It is intended that there is only one global declaration
*/
static rotor _rotor;
/**
* Core class for pure async timer functions
*/
class _timer_intern {
shared_ptr<timer_core> tcore;
public:
_timer_intern(function<void()> _callback, int64_t _time, bool repeat) {
tcore = make_shared<timer_core>(_callback, _time, repeat);
_rotor.insert(tcore);
}
/**
* Stop interval
*/
void clear() {
tcore->clear();
}
};
/**
* Class interval for periodic execution of the callback in time in ms
*/
class interval : public _timer_intern {
public:
/**
* The constructor receives a callback function and an interval time
*/
interval( function<void()> _callback, int64_t _time):
_timer_intern(_callback, _time, true) {
}
};
/**
* Class interval for delayed callback execution in ms
*/
class timeout : public _timer_intern {
public:
/**
* The constructor receives a callback function and a delay time
*/
timeout( function<void()> _callback, int64_t delay):
_timer_intern(_callback, delay, false) {
}
};
}
}
#endif

@ -65,246 +65,246 @@ int main () {
// --------------- TIME ASYNCHRONOUS FUNCTIONS -------------- // --------------- TIME ASYNCHRONOUS FUNCTIONS --------------
// /** /**
// * Init interval and timeout; clear interval and timeout * Init interval and timeout; clear interval and timeout
// */ */
// interval inter1 ([&]() { interval inter1 ([&]() {
// cout << "interval prvi " << rtime_ms() - start << endl; cout << "interval prvi " << rtime_ms() - start << endl;
// }, 1000); }, 1000);
// interval inter2 ([&]() { interval inter2 ([&]() {
// cout << "interval drugi " << rtime_ms() - start << endl; cout << "interval drugi " << rtime_ms() - start << endl;
// }, 2000); }, 2000);
// interval inter3 ([&]() { interval inter3 ([&]() {
// cout << "interval treći " << rtime_ms() - start << endl; cout << "interval treći " << rtime_ms() - start << endl;
// }, 1000); }, 1000);
// interval inter4 ([&]() { interval inter4 ([&]() {
// // cout << "interval cetvrti " << rtime_ms() - start << endl; // cout << "interval cetvrti " << rtime_ms() - start << endl;
// cout << "Ticks " << inter3.ticks() << endl; cout << "Ticks " << inter3.ticks() << endl;
// }, 500); }, 500);
// interval inter5 ([&]() { interval inter5 ([&]() {
// cout << "interval peti " << rtime_ms() - start << endl; cout << "interval peti " << rtime_ms() - start << endl;
// }, 2000); }, 2000);
// interval inter6 ([&]() { interval inter6 ([&]() {
// cout << "interval sesti " << rtime_ms() - start << endl; cout << "interval sesti " << rtime_ms() - start << endl;
// }, 3000); }, 3000);
// timeout time1 ( [&] () { timeout time1 ( [&] () {
// cout << "Close interval 1 i 2 " << rtime_ms() - start << endl; cout << "Close interval 1 i 2 " << rtime_ms() - start << endl;
// inter1.stop(); inter1.stop();
// cout << "inter1.stop " << endl; cout << "inter1.stop " << endl;
// inter2.stop(); inter2.stop();
// cout << "inter2.stop " << endl; cout << "inter2.stop " << endl;
// }, 8000); }, 8000);
// timeout time2 ([&] () { timeout time2 ([&] () {
// cout << "Close interval 3 " << rtime_ms() - start << endl; cout << "Close interval 3 " << rtime_ms() - start << endl;
// inter3.stop(); inter3.stop();
// cout << "Stoped " << inter3.stoped() << endl; cout << "Stoped " << inter3.stoped() << endl;
// // time1.stop(); // time1.stop();
// }, 5000); }, 5000);
// if (time2.expired()) { if (time2.expired()) {
// cout << "isteko " << endl; cout << "isteko " << endl;
// } else { } else {
// cout << "nije isteko " << endl; cout << "nije isteko " << endl;
// } }
// // sleep(6); // sleep(6);
// if (time2.expired()) { if (time2.expired()) {
// cout << "isteko " << endl; cout << "isteko " << endl;
// } else { } else {
// cout << "nije isteko " << endl; cout << "nije isteko " << endl;
// } }
// // // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS ------------------------- // // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS -------------------------
// /** /**
// * Run an function asyncronic * Run an function asyncronic
// */ */
// atask( []() { atask( []() {
// sleep_for(2s); // only for simulate log duration function sleep_for(2s); // only for simulate log duration function
// cout << "atask 1" << endl; cout << "atask 1" << endl;
// return 5; return 5;
// }); });
// /** /**
// * Call not lambda function * Call not lambda function
// */ */
// atask (notLambdaFunction); atask (notLambdaFunction);
// wait ( wait (
// atask ( atask (
// notLambdaFunction notLambdaFunction
// ) )
// ); );
// /** /**
// * Call class method * Call class method
// */ */
// clm classes; clm classes;
// atask( [&classes] () { atask( [&classes] () {
// classes.classMethode(); classes.classMethode();
// }); });
// sleep(5); sleep(5);
// /** /**
// * Wait after runned as async * Wait after runned as async
// */ */
// auto a = atask( []() { auto a = atask( []() {
// sleep_for(2s); // only for simulate log duration function sleep_for(2s); // only for simulate log duration function
// cout << "atask 2" << endl; cout << "atask 2" << endl;
// return 5; return 5;
// }); });
// cout << wait(a) << endl; cout << wait(a) << endl;
// cout << "print after atask 2" << endl; cout << "print after atask 2" << endl;
// /** /**
// * Wait async function call and use i cout * Wait async function call and use i cout
// */ */
// cout << wait(atask( [] () { cout << wait(atask( [] () {
// sleep_for(chrono::seconds(1)); // only for simulate log duration function sleep_for(chrono::seconds(1)); // only for simulate log duration function
// cout << "wait end" << endl; cout << "wait end" << endl;
// return 4; return 4;
// })) << endl; })) << endl;
// /** /**
// * Sleep with timeout sleep implement * Sleep with timeout sleep implement
// */ */
// sleep_to(3000); sleep_to(3000);
// cout << "sleep_to " << rtime_ms() - start << endl; cout << "sleep_to " << rtime_ms() - start << endl;
// /** /**
// * Catch promise reject * Catch promise reject
// */ */
// try { try {
// promise_reject(3000); promise_reject(3000);
// } catch (runtime_error err) { } catch (runtime_error err) {
// cout<< err.what() << endl; cout<< err.what() << endl;
// } }
// cout << "promise_reject " << rtime_ms() - start << endl; cout << "promise_reject " << rtime_ms() - start << endl;
// /** /**
// * Nested asynchronous invocation * Nested asynchronous invocation
// */ */
// atask( [] { atask( [] {
// cout << "idemo ..." << endl; cout << "idemo ..." << endl;
// atask( [] { atask( [] {
// cout << "ugdnježdena async funkcija " << endl; cout << "ugdnježdena async funkcija " << endl;
// }); });
// }); });
// // // --------------- EVENTS ------------------- // // --------------- EVENTS -------------------
// /** /**
// * initialization of typed events * initialization of typed events
// */ */
// event<int, int> ev2int; event<int, int> ev2int;
// event<int, string> evintString; event<int, string> evintString;
// event<> evoid; event<> evoid;
// ev2int.on("sum", [](int a, int b) { ev2int.on("sum", [](int a, int b) {
// cout << "Sum " << a+b << endl; cout << "Sum " << a+b << endl;
// }); });
// ev2int.on("sum", [](int a, int b) { ev2int.on("sum", [](int a, int b) {
// cout << "Sum done" << endl; cout << "Sum done" << endl;
// }); });
// evintString.on("substract", [](int a, string b) { evintString.on("substract", [](int a, string b) {
// cout << "Substract " << a-stoi(b) << endl; cout << "Substract " << a-stoi(b) << endl;
// }); });
// evoid.on("void", []() { evoid.on("void", []() {
// cout << "Void emited" << endl; cout << "Void emited" << endl;
// }); });
// string emited2 = "2"; string emited2 = "2";
// evoid.on("void", [&]() { evoid.on("void", [&]() {
// cout << "Void emited " << emited2 << endl; cout << "Void emited " << emited2 << endl;
// }); });
// evoid.emit("void"); evoid.emit("void");
// sleep(1); sleep(1);
// /** /**
// * Emit * Emit
// */ */
// ev2int.emit("sum", 5, 8); ev2int.emit("sum", 5, 8);
// sleep(1); sleep(1);
// evintString.emit("substract", 3, to_string(2)); evintString.emit("substract", 3, to_string(2));
// sleep(1); sleep(1);
// evoid.off("void"); evoid.off("void");
// evoid.emit("void"); evoid.emit("void");
// /** /**
// * Own class * Own class
// */ */
// myOwnClass myclass; myOwnClass myclass;
// timeout t( [&] { timeout t( [&] {
// myclass.emit("constructed", 1); myclass.emit("constructed", 1);
// }, 200); }, 200);
// myclass.on("constructed", [] (int i) { myclass.on("constructed", [] (int i) {
// cout << "Constructed " << i << endl; cout << "Constructed " << i << endl;
// }); });
// auto status = fs::read("test1.txt"); auto status = fs::read("test1.txt");
// try { try {
// auto data = wait(status); auto data = wait(status);
// cout << data; cout << data;
// } catch (exception& err) { } catch (exception& err) {
// cout << err.what() << endl; cout << err.what() << endl;
// } }
// string data_; string data_;
// auto start_read = rtime_us(); auto start_read = rtime_us();
// fs::read("test1.txt", [&data_, &start_read] (string data, exception* error) { fs::read("test1.txt", [&data_, &start_read] (string data, exception* error) {
// if (error) { if (error) {
// cout << "Error " << error->what() << endl; cout << "Error " << error->what() << endl;
// } else { } else {
// // cout << "Data " << endl << data << endl; // cout << "Data " << endl << data << endl;
// // data_ = data; // data_ = data;
// // cout << "Data_" << data_ << endl; // cout << "Data_" << data_ << endl;
// cout << "read " << rtime_us() - start_read << endl; cout << "read " << rtime_us() - start_read << endl;
// } }
// }); });
// ---------------------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------------------

Loading…
Cancel
Save