From 12132229f0e1bb1a38c0170a43e5e88d1f15444c Mon Sep 17 00:00:00 2001 From: ubuntu201711081 <201711081@jbnu.ac.kr> Date: Sat, 5 Dec 2020 01:25:54 +0000 Subject: [PATCH] add threadpool server --- .vscode/settings.json | 3 +- Makefile | 2 +- README.md | 17 +++-- p-client.c | 7 ++- p-server.c | 4 +- pstest.sh | 2 +- server.c | 132 +++++++++++++++++++++++++++++++++------ simple_circular_buffer.h | 30 +++++++++ socket_wrapper.c | 39 +++++++----- 9 files changed, 192 insertions(+), 44 deletions(-) create mode 100644 simple_circular_buffer.h diff --git a/.vscode/settings.json b/.vscode/settings.json index 28c5256..25131cc 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,6 +3,7 @@ "_GNU_SOURCE" ], "files.associations": { - "socket_wrapper.h": "c" + "socket_wrapper.h": "c", + "stdalign.h": "c" } } \ No newline at end of file diff --git a/Makefile b/Makefile index 5d57802..d5ff166 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,7 @@ socket_wrapper.o: socket_wrapper.c socket_wrapper.h client: socket_wrapper.o client.c $(CC) -o client client.c socket_wrapper.o $(CFLAGS) server: socket_wrapper.o server.c - $(CC) -o server server.c socket_wrapper.o $(CFLAGS) + $(CC) -o server server.c socket_wrapper.o $(CFLAGS) -pthread p-server: socket_wrapper.o p-server.c $(CC) -o p-server p-server.c socket_wrapper.o $(CFLAGS) p-client: socket_wrapper.o p-client.c diff --git a/README.md b/README.md index f6ec5f9..ec2c9a6 100644 --- a/README.md +++ b/README.md @@ -15,9 +15,16 @@ Server OPTION and arguments: - `-h` :print help message. Available macro: -- DEFAULT_SERVER_PORT : 9091 -- DEFAULT_MAX_PATH_SIZE : 256(must be less than 1000) -- TIMEOUT : 5(second unit) +For server +- DEFAULT_MAX_LISTEN_SOCKET: 16 +- DEFAULT_SERVER_PORT: 9091 +- DEFAULT_MAX_PATH_SIZE: 256(must be less than 1000) - USE_SENDFILE - - DEFAULT_SEND_FILE_CHUNK_SIZE : 0x100000(1MB) -- SLOW_CLIENT(second unit) \ No newline at end of file + - DEFAULT_SEND_FILE_CHUNK_SIZE: 0x100000(1MB) +- DEFAULT_WORK_QUEUE_SIZE: 10 +- DEFAULT_MAX_THREAD_NUMBER: 10 +For client +- SLOW_CLIENT(second unit) +- DEFAULT_PROGRESS_BAR_WIDTH: 30 +For both +- TIMEOUT: 5(second unit) \ No newline at end of file diff --git a/p-client.c b/p-client.c index 34f6cd3..2fc1a88 100644 --- a/p-client.c +++ b/p-client.c @@ -28,8 +28,11 @@ static const int TIMEOUT = 5; #else static const int TIMEOUT = DEFAULT_TIMEOUT; #endif - +#ifndef DEFAULT_PROGRESS_BAR_WIDTH static const int PROGRESS_BAR_WIDTH = 30; +#else +static const int PROGRESS_BAR_WIDTH = DEFAULT_PROGRESS_BAR_WIDTH; +#endif /*======== *Operation *========*/ @@ -67,7 +70,7 @@ void DisplayProgressBar(size_t offset,size_t total,double cur_progress){ buf[i] = '='; else if(i == cur_pos) buf[i] = '>'; - else buf[i] = ' '; + else buf[i] = '.'; } printf("\r[%s]: %.2f%% bytes: %ld/%ld bytes",buf,cur_progress,total,offset); } diff --git a/p-server.c b/p-server.c index 3783414..a455a61 100644 --- a/p-server.c +++ b/p-server.c @@ -272,8 +272,8 @@ int main(int argc, const char *argv[]){ perror("accept error"); return 1; } - msg = inet_ntop(AF_INET,&addr.sin_addr,ip_buf,sizeof(ip_buf)); - fprintf(stderr,"Connected on : %s:%d\n",msg == NULL ? "(null)" : msg , ntohs(addr.sin_port)); + msg = inet_ntop(AF_INET,&client_addr.sin_addr,ip_buf,sizeof(ip_buf)); + fprintf(stderr,"Connected on : %s:%d\n",msg == NULL ? "(null)" : msg , ntohs(client_addr.sin_port)); pid = fork(); if(pid == 0){ if((fd = read_request(csock,buf,bufsize)) > 0){ diff --git a/pstest.sh b/pstest.sh index c8cb742..47ae48e 100755 --- a/pstest.sh +++ b/pstest.sh @@ -1,3 +1,3 @@ #! /bin/bash cd server_test -./p-server \ No newline at end of file +./server \ No newline at end of file diff --git a/server.c b/server.c index f606627..74e58d6 100644 --- a/server.c +++ b/server.c @@ -12,8 +12,16 @@ #include #include #include -#include "socket_wrapper.h" +#include +#include "socket_wrapper.h" +#include "simple_circular_buffer.h" + +#ifndef DEFAULT_MAX_LISTEN_SOCKET +static const int MAX_LISTEN_SOCKET = 16; +#else +static const int MAX_LISTEN_SOCKET = DEFAULT_MAX_LISTEN_SOCKET; +#endif #ifdef USE_SENDFILE #include #ifndef DEFAULT_SEND_FILE_CHUNK_SIZE @@ -39,6 +47,19 @@ static const int TIMEOUT = 5; static const int TIMEOUT = DEFAULT_TIMEOUT; #endif +enum{ +#ifndef DEFAULT_WORK_QUEUE_SIZE + WORK_QUEUE_SIZE = 10, + #else + WORK_QUEUE_SIZE = DEFAULT_WORK_QUEUE_SIZE, +#endif +#ifndef DEFAULT_MAX_THREAD_NUMBER + MAX_THREAD_NUMBER = 10 + #else + MAX_THREAD_NUMBER = DEFAULT_MAX_THREAD_NUMBER +#endif +}; + /*======== *Operation *========*/ @@ -204,18 +225,83 @@ int parse_args(int argc,const char * argv[] , in_port_t * port){ } return 0; } +//============ +//Simple Thread Pool +//============ +typedef struct SharedState{ + //empty if less than 0 + queue_struct(int,WORK_QUEUE_SIZE) socks; + pthread_mutex_t sock_mutex; + pthread_cond_t ready; + //int progress[MAX_THREAD_NUMBER]; +} shared_state_t; + +void init_shared_state(shared_state_t * state) { + queue_init(&state->socks); + pthread_mutex_init(&state->sock_mutex,NULL); + pthread_cond_init(&state->ready,NULL); +} +// +typedef struct WorkerArgument +{ + int id; + int bufsize; + uint8_t * buf; + + //pthread_mutex_t * cond_mutex; + //pthread_cond_t cond; + +} worker_argument_t; + +__attribute__((malloc)) worker_argument_t * create_worker_argument(int id, int bufsize){ + worker_argument_t * ret = (worker_argument_t *)malloc(sizeof(worker_argument_t)); + if (ret == NULL) return ret; + ret->id = id; + ret->bufsize = bufsize; + ret->buf = (uint8_t *)malloc(sizeof(*ret->buf)*bufsize); + if(ret->buf == NULL){ + free(ret); + ret = NULL; + } + return ret; +} + +static shared_state_t globalState; + +void * worker_proc(void * data){ + worker_argument_t * args = (worker_argument_t *)data; + int fd, csock; + for(;;){ + pthread_mutex_lock(&globalState.sock_mutex); + while (queue_isempty(&globalState.socks)){ + pthread_cond_wait(&globalState.ready,&globalState.sock_mutex); + } + csock = dequeue(&globalState.socks); + pthread_mutex_unlock(&globalState.sock_mutex); + if((fd = read_request(csock,args->buf,args->bufsize)) > 0){ + send_response(csock,fd,args->buf,args->bufsize); + close(fd); + } + + if(close(csock) < 0) + perror("csock close error"); + } +} + + +static pthread_t worker_threads[MAX_THREAD_NUMBER]; static int sock; void safe_exit(){ close(sock); } int main(int argc, const char *argv[]){ - uint8_t * buf; struct sockaddr_in addr; struct sockaddr_in client_addr; socklen_t client_addr_len = sizeof(client_addr); int csock; int bufsize; + int i; in_port_t binding_port_number = SERVER_PORT; if (argc > 1){ int d = parse_args(argc,argv,&binding_port_number); @@ -234,11 +320,15 @@ int main(int argc, const char *argv[]){ perror("setsockopt"); } } + init_shared_state(&globalState); bufsize = getBufferSizeFrom(sock); - buf = malloc(bufsize * sizeof(*buf)); - if (buf == NULL){ - fprintf(stderr,"lack of memory"); - return 1; + for (i = 0; i < MAX_THREAD_NUMBER; i++) { + worker_argument_t * args = create_worker_argument(i,bufsize); + if (args == NULL) { + fprintf(stderr,"malloc: lack of memory"); + return 1; + } + pthread_create(&worker_threads[i],NULL,worker_proc,args); } addr.sin_addr.s_addr = htonl(INADDR_ANY); /*0.0.0.0 모든 네트워크 인터페이스에 묶임.*/ @@ -255,27 +345,33 @@ int main(int argc, const char *argv[]){ fprintf(stderr,"server bind on %s:%d\n",msg ,binding_port_number); } - if(listen(sock,1) < 0){ + if(listen(sock,MAX_LISTEN_SOCKET) < 0){ perror("listen failed"); return 1; } while ((csock = accept(sock, (struct sockaddr *)&client_addr,&client_addr_len)) >= 0) { - int fd; char ip_buf[INET_ADDRSTRLEN]; - const char * msg = inet_ntop(AF_INET,&addr.sin_addr,ip_buf,sizeof(ip_buf)); + const char * msg = inet_ntop(AF_INET,&client_addr.sin_addr,ip_buf,sizeof(ip_buf)); fprintf(stderr,"Connected on : %s:%d\n",msg == NULL ? "(null)" : msg , ntohs(addr.sin_port)); - if((fd = read_request(csock,buf,bufsize)) > 0){ - send_response(csock,fd,buf,bufsize); - close(fd); + for(;;){ + pthread_mutex_lock(&globalState.sock_mutex); + if (queue_isfull(&globalState.socks)){ + pthread_mutex_unlock(&globalState.sock_mutex); +#ifdef _GNU_SOURCE + pthread_yield(); +#else + usleep(400); +#endif + continue; + } + else enqueue(&globalState.socks,csock); + break; } - - if(close(csock) < 0) - perror("csock close error"); - + pthread_mutex_unlock(&globalState.sock_mutex); + pthread_cond_signal(&globalState.ready); } - free(buf); - perror("accept error"); + return 1; } \ No newline at end of file diff --git a/simple_circular_buffer.h b/simple_circular_buffer.h new file mode 100644 index 0000000..5d3f505 --- /dev/null +++ b/simple_circular_buffer.h @@ -0,0 +1,30 @@ +#ifndef _SIMPLE_CIRCULAR_BUFFER_ +#define _SIMPLE_CIRCULAR_BUFFER_ + +#include + +#define queue_struct(queue_type,queue_size) struct{\ +queue_type data [(queue_size)+1];\ +size_t begin;\ +size_t end;\ +} + +#define queue_size(queue) ((sizeof((queue)->data)/sizeof((queue)->data[0])) - 1) +#define queue_isempty(queue) ((queue)->begin == (queue)->end) +#define queue_isfull(queue) ((queue)->begin == (((queue)->end + 1) % queue_size(queue))) + +#define queue_init(queue) do{\ + (queue)->begin = 0;\ + (queue)->end = 0;\ +}while(0) +//unchecked +#define enqueue(queue,element) do{ \ + (queue)->data[(queue)->end] = (element);\ + (queue)->end = ((queue)->end + 1) % (queue_size(queue) + 1);\ +}while(0) +//unchecked +#define dequeue(queue) \ +(((queue)->begin = ((queue)->begin + 1) % (queue_size(queue) + 1)) ,\ +(queue)->data[((queue)->begin + queue_size(queue)) % (queue_size(queue) + 1)]) + +#endif \ No newline at end of file diff --git a/socket_wrapper.c b/socket_wrapper.c index 7996ef6..9153133 100644 --- a/socket_wrapper.c +++ b/socket_wrapper.c @@ -26,21 +26,32 @@ ssize_t timeout_recv(int fd,void * buf,size_t n,int timeout) { ssize_t ret = 0; int poll_ret; + int try = 0; struct pollfd fd_single; - fd_single.fd = fd; - fd_single.events = POLL_IN; - poll_ret = (poll(&fd_single,1,timeout * 1000)); - if (poll_ret < 0) return -1; - else if(poll_ret == 0) return -2; - if (fd_single.revents & POLLHUP) //We'll treat hangups state like timeouts state. - return -2; - if ((fd_single.revents & POLLERR) || (fd_single.revents & POLLNVAL)) - return -1; - if (fd_single.revents & POLL_IN) - ret = recv(fd,buf,n,0); - assert(ret != 0); - return ret; - + timeout = timeout * 1000; + if (n == 0) return 0; + for(;;){ + fd_single.fd = fd; + fd_single.events = POLL_IN; + poll_ret = (poll(&fd_single,1,timeout)); + if (poll_ret < 0){ fprintf(stderr,"timeout %d\n",timeout); return -1;} + else if(poll_ret == 0) return -2; + if (fd_single.revents & POLLHUP) //We'll treat hangups state like timeouts state. + return -2; + if ((fd_single.revents & POLLERR) || (fd_single.revents & POLLNVAL)) + return -1; + if (fd_single.revents & POLL_IN){ + ret = recv(fd,buf,n,0); + if(ret != 0) return ret; + //try 3 times + if (try < 3){ + try++; + timeout /= 2; + continue; + } + return -2; + } + } assert(0 && "unreachable"); }