From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on gnuweeb.org X-Spam-Level: X-Spam-Status: No, score=-0.8 required=5.0 tests=ALL_TRUSTED,DKIM_SIGNED, DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF,NO_DNS_FOR_FROM,URIBL_BLOCKED autolearn=no autolearn_force=no version=3.4.6 Received: from integral2.. (unknown [36.81.65.188]) by gnuweeb.org (Postfix) with ESMTPSA id B359B7E385; Fri, 8 Jul 2022 12:10:59 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=gnuweeb.org; s=default; t=1657282262; bh=utWIepQArvlU8L2lQjU+pgttWeWaaKGpieWIviJDUGQ=; h=From:To:Cc:Subject:Date:In-Reply-To:References:From; b=MZ8JjHt20wUNDVoobnOxCrDSnVrDJ7zks0tXRquBXr6f+7rpLwmnz05w3wtwZ4MYY sb24M7sm2UxxRyANDm9OJ24bFNr7+pfJVNAP+fWppFOrDDswNneGUsqM28Xa9Rw3Qb BDvmknkjA6pd77mfHUzqGkVhqPgMQmp+gDlvbE8vcynMW6y9uEkBfHn0PuReNuGZqT 3uIYFmvogTMAabPmA96+JSj3TfxhJ8ydMrCMwlrf6VPxsEo6RDGfi1+4nBYmfIWNEV T8NEzIif0UTfI2uSWCOKUrUfF6N6LcQ7mKL+zydy2oTuhw8/abMxTF9wW5RrSwoPLL u2DU2DTDhMFJw== From: Ammar Faizi To: GNU/Weeb Mailing List Cc: Ammar Faizi , Alviro Iskandar Setiawan , Arthur Lapz , Fernanda Ma'rouf , Sprite , Yonle Subject: [PATCH gwhttpd 09/14] gwhttpd: Add directory listing and download file support Date: Fri, 8 Jul 2022 19:10:20 +0700 Message-Id: <20220708121025.926162-10-ammarfaizi2@gnuweeb.org> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20220708121025.926162-1-ammarfaizi2@gnuweeb.org> References: <20220708121025.926162-1-ammarfaizi2@gnuweeb.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: A huge changes, full refactoring. Let's start again from this point. Signed-off-by: Ammar Faizi --- Makefile | 4 +- gwhttpd.cpp | 1680 +++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 1371 insertions(+), 313 deletions(-) diff --git a/Makefile b/Makefile index 28cc0c8..cfbd377 100644 --- a/Makefile +++ b/Makefile @@ -2,11 +2,11 @@ CXXFLAGS = -ggdb3 -Wall -Wextra -O2 -fno-exceptions ifeq ($(SANITIZE_BUILD),1) - CXXFLAGS += "-fsanitize=address -DUSE_ASAN=1" + CXXFLAGS += -fsanitize=address -DUSE_ASAN=1 endif gwhttpd: gwhttpd.cpp - $(CXX) $(CXXFLAGS) -o $(@) $(^) + $(CXX) $(CXXFLAGS) -o $(@) $(^) -lpthread clean: rm -vf gwhttpd diff --git a/gwhttpd.cpp b/gwhttpd.cpp index bb8dc93..0e4c558 100644 --- a/gwhttpd.cpp +++ b/gwhttpd.cpp @@ -6,6 +6,8 @@ #define _GNU_SOURCE #endif +#include +#include #include #include #include @@ -13,22 +15,130 @@ #include #include #include +#include +#include #include +#include #include +#include +#include #include #include #include #include +#include +#include +#define noinline __attribute__((__noinline__)) +#define __hot __attribute__((__hot__)) +#define __cold __attribute__((__cold__)) #define likely(COND) __builtin_expect(!!(COND), 1) #define unlikely(COND) __builtin_expect(!!(COND), 0) -#define NR_EPOLL_EVT 32 -#define NR_MAX_CLIENTS 4096 +#define NR_WORKERS 16 +#define NR_EPOLL_EVT 512 +#define NR_MAX_CLIENTS 10240 #define IPV4_LEN (sizeof("xxx.xxx.xxx.xxx")) #define FCIP "%s:%u" -#define FCIP_ARG(P) (P)->src_addr, (P)->src_port +#define FCIP_ARG(P) ((P)->src_addr), ((P)->src_port) + +#if defined(__x86_64__) +#define __page_size_aligned_in_smp __attribute__((__aligned__(4096))) +#else +#define __page_size_aligned_in_smp +#endif + +template +struct wq_stack { + T *arr_; + size_t pos_; + size_t max_; + + inline wq_stack(size_t max): + pos_(max), + max_(max) + { + arr_ = new T[max]; + } + + inline ~wq_stack(void) + { + delete[] arr_; + } + + inline int64_t push(T val) + { + arr_[--pos_] = val; + return pos_; + } + + inline T top(void) + { + return arr_[pos_]; + } + + inline T pop(void) + { + return arr_[pos_++]; + } + + inline bool empty(void) + { + return max_ == pos_; + } +}; + + +template +struct wq_queue { + T *arr_; + size_t front_; + size_t rear_; + size_t and_; + + inline wq_queue(size_t want_max): + front_(0), + rear_(0) + { + size_t max = 1; + while (max < want_max) + max *= 2; + + and_ = max - 1; + arr_ = new T[max]; + } + + inline ~wq_queue(void) + { + delete[] arr_; + } + + inline size_t size(void) + { + if (rear_ >= front_) + return rear_ - front_; + else + return front_ - rear_; + } + + inline int64_t push(T val) + { + arr_[rear_ & and_] = val; + return rear_++; + } + + inline T front(void) + { + return arr_[front_ & and_]; + } + + inline T pop(void) + { + return arr_[front_++ & and_]; + } +}; enum http_method { + HTTP_NOP = 0, HTTP_GET, HTTP_POST, HTTP_PATCH, @@ -36,43 +146,84 @@ enum http_method { HTTP_DELETE, }; +enum http_action { + HTTP_ACT_NONE = 0, + HTTP_ACT_DIRLIST, + HTTP_ACT_FILE_STREAM, +}; + +struct stream_dir_list_data { + DIR *dir; + char path[4096]; +}; + +struct stream_file_data { + char *map; + off_t size; + off_t cur_off; +}; + struct client_sess { int fd; - size_t buf_size; - char buf[4096]; + uint32_t idx; + void *priv_data; + + enum http_action action; enum http_method method; char *uri; char *qs; char *http_ver; + char *header; char *body; bool got_http_header; - struct sockaddr_in addr; - uint16_t src_port; char src_addr[IPV4_LEN]; + uint16_t src_port; + char recv_buf[4096]; + size_t rbuf_len; + bool need_epl_del; + std::unordered_map *http_headers; +} __page_size_aligned_in_smp; + +struct server_state; +struct worker { + int epl_fd; + struct epoll_event events[NR_EPOLL_EVT]; uint32_t idx; -}; + struct server_state *state; + wq_queue *buf_queue; + std::thread thread; + volatile bool need_join; +} __page_size_aligned_in_smp; struct server_state { volatile bool stop; + std::atomic wrk_idx_use; int tcp_fd; - int epl_fd; - struct epoll_event events[NR_EPOLL_EVT]; - std::mutex *sfi_lock; - std::stack *sess_free_idx; - struct client_sess sess[NR_MAX_CLIENTS]; + int sig; /* - * Signal caught by the interrupt handler. + * Stack of free session indexes. */ - int sig; + wq_stack *sess_free; + std::mutex *sess_free_lock; + + /* + * Array of client sessions. + */ + struct client_sess sess[NR_MAX_CLIENTS]; + std::atomic nr_on_thread; + + struct worker *workers; + + const char *bind_addr; + uint16_t bind_port; }; -static struct server_state *g_state; +static struct server_state *g_state = NULL; -static void interrupt_handler(int sig) +static __cold void signal_handler_func(int sig) { - putchar('\n'); - printf("Got signal: %d\n", sig); + printf("\nGot signal: %d\n", sig); if (!g_state) return; @@ -80,94 +231,223 @@ static void interrupt_handler(int sig) g_state->sig = sig; } -static int init_state(struct server_state *state) +static __cold struct server_state *alloc_state(void) +{ + struct server_state *state; + +#ifdef USE_ASAN + state = new struct server_state; + if (!state) { + errno = ENOMEM; + perror("new()"); + return NULL; + } +#else /* #ifdef USE_ASAN */ + state = (struct server_state *)mmap(NULL, sizeof(*state), + PROT_READ|PROT_WRITE, + MAP_ANONYMOUS|MAP_PRIVATE, -1, 0); + if (state == MAP_FAILED) { + perror("mmap"); + return NULL; + } + mlock(state, sizeof(*state)); +#endif /* #ifdef USE_ASAN */ + + return state; +} + +static __cold void free_state(struct server_state *state) +{ +#ifdef USE_ASAN + delete state; +#else + munlock(state, sizeof(*state)); + munmap(state, sizeof(*state)); +#endif +} + +static __cold int setup_signal_handler(void) { struct sigaction act; - uint32_t i; int ret; - memset(state, 0, sizeof(*state)); - state->tcp_fd = -1; - state->epl_fd = -1; - state->sig = -1; - memset(&act, 0, sizeof(act)); - act.sa_handler = interrupt_handler; + act.sa_handler = signal_handler_func; ret = sigaction(SIGINT, &act, NULL); - ret |= sigaction(SIGHUP, &act, NULL); - ret |= sigaction(SIGTERM, &act, NULL); + if (unlikely(ret)) + goto err; + ret = sigaction(SIGHUP, &act, NULL); + if (unlikely(ret)) + goto err; + ret = sigaction(SIGTERM, &act, NULL); + if (unlikely(ret)) + goto err; + act.sa_handler = SIG_IGN; - ret |= sigaction(SIGPIPE, &act, NULL); - if (ret) { - fprintf(stderr, "Failed to set up the interrupt handler.\n"); + ret = sigaction(SIGPIPE, &act, NULL); + if (unlikely(ret)) + goto err; + + return 0; +err: + perror("sigaction"); + return -errno; +} + +static __cold int init_state(struct server_state **state_p) +{ + struct server_state *state; + uint32_t i; + int ret; + + ret = setup_signal_handler(); + if (unlikely(ret)) return ret; - } - state->sfi_lock = new std::mutex; - if (!state->sfi_lock) { - fprintf(stderr, "Cannot allocate mutex\n"); + state = alloc_state(); + if (unlikely(!state)) return -ENOMEM; - } - state->sess_free_idx = new __typeof__(*state->sess_free_idx); - if (!state->sess_free_idx) { - fprintf(stderr, "Cannot allocate sess_free_idx\n"); - delete state->sfi_lock; + state->tcp_fd = -1; + + state->sess_free = new wq_stack(NR_MAX_CLIENTS); + if (unlikely(!state->sess_free)) { + errno = ENOMEM; + perror("state->sess_free = new()"); return -ENOMEM; } - for (i = NR_MAX_CLIENTS - 1; i--; ) { + state->sess_free_lock = new std::mutex; + if (unlikely(!state->sess_free_lock)) { + errno = ENOMEM; + perror("state->sess_free_lock = new()"); + goto out_err_sess_free_lock; + } + + state->workers = new struct worker[NR_WORKERS]; + if (unlikely(!state->workers)) { + ret = -ENOMEM; + errno = -ret; + perror("state->workers = new()"); + goto out_err_workers; + } + + i = NR_MAX_CLIENTS; + while (i--) { state->sess[i].fd = -1; state->sess[i].idx = i; - state->sess_free_idx->push(i); + state->sess_free->push(i); + } + + i = NR_WORKERS; + while (i--) + state->workers[i].buf_queue = NULL; + + i = NR_WORKERS; + while (i--) { + wq_queue *p = new wq_queue(NR_MAX_CLIENTS); + if (unlikely(!p)) + goto out_err_wq_queue; + state->workers[i].idx = i; + state->workers[i].epl_fd = -1; + state->workers[i].state = state; + state->workers[i].need_join = false; + state->workers[i].buf_queue = p; + } + + state->stop = false; + atomic_store(&state->nr_on_thread, 0u); + atomic_store(&state->wrk_idx_use, 0u); + *state_p = state; + g_state = state; + return 0; + +out_err_wq_queue: + while (i < NR_WORKERS) { + delete state->workers[i].buf_queue; + state->workers[i].buf_queue = NULL; + i++; + } +out_err_workers: + delete state->sess_free_lock; + state->sess_free_lock = NULL; +out_err_sess_free_lock: + delete state->sess_free; + state->sess_free = NULL; + return ret; +} + +static __cold int set_socket_options(int tcp_fd) +{ + int val; + const void *y = (const void *)&val; + const char *on, *ov; + int ret; + + val = 1; + ret = setsockopt(tcp_fd, SOL_SOCKET, SO_REUSEADDR, &y, sizeof(val)); + if (ret < 0) { + on = "SOL_SOCKET"; + ov = "SO_REUSEADDR"; + goto out_err; } return 0; + +out_err: + ret = errno; + fprintf(stderr, "setsockopt(%d, %s, %s, ...): %s\n", tcp_fd, on, ov, + strerror(ret)); + return -ret; } -static int init_socket(struct server_state *state, const char *addr, - uint16_t port) +static __cold int init_socket(struct server_state *state) { struct sockaddr_in saddr; int tcp_fd; int ret; - int y; + + if (unlikely(!state->bind_addr)) { + fprintf(stderr, "Error: state->bind_addr is empty!\n"); + return -EINVAL; + } + + if (unlikely(!state->bind_port)) { + fprintf(stderr, "Error: state->bind_port is empty!\n"); + return -EINVAL; + } tcp_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); - if (tcp_fd < 0) { + if (unlikely(tcp_fd < 0)) { ret = -errno; perror("socket"); return ret; } - y = 1; - ret = setsockopt(tcp_fd, SOL_SOCKET, SO_REUSEADDR, (void *)&y, sizeof(y)); - if (ret < 0) { - ret = -errno; - perror("setsockopt"); + ret = set_socket_options(tcp_fd); + if (unlikely(ret)) goto out_err; - } memset(&saddr, 0, sizeof(saddr)); saddr.sin_family = AF_INET; - saddr.sin_port = htons(port); - saddr.sin_addr.s_addr = inet_addr(addr); + saddr.sin_addr.s_addr = inet_addr(state->bind_addr); + saddr.sin_port = htons(state->bind_port); ret = bind(tcp_fd, (struct sockaddr *)&saddr, sizeof(saddr)); - if (ret < 0) { + if (unlikely(ret < 0)) { ret = -errno; perror("bind"); goto out_err; } - ret = listen(tcp_fd, 1000); - if (ret < 0) { + ret = listen(tcp_fd, 4096); + if (unlikely(ret < 0)) { ret = -errno; perror("listen"); - goto out_err; + goto out_err; } - printf("Listening on %s:%u...\n", addr, port); + printf("Listening on %s:%u...\n", state->bind_addr, state->bind_port); state->tcp_fd = tcp_fd; return 0; @@ -176,178 +456,244 @@ out_err: return ret; } -static int init_epoll(struct server_state *state) +static __cold void wait_for_ready_state(struct worker *worker) { - struct epoll_event evt; - int epl_fd; - int ret; + uint32_t i = 0; + uint32_t on; - epl_fd = epoll_create(255); - if (epl_fd < 0) { - ret = -errno; - perror("epoll_create"); - return ret; + while (true) { + on = atomic_load(&worker->state->nr_on_thread); + if (on >= NR_WORKERS || worker->state->stop) + break; + + usleep(1000); + if (i++ < 10000) + continue; + + fprintf(stderr, "Timedout while waiting for ready state"); + worker->state->stop = true; + return; } +} + +static int install_infd_to_worker(int fd, struct worker *worker, + epoll_data_t data) +{ + struct epoll_event evt; + int ret; memset(&evt, 0, sizeof(evt)); evt.events = EPOLLIN | EPOLLPRI; - ret = epoll_ctl(epl_fd, EPOLL_CTL_ADD, state->tcp_fd, &evt); + evt.data = data; + ret = epoll_ctl(worker->epl_fd, EPOLL_CTL_ADD, fd, &evt); if (ret < 0) { ret = -errno; perror("epoll_ctl"); return ret; } - - state->epl_fd = epl_fd; - return 0; -} - -static void put_sess_idx(uint32_t idx, struct server_state *state) -{ - state->sfi_lock->lock(); - state->sess_free_idx->push(idx); - state->sfi_lock->unlock(); + return ret; } static int64_t get_sess_idx(struct server_state *state) { - int64_t ret; - state->sfi_lock->lock(); - if (unlikely(state->sess_free_idx->empty())) { - state->sfi_lock->unlock(); + int64_t idx; + + state->sess_free_lock->lock(); + if (unlikely(state->sess_free->empty())) { + state->sess_free_lock->unlock(); return -EAGAIN; } - ret = state->sess_free_idx->top(); - state->sess_free_idx->pop(); - state->sfi_lock->unlock(); - return ret; + idx = (int64_t)state->sess_free->pop(); + state->sess_free_lock->unlock(); + return idx; } -static int delete_client(int epl_fd, struct client_sess *sess) +static void put_sess_idx(uint32_t idx, struct server_state *state) +{ + state->sess_free_lock->lock(); + state->sess_free->push(idx); + state->sess_free_lock->unlock(); +} + +static int uninstall_fd_from_worker(int fd, struct worker *worker) { int ret; - ret = epoll_ctl(epl_fd, EPOLL_CTL_DEL, sess->fd, NULL); + ret = epoll_ctl(worker->epl_fd, EPOLL_CTL_DEL, fd, NULL); if (ret < 0) { ret = -errno; perror("epoll_ctl"); return ret; } - return 0; + return ret; } -static int register_new_client(int epl_fd, struct client_sess *sess) +static void close_sess(struct client_sess *sess, struct worker *worker) { - struct epoll_event evt; - int ret; + printf("Closing session " FCIP " (idx = %u)\n", FCIP_ARG(sess), + sess->idx); - memset(&evt, 0, sizeof(evt)); - evt.events = EPOLLIN | EPOLLPRI; - evt.data.ptr = (void *)sess; - ret = epoll_ctl(epl_fd, EPOLL_CTL_ADD, sess->fd, &evt); - if (ret < 0) { - ret = -errno; - perror("epoll_ctl"); - return ret; + if (sess->need_epl_del) + uninstall_fd_from_worker(sess->fd, worker); + + close(sess->fd); + sess->fd = -1; + sess->action = HTTP_ACT_NONE; + if (sess->http_headers) { + delete sess->http_headers; + sess->http_headers = NULL; } - return 0; + put_sess_idx(sess->idx, worker->state); } -static int handle_new_client(struct server_state *state) -{ +static int _handle_new_client(int tcp_fd, struct worker *worker) +{ + struct server_state *state = worker->state; struct sockaddr_in caddr; socklen_t addrlen = sizeof(caddr); struct client_sess *sess; + epoll_data_t epld; + uint32_t wrk_idx; int64_t idx; int cli_fd; int ret; memset(&caddr, 0, sizeof(caddr)); - cli_fd = accept(state->tcp_fd, (struct sockaddr *)&caddr, &addrlen); + cli_fd = accept(tcp_fd, (struct sockaddr *)&caddr, &addrlen); if (unlikely(cli_fd < 0)) { ret = errno; - perror("accept"); - if (ret == EAGAIN || ret == EMFILE) - return 0; + if (ret != EAGAIN) + perror("accept"); return -ret; } idx = get_sess_idx(state); if (unlikely(idx == -EAGAIN)) { close(cli_fd); - fprintf(stderr, "Cannot handle a new client\n"); - return 0; + fprintf(stderr, "Client session is full, cannot handle a new " + "client\n"); + return -EAGAIN; } sess = &state->sess[idx]; sess->fd = cli_fd; - sess->buf_size = 0; + sess->rbuf_len = 0; sess->got_http_header = false; - sess->addr = caddr; + sess->action = HTTP_ACT_NONE; + sess->method = HTTP_NOP; - sess->src_addr[0] = '\0'; - sess->src_port = 0; if (addrlen <= sizeof(caddr)) { const char *p; - sess->src_port = ntohs(caddr.sin_port); p = inet_ntop(AF_INET, &caddr.sin_addr.s_addr, sess->src_addr, sizeof(sess->src_addr)); - if (!p) { + if (p) + sess->src_port = ntohs(caddr.sin_port); + else perror("inet_ntop"); - fprintf(stderr, "inet_ntop() error?\n"); - } } else { + sess->src_addr[0] = '\0'; + sess->src_port = 0; fprintf(stderr, "Warning, accept overflow!\n"); } - ret = register_new_client(state->epl_fd, sess); + epld.ptr = (void *)sess; + wrk_idx = atomic_fetch_add(&state->wrk_idx_use, 1u) % NR_WORKERS; + ret = install_infd_to_worker(cli_fd, &state->workers[wrk_idx], epld); if (unlikely(ret < 0)) return ret; + sess->need_epl_del = true; printf("Got a new client (idx = %u) " FCIP "\n", sess->idx, FCIP_ARG(sess)); return 0; } -static ssize_t send_to_client(struct client_sess *sess, const char *buf, - size_t len) +static __hot int handle_new_client(struct worker *worker) +{ + int tcp_fd = worker->state->tcp_fd; + uint32_t acc_iter = 512; + int ret; + + while (acc_iter--) { + ret = _handle_new_client(tcp_fd, worker); + if (likely(!ret)) + continue; + + if (likely(ret == -EAGAIN) || unlikely(ret == -EMFILE)) + return 0; + + return ret; + } + return 0; +} + +static int poll_wait_for_fd_be_writable(int fd) +{ + struct pollfd fds[1]; + int ret; + + fds[0].fd = fd; + fds[0].events = POLLOUT; + fds[0].revents = 0; + + ret = poll(fds, 1, -1); + if (unlikely(ret < 0)) { + ret = errno; + if (ret != EINTR) + return -ret; + } + return 0; +} + +static ssize_t send_to_sess(struct client_sess *sess, const char *buf, + size_t len) { - constexpr uint32_t max_try = 10; - uint32_t try_count = 0; + const char *bptr = buf; + ssize_t total_sent = 0; + size_t blen = len; + int fd = sess->fd; ssize_t ret; - int tmp; repeat: - if (unlikely(try_count++ >= max_try)) - return -ENETDOWN; - ret = send(sess->fd, buf, len, MSG_DONTWAIT); + ret = send(fd, bptr, blen, MSG_DONTWAIT); if (unlikely(ret < 0)) { - tmp = errno; - if (tmp == EAGAIN) - goto repeat; - perror("send"); - return -tmp; - } else if (unlikely(ret == 0)) { - return -ENETDOWN; - } else if (unlikely((size_t)ret < len)) { - buf = &buf[len]; - len -= (size_t)ret; + int ret = errno; + if (unlikely(ret != EAGAIN)) { + perror("send"); + return -ret; + } + + ret = poll_wait_for_fd_be_writable(fd); + if (unlikely(ret)) + return ret; + goto repeat; } - return ret; + + total_sent += ret; + if (unlikely((size_t)total_sent < len)) { + bptr = &buf[total_sent]; + blen = len - (size_t)total_sent; + goto repeat; + } + + return total_sent; } -static void send_http_error(int code, struct client_sess *sess) +static ssize_t send_http_error(struct client_sess *sess, int code, + const char *errstr) { char buf[128]; int tmp; tmp = snprintf(buf, sizeof(buf), - "HTTP/1.1 %d\r\n" + "HTTP/1.1 %d%s%s\r\n" + "Connection: closed\r\n" "Content-Type: text/plain\r\n\r\n" "HTTP Error %d", - code, code); - send_to_client(sess, buf, (size_t)tmp); + code, (errstr ? " " : ""), (errstr ? errstr : ""), + code); + return send_to_sess(sess, buf, (size_t)tmp); } static char *parse_http_method(char *buf, enum http_method *method_p) @@ -385,20 +731,21 @@ static char *parse_query_string(char *uri, char *end_uri) while (uri < end_uri) { if (*uri++ != '?') continue; + uri[-1] = '\0'; if (uri == end_uri) /* * We got an empty query string: * "http://somehwere.com/path?" */ return NULL; - return uri; + return uri[0] ? uri : NULL; } return NULL; } static int parse_http_header(struct client_sess *sess) { - char *buf = sess->buf; + char *buf = sess->recv_buf; char *end; char *ret; @@ -412,7 +759,7 @@ static int parse_http_header(struct client_sess *sess) * Don't fail here if still have enough buffer, * we will wait for the next recv() iteration. */ - if (sess->buf_size >= sizeof(sess->buf) - 1) + if (sess->rbuf_len >= sizeof(sess->recv_buf) - 1) goto bad_req; return 0; @@ -435,6 +782,8 @@ static int parse_http_header(struct client_sess *sess) * ^ * @ret */ + if (unlikely(ret[0] != '/')) + goto bad_req; sess->uri = ret; ret = strstr(sess->uri, " "); if (unlikely(!ret)) @@ -455,138 +804,715 @@ static int parse_http_header(struct client_sess *sess) goto bad_req; ret[0] = '\0'; + sess->header = &ret[1]; sess->got_http_header = true; return 0; bad_req: - send_http_error(400, sess); + send_http_error(sess, 400, "Bad Request"); return -EBADMSG; } -static void close_sess(struct client_sess *sess, struct server_state *state) +static ssize_t http_redirect(struct client_sess *sess, const char *location) { - printf("Closing session " FCIP " (idx = %u)\n", FCIP_ARG(sess), - sess->idx); - delete_client(state->epl_fd, sess); - close(sess->fd); - put_sess_idx(sess->idx, state); -} + size_t need_len; + ssize_t ret; + char *buf; + int len; -#define HTTP_200_HTML "HTTP/1.1 200\r\nContent-Type: text/html\r\n\r\n" -#define HTTP_200_TEXT "HTTP/1.1 200\r\nContent-Type: text/plain\r\n\r\n" + need_len = strlen(location) * 2 + 256; + buf = new char[need_len]; + if (unlikely(!buf)) + return -ENOMEM; -static int route_show_index(struct client_sess *sess, - struct server_state *state) -{ - static const char buf[] = - HTTP_200_HTML - "" - "" - "" - "

