Rewrite async with pool connection. Fix timeout bug in pull method and define exceptions

read-write-limit
marcelb 1 year ago
parent 031853d8ee
commit e2c72e1cd2
  1. 3
      .vscode/settings.json
  2. 33
      lib/tcp_socket.hpp
  3. 70
      src/tcp_socket.cpp
  4. 59
      test/client.cpp
  5. BIN
      test/client.o
  6. 36
      test/server.cpp
  7. BIN
      test/server.o

@ -45,6 +45,7 @@
"streambuf": "cpp", "streambuf": "cpp",
"cinttypes": "cpp", "cinttypes": "cpp",
"typeinfo": "cpp", "typeinfo": "cpp",
"thread": "cpp" "thread": "cpp",
"chrono": "cpp"
} }
} }

@ -7,9 +7,15 @@
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <string.h> #include <string.h>
#include <errno.h>
#include <chrono>
#include <stdexcept>
#include <openssl/ssl.h> #include <openssl/ssl.h>
#include <openssl/err.h> #include <openssl/err.h>
#define SOCKET_TIMEOUT 1000; // timeout u us
#if __linux__ #if __linux__
#include <arpa/inet.h> #include <arpa/inet.h>
#include <netdb.h> #include <netdb.h>
@ -25,6 +31,7 @@
#include "ip.hpp" #include "ip.hpp"
using namespace std; using namespace std;
using namespace chrono;
class client; class client;
// class secure; // class secure;
@ -52,7 +59,7 @@ class server {
~server (); ~server ();
void sync(void (*handlecli)(client&), const uint timeout = 100); void sync(void (*handlecli)(client&), const uint timeout = 100);
void async(const uint limit, void (*handlecli)(client&, mutex&), const uint timeout = 100); void async(const uint limit, void (*handlecli)(client&), const uint timeout = 100);
}; };
@ -87,6 +94,7 @@ class client {
#endif #endif
struct sockaddr_in addr; struct sockaddr_in addr;
SSL* ssl = NULL; SSL* ssl = NULL;
uint _timeout = 100; // timeout u ms
// server s klijentima // server s klijentima
const server* srv; const server* srv;
// klijent sa serverom // klijent sa serverom
@ -102,4 +110,27 @@ class client {
string pull (size_t byte_limit = 1024); string pull (size_t byte_limit = 1024);
}; };
class ConnectionException : public exception {
public:
ConnectionException(const string& message, const string& data, const bool& interrupted = false): message_(message), data_(data), interrupted_(interrupted) {}
virtual const char* what() const noexcept {
return message_.c_str();
}
const string& getData() const {
return data_;
}
const bool& isInterrupted() const {
return interrupted_;
}
private:
string message_;
string data_;
bool interrupted_;
};
#endif #endif

