Working on socket pool..

pool
mbandic 1 year ago
parent c030b22446
commit 5fa791cf5e
  1. 12
      .vscode/settings.json
  2. 23
      lib/tcp_socket.hpp
  3. 183
      src/tcp_socket.cpp
  4. 16
      test/client.cpp
  5. BIN
      test/client.o
  6. 47
      test/server.cpp
  7. BIN
      test/server.o

@ -45,6 +45,16 @@
"streambuf": "cpp",
"cinttypes": "cpp",
"typeinfo": "cpp",
"thread": "cpp"
"thread": "cpp",
"ctime": "cpp",
"chrono": "cpp",
"condition_variable": "cpp",
"optional": "cpp",
"ratio": "cpp",
"fstream": "cpp",
"mutex": "cpp",
"semaphore": "cpp",
"sstream": "cpp",
"stop_token": "cpp"
}
}

@ -53,7 +53,9 @@ class server {
void sync(void (*handlecli)(client&), const uint timeout = 100);
void async(const uint limit, void (*handlecli)(client&, mutex&), const uint timeout = 100);
void pool(const uint limit, void (*handlecli)(client&, mutex&), const uint timeout = 100);
// void syncPool(void (*handlecli)(client&), const uint timeout = 100);
void asyncPool(const uint limit, void (*handlecli)(client&), const uint timeout = 100);
};
@ -88,8 +90,12 @@ class client {
#endif
struct sockaddr_in addr;
SSL* ssl = NULL;
string _address;
ushort _port;
uint _timeout;
SSL_CTX* _securefds = NULL;
// server s klijentima
const server* srv;
const server* srv = NULL;
// klijent sa serverom
string ipv4;
string ipv6;
@ -101,6 +107,7 @@ class client {
~client ();
bool push (const string msg);
string pull (size_t byte_limit = 1024);
bool reconnect();
/**
* ustvari ne znam jel konekcija aktivna
@ -109,18 +116,20 @@ class client {
};
class Pool {
class clientPool {
public:
mutex io;
uint numcli;
vector<pair<mutex*, client*>> drops;
// konstruktor za klijente bez servera
Pool (const uint _numcli, const string address, const ushort port, const uint timeout = 100, SSL_CTX* securefds = NULL);
clientPool (const uint _numcli, const string address, const ushort port, const uint timeout = 100, SSL_CTX* securefds = NULL);
// konstruktor za klijente sa serverom
Pool (const server *_srv, const uint _numcli, const uint timeout = 100, SSL_CTX* securefds = NULL);
~Pool();
clientPool (const server *_srv, const uint _numcli, const uint timeout = 100, SSL_CTX* securefds = NULL);
~clientPool();
void run();
pair<mutex*, client*>* pickup();
void release(pair<mutex*, client*>* drop);
};

@ -83,6 +83,53 @@ void server::async(const uint limit, void (*handlecli)(client&, mutex&), const u
}
/**
*
*/
/**
* Metoda za sinkroni rad s klijentima, prima pokazivač na funkciju i timeout;
* Funkcija handlecli prima referencu tipa client - važno za definiranje funkcija koje se šalju;
* Nije moguće proslijediti druge parametre;
*/
// void server::syncPool(void (*handlecli)(client&), const uint timeout) {
// do {
// client cli(this, timeout, securefds);
// handlecli(cli);
// } while (true);
// }
/**
* Metoda za asinkdorni rad s klijentima, prima limit, pokazivač na funkciju i timeout;
* Funkcija handlecli prima referencu tipa client - važno za definiranje funkcija koje se šalju;
* Nije moguće proslijediti druge parametre;
*/
void server::asyncPool(const uint limit, void (*handlecli)(client&), const uint timeout) {
clientPool clipool(this, limit, timeout, securefds);
for (uint i=0; i<limit; i++) {
thr.push_back(thread( [&]() {
while (true) {
pair<mutex*, client*>* cli = clipool.pickup();
handlecli(*(cli->second));
clipool.release(cli);
}
}));
}
for (uint i=0; i<limit; i++) {
thr[i].join();
}
thr.clear();
}
/**
* Destruktor varijable tipa server
*/
@ -369,9 +416,13 @@ string client::pull(size_t byte_limit) {
if (received == -1) {
// Greška pri primanju poruke
cout << "Da vidim jel ovdje kad nije poslo " << endl;
break;
} else if (received == 0) {
// Veza je prekinuta
// Veza je prekinuta - treba pozvati destruktor
cout << "Destruktor " << endl;
// this->~client();
throw string("[WARNING] Socket closed remotely");
break;
}
@ -381,8 +432,111 @@ string client::pull(size_t byte_limit) {
return string(res);
}
bool client::reconnect() {
if (_address.empty() && srv != NULL) {
// srv = _srv;
socklen_t len = sizeof(struct sockaddr_in);
if ((conn = accept(srv->sock, (struct sockaddr *)&(srv->addr), (socklen_t*)&len)) < 0) {
throw string("[ERROR] Unable to accept client connection ");
}
#if __linux__
struct timeval tv;
tv.tv_sec = _timeout/1000;
tv.tv_usec = (_timeout%1000)*1000;
if (setsockopt(conn, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval))) {
throw string("[ERROR] Unable to set timeout ");
}
#elif _WIN32
DWORD tv = timeout;
if (setsockopt(conn, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv))) {
throw string("[ERROR] Unable to set timeout ");
}
#endif
Pool::Pool(const uint _numcli, const string address, const ushort port, const uint timeout, SSL_CTX* securefds) {
if (_securefds) {
ssl = SSL_new(_securefds);
if (!ssl) {
throw string("[ERROR] Creating SSL object ");
}
SSL_set_fd(ssl, conn);
// Perform SSL handshake
if (SSL_accept(ssl) <= 0) {
SSL_free(ssl);
throw string("[ERROR] Performing SSL handshake ");
}
}
char ipv4_buff[INET_ADDRSTRLEN];
char ipv6_buff[INET6_ADDRSTRLEN];
inet_ntop(AF_INET, &(srv->addr.sin_addr), ipv4_buff, INET_ADDRSTRLEN);
ipv4 = ipv4_buff;
inet_ntop(AF_INET6, &(srv->addr.sin_addr), ipv6_buff, INET6_ADDRSTRLEN);
ipv6 = ipv6_buff;
}
else {
#if _WIN32
if (WSAStartup(MAKEWORD(2,2),&wsa) != 0) {
throw string("[ERROR] Unable to set WinSock " + to_string(WSAGetLastError()));
}
#endif
conn = socket(AF_INET, SOCK_STREAM, 0);
if (conn < 0) {
throw string("[ERROR] Unable to open TCP socket ");
}
const string _address = isIPAddress(_address) ? _address : ipFromDomain(_address);
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(_address.c_str());
addr.sin_port = htons(_port);
if (connect(conn, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) != 0) {
throw string("Unable to connect to server ");
}
#if __linux__
struct timeval tv;
tv.tv_sec = _timeout/1000;
tv.tv_usec = (_timeout%1000)*1000;
if (setsockopt(conn, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval))) {
throw string("[ERROR] Unable to set timeout ");
}
#elif _WIN32
DWORD tv = timeout;
if (setsockopt(conn, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv))) {
throw string("[ERROR] Unable to set timeout ");
}
#endif
if (_securefds) {
ssl = SSL_new(_securefds);
if (!ssl) {
throw string("[ERROR] Creating SSL object ");
}
SSL_set_fd(ssl, conn);
// Perform the SSL handshake
if (SSL_connect(ssl) <= 0) {
SSL_free(ssl);
throw string("[ERROR] Performing SSL handshake ");
}
}
}
}
clientPool::clientPool(const uint _numcli, const string address, const ushort port, const uint timeout, SSL_CTX* securefds) {
if (_numcli > 1 ) {
numcli = _numcli;
}
@ -397,7 +551,7 @@ Pool::Pool(const uint _numcli, const string address, const ushort port, const ui
}
Pool::Pool(const server *_srv, const uint _numcli, const uint timeout = 100, SSL_CTX* securefds = NULL) {
clientPool::clientPool(const server *_srv, const uint _numcli, const uint timeout, SSL_CTX* securefds) {
if (_numcli > 1 ) {
numcli = _numcli;
}
@ -406,12 +560,33 @@ Pool::Pool(const server *_srv, const uint _numcli, const uint timeout = 100, SSL
}
for (uint i=0; i<numcli; i++) {
cout << "init clients " << i << endl;
drops.push_back(make_pair(new mutex, new client(_srv, timeout, securefds)));
}
}
Pool::~Pool() {
pair<mutex*, client*>* clientPool::pickup() {
cout << "Uzimam clienta " << endl;
lock_guard<mutex> master(io);
while (true) {
for (uint i=0; i<drops.size(); i++) {
cout << "Pokušavam s " << i << endl;
if (drops[i].first->try_lock()) {
cout << "Odabrao sam " << i << endl;
return &drops[i];
}
}
}
}
void clientPool::release(pair<mutex*, client*>* drop) {
drop->first->unlock();
}
clientPool::~clientPool() {
numcli = 0;
drops.clear();

@ -1,9 +1,11 @@
#include <iostream>
#include <string>
#include <chrono>
#include "../lib/tcp_socket.hpp"
using namespace std;
using namespace chrono;
int main() {
@ -26,12 +28,20 @@ int main() {
// }
uint i = 0;
client mycli("localhost", 5000);
auto t1 = high_resolution_clock::now();
client mycli("localhost", 7000);
auto t2 = high_resolution_clock::now();
cout << "Connected: " << duration_cast<microseconds>(t2 - t1).count() << endl;
while (true) {
sleep(4);
mycli.push("Helllo " + to_string(i++));
cout << mycli.pull() << endl;
usleep(10000);
cout << "> " << mycli.pull() << endl;
auto t3 = high_resolution_clock::now();
cout << "Sending and recive: " << duration_cast<microseconds>(t3 - t2).count() << endl;
// usleep(10000);
sleep(1);
}
// secure crypto;

Binary file not shown.

@ -62,27 +62,42 @@ int main() {
// cli.push(fromclient);
// }, 200);
server myserver(5000, 100);
myserver.pool(10, [](client &cli, mutex &io) {
cout << "Klijent " << cli.ipv4 << endl;
string fromclient = cli.pull();
// io.lock();
cout << "S klijenta " << fromclient << endl;
// io.unlock();
// fromclient += teststr;
usleep(10000);
cli.push(fromclient);
}, 200);
// string teststr = " Idemooo";
// myserver.sync([](client &cli) {
server myserver(7000, 100);
// myserver.asyncPool(10, [](client &cli) {
// cout << "Klijent " << cli.ipv4 << endl;
// string fromclient = cli.pull();
// // io.lock();
// cout << "S klijenta " << fromclient << endl;
// // io.unlock();
// // fromclient += teststr;
// usleep(10000);
// cli.push(fromclient);
// });
// }, 200);
// string teststr = " Idemooo";
myserver.async(4, [](client &cli, mutex &io) {
while (true) {
try {
string fromclient = cli.pull();
cout << "> " << fromclient << endl;
// fromclient += teststr;
cli.push(fromclient);
} catch (const string err) {
cout << err << endl;
try {
cli.reconnect();
} catch (const string err) {
cout << err << endl;
}
}
}
});
}
catch(const string err) {

Binary file not shown.
Loading…
Cancel
Save