#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "socket_wrapper.h" #include "simple_circular_buffer.h" #include "display_bar.h" #ifndef DEFAULT_MAX_LISTEN_SOCKET static const int MAX_LISTEN_SOCKET = 16; #else static const int MAX_LISTEN_SOCKET = DEFAULT_MAX_LISTEN_SOCKET; #endif #ifdef USE_SENDFILE #include #ifndef DEFAULT_SEND_FILE_CHUNK_SIZE const size_t SEND_FILE_CHUNK_SIZE = 0x100000; /*1MB*/ #else const size_t SEND_FILE_CHUNK_SIZE = DEFAULT_SEND_FILE_CHUNK_SIZE; /*1MB*/ #endif #endif #ifndef DEFAULT_SERVER_PORT static const in_port_t SERVER_PORT = 9091; #else static const in_port_t SERVER_PORT = DEFAULT_SERVER_PORT; #endif #ifndef DEFAULT_MAX_PATH_SIZE /*0 < x < MAX_PATH_SIZE*/ static const uint16_t MAX_PATH_SIZE = 256; #else static const uint16_t MAX_PATH_SIZE = DEFAULT_MAX_PATH_SIZE; #endif #ifndef DEFAULT_TIMEOUT static const int TIMEOUT = 5; #else static const int TIMEOUT = DEFAULT_TIMEOUT; #endif enum{ #ifndef DEFAULT_WORK_QUEUE_SIZE WORK_QUEUE_SIZE = 10, #else WORK_QUEUE_SIZE = DEFAULT_WORK_QUEUE_SIZE, #endif #ifndef DEFAULT_MAX_THREAD_NUMBER MAX_THREAD_NUMBER = 10 #else MAX_THREAD_NUMBER = DEFAULT_MAX_THREAD_NUMBER #endif }; #ifdef SLOW_SERVER //micro second unit static const int SLOW_SERVER_TIME = SLOW_SERVER; #endif //#define USE_TRACE #ifdef USE_TRACE #include "timerhelper.h" enum{ #ifndef DEFAULT_TOP_TRACE_TIMER_ID Top_Trace_Timer_ID = CLOCK_REALTIME, #else Top_Trace_Timer_ID = DEFAULT_TOP_TRACE_TIMER_ID, #endif #ifndef DEFAULT_BOTTOM_TRACE_TIMER_ID Bottom_Trace_Timer_ID = CLOCK_THREAD_CPUTIME_ID #else Bottom_Trace_Timer_ID = DEFAULT_BOTTOM_TRACE_TIMER_ID #endif }; static inline void report_resolution() { struct timespec top_res,bottom_res; clock_getres(Top_Trace_Timer_ID,&top_res); clock_getres(Bottom_Trace_Timer_ID,&bottom_res); fprintf(stderr,"top res: %ld, bottom res: %ld\n",top_res.tv_nsec,bottom_res.tv_nsec); } #endif static bool use_gui = false; static inline void server_perror(const char * msg){ if(use_gui) myd_perror(msg); else perror(msg); } static inline int server_fprintf(int line,FILE * file,const char * msg,...){ va_list va; va_start(va,msg); return use_gui ? myd_vfprintf(line,file,msg,va) : vfprintf(file,msg,va); } /*======== *Operation *========*/ /** * send user error message * 80 character limit * thread safe */ int send_fail(int sock,const char * msg){ struct TransferResult res; res.res = RES_USR_ERR; res.file_size = 0; res.err_number = 0; res.error_msg_size = strlen(msg); //os will be combining if tcp_autocorking emabled. if(send(sock,&res,sizeof(res),0) < 0){ server_perror("error msg send"); return -1; } if (send(sock,msg,res.error_msg_size,0) < 0){ server_perror("error msg send"); return -1; } return 0; } /** * send errno to client * thread safe */ int send_errno(int sock){ struct TransferResult r; r.res = RES_ERR; r.err_number = errno; r.file_size = 0; r.error_msg_size = 0; if(send(sock,&r,sizeof(r),0)){ server_perror("errno send"); return -1; } return 0; } /** * return fd, if success. otherwise, return -1. * thread safe */ int read_request(int sock,uint8_t * buf,size_t bufsize){ struct ReadOp p; int fd; ssize_t n = recv_until_byte(sock,&p,sizeof(p),TIMEOUT); if (n < 0){ if (n == -2) fprintf(stderr,"timeout!"); else server_perror("receive fail"); return -1; } if(bufsize <= ((size_t)p.file_url_size) + sizeof(p) + 1){ send_fail(sock,"buffer overflow"); return -1; } else if(p.file_url_size + 1 > MAX_PATH_SIZE){ send_fail(sock,"max path fail"); return -1; } else if(p.file_url_size == 0){ send_fail(sock,"filename zero fail"); return -1; } n = recv_until_byte(sock,buf,p.file_url_size,TIMEOUT); buf[p.file_url_size] = '\0'; //truncate server_fprintf(1,stdout,"str size: %d, request %s\n",p.file_url_size,buf); if(strchr((char *)buf,'/') != NULL){ send_fail(sock,"Illegal character /"); return -1; } fd = open((char *)buf,O_RDONLY); if(fd < 0){ send_errno(sock); close(fd); return -1; } return fd; } /** * send response to client * thread safe */ int send_response(int sock,int fd, uint8_t * buf, size_t bufsize){ struct TransferResult r; struct stat st; off_t offset = 0; ssize_t readed = 0; progress_bar_t pbar; r.res = RES_OK; r.err_number = 0; r.error_msg_size = 0; if(fstat(fd,&st) < 0){ return send_errno(sock); } if(S_ISDIR(st.st_mode)){ return send_fail(sock,"is a directory"); } r.file_size = st.st_size; if(send(sock,&r,sizeof(r),0)<0){ server_perror("send fail"); return -1; } if(use_gui) init_progress_bar(&pbar,10); #ifdef USE_SENDFILE while (r.file_size != offset) { size_t count = SEND_FILE_CHUNK_SIZE < (r.file_size - offset) ? SEND_FILE_CHUNK_SIZE : (r.file_size - offset); if((readed = sendfile(sock,fd,&offset,count)) < 0){ server_perror("send file fail"); return -1; } if(use_gui) DisplayProgressBar(&pbar,offset,r.file_size,"",false); #ifdef SLOW_SERVER usleep(SLOW_SERVER_TIME); #endif } #else while (offset < r.file_size) { if(use_gui) DisplayProgressBar(&pbar,offset,r.file_size,"",false); readed = bufsize < (r.file_size - offset) ? bufsize : r.file_size - offset; if(read(fd,buf,readed)<0){ server_perror("send response read fail"); return -1; } if(send(sock,buf,readed,0)<0){ server_perror("send response send fail"); return -1; } offset += readed; #ifdef SLOW_SERVER usleep(SLOW_SERVER_TIME); #endif } if(use_gui) DisplayProgressBar(&pbar,offset,r.file_size,"",true); #endif return 0; } const char * help(const char * n){ const char * msg = "USASE : %s [Option] ...\n" "Options and arguments: \n" "-p port\t:set to port binding. couldn't set to 0\n" "-h\t:print help message.\n" "--progress_bar\t: show pretty progress bar"; printf(msg,n); return msg; } /** return 0 ok. otherwise invalid format*/ int parse_args(int argc,const char * argv[] , in_port_t * port){ int pos = 1; const char * opt; while (pos < argc){ opt = argv[pos++]; if (strcmp(opt,"-h") == 0 || strcmp(opt,"--help") == 0){ help(argv[0]); return 0; } else if(strcmp(opt,"-p") == 0 || strcmp(opt,"--port") == 0){ if (pos < argc){ const char * value = argv[pos++]; *port = atoi(value); if (port == 0){ // either not number or zero fprintf(stderr,"argument is either not number or zero\n"); return 2; } } else{ fprintf(stderr,"need argument\n"); return 2; //failed to find argument. } } else if(strcmp(opt,"--progress_bar") == 0){ use_gui = true; } else{ fprintf(stderr,"unknown option\n"); help(argv[0]); return 2; } } return 0; } //============ //Simple Thread Pool //============ #ifndef USE_NO_QUEUE typedef struct SharedState{ //empty if less than 0 queue_struct(int,WORK_QUEUE_SIZE) socks; #ifdef USE_TRACE queue_struct(struct timespec,WORK_QUEUE_SIZE) trace_timer; #endif pthread_mutex_t sock_mutex; pthread_cond_t ready; //int progress[MAX_THREAD_NUMBER]; } shared_state_t; void init_shared_state(shared_state_t * state) { queue_init(&state->socks); #ifdef USE_TRACE queue_init(&state->trace_timer); #endif pthread_mutex_init(&state->sock_mutex,NULL); pthread_cond_init(&state->ready,NULL); } #endif //argument for thread worker typedef struct WorkerArgument { int id; int bufsize; uint8_t * buf; #ifdef USE_NO_QUEUE int csock; #ifdef USE_TRACE struct timespec ts; #endif #endif } worker_argument_t; __attribute_malloc__ worker_argument_t * create_worker_argument(int id, int bufsize #ifdef USE_NO_QUEUE , int csock #endif ){ worker_argument_t * ret = (worker_argument_t *)malloc(sizeof(worker_argument_t)); if (ret == NULL) return ret; ret->id = id; #ifdef USE_NO_QUEUE ret->csock = csock; #endif ret->bufsize = bufsize; ret->buf = (uint8_t *)malloc(sizeof(*ret->buf)*bufsize); if(ret->buf == NULL){ free(ret); ret = NULL; } return ret; } void destory_worker_argument(worker_argument_t * arg){ free(arg->buf); free(arg); } #ifndef USE_NO_QUEUE static shared_state_t globalState; void * worker_proc(void * data){ worker_argument_t * args = (worker_argument_t *)data; int fd, csock; #ifdef USE_TRACE struct timespec ts_top_begin,ts_top_end, ts_bottom_begin, ts_bottom_end; #endif for(;;){ pthread_mutex_lock(&globalState.sock_mutex); //wait until request come. while (queue_isempty(&globalState.socks)){ pthread_cond_wait(&globalState.ready,&globalState.sock_mutex); } csock = dequeue(&globalState.socks); //dequeue socket descriptor for request. #ifdef USE_TRACE ts_top_begin = dequeue(&globalState.trace_timer); #endif pthread_mutex_unlock(&globalState.sock_mutex); #ifdef USE_TRACE clock_gettime(Top_Trace_Timer_ID,&ts_top_end); clock_gettime(Bottom_Trace_Timer_ID,&ts_bottom_begin); #endif //process request. if((fd = read_request(csock,args->buf,args->bufsize)) > 0){ send_response(csock,fd,args->buf,args->bufsize); close(fd); } #ifdef USE_TRACE clock_gettime(Bottom_Trace_Timer_ID,&ts_bottom_end); struct timespec tophalf = timespec_sub(ts_top_end,ts_top_begin); struct timespec bottomhalf = timespec_sub(ts_bottom_end,ts_bottom_begin); server_fprintf(1,stderr,"top : %ld ns, bottom : %ld ns\n",tophalf.tv_nsec,bottomhalf.tv_nsec); #endif if(close(csock) < 0) server_perror("csock close error"); } destory_worker_argument(args); return NULL; } static pthread_t worker_threads[MAX_THREAD_NUMBER]; #else void * worker_proc(void * data){ worker_argument_t * args = (worker_argument_t *)data; int fd, csock; csock = args->csock; #ifdef USE_TRACE struct timespec ts_top_begin,ts_top_end, ts_bottom_begin, ts_bottom_end; ts_top_begin = args->ts; clock_gettime(Top_Trace_Timer_ID,&ts_top_end); clock_gettime(Bottom_Trace_Timer_ID,&ts_bottom_begin); #endif if((fd = read_request(csock,args->buf,args->bufsize)) > 0){ send_response(csock,fd,args->buf,args->bufsize); close(fd); } #ifdef USE_TRACE clock_gettime(Bottom_Trace_Timer_ID,&ts_bottom_end); struct timespec tophalf = timespec_sub(ts_top_end,ts_top_begin); struct timespec bottomhalf = timespec_sub(ts_bottom_end,ts_bottom_begin); server_fprintf(1,stderr,"top : %ld ns, bottom : %ld ns\n",tophalf.tv_nsec,bottomhalf.tv_nsec); #endif if(close(csock) < 0) server_perror("csock close error"); destory_worker_argument(args); return NULL; } #endif static int sock; void safe_exit(){ close(sock); } int main(int argc, const char *argv[]){ struct sockaddr_in addr; struct sockaddr_in client_addr; socklen_t client_addr_len = sizeof(client_addr); int csock; int bufsize; int i = 0; in_port_t binding_port_number = SERVER_PORT; if (argc > 1){ int d = parse_args(argc,argv,&binding_port_number); if(d != 0 ) return d; } if(use_gui) ready_progress_bar(); #ifdef USE_TRACE report_resolution(); #endif sock = socket(AF_INET,SOCK_STREAM,0); atexit(safe_exit); if(sock < 0){ server_perror("sock create fail"); return 1; } else { int option = 1; if(setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&option,sizeof(option)) < 0){ server_perror("setsockopt"); } } bufsize = getBufferSizeFrom(sock); #ifndef USE_NO_QUEUE init_shared_state(&globalState); for (i = 0; i < MAX_THREAD_NUMBER; i++) { worker_argument_t * args = create_worker_argument(i,bufsize); if (args == NULL) { fprintf(stderr,"malloc: lack of memory"); return 1; } pthread_create(&worker_threads[i],NULL,worker_proc,args); } #endif addr.sin_addr.s_addr = htonl(INADDR_ANY); /*0.0.0.0 모든 네트워크 인터페이스에 묶임.*/ addr.sin_family = AF_INET; addr.sin_port = htons(binding_port_number); if(bind(sock, (struct sockaddr *)&addr,sizeof(addr)) < 0){ server_perror("bind failed"); return 1; } else { char ip_buf[INET_ADDRSTRLEN]; const char * msg = inet_ntop(AF_INET,&addr.sin_addr,ip_buf,sizeof(ip_buf)); assert(msg != NULL); server_fprintf(1,stdout,"server bind on %s:%d\n",msg ,binding_port_number); } if(listen(sock,MAX_LISTEN_SOCKET) < 0){ server_perror("listen failed"); return 1; } while ((csock = accept(sock, (struct sockaddr *)&client_addr,&client_addr_len)) >= 0) { char ip_buf[INET_ADDRSTRLEN]; const char * msg = inet_ntop(AF_INET,&client_addr.sin_addr,ip_buf,sizeof(ip_buf)); server_fprintf(1,stdout,"Connected on : %s:%d\n",msg == NULL ? "(null)" : msg , ntohs(addr.sin_port)); #ifdef USE_TRACE struct timespec ts_top_begin; clock_gettime(Top_Trace_Timer_ID, &ts_top_begin); #endif #ifndef USE_NO_QUEUE for(;;){ pthread_mutex_lock(&globalState.sock_mutex); if (queue_isfull(&globalState.socks)){ pthread_mutex_unlock(&globalState.sock_mutex); #ifdef _GNU_SOURCE pthread_yield(); #else usleep(400); #endif continue; } else { enqueue(&globalState.socks,csock); #ifdef USE_TRACE enqueue(&globalState.trace_timer,ts_top_begin); #endif } break; } pthread_mutex_unlock(&globalState.sock_mutex); pthread_cond_signal(&globalState.ready); #else pthread_t thread_a; worker_argument_t * args = create_worker_argument(i++,bufsize,csock); #ifdef USE_TRACE args->ts = ts_top_begin; #endif pthread_create(&thread_a,NULL,worker_proc,args); pthread_detach(thread_a); #endif } return 1; }