@ -64,22 +64,26 @@ void server::sync(void (*handlecli)(client&), const uint timeout) {
* Nije moguće proslijediti druge parametre; * Nije moguće proslijediti druge parametre;
*/ */
void server::async(const uint limit, void (*handlecli)(client&, mutex&), const uint timeout) { void server::async(const uint limit, void (*handlecli)(client&), const uint timeout) {
mutex io;
do {
for (uint i=0; i<limit; i++) { for (uint i=0; i<limit; i++) {
thr.push_back(thread([&](){ thr.push_back(thread([&](){
client cli(this, timeout, securefds); client *cli = new client(this, timeout, securefds);
handlecli(cli, io); while (true) {
try {
handlecli(*cli);
} catch (const ConnectionException err) {
if (err.isInterrupted()) {
cli->~client();
cli = new client(this, timeout, securefds);
}
}
}
})); }));
} }
for (uint i=0; i<limit; i++) { for (uint i=0; i<limit; i++) {
thr[i].join(); thr[i].join();
} }
thr.clear();
} while (true);
} }
@ -172,6 +176,7 @@ secure::~secure () {
*/ */
client::client(const string address, const ushort port, const uint timeout, SSL_CTX* securefds) { client::client(const string address, const ushort port, const uint timeout, SSL_CTX* securefds) {
_timeout = timeout;
#if _WIN32 #if _WIN32
if (WSAStartup(MAKEWORD(2,2),&wsa) != 0) { if (WSAStartup(MAKEWORD(2,2),&wsa) != 0) {
@ -196,8 +201,8 @@ client::client(const string address, const ushort port, const uint timeout, SSL_
#if __linux__ #if __linux__
struct timeval tv; struct timeval tv;
tv.tv_sec = timeout/1000; tv.tv_sec = 0;
tv.tv_usec = (timeout%1000)*1000; tv.tv_usec = SOCKET_TIMEOUT;
if (setsockopt(conn, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval))) { if (setsockopt(conn, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval))) {
throw string("[ERROR] Unable to set timeout "); throw string("[ERROR] Unable to set timeout ");
@ -237,6 +242,7 @@ client::client(const string address, const ushort port, const uint timeout, SSL_
client::client(const server *_srv, const uint timeout, SSL_CTX* securefds) { client::client(const server *_srv, const uint timeout, SSL_CTX* securefds) {
srv = _srv; srv = _srv;
socklen_t len = sizeof(struct sockaddr_in); socklen_t len = sizeof(struct sockaddr_in);
_timeout = timeout;
if ((conn = accept(srv->sock, (struct sockaddr *)&(srv->addr), (socklen_t*)&len)) < 0) { if ((conn = accept(srv->sock, (struct sockaddr *)&(srv->addr), (socklen_t*)&len)) < 0) {
throw string("[ERROR] Unable to accept client connection "); throw string("[ERROR] Unable to accept client connection ");
@ -244,8 +250,8 @@ client::client(const server *_srv, const uint timeout, SSL_CTX* securefds) {
#if __linux__ #if __linux__
struct timeval tv; struct timeval tv;
tv.tv_sec = timeout/1000; tv.tv_sec = 0;
tv.tv_usec = (timeout%1000)*1000; tv.tv_usec = SOCKET_TIMEOUT;
if (setsockopt(conn, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval))) { if (setsockopt(conn, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval))) {
throw string("[ERROR] Unable to set timeout "); throw string("[ERROR] Unable to set timeout ");
@ -258,9 +264,6 @@ client::client(const server *_srv, const uint timeout, SSL_CTX* securefds) {
} }
#endif #endif
if (securefds) { if (securefds) {
ssl = SSL_new(securefds); ssl = SSL_new(securefds);
if (!ssl) { if (!ssl) {
@ -338,8 +341,7 @@ bool client::push(const string msg) {
} }
if (sent == -1) { if (sent == -1) {
// Greška pri slanju poruke throw string("[ERRNO] (push) - Error code: " + to_string(errno) + " Detail: " + strerror(errno));
return false;
} }
total_sent += sent; total_sent += sent;
@ -352,11 +354,31 @@ bool client::push(const string msg) {
* Metoda klase client za primanje poruke preko soketa * Metoda klase client za primanje poruke preko soketa
* Prima dozvoljeni broj karaktera koji će primiti * Prima dozvoljeni broj karaktera koji će primiti
* Vraća string primljene poruke * Vraća string primljene poruke
*
* Funkcija baca izuzetke koji se moraju uhvatiti za pravilno rukovođenje vezom
* Potrebno je i baciti dalje taj izuzetak ukoliko se koriste server async metode
* PRILOG
* ----------------------------------------------------------------
* try {
fromclient = cli.pull();
}
catch(const ConnectionException except) {
if (except.isInterrupted()) {
throw except;
}
else {
cout << "[EXCEPT] " << except.what() << endl;
fromclient = except.getData();
}
}
* -----------------------------------------------------------------
*
*/ */
string client::pull(size_t byte_limit) { string client::pull(size_t byte_limit) {
char res[byte_limit] = {0}; char res[byte_limit] = {0};
size_t total_received = 0; size_t total_received = 0;
auto start = high_resolution_clock::now();
while (total_received < byte_limit) { while (total_received < byte_limit) {
ssize_t received = 0; ssize_t received = 0;
@ -367,15 +389,21 @@ string client::pull(size_t byte_limit) {
received = recv(conn, res + total_received, byte_limit - total_received, 0); received = recv(conn, res + total_received, byte_limit - total_received, 0);
} }
cout << "Primljeno " << received << endl;
if (received == -1) { if (received == -1) {
// Greška pri primanju poruke throw ConnectionException(strerror(errno), string(res, total_received));
break;
} else if (received == 0) { } else if (received == 0) {
// Veza je prekinuta throw ConnectionException("The socket is broken", string(res), true);
break;
} }
total_received += received; total_received += received;
auto cycle = high_resolution_clock::now();
if (duration_cast<milliseconds>(cycle - start).count() > _timeout) {
cout << "TIMEOUT" << endl;
throw ConnectionException("Timeout", string(res));
}
} }
return string(res); return string(res);

@ -1,29 +1,31 @@
#include <iostream> #include <iostream>
#include <string> #include <string>
#include <chrono>
#include "../lib/tcp_socket.hpp" #include "../lib/tcp_socket.hpp"
using namespace std; using namespace std;
using namespace chrono;
int main() { int main() {
try { try {
uint n = 10000; // uint n = 10000;
vector<thread> thr; // vector<thread> thr;
for (uint i=0; i<n; i++) { // for (uint i=0; i<n; i++) {
thr.push_back(thread([](uint a){ // thr.push_back(thread([](uint a){
client myserver("127.0.0.1", 5000, 500); // client myserver("127.0.0.1", 5000, 500);
string sends = "Hello world " + to_string(a); // string sends = "Hello world " + to_string(a);
myserver.push(sends); // myserver.push(sends);
cout << myserver.pull() << endl; // cout << myserver.pull() << endl;
}, i)); // }, i));
} // }
for (uint i=0; i<n; i++) { // for (uint i=0; i<n; i++) {
thr[i].join(); // thr[i].join();
} // }
// secure crypto; // secure crypto;
// cout << "init cert " << endl; // cout << "init cert " << endl;
@ -40,7 +42,38 @@ int main() {
// cout << myserver.pull(); // cout << myserver.pull();
auto t1 = high_resolution_clock::now();
client mycli("127.0.0.1", 5000);
auto t2 = high_resolution_clock::now();
cout << "Connecting : " << duration_cast<microseconds>(t2 - t1).count() << endl;
while (true) {
auto t3 = high_resolution_clock::now();
mycli.push("Helooo");
auto t4 = high_resolution_clock::now();
cout << "Send : " << duration_cast<microseconds>(t4 - t3).count() << endl;
auto t5 = high_resolution_clock::now();
string msg;
try {
msg = mycli.pull();
} catch (const ConnectionException err) {
cout << err.what() << endl;
msg = err.getData();
}
cout << msg << endl;
auto t6 = high_resolution_clock::now();
cout << "Recive : " << duration_cast<microseconds>(t6 - t5).count() << endl;
// break;
} }
}
catch (const string err) { catch (const string err) {
cout << err << endl; cout << err << endl;
} }

Binary file not shown.

@ -1,8 +1,12 @@
#include <iostream> #include <iostream>
#include <chrono>
#include "../lib/tcp_socket.hpp" #include "../lib/tcp_socket.hpp"
using namespace std; using namespace std;
using namespace chrono;
int main() { int main() {
try{ try{
@ -52,15 +56,31 @@ int main() {
cout << "init server " << endl; cout << "init server " << endl;
server myserver(5000, 100); server myserver(5000, 100);
cout << "init client " << endl; cout << "init client " << endl;
myserver.async(8, [](client &cli, mutex &io) { myserver.async(8, [](client &cli) {
cout << "Klijent " << cli.ipv4 << endl; auto t3 = high_resolution_clock::now();
string fromclient = cli.pull(); string fromclient;
io.lock(); try {
cout << "S klijenta " << fromclient << endl; fromclient = cli.pull();
io.unlock(); }
// fromclient += teststr; catch(const ConnectionException except) {
if (except.isInterrupted()) {
throw except;
}
else {
cout << "[EXCEPT] " << except.what() << endl;
fromclient = except.getData();
}
}
auto t4 = high_resolution_clock::now();
cout << "Recive : " << duration_cast<microseconds>(t4 - t3).count() << endl;
cout << "> " << fromclient << endl;
auto t5 = high_resolution_clock::now();
cli.push(fromclient); cli.push(fromclient);
}, 200); auto t6 = high_resolution_clock::now();
cout << "Response : " << duration_cast<microseconds>(t6 - t5).count() << endl;
});
// string teststr = " Idemooo"; // string teststr = " Idemooo";

Binary file not shown.
Loading…
Cancel
Save