This is the index!

" - "" - ""; + len = snprintf(buf, need_len, + "HTTP/1.1 302\r\n" + "Connection: closed\r\n" + "Location: %s\r\n\r\nYou are redirected to %s\n\n", + location, location); - send_to_client(sess, buf, sizeof(buf) - 1); - close_sess(sess, state); - return 0; + ret = send_to_sess(sess, buf, (size_t)len); + delete[] buf; + return ret; } -static int route_show_hello(struct client_sess *sess, - struct server_state *state) +static void not_found_or_forbidden(int e, struct client_sess *sess) { - static const char buf[] = - HTTP_200_HTML - "" - "" - "" - "

Hello World!

" - "" - ""; - - send_to_client(sess, buf, sizeof(buf) - 1); - close_sess(sess, state); - return 0; + if (e == ENOENT) + send_http_error(sess, 404, "Not Found"); + else + send_http_error(sess, 403, "Forbidden"); } -static int handle_route_get(struct client_sess *sess, - struct server_state *state) +static int parse_http_header2(struct client_sess *sess) { - const char *uri = sess->uri; + char *hdr = sess->header + 1; + + if (!sess->http_headers) + sess->http_headers = new __typeof__(*sess->http_headers); + + while (1) { + char *keyval, *key, *val, *it; + + key = hdr; + keyval = strstr(key, "\r\n"); + if (!keyval) + break; + keyval[0] = '\0'; + + hdr = &keyval[2]; + val = strstr(key, ":"); + if (!val) + continue; + val[0] = '\0'; + val++; + + while (*val == ' ') + val++; + + it = key; + while (*it) { + *it = tolower((unsigned char)*it); + it++; + } - if (!strcmp(uri, "/")) - return route_show_index(sess, state); - if (!strcmp(uri, "/hello")) - return route_show_hello(sess, state); + sess->http_headers->emplace(key, val); + } - send_http_error(404, sess); - close_sess(sess, state); return 0; } -static int route_show_echo(struct client_sess *sess, struct server_state *state) +static char *open_file_for_stream(const char *file, struct stat *st, + struct client_sess *sess, + struct stream_file_data *sfd) { - char buf[4096] = HTTP_200_TEXT; - constexpr size_t max_body_len = sizeof(buf) - sizeof(HTTP_200_TEXT); + char *map; + int ret; + int fd; + + fd = open(file, O_RDONLY); + if (unlikely(fd < 0)) { + fd = errno; + not_found_or_forbidden(fd, sess); + fprintf(stderr, "Cannot open file: \"%s\": %s\n", file, + strerror(fd)); + return NULL; + } - size_t header_size = (size_t)(sess->body - sess->buf); - size_t body_len = sess->buf_size - header_size; - size_t send_len; + ret = fstat(fd, st); + if (unlikely(ret < 0)) { + ret = errno; + not_found_or_forbidden(ret, sess); + close(fd); + fprintf(stderr, "Cannot stat file: \"%s\": %s\n", file, + strerror(ret)); + return NULL; + } - if (body_len > max_body_len) - body_len = max_body_len; + map = (char *)mmap(NULL, st->st_size, PROT_READ, MAP_SHARED, fd, 0); + close(fd); + if (unlikely(map == MAP_FAILED)) { + ret = errno; + not_found_or_forbidden(ret, sess); + fprintf(stderr, "Cannot map file: \"%s\": %s\n", file, + strerror(ret)); + return NULL; + } - send_len = body_len + sizeof(HTTP_200_TEXT) - 1; - memcpy(&buf[sizeof(HTTP_200_TEXT) - 1], sess->body, body_len); - send_to_client(sess, buf, send_len); - close_sess(sess, state); - return 0; + sfd->size = st->st_size; + sfd->map = map; + sfd->cur_off = 0; + return map; } -static int handle_route_post(struct client_sess *sess, - struct server_state *state) +static int send_http_header_for_stream_file(struct client_sess *sess, + off_t start_offset, + off_t content_length) { - const char *uri = sess->uri; + char buf[1024]; + int ret; - if (!strcmp(uri, "/echo")) - return route_show_echo(sess, state); + ret = snprintf(buf, sizeof(buf), + "HTTP/1.1 %s\r\n" + "Content-Type: text/plain\r\n" + "Content-Range: bytes %lu-%lu/%lu\r\n" + "Content-Length: %zu\r\n\r\n", + start_offset ? "206 Partial Content" : "200 OK", + start_offset, content_length - 1, content_length, + content_length); + + ret = send_to_sess(sess, buf, (size_t)ret); + if (unlikely(ret < 0)) + return -EBADMSG; - send_http_error(404, sess); - close_sess(sess, state); return 0; } -static int handle_route(struct client_sess *sess, struct server_state *state) +static int stream_file_once(struct stream_file_data *sfd, + struct client_sess *sess) { + size_t send_len; int ret; - switch (sess->method) { + if (unlikely(sfd->cur_off >= sfd->size)) + return 1; + + send_len = (size_t)(sfd->size - sfd->cur_off); + ret = send_to_sess(sess, &sfd->map[sfd->cur_off], send_len); + if (unlikely(ret < 0)) + return -EBADMSG; + + return 1; +} + +constexpr off_t max_send_len_file = 1024 * 128; + +static int stream_file_loop(struct stream_file_data *sfd, + struct client_sess *sess, + struct worker *worker) +{ + size_t send_len; + int ret; + + if (unlikely(sfd->cur_off >= sfd->size)) { + ret = 1; + goto out; + } + + if (unlikely(!sess->priv_data)) { + struct stream_file_data *sfd_h; + + sfd_h = new struct stream_file_data; + if (unlikely(!sfd_h)) + return -ENOMEM; + + *sfd_h = *sfd; + sfd = sfd_h; + sess->priv_data = (void *)sfd_h; + madvise(sfd->map, sfd->size, MADV_SEQUENTIAL); + send_len = max_send_len_file; + } else { + send_len = (size_t)(sfd->size - sfd->cur_off); + if (send_len > max_send_len_file) + send_len = max_send_len_file; + } + + ret = send_to_sess(sess, &sfd->map[sfd->cur_off], send_len); + if (unlikely(ret < 0)) { + ret = -EBADMSG; + goto out; + } + sfd->cur_off += (off_t)ret; + ret = 1; + + if (sfd->cur_off < sfd->size) { + sess->action = HTTP_ACT_FILE_STREAM; + worker->buf_queue->push(sess->idx); + return 0; + } + +out: + munmap(sfd->map, sfd->size); + if (likely(sess->priv_data)) { + delete (struct stream_file_data *)sess->priv_data; + sess->priv_data = NULL; + } + + return ret; +} + +static void handle_range_header(struct client_sess *sess, off_t *start_offset_p) +{ + parse_http_header2(sess); + + const auto &http_headers = *sess->http_headers; + auto it = http_headers.find("range"); + if (it != http_headers.end()) { + const char *val = it->second.c_str(); + const char *start_bytes; + + start_bytes = strstr(val, "bytes="); + if (!start_bytes) + return; + + start_bytes += 6; + *start_offset_p = strtoll(start_bytes, NULL, 10); + } +} + +static void stream_file_bad_req(struct client_sess *sess, const char *msg) +{ + char buf[512]; + int ret; + + ret = snprintf(buf, sizeof(buf), + "HTTP/1.1 400 Bad Request\r\n" + "Connection: closed\r\n" + "Content-Type: text/plain\r\n\r\n" + "%s\n\n", msg); + + send_to_sess(sess, buf, (size_t)ret); +} + +static int start_stream_file(const char *file, struct client_sess *sess, + struct worker *worker) +{ + struct stream_file_data sfd; + off_t start_offset = 0; + struct stat st; + char *map; + int ret; + + map = open_file_for_stream(file, &st, sess, &sfd); + if (unlikely(!map)) + return -EBADMSG; + + handle_range_header(sess, &start_offset); + if (start_offset >= sfd.size || start_offset < 0) { + stream_file_bad_req(sess, "Bad range offset!"); + return -EBADMSG; + } + + sfd.cur_off = start_offset; + ret = send_http_header_for_stream_file(sess, start_offset, st.st_size); + if (unlikely(ret)) + return -EBADMSG; + + if (st.st_size <= max_send_len_file) { + ret = stream_file_once(&sfd, sess); + munmap(sfd.map, sfd.size); + } else { + ret = stream_file_loop(&sfd, sess, worker); + } + + return ret; +} + +static int stream_file(const char *file, struct client_sess *sess, + struct worker *worker) +{ + int ret; + + if (sess->action == HTTP_ACT_NONE) { + ret = start_stream_file(file, sess, worker); + if (unlikely(ret)) + return ret; + if (sess->action == HTTP_ACT_FILE_STREAM) + return 0; + } + return 1; +} + +#if 0 +static size_t htmlspecialchars(char *_output, size_t outlen, const char *_input, + size_t inlen) +{ + struct html_char_map { + const char to[8]; + const uint8_t len; + }; + + static const struct html_char_map html_map[0x100u] = { + ['<'] = {"<", 4}, + ['>'] = {">", 4}, + ['"'] = {""", 6}, + ['&'] = {"&", 5}, + }; + + + size_t j = 0; + uint8_t len = 0; + unsigned char *output = (unsigned char *)_output; + const unsigned char *input = (const unsigned char *)_input; + const unsigned char *in_end = input + inlen; + + while (likely(input < in_end)) { + const unsigned char *cp; + const struct html_char_map *map_to = &html_map[(size_t)*input]; + + if (likely(*map_to->to == '\0')) { + cp = input; + len = 1; + } else { + cp = (const unsigned char *)map_to->to; + len = map_to->len; + } + + if (unlikely((j + len - 1) >= outlen)) + break; + + memcpy(&output[j], cp, len); + j += len; + input++; + } + + if (likely(outlen > 0)) { + if (unlikely((j + 1) > outlen)) + j -= len; + output[++j] = '\0'; + } + + return j; +} +#endif + +static int redirect_on_no_trailing_slash(const char *path, + struct client_sess *sess) +{ + size_t len; + char *buf; + char *qs; + + len = strlen(path); + if (path[len - 1] == '/') + return 0; + + qs = sess->qs; + if (qs) + len += strlen(qs); + + len += 16; + buf = new char[len]; + if (unlikely(!buf)) + return -ENETDOWN; + + if (qs) + snprintf(buf, len, "%s/?%s", path, qs); + else + snprintf(buf, len, "%s/", path); + + http_redirect(sess, buf); + delete[] buf; + return 1; +} + +static int stream_dir_list_open(const char *path, struct client_sess *sess, + DIR **dir_p) +{ + DIR *dir; + int ret; + + dir = opendir(path); + if (unlikely(!dir)) { + ret = errno; + if (ret == ENOENT) + send_http_error(sess, 404, "Not Found"); + else + send_http_error(sess, 403, "Forbidden"); + + return -EBADMSG; + } + *dir_p = dir; + return 0; +} + +static int send_init_payload_for_stream_dir_list(struct client_sess *sess) +{ + constexpr static const char buf[] = + "HTTP/1.1 200\r\n" + "Content-Type: text/html\r\n" + "Connection: closed\r\n\r\n" + "\n" + "\n" + "" + "\n" + "\t

GNU/Weeb HTTP Server

\n" + "\t\n" + "\t\t" + "" + "" + "" + "\n"; + + int ret; + + ret = send_to_sess(sess, buf, sizeof(buf) - 1); + if (unlikely(ret < 0)) + return -EBADMSG; + + return 0; +} + +static int construct_file_row(const char *path, const char *file, char **pbuf_p, + size_t *capacity_p) +{ + char fpath[4096 + 256]; + const char *type; + struct stat st; + int ret; + + snprintf(fpath, sizeof(fpath), "%s/%s", path, file); + + ret = stat(fpath, &st); + if (unlikely(ret < 0)) { + ret = errno; + fprintf(stderr, "Can't stat \"%s\": %s\n", fpath, strerror(ret)); + return -ret; + } + + if (S_ISDIR(st.st_mode)) + type = "Directory"; + else if (S_ISREG(st.st_mode)) + type = "Regular File"; + else if (S_ISCHR(st.st_mode)) + type = "Char dev"; + else if (S_ISBLK(st.st_mode)) + type = "Block dev"; + else if (S_ISFIFO(st.st_mode)) + type = "FIFO"; + else if (S_ISLNK(st.st_mode)) + type = "Symlink"; + else if (S_ISSOCK(st.st_mode)) + type = "Socket"; + else + return -ENOTSUP; + + ret = snprintf(*pbuf_p, *capacity_p, + "\t\t" + "" + "" + "" + "\n", + file, S_ISDIR(st.st_mode) ? "/" : "", file, + type, + (st.st_mode & 07000) >> 9, + (st.st_mode & 00700) >> 6, + (st.st_mode & 00070) >> 3, + (st.st_mode & 00007) + ); + *pbuf_p += (size_t)ret; + *capacity_p -= (size_t)ret; + return 0; +} + +static int _stream_dir_list_loop(struct stream_dir_list_data *sdld, + char **pbuf_p, size_t *capacity_p) +{ + struct dirent *de; + const char *f; + int ret; + + de = readdir(sdld->dir); + if (unlikely(!de)) + return -1; + + f = de->d_name; + if (unlikely(!strcmp(f, ".") || !strcmp(f, ".."))) + return 0; + + ret = construct_file_row(sdld->path, f, pbuf_p, capacity_p); + if (unlikely(ret > 0)) + return 0; + + return 0; +} + +static int stream_dir_list_queue(struct stream_dir_list_data *sdld_s, + struct client_sess *sess, + struct worker *worker) +{ + if (unlikely(!sess->priv_data)) { + struct stream_dir_list_data *sdld; + + sdld = new struct stream_dir_list_data; + if (unlikely(!sdld)) + return -EBADMSG; + + *sdld = *sdld_s; + sess->priv_data = (void *)sdld; + } + + sess->action = HTTP_ACT_DIRLIST; + worker->buf_queue->push(sess->idx); + return 0; +} + +constexpr static const char dir_list_foot[] = + "\t
FilenameTypeMode
%s%s%d%d%d%d
\n\n\n"; + +static int send_footer_dir_list(struct client_sess *sess) +{ + int ret; + + ret = send_to_sess(sess, dir_list_foot, sizeof(dir_list_foot) - 1); + if (unlikely(ret < 0)) + return ret; + + return 0; +} + +static int stream_dir_list_loop(struct stream_dir_list_data *sdld, + struct client_sess *sess, + struct worker *worker) +{ + bool need_send_footer; + char buf[1024 * 128]; + size_t capacity = sizeof(buf) - 1; + size_t send_len; + char *pbuf = buf; + int ret; + + while (1) { + + if (unlikely(capacity < 8192)) { + send_len = sizeof(buf) - 1 - capacity; + ret = send_to_sess(sess, buf, send_len); + if (unlikely(ret < 0)) + goto out_close; + + ret = stream_dir_list_queue(sdld, sess, worker); + if (unlikely(ret)) + goto out_close; + + /* + * We are still in the queue loop. + */ + return 0; + } + + ret = _stream_dir_list_loop(sdld, &pbuf, &capacity); + if (unlikely(ret)) + break; + } + + if (capacity > sizeof(dir_list_foot) - 1) { + capacity -= sizeof(dir_list_foot) - 1; + memcpy(pbuf, dir_list_foot, sizeof(dir_list_foot) - 1); + need_send_footer = false; + } else { + need_send_footer = true; + } + + send_len = sizeof(buf) - 1 - capacity; + if (send_len > 0) { + ret = send_to_sess(sess, buf, send_len); + if (unlikely(ret < 0)) + goto out_close; + } + + if (need_send_footer) { + ret = send_footer_dir_list(sess); + if (unlikely(ret)) + goto out_close; + } + + ret = 1; + +out_close: + closedir(sdld->dir); + if (sess->priv_data) { + delete (struct stream_dir_list_data *)sess->priv_data; + sess->priv_data = NULL; + } + + return ret; +} + +static int stream_dir_list(const char *path, struct client_sess *sess, + struct worker *worker) +{ + struct stream_dir_list_data sdld; + int ret; + + /* + * If we are going to list the directory, the URI path must be + * ended with a trailing slash. We it's not, put the a trailing + * slash by redirecting the client. + */ + ret = redirect_on_no_trailing_slash(path, sess); + if (ret) + return ret; + + ret = stream_dir_list_open(path, sess, &sdld.dir); + if (unlikely(ret)) + return ret; + + ret = send_init_payload_for_stream_dir_list(sess); + if (unlikely(ret)) + return ret; + + snprintf(sdld.path, sizeof(sdld.path), "%s", path); + return stream_dir_list_loop(&sdld, sess, worker); +} + +static int handle_route_get(struct client_sess *sess, struct worker *worker) +{ + char path[4096 + 128]; + struct stat st; + int ret; + + if (likely(sess->need_epl_del)) { + sess->need_epl_del = false; + uninstall_fd_from_worker(sess->fd, worker); + } + + snprintf(path, sizeof(path), "./%s", sess->uri); + + /* + * Don't allow to step up to the parent directory. + */ + if (unlikely(strstr(path, "/.."))) { + send_http_error(sess, 404, "Not Found"); + return -EBADMSG; + } + + ret = stat(path, &st); + if (unlikely(ret < 0)) { + ret = errno; + fprintf(stderr, "Can't stat \"%s\": %s\n", path, strerror(ret)); + + if (ret == ENOENT) + send_http_error(sess, 404, "Not Found"); + else + send_http_error(sess, 403, "Forbidden"); + + return -EBADMSG; + } + + if (S_ISDIR(st.st_mode)) + return stream_dir_list(path, sess, worker); + + if (S_ISREG(st.st_mode)) + return stream_file(path, sess, worker); + + return -EBADMSG; +} + +static int handle_route(struct client_sess *sess, struct worker *worker) +{ + int ret; + + switch (sess->method) { case HTTP_GET: - ret = handle_route_get(sess, state); + ret = handle_route_get(sess, worker); break; case HTTP_POST: - ret = handle_route_post(sess, state); - break; + case HTTP_DELETE: + case HTTP_PATCH: + case HTTP_PUT: default: - send_http_error(405, sess); - close_sess(sess, state); - return 0; + send_http_error(sess, 405, "Method not allowed"); + return -EBADMSG; } return ret; } -static int _handle_client(struct client_sess *sess, struct server_state *state) +static int _handle_client(struct client_sess *sess, struct worker *worker) { int ret = 0; - sess->buf[sess->buf_size] = '\0'; if (!sess->got_http_header) { + sess->recv_buf[sess->rbuf_len] = '\0'; ret = parse_http_header(sess); - if (ret) + if (unlikely(ret)) + goto out; + if (unlikely(!sess->got_http_header)) goto out; - if (!sess->got_http_header) - return 0; } #if 0 @@ -595,68 +1521,107 @@ static int _handle_client(struct client_sess *sess, struct server_state *state) printf("HTTP version: %s\n", sess->http_ver); #endif - ret = handle_route(sess, state); + ret = handle_route(sess, worker); out: if (ret) { - close_sess(sess, state); - if (likely(ret == -EBADMSG || ret == -ENETDOWN)) + close_sess(sess, worker); + if (likely(ret == -EBADMSG || ret == -ENETDOWN || + ret == -ECONNRESET || ret == -EPIPE || + ret == 1)) ret = 0; } return ret; } -static int handle_client(struct client_sess *sess, struct server_state *state) +static int handle_client(struct client_sess *sess, struct worker *worker) { ssize_t recv_ret; size_t len; char *buf; int ret; - len = sizeof(sess->buf) - sess->buf_size - 1; - buf = &sess->buf[sess->buf_size]; + len = sizeof(sess->recv_buf) - 1 - sess->rbuf_len; + buf = &sess->recv_buf[sess->rbuf_len]; recv_ret = recv(sess->fd, buf, len, 0); - if (unlikely(recv_ret < 0)) { + if (unlikely(recv_ret <= 0)) { + + if (recv_ret == 0) { + close_sess(sess, worker); + return 0; + } + ret = errno; if (ret == EAGAIN) return 0; - perror("recv"); - return -ret; - } - if (unlikely(recv_ret == 0 && len != 0)) { - close_sess(sess, state); + close_sess(sess, worker); + perror("recv"); return 0; } + sess->rbuf_len += (size_t)recv_ret; + return _handle_client(sess, worker); +} + +static __hot int handle_event(struct epoll_event *event, + struct worker *worker) +{ + struct client_sess *sess; - sess->buf_size += (size_t)recv_ret; - return _handle_client(sess, state); + if (!event->data.ptr) + return handle_new_client(worker); + + sess = (struct client_sess *)event->data.ptr; + return handle_client(sess, worker); } -static int handle_event(struct epoll_event *event, struct server_state *state) +static __hot int handle_buf_queue(uint32_t idx, struct worker *worker) { + struct server_state *state = worker->state; + struct client_sess *sess = &state->sess[idx]; int ret = 0; - if (event->data.fd == 0) { - ret = handle_new_client(state); - } else { - struct client_sess *sess = - (struct client_sess *)event->data.ptr; + switch (sess->action) { + case HTTP_ACT_NONE: + ret = -ENETDOWN; + break; + case HTTP_ACT_DIRLIST: { + struct stream_dir_list_data *sdld; - ret = handle_client(sess, state); + sdld = (struct stream_dir_list_data *)sess->priv_data; + ret = stream_dir_list_loop(sdld, sess, worker); + break; } + case HTTP_ACT_FILE_STREAM: { + struct stream_file_data *sfd; - return ret; + sfd = (struct stream_file_data *)sess->priv_data; + ret = stream_file_loop(sfd, sess, worker); + break; + } + } + + if (ret) + close_sess(sess, worker); + + return 0; } -static int _run_server(int epl_fd, struct server_state *state) +static __hot int run_worker(int epl_fd, struct epoll_event *events, + struct worker *worker) { - struct epoll_event *events = state->events; + int timeout = 1000; + size_t qlen; + int nr_evt; int ret; int i; - ret = epoll_wait(epl_fd, events, NR_EPOLL_EVT, 1000); - if (unlikely(ret < 0)) { + qlen = worker->buf_queue->size(); + if (qlen) + timeout = 0; + + nr_evt = epoll_wait(epl_fd, events, NR_EPOLL_EVT, timeout); + if (unlikely(nr_evt < 0)) { ret = errno; perror("epoll_wait"); if (ret == EINTR) @@ -664,106 +1629,199 @@ static int _run_server(int epl_fd, struct server_state *state) return -ret; } - for (i = 0; i < ret; i++) { - ret = handle_event(&events[i], state); + for (i = 0; i < nr_evt; i++) { + ret = handle_event(&events[i], worker); if (unlikely(ret)) return ret; } - return ret; + while (qlen--) { + uint32_t idx; + + idx = worker->buf_queue->pop(); + ret = handle_buf_queue(idx, worker); + if (unlikely(ret)) + return ret; + } + + return 0; } -static int run_server(struct server_state *state) +static __hot int _worker_func(struct worker *worker) { - int epl_fd = state->epl_fd; + struct epoll_event *events = worker->events; + struct server_state *state = worker->state; + int epl_fd = worker->epl_fd; int ret = 0; while (likely(!state->stop)) { - ret = _run_server(epl_fd, state); - if (unlikely(ret)) - break; + ret = run_worker(epl_fd, events, worker); + if (likely(!ret)) + continue; + + state->stop = true; + break; } return ret; } -static void destroy_state(struct server_state *state) +static noinline int worker_func(struct worker *worker) +{ + int ret; + + if (worker->idx > 0) + worker->need_join = true; + + atomic_fetch_add(&worker->state->nr_on_thread, 1u); + wait_for_ready_state(worker); + ret = _worker_func(worker); + atomic_fetch_sub(&worker->state->nr_on_thread, 1u); + return ret; +} + +static int init_epoll_for_worker(struct worker *worker) +{ + int epl_fd; + + epl_fd = epoll_create(255); + if (unlikely(epl_fd < 0)) { + int ret = errno; + perror("epoll_create"); + return -ret; + } + + worker->epl_fd = epl_fd; + return 0; +} + +static __cold int wait_for_worker_online(struct worker *worker) +{ + uint32_t i = 0; + + while (!worker->need_join) { + usleep(1000); + if (i++ < 10000) + continue; + + fprintf(stderr, "Timedout while waiting for thread %u\n", + worker->idx); + worker->state->stop = true; + worker->thread.join(); + worker->need_join = false; + return -ETIMEDOUT; + } + return 0; +} + +static __cold int run_workers(struct server_state *state) +{ + struct worker *workers = state->workers; + epoll_data_t edt; + int ret = 0; + uint32_t i; + + i = NR_WORKERS; + while (i--) { + ret = init_epoll_for_worker(&workers[i]); + if (unlikely(ret)) + return ret; + } + + + i = NR_WORKERS; + /* + * Skip i == 0, we will run the worker + * on the main thread. + */ + while (--i > 0) { + workers[i].thread = std::thread(worker_func, &workers[i]); + ret = wait_for_worker_online(&workers[i]); + if (unlikely(ret)) + return ret; + } + + edt.ptr = NULL; + ret = install_infd_to_worker(state->tcp_fd, &workers[0], edt); + if (unlikely(ret)) + return ret; + + return worker_func(&workers[0]); +} + +static __cold void destroy_state(struct server_state *state) { uint32_t i; + int fd; - if (state->tcp_fd != -1) - close(state->tcp_fd); - if (state->epl_fd != -1) - close(state->epl_fd); + if (!state) + return; + + fd = state->tcp_fd; + if (fd != -1) + close(fd); + + /* + * Do not join @workers[0].thread, it doesn't + * have an LWP! + */ + for (i = 1; i < NR_WORKERS; i++) { + if (state->workers[i].need_join) + state->workers[i].thread.join(); + } + + for (i = 0; i < NR_WORKERS; i++) { + fd = state->workers[i].epl_fd; + if (fd != -1) + close(fd); + } for (i = 0; i < NR_MAX_CLIENTS; i++) { - int fd = state->sess[i].fd; + fd = state->sess[i].fd; if (fd != -1) close(fd); } - delete state->sess_free_idx; - delete state->sfi_lock; -#ifdef USE_ASAN - delete state; -#else - munmap(state, sizeof(*state)); -#endif - g_state = NULL; + i = NR_WORKERS; + while (i--) { + wq_queue *p = state->workers[i].buf_queue; + if (p) + delete p; + } + + if (state->workers) + delete[] state->workers; + if (state->sess_free_lock) + delete state->sess_free_lock; + if (state->sess_free) + delete state->sess_free; + + free_state(state); } -/** - * - * ./gwhttpd 0.0.0.0 8000 - * - */ int main(int argc, char *argv[]) { - struct server_state *state; + struct server_state *state = NULL; int ret; setvbuf(stdout, NULL, _IOLBF, 4096); - if (argc != 3) { printf("Usage: %s [bind_address] [bind_port]\n", argv[0]); return 0; } -#ifdef USE_ASAN - state = new struct server_state; - if (!state) { - perror("malloc"); - return ret; - } -#else - state = (struct server_state *)mmap(NULL, sizeof(*state), - PROT_READ|PROT_WRITE, - MAP_ANONYMOUS|MAP_PRIVATE, -1, 0); - if (state == MAP_FAILED) { - ret = errno; - perror("mmap"); + ret = init_state(&state); + if (unlikely(ret)) return ret; - } - mlock(state, sizeof(*state)); -#endif + state->bind_addr = argv[1]; + state->bind_port = (uint16_t)atoi(argv[2]); - g_state = state; - ret = init_state(state); - if (ret) - return ret; - ret = init_socket(state, argv[1], (uint16_t)atoi(argv[2])); - if (ret) - goto out; - ret = init_epoll(state); - if (ret) + ret = init_socket(state); + if (unlikely(ret)) goto out; - - ret = run_server(state); - + ret = run_workers(state); out: destroy_state(state); - if (ret < 0) - ret = -ret; return ret; } -- Ammar Faizi