Switch atask, events, timers to asio

dev-asio-engine 0.2-asio-engine
marcelb 8 months ago
parent 14663d631a
commit e4fbabd530
  1. 25
      README.md
  2. 52
      lib/asynco.hpp
  3. 5
      lib/event.hpp
  4. 7
      lib/filesystem.hpp
  5. 237
      lib/rotor.hpp
  6. 136
      lib/runner.hpp
  7. 223
      lib/timers.hpp
  8. 76
      test/test.cpp

@ -15,6 +15,7 @@ A C++ library for event-driven asynchronous multi-threaded programming.
- Event loops - Event loops
- Multiple parallel execution loops - Multiple parallel execution loops
- Asynchronous file IO - Asynchronous file IO
- Based on ASIO (Boost Asio)
## Installation ## Installation
Just download the latest release and unzip it into your project. Just download the latest release and unzip it into your project.
@ -24,14 +25,17 @@ Just download the latest release and unzip it into your project.
#include "asynco/lib/asynco.hpp" // atask(), wait() #include "asynco/lib/asynco.hpp" // atask(), wait()
#include "asynco/lib/event.hpp" // event #include "asynco/lib/event.hpp" // event
#include "asynco/lib/rotor.hpp" // interval, timeout #include "asynco/lib/timers.hpp" // interval, timeout
#include "asynco/lib/runner.hpp" // for own loop
#include "asynco/lib/filesystem.hpp" // for async read and write files #include "asynco/lib/filesystem.hpp" // for async read and write files
using namespace marcelb; using namespace marcelb;
using namespace asynco; using namespace asynco;
using namespace events; using namespace events;
// At the end of the main function, always set
_asynco_engine.run();
return 0;
``` ```
## Usage ## Usage
@ -45,7 +49,13 @@ interval inter1 ([]() {
}, 1000); }, 1000);
// stop interval // stop interval
inter1.clear(); inter1.stop();
// how many times it has expired
int t = inter1.ticks();
// is it stopped
bool stoped = inter1.stoped();
// start timeout // start timeout
timeout time1 ( [] () { timeout time1 ( [] () {
@ -53,7 +63,14 @@ timeout time1 ( [] () {
}, 10000); }, 10000);
// stop timeout // stop timeout
time1.clear(); time1.stop();
// is it expired
int t = time1.expired();
// is it stopped
bool stoped = time1.stoped();
``` ```
Make functions asynchronous Make functions asynchronous

@ -1,21 +1,67 @@
#ifndef _ASYNCO_ #ifndef _ASYNCO_
#define _ASYNCO_ #define _ASYNCO_
#include "runner.hpp" #include <boost/asio.hpp>
#include <iostream>
using namespace std; using namespace std;
namespace marcelb { namespace marcelb {
namespace asynco { namespace asynco {
#define HW_CONCURRENCY_MINIMAL 4
/**
* Internal anonymous class for initializing the ASIO context and thread pool
* !!! It is anonymous to protect against use in the initialization of other objects of the same type !!!
*/
class {
public:
boost::asio::io_context io_context;
void run() {
for (auto& runner : runners) {
runner.join();
}
}
private:
unique_ptr<boost::asio::io_service::work> work { [&] () {
return new boost::asio::io_service::work(io_context);
} ()};
vector<thread> runners { [&] () {
vector<thread> _runs;
unsigned int num_of_runners;
#ifdef NUM_OF_RUNNERS
num_of_runners = NUM_OF_RUNNERS;
#else
num_of_runners = thread::hardware_concurrency();
if (num_of_runners < HW_CONCURRENCY_MINIMAL) {
num_of_runners = HW_CONCURRENCY_MINIMAL;
}
#endif
for (int i=0; i<num_of_runners; i++) {
_runs.push_back(thread ( [this] () {
io_context.run();
}));
}
return _runs;
} ()};
} _asynco_engine;
/** /**
* Run the function asynchronously * Run the function asynchronously
*/ */
template<class F, class... Args> template<class F, class... Args>
auto atask(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type> { auto atask(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type> {
using return_type = typename result_of<F(Args...)>::type; using return_type = typename result_of<F(Args...)>::type;
future<return_type> res = _asynco_engine.io_context.post(boost::asio::use_future(bind(forward<F>(f), forward<Args>(args)...)));
future<return_type> res = _asyncon.put_task(bind(forward<F>(f), forward<Args>(args)...));
return res; return res;
} }

@ -1,15 +1,14 @@
#ifndef _EVENT_ #ifndef _EVENT_
#define _EVENT_ #define _EVENT_
#include <iostream>
#include <map> #include <map>
#include <vector> #include <vector>
#include <string> #include <string>
#include <functional> #include <functional>
#include "runner.hpp"
using namespace std; using namespace std;
#include "asynco.hpp"
namespace marcelb { namespace marcelb {
namespace asynco { namespace asynco {
namespace events { namespace events {
@ -43,7 +42,7 @@ class event {
if (it_eve != events.end()) { if (it_eve != events.end()) {
for (uint i =0; i<it_eve->second.size(); i++) { for (uint i =0; i<it_eve->second.size(); i++) {
auto callback = bind(it_eve->second[i], forward<Args>(args)...); auto callback = bind(it_eve->second[i], forward<Args>(args)...);
_asyncon.put_task(callback); atask(callback);
} }
} }
} }

@ -3,13 +3,12 @@
#include "asynco.hpp" #include "asynco.hpp"
using namespace marcelb;
using namespace asynco;
#include <fstream> #include <fstream>
#include <iostream>
using namespace std; using namespace std;
using namespace marcelb;
using namespace asynco;
namespace marcelb { namespace marcelb {
namespace asynco { namespace asynco {

@ -1,237 +0,0 @@
#ifndef _ROTOR_
#define _ROTOT_
#include "runner.hpp"
#include "chrono"
#include <memory>
#include "iostream"
using namespace std;
using namespace marcelb;
using namespace asynco;
namespace marcelb {
namespace asynco {
/**
* Get the time in ms from the epoch
*/
int64_t rtime_ms() {
return chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
int64_t rtime_us() {
return chrono::duration_cast<chrono::microseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
namespace {
/**
* Intern class for timer async loop
*/
class timer_core {
public:
mutex hangon;
condition_variable cv;
function<void()> callback;
int64_t time;
int64_t next;
bool repeat;
bool stop;
/**
* Timer constructor, receives a callback function and time
*/
timer_core( function<void()> _callback, int64_t _time, bool _repeat):
callback(_callback), time(_time*1000), repeat(_repeat), stop(false) {
next = rtime_us() + time;
}
/**
* Stop timer
*/
void clear() {
// lock_guard<mutex> hang(hangon);
stop = true;
cv.notify_one();
}
/**
* Destruktor of timer, call stop
*/
~timer_core() {
clear();
}
};
/**
* Event loop for time events
*/
class rotor {
vector<shared_ptr<timer_core>> tcores;
mutex te_m;
bool rotating = true;
int64_t sampling;
condition_variable te_cv;
/**
* Loop method, started by the constructor in a separate runner
* It checks the events on the stack and sends the expired ones to the runner
*/
void loop() {
while (rotating) {
vector<shared_ptr<timer_core>>::iterator next_tc;
shared_ptr<timer_core> next_ptr;
{
unique_lock<mutex> te_l(te_m);
te_cv.wait(te_l, [this]{ return !tcores.empty() || rotating; });
if (!rotating) {
break;
}
next_tc = min_element( tcores.begin(), tcores.end(),
[](shared_ptr<timer_core> a, shared_ptr<timer_core> b ) {
return a->next < b->next;
}
);
next_ptr = *next_tc;
}
unique_lock<mutex> next_l(next_ptr->hangon);
next_ptr->cv.wait_for(next_l, chrono::microseconds(next_ptr->next - rtime_us()), [&next_ptr] () {
return next_ptr->stop;
});
if (next_ptr->stop) {
remove(next_tc);
} else {
_asyncon.put_task(next_ptr->callback);
if (next_ptr->repeat) {
next_ptr->next += next_ptr->time;
}
else {
remove(next_tc);
}
}
}
}
/**
* The method deletes a non-repeating or stopped event from the stack
*/
void remove(vector<shared_ptr<timer_core>>::iterator it) {
lock_guard<mutex> lock(te_m);
tcores.erase(it);
// te_cv.notify_one();
}
public:
/**
* Constructor for the rotor, starts the given loop by occupying one runner
*/
rotor() {
_asyncon.put_task( [&] () {
loop();
});
};
/**
* Adds a time event to the stack
*/
void insert(shared_ptr<timer_core> tcore) {
lock_guard<mutex> lock(te_m);
tcores.push_back(tcore);
te_cv.notify_one();
};
/**
* Returns the number of active events
*/
int active() {
return tcores.size();
}
/**
* Stops all active events and stops the rotor
*/
~rotor() {
for (int i=0; i<tcores.size(); i++) {
tcores[i]->clear();
}
rotating = false;
}
};
/**
* It is intended that there is only one global declaration
*/
static rotor _rotor;
}
/**
* Core class for pure async timer functions
*/
class _timer_intern {
shared_ptr<timer_core> tcore;
public:
_timer_intern(function<void()> _callback, int64_t _time, bool repeat) {
tcore = make_shared<timer_core>(_callback, _time, repeat);
_rotor.insert(tcore);
}
/**
* Stop interval
*/
void clear() {
tcore->clear();
}
};
/**
* Class interval for periodic execution of the callback in time in ms
*/
class interval : public _timer_intern {
public:
/**
* The constructor receives a callback function and an interval time
*/
interval( function<void()> _callback, int64_t _time):
_timer_intern(_callback, _time, true) {
}
};
/**
* Class interval for delayed callback execution in ms
*/
class timeout : public _timer_intern {
public:
/**
* The constructor receives a callback function and a delay time
*/
timeout( function<void()> _callback, int64_t delay):
_timer_intern(_callback, delay, false) {
}
};
}
}
#endif

@ -1,136 +0,0 @@
#ifndef _RUNNER_
#define _RUNNER_
#include <thread>
#include <vector>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>
using namespace std;
namespace marcelb {
namespace asynco {
#define HW_CONCURRENCY_MINIMAL 4
/**
* The runner class implements multithread, task stack and event loop for asynchronous execution of tasks
*/
class runner {
private:
vector<thread> runners;
queue<function<void()>> tasks;
mutex q_io;
condition_variable cv;
bool stop;
public:
/**
* The constructor starts as many threads as the system has cores,
* and runs an event loop inside each one.
* Each event loop waits for tasks from the stack and executes them.
*/
runner(unsigned int _num_of_runners = 0) : stop(false) {
unsigned int num_of_runners = _num_of_runners;
if (num_of_runners == 0) {
#ifdef NUM_OF_RUNNERS
num_of_runners = NUM_OF_RUNNERS;
#else
num_of_runners = thread::hardware_concurrency();
if (num_of_runners < HW_CONCURRENCY_MINIMAL) {
num_of_runners = HW_CONCURRENCY_MINIMAL;
}
#endif
}
for (size_t i = 0; i < num_of_runners; ++i) {
runners.emplace_back( thread([&] {
while (!stop) {
function<void()> task;
{
unique_lock<mutex> lock(q_io);
cv.wait(lock, [this] { return stop || !tasks.empty(); });
// if (stop && tasks.empty())
if (stop)
return;
task = move(tasks.front());
tasks.pop();
}
task();
}
}));
}
}
/**
* With the method, we send the callback function and its arguments to the task stack
*/
template<class F, class... Args>
auto put_task(F&& f, Args&&... args)
-> future<typename result_of<F(Args...)>::type> {
using return_type = typename result_of<F(Args...)>::type;
auto task = make_shared<packaged_task<return_type()>>(bind(forward<F>(f), forward<Args>(args)...));
future<return_type> res = task->get_future();
{
unique_lock<mutex> lock(q_io);
if (stop) {
throw runtime_error("Pool is stoped!");
}
tasks.emplace([task]() { (*task)(); });
}
cv.notify_one();
return res;
}
/**
* Returns the number of tasks the runner has to perform
*/
unsigned int count_tasks() {
return tasks.size();
}
/**
* Returns the number of threads used by the runner
*/
unsigned int count_threads() {
return runners.size();
}
/**
* The destructor stops all loops and stops all threads
*/
~runner() {
{
unique_lock<mutex> lock(q_io);
stop = true;
}
cv.notify_all();
for (thread& runner : runners) {
runner.join();
}
runners.clear();
}
};
/**
* Internal global library variable
*/
static runner _asyncon;
}
}
#endif

@ -0,0 +1,223 @@
#ifndef _ROTOR_
#define _ROTOT_
#include "asynco.hpp"
#include <chrono>
using namespace std;
using namespace marcelb;
using namespace asynco;
namespace marcelb {
namespace asynco {
/**
* Get the time in ms from the epoch
*/
int64_t rtime_ms() {
return chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
/**
* Get the time in us from the epoch
*/
int64_t rtime_us() {
return chrono::duration_cast<chrono::microseconds>(chrono::system_clock::now()
.time_since_epoch())
.count();
}
/**
* Core timer class for construct time async functions
*/
class timer {
boost::asio::steady_timer st;
bool _stop = false;
bool repeate;
function<void()> callback;
uint64_t time;
uint64_t _ticks = 0;
/**
* A method to assign a callback wrapper and a reinitialization algorithm
*/
void init() {
st.async_wait( [this] (const boost::system::error_code&) {
if (!_stop) {
callback();
if (repeate) {
st = boost::asio::steady_timer(_asynco_engine.io_context, boost::asio::chrono::milliseconds(time));
init();
}
_ticks++;
}
});
}
public:
/**
* The constructor creates the steady_timer and accompanying variables and runs a method to initialize the timer
*/
timer (function<void()> _callback, uint64_t _time, bool _repeate) :
st(_asynco_engine.io_context, boost::asio::chrono::milliseconds(_time)),
_stop(false),
repeate(_repeate),
callback(_callback),
time(_time) {
init();
}
/**
* Stop timer
* The stop flag is set and timer remove it from the queue
*/
void stop() {
_stop = true;
st.cancel();
}
/**
* Run callback now
* Forces the callback function to run independently of the timer
*/
void now() {
st.cancel();
}
/**
* Get the number of times the timer callback was runned
*/
uint64_t ticks() {
return _ticks;
}
/**
* The logic status of the timer stop state
*/
bool stoped() {
return _stop;
}
/**
* The destructor stops the timer
*/
~timer() {
stop();
}
};
/**
* Class interval for periodic execution of the callback in time in ms
*/
class interval {
shared_ptr<timer> _timer;
public:
/**
* Constructor initializes a shared pointer of type timer
*/
interval(function<void()> callback, uint64_t time) :
_timer(make_shared<timer> (callback, time, true)) {
}
/**
* Stop interval
* The stop flag is set and interval remove it from the queue
*/
void stop() {
_timer->stop();
}
/**
* Run callback now
* Forces the callback function to run independently of the interval
*/
void now() {
_timer->now();
}
/**
* Get the number of times the interval callback was runned
*/
uint64_t ticks() {
return _timer->ticks();
}
/**
* The logic status of the interval stop state
*/
bool stoped() {
return _timer->stoped();
}
/**
* The destructor stops the interval
*/
~interval() {
stop();
}
};
/**
* Class timeout for delayed callback execution in ms
*/
class timeout {
shared_ptr<timer> _timer;
public:
/**
* Constructor initializes a shared pointer of type timer
*/
timeout(function<void()> callback, uint64_t time) :
_timer(make_shared<timer> (callback, time, false)) {
}
/**
* Stop timeout
* The stop flag is set and timeout remove it from the queue
*/
void stop() {
_timer->stop();
}
/**
* Run callback now
* Forces the callback function to run independently of the timeout
*/
void now() {
_timer->now();
}
/**
* Get the number of times the timeout callback was runned
*/
bool expired() {
return bool(_timer->ticks());
}
/**
* The logic status of the timeout stop state
*/
bool stoped() {
return _timer->stoped();
}
/**
* The destructor stops the timeout
*/
~timeout() {
stop();
}
};
}
}
#endif

@ -2,20 +2,20 @@
#include "../lib/asynco.hpp" #include "../lib/asynco.hpp"
#include "../lib/event.hpp" #include "../lib/event.hpp"
// #include "../lib/rotor.hpp"
#include "../lib/filesystem.hpp" #include "../lib/filesystem.hpp"
#include "../lib/timers.hpp" #include "../lib/timers.hpp"
using namespace marcelb::asynco;
using namespace events;
#include <iostream> #include <iostream>
#include <unistd.h> #include <unistd.h>
#include <thread> #include <thread>
using namespace std; using namespace std;
using namespace marcelb::asynco;
using namespace events;
using namespace asynco;
using namespace this_thread; using namespace this_thread;
void sleep_to (int _time) { void sleep_to (int _time) {
promise<void> _promise; promise<void> _promise;
timeout t( [&]() { timeout t( [&]() {
@ -65,19 +65,9 @@ int main () {
// --------------- TIME ASYNCHRONOUS FUNCTIONS -------------- // --------------- TIME ASYNCHRONOUS FUNCTIONS --------------
/** // /**
* Init interval and timeout; clear interval and timeout // * Init interval and timeout; clear interval and timeout
*/ // */
// vector<interval> intervals;
// for(int i=0; i<1000; i++) {
// intervals.push_back(interval( [i, &start]() {
// cout << "interval " << i << " end: " << rtime_ms() - start << endl;
// }, (i%5 +1)*1000));
// }
// interval inter1 ([&]() { // interval inter1 ([&]() {
// cout << "interval prvi " << rtime_ms() - start << endl; // cout << "interval prvi " << rtime_ms() - start << endl;
@ -89,11 +79,12 @@ int main () {
// interval inter3 ([&]() { // interval inter3 ([&]() {
// cout << "interval treći " << rtime_ms() - start << endl; // cout << "interval treći " << rtime_ms() - start << endl;
// }, 3000); // }, 1000);
// interval inter4 ([&]() { // interval inter4 ([&]() {
// cout << "interval cetvrti " << rtime_ms() - start << endl; // // cout << "interval cetvrti " << rtime_ms() - start << endl;
// }, 1000); // cout << "Ticks " << inter3.ticks() << endl;
// }, 500);
// interval inter5 ([&]() { // interval inter5 ([&]() {
// cout << "interval peti " << rtime_ms() - start << endl; // cout << "interval peti " << rtime_ms() - start << endl;
@ -105,7 +96,7 @@ int main () {
// timeout time1 ( [&] () { // timeout time1 ( [&] () {
// cout << "Close interval 1 i 2 " << rtime_ms() - start << endl; // cout << "Close interval 1 i 2 " << rtime_ms() - start << endl;
// // inter1.stop(); // inter1.stop();
// cout << "inter1.stop " << endl; // cout << "inter1.stop " << endl;
// inter2.stop(); // inter2.stop();
// cout << "inter2.stop " << endl; // cout << "inter2.stop " << endl;
@ -113,15 +104,26 @@ int main () {
// timeout time2 ([&] () { // timeout time2 ([&] () {
// // cout << "Close interval 3 " << rtime_ms() - start << endl; // cout << "Close interval 3 " << rtime_ms() - start << endl;
// inter3.stop(); // inter3.stop();
// cout << "Stoped " << inter3.stoped() << endl;
// // time1.stop(); // // time1.stop();
// }, 2000); // }, 5000);
// interval ( [] () { // if (time2.expired()) {
// cout << "BROJ TAJMERA " << _intern_asynco_timer_globals.timers.size() << endl; // cout << "isteko " << endl;
// }, 5000); // } else {
// cout << "nije isteko " << endl;
// }
// // sleep(6);
// if (time2.expired()) {
// cout << "isteko " << endl;
// } else {
// cout << "nije isteko " << endl;
// }
// // // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS ------------------------- // // // ------------------------ MAKE FUNCTIONS ASYNCHRONOUS -------------------------
@ -214,7 +216,7 @@ int main () {
// }); // });
// }); // });
// // --------------- EVENTS ------------------- // // // --------------- EVENTS -------------------
// /** // /**
// * initialization of typed events // * initialization of typed events
@ -290,8 +292,8 @@ int main () {
// } // }
string data_; // string data_;
auto start_read = rtime_us(); // auto start_read = rtime_us();
// fs::read("test1.txt", [&data_, &start_read] (string data, exception* error) { // fs::read("test1.txt", [&data_, &start_read] (string data, exception* error) {
// if (error) { // if (error) {
@ -304,19 +306,11 @@ int main () {
// } // }
// }); // });
fs::read2("test1.txt", [&data_, &start_read] (string data, exception* error) {
if (error) {
cout << "Error " << error->what() << endl;
} else {
// cout << "Data " << endl << data << endl;
// data_ = data;
// cout << "Data_" << data_ << endl;
cout << "read " << rtime_us() - start_read << endl;
}
});
cout << "Sleep" << endl; // ----------------------------------------------------------------------------------------------------
sleep(100000); // only for testing
cout << "Run" << endl;
_asynco_engine.run();
return 0; return 0;
} }

Loading…
Cancel
Save