From: Ammar Faizi <[email protected]>
To: GNU/Weeb Mailing List <[email protected]>
Cc: Ammar Faizi <[email protected]>,
Alviro Iskandar Setiawan <[email protected]>,
Arthur Lapz <[email protected]>,
Fernanda Ma'rouf <[email protected]>,
Sprite <[email protected]>, Yonle <[email protected]>
Subject: [PATCH gwhttpd 09/14] gwhttpd: Add directory listing and download file support
Date: Fri, 8 Jul 2022 19:10:20 +0700 [thread overview]
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
A huge changes, full refactoring. Let's start again from this point.
Signed-off-by: Ammar Faizi <[email protected]>
---
Makefile | 4 +-
gwhttpd.cpp | 1680 +++++++++++++++++++++++++++++++++++++++++----------
2 files changed, 1371 insertions(+), 313 deletions(-)
diff --git a/Makefile b/Makefile
index 28cc0c8..cfbd377 100644
--- a/Makefile
+++ b/Makefile
@@ -2,11 +2,11 @@
CXXFLAGS = -ggdb3 -Wall -Wextra -O2 -fno-exceptions
ifeq ($(SANITIZE_BUILD),1)
- CXXFLAGS += "-fsanitize=address -DUSE_ASAN=1"
+ CXXFLAGS += -fsanitize=address -DUSE_ASAN=1
endif
gwhttpd: gwhttpd.cpp
- $(CXX) $(CXXFLAGS) -o $(@) $(^)
+ $(CXX) $(CXXFLAGS) -o $(@) $(^) -lpthread
clean:
rm -vf gwhttpd
diff --git a/gwhttpd.cpp b/gwhttpd.cpp
index bb8dc93..0e4c558 100644
--- a/gwhttpd.cpp
+++ b/gwhttpd.cpp
@@ -6,6 +6,8 @@
#define _GNU_SOURCE
#endif
+#include <poll.h>
+#include <fcntl.h>
#include <cstdio>
#include <cstdint>
#include <cstdlib>
@@ -13,22 +15,130 @@
#include <unistd.h>
#include <cerrno>
#include <stack>
+#include <queue>
+#include <atomic>
#include <mutex>
+#include <dirent.h>
#include <signal.h>
+#include <thread>
+#include <unordered_map>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#define noinline __attribute__((__noinline__))
+#define __hot __attribute__((__hot__))
+#define __cold __attribute__((__cold__))
#define likely(COND) __builtin_expect(!!(COND), 1)
#define unlikely(COND) __builtin_expect(!!(COND), 0)
-#define NR_EPOLL_EVT 32
-#define NR_MAX_CLIENTS 4096
+#define NR_WORKERS 16
+#define NR_EPOLL_EVT 512
+#define NR_MAX_CLIENTS 10240
#define IPV4_LEN (sizeof("xxx.xxx.xxx.xxx"))
#define FCIP "%s:%u"
-#define FCIP_ARG(P) (P)->src_addr, (P)->src_port
+#define FCIP_ARG(P) ((P)->src_addr), ((P)->src_port)
+
+#if defined(__x86_64__)
+#define __page_size_aligned_in_smp __attribute__((__aligned__(4096)))
+#else
+#define __page_size_aligned_in_smp
+#endif
+
+template<typename T>
+struct wq_stack {
+ T *arr_;
+ size_t pos_;
+ size_t max_;
+
+ inline wq_stack(size_t max):
+ pos_(max),
+ max_(max)
+ {
+ arr_ = new T[max];
+ }
+
+ inline ~wq_stack(void)
+ {
+ delete[] arr_;
+ }
+
+ inline int64_t push(T val)
+ {
+ arr_[--pos_] = val;
+ return pos_;
+ }
+
+ inline T top(void)
+ {
+ return arr_[pos_];
+ }
+
+ inline T pop(void)
+ {
+ return arr_[pos_++];
+ }
+
+ inline bool empty(void)
+ {
+ return max_ == pos_;
+ }
+};
+
+
+template<typename T>
+struct wq_queue {
+ T *arr_;
+ size_t front_;
+ size_t rear_;
+ size_t and_;
+
+ inline wq_queue(size_t want_max):
+ front_(0),
+ rear_(0)
+ {
+ size_t max = 1;
+ while (max < want_max)
+ max *= 2;
+
+ and_ = max - 1;
+ arr_ = new T[max];
+ }
+
+ inline ~wq_queue(void)
+ {
+ delete[] arr_;
+ }
+
+ inline size_t size(void)
+ {
+ if (rear_ >= front_)
+ return rear_ - front_;
+ else
+ return front_ - rear_;
+ }
+
+ inline int64_t push(T val)
+ {
+ arr_[rear_ & and_] = val;
+ return rear_++;
+ }
+
+ inline T front(void)
+ {
+ return arr_[front_ & and_];
+ }
+
+ inline T pop(void)
+ {
+ return arr_[front_++ & and_];
+ }
+};
enum http_method {
+ HTTP_NOP = 0,
HTTP_GET,
HTTP_POST,
HTTP_PATCH,
@@ -36,43 +146,84 @@ enum http_method {
HTTP_DELETE,
};
+enum http_action {
+ HTTP_ACT_NONE = 0,
+ HTTP_ACT_DIRLIST,
+ HTTP_ACT_FILE_STREAM,
+};
+
+struct stream_dir_list_data {
+ DIR *dir;
+ char path[4096];
+};
+
+struct stream_file_data {
+ char *map;
+ off_t size;
+ off_t cur_off;
+};
+
struct client_sess {
int fd;
- size_t buf_size;
- char buf[4096];
+ uint32_t idx;
+ void *priv_data;
+
+ enum http_action action;
enum http_method method;
char *uri;
char *qs;
char *http_ver;
+ char *header;
char *body;
bool got_http_header;
- struct sockaddr_in addr;
- uint16_t src_port;
char src_addr[IPV4_LEN];
+ uint16_t src_port;
+ char recv_buf[4096];
+ size_t rbuf_len;
+ bool need_epl_del;
+ std::unordered_map<std::string, std::string> *http_headers;
+} __page_size_aligned_in_smp;
+
+struct server_state;
+struct worker {
+ int epl_fd;
+ struct epoll_event events[NR_EPOLL_EVT];
uint32_t idx;
-};
+ struct server_state *state;
+ wq_queue<uint32_t> *buf_queue;
+ std::thread thread;
+ volatile bool need_join;
+} __page_size_aligned_in_smp;
struct server_state {
volatile bool stop;
+ std::atomic<uint32_t> wrk_idx_use;
int tcp_fd;
- int epl_fd;
- struct epoll_event events[NR_EPOLL_EVT];
- std::mutex *sfi_lock;
- std::stack<uint32_t> *sess_free_idx;
- struct client_sess sess[NR_MAX_CLIENTS];
+ int sig;
/*
- * Signal caught by the interrupt handler.
+ * Stack of free session indexes.
*/
- int sig;
+ wq_stack<uint32_t> *sess_free;
+ std::mutex *sess_free_lock;
+
+ /*
+ * Array of client sessions.
+ */
+ struct client_sess sess[NR_MAX_CLIENTS];
+ std::atomic<uint32_t> nr_on_thread;
+
+ struct worker *workers;
+
+ const char *bind_addr;
+ uint16_t bind_port;
};
-static struct server_state *g_state;
+static struct server_state *g_state = NULL;
-static void interrupt_handler(int sig)
+static __cold void signal_handler_func(int sig)
{
- putchar('\n');
- printf("Got signal: %d\n", sig);
+ printf("\nGot signal: %d\n", sig);
if (!g_state)
return;
@@ -80,94 +231,223 @@ static void interrupt_handler(int sig)
g_state->sig = sig;
}
-static int init_state(struct server_state *state)
+static __cold struct server_state *alloc_state(void)
+{
+ struct server_state *state;
+
+#ifdef USE_ASAN
+ state = new struct server_state;
+ if (!state) {
+ errno = ENOMEM;
+ perror("new()");
+ return NULL;
+ }
+#else /* #ifdef USE_ASAN */
+ state = (struct server_state *)mmap(NULL, sizeof(*state),
+ PROT_READ|PROT_WRITE,
+ MAP_ANONYMOUS|MAP_PRIVATE, -1, 0);
+ if (state == MAP_FAILED) {
+ perror("mmap");
+ return NULL;
+ }
+ mlock(state, sizeof(*state));
+#endif /* #ifdef USE_ASAN */
+
+ return state;
+}
+
+static __cold void free_state(struct server_state *state)
+{
+#ifdef USE_ASAN
+ delete state;
+#else
+ munlock(state, sizeof(*state));
+ munmap(state, sizeof(*state));
+#endif
+}
+
+static __cold int setup_signal_handler(void)
{
struct sigaction act;
- uint32_t i;
int ret;
- memset(state, 0, sizeof(*state));
- state->tcp_fd = -1;
- state->epl_fd = -1;
- state->sig = -1;
-
memset(&act, 0, sizeof(act));
- act.sa_handler = interrupt_handler;
+ act.sa_handler = signal_handler_func;
ret = sigaction(SIGINT, &act, NULL);
- ret |= sigaction(SIGHUP, &act, NULL);
- ret |= sigaction(SIGTERM, &act, NULL);
+ if (unlikely(ret))
+ goto err;
+ ret = sigaction(SIGHUP, &act, NULL);
+ if (unlikely(ret))
+ goto err;
+ ret = sigaction(SIGTERM, &act, NULL);
+ if (unlikely(ret))
+ goto err;
+
act.sa_handler = SIG_IGN;
- ret |= sigaction(SIGPIPE, &act, NULL);
- if (ret) {
- fprintf(stderr, "Failed to set up the interrupt handler.\n");
+ ret = sigaction(SIGPIPE, &act, NULL);
+ if (unlikely(ret))
+ goto err;
+
+ return 0;
+err:
+ perror("sigaction");
+ return -errno;
+}
+
+static __cold int init_state(struct server_state **state_p)
+{
+ struct server_state *state;
+ uint32_t i;
+ int ret;
+
+ ret = setup_signal_handler();
+ if (unlikely(ret))
return ret;
- }
- state->sfi_lock = new std::mutex;
- if (!state->sfi_lock) {
- fprintf(stderr, "Cannot allocate mutex\n");
+ state = alloc_state();
+ if (unlikely(!state))
return -ENOMEM;
- }
- state->sess_free_idx = new __typeof__(*state->sess_free_idx);
- if (!state->sess_free_idx) {
- fprintf(stderr, "Cannot allocate sess_free_idx\n");
- delete state->sfi_lock;
+ state->tcp_fd = -1;
+
+ state->sess_free = new wq_stack<uint32_t>(NR_MAX_CLIENTS);
+ if (unlikely(!state->sess_free)) {
+ errno = ENOMEM;
+ perror("state->sess_free = new()");
return -ENOMEM;
}
- for (i = NR_MAX_CLIENTS - 1; i--; ) {
+ state->sess_free_lock = new std::mutex;
+ if (unlikely(!state->sess_free_lock)) {
+ errno = ENOMEM;
+ perror("state->sess_free_lock = new()");
+ goto out_err_sess_free_lock;
+ }
+
+ state->workers = new struct worker[NR_WORKERS];
+ if (unlikely(!state->workers)) {
+ ret = -ENOMEM;
+ errno = -ret;
+ perror("state->workers = new()");
+ goto out_err_workers;
+ }
+
+ i = NR_MAX_CLIENTS;
+ while (i--) {
state->sess[i].fd = -1;
state->sess[i].idx = i;
- state->sess_free_idx->push(i);
+ state->sess_free->push(i);
+ }
+
+ i = NR_WORKERS;
+ while (i--)
+ state->workers[i].buf_queue = NULL;
+
+ i = NR_WORKERS;
+ while (i--) {
+ wq_queue<uint32_t> *p = new wq_queue<uint32_t>(NR_MAX_CLIENTS);
+ if (unlikely(!p))
+ goto out_err_wq_queue;
+ state->workers[i].idx = i;
+ state->workers[i].epl_fd = -1;
+ state->workers[i].state = state;
+ state->workers[i].need_join = false;
+ state->workers[i].buf_queue = p;
+ }
+
+ state->stop = false;
+ atomic_store(&state->nr_on_thread, 0u);
+ atomic_store(&state->wrk_idx_use, 0u);
+ *state_p = state;
+ g_state = state;
+ return 0;
+
+out_err_wq_queue:
+ while (i < NR_WORKERS) {
+ delete state->workers[i].buf_queue;
+ state->workers[i].buf_queue = NULL;
+ i++;
+ }
+out_err_workers:
+ delete state->sess_free_lock;
+ state->sess_free_lock = NULL;
+out_err_sess_free_lock:
+ delete state->sess_free;
+ state->sess_free = NULL;
+ return ret;
+}
+
+static __cold int set_socket_options(int tcp_fd)
+{
+ int val;
+ const void *y = (const void *)&val;
+ const char *on, *ov;
+ int ret;
+
+ val = 1;
+ ret = setsockopt(tcp_fd, SOL_SOCKET, SO_REUSEADDR, &y, sizeof(val));
+ if (ret < 0) {
+ on = "SOL_SOCKET";
+ ov = "SO_REUSEADDR";
+ goto out_err;
}
return 0;
+
+out_err:
+ ret = errno;
+ fprintf(stderr, "setsockopt(%d, %s, %s, ...): %s\n", tcp_fd, on, ov,
+ strerror(ret));
+ return -ret;
}
-static int init_socket(struct server_state *state, const char *addr,
- uint16_t port)
+static __cold int init_socket(struct server_state *state)
{
struct sockaddr_in saddr;
int tcp_fd;
int ret;
- int y;
+
+ if (unlikely(!state->bind_addr)) {
+ fprintf(stderr, "Error: state->bind_addr is empty!\n");
+ return -EINVAL;
+ }
+
+ if (unlikely(!state->bind_port)) {
+ fprintf(stderr, "Error: state->bind_port is empty!\n");
+ return -EINVAL;
+ }
tcp_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
- if (tcp_fd < 0) {
+ if (unlikely(tcp_fd < 0)) {
ret = -errno;
perror("socket");
return ret;
}
- y = 1;
- ret = setsockopt(tcp_fd, SOL_SOCKET, SO_REUSEADDR, (void *)&y, sizeof(y));
- if (ret < 0) {
- ret = -errno;
- perror("setsockopt");
+ ret = set_socket_options(tcp_fd);
+ if (unlikely(ret))
goto out_err;
- }
memset(&saddr, 0, sizeof(saddr));
saddr.sin_family = AF_INET;
- saddr.sin_port = htons(port);
- saddr.sin_addr.s_addr = inet_addr(addr);
+ saddr.sin_addr.s_addr = inet_addr(state->bind_addr);
+ saddr.sin_port = htons(state->bind_port);
ret = bind(tcp_fd, (struct sockaddr *)&saddr, sizeof(saddr));
- if (ret < 0) {
+ if (unlikely(ret < 0)) {
ret = -errno;
perror("bind");
goto out_err;
}
- ret = listen(tcp_fd, 1000);
- if (ret < 0) {
+ ret = listen(tcp_fd, 4096);
+ if (unlikely(ret < 0)) {
ret = -errno;
perror("listen");
- goto out_err;
+ goto out_err;
}
- printf("Listening on %s:%u...\n", addr, port);
+ printf("Listening on %s:%u...\n", state->bind_addr, state->bind_port);
state->tcp_fd = tcp_fd;
return 0;
@@ -176,178 +456,244 @@ out_err:
return ret;
}
-static int init_epoll(struct server_state *state)
+static __cold void wait_for_ready_state(struct worker *worker)
{
- struct epoll_event evt;
- int epl_fd;
- int ret;
+ uint32_t i = 0;
+ uint32_t on;
- epl_fd = epoll_create(255);
- if (epl_fd < 0) {
- ret = -errno;
- perror("epoll_create");
- return ret;
+ while (true) {
+ on = atomic_load(&worker->state->nr_on_thread);
+ if (on >= NR_WORKERS || worker->state->stop)
+ break;
+
+ usleep(1000);
+ if (i++ < 10000)
+ continue;
+
+ fprintf(stderr, "Timedout while waiting for ready state");
+ worker->state->stop = true;
+ return;
}
+}
+
+static int install_infd_to_worker(int fd, struct worker *worker,
+ epoll_data_t data)
+{
+ struct epoll_event evt;
+ int ret;
memset(&evt, 0, sizeof(evt));
evt.events = EPOLLIN | EPOLLPRI;
- ret = epoll_ctl(epl_fd, EPOLL_CTL_ADD, state->tcp_fd, &evt);
+ evt.data = data;
+ ret = epoll_ctl(worker->epl_fd, EPOLL_CTL_ADD, fd, &evt);
if (ret < 0) {
ret = -errno;
perror("epoll_ctl");
return ret;
}
-
- state->epl_fd = epl_fd;
- return 0;
-}
-
-static void put_sess_idx(uint32_t idx, struct server_state *state)
-{
- state->sfi_lock->lock();
- state->sess_free_idx->push(idx);
- state->sfi_lock->unlock();
+ return ret;
}
static int64_t get_sess_idx(struct server_state *state)
{
- int64_t ret;
- state->sfi_lock->lock();
- if (unlikely(state->sess_free_idx->empty())) {
- state->sfi_lock->unlock();
+ int64_t idx;
+
+ state->sess_free_lock->lock();
+ if (unlikely(state->sess_free->empty())) {
+ state->sess_free_lock->unlock();
return -EAGAIN;
}
- ret = state->sess_free_idx->top();
- state->sess_free_idx->pop();
- state->sfi_lock->unlock();
- return ret;
+ idx = (int64_t)state->sess_free->pop();
+ state->sess_free_lock->unlock();
+ return idx;
}
-static int delete_client(int epl_fd, struct client_sess *sess)
+static void put_sess_idx(uint32_t idx, struct server_state *state)
+{
+ state->sess_free_lock->lock();
+ state->sess_free->push(idx);
+ state->sess_free_lock->unlock();
+}
+
+static int uninstall_fd_from_worker(int fd, struct worker *worker)
{
int ret;
- ret = epoll_ctl(epl_fd, EPOLL_CTL_DEL, sess->fd, NULL);
+ ret = epoll_ctl(worker->epl_fd, EPOLL_CTL_DEL, fd, NULL);
if (ret < 0) {
ret = -errno;
perror("epoll_ctl");
return ret;
}
- return 0;
+ return ret;
}
-static int register_new_client(int epl_fd, struct client_sess *sess)
+static void close_sess(struct client_sess *sess, struct worker *worker)
{
- struct epoll_event evt;
- int ret;
+ printf("Closing session " FCIP " (idx = %u)\n", FCIP_ARG(sess),
+ sess->idx);
- memset(&evt, 0, sizeof(evt));
- evt.events = EPOLLIN | EPOLLPRI;
- evt.data.ptr = (void *)sess;
- ret = epoll_ctl(epl_fd, EPOLL_CTL_ADD, sess->fd, &evt);
- if (ret < 0) {
- ret = -errno;
- perror("epoll_ctl");
- return ret;
+ if (sess->need_epl_del)
+ uninstall_fd_from_worker(sess->fd, worker);
+
+ close(sess->fd);
+ sess->fd = -1;
+ sess->action = HTTP_ACT_NONE;
+ if (sess->http_headers) {
+ delete sess->http_headers;
+ sess->http_headers = NULL;
}
- return 0;
+ put_sess_idx(sess->idx, worker->state);
}
-static int handle_new_client(struct server_state *state)
-{
+static int _handle_new_client(int tcp_fd, struct worker *worker)
+{
+ struct server_state *state = worker->state;
struct sockaddr_in caddr;
socklen_t addrlen = sizeof(caddr);
struct client_sess *sess;
+ epoll_data_t epld;
+ uint32_t wrk_idx;
int64_t idx;
int cli_fd;
int ret;
memset(&caddr, 0, sizeof(caddr));
- cli_fd = accept(state->tcp_fd, (struct sockaddr *)&caddr, &addrlen);
+ cli_fd = accept(tcp_fd, (struct sockaddr *)&caddr, &addrlen);
if (unlikely(cli_fd < 0)) {
ret = errno;
- perror("accept");
- if (ret == EAGAIN || ret == EMFILE)
- return 0;
+ if (ret != EAGAIN)
+ perror("accept");
return -ret;
}
idx = get_sess_idx(state);
if (unlikely(idx == -EAGAIN)) {
close(cli_fd);
- fprintf(stderr, "Cannot handle a new client\n");
- return 0;
+ fprintf(stderr, "Client session is full, cannot handle a new "
+ "client\n");
+ return -EAGAIN;
}
sess = &state->sess[idx];
sess->fd = cli_fd;
- sess->buf_size = 0;
+ sess->rbuf_len = 0;
sess->got_http_header = false;
- sess->addr = caddr;
+ sess->action = HTTP_ACT_NONE;
+ sess->method = HTTP_NOP;
- sess->src_addr[0] = '\0';
- sess->src_port = 0;
if (addrlen <= sizeof(caddr)) {
const char *p;
- sess->src_port = ntohs(caddr.sin_port);
p = inet_ntop(AF_INET, &caddr.sin_addr.s_addr, sess->src_addr,
sizeof(sess->src_addr));
- if (!p) {
+ if (p)
+ sess->src_port = ntohs(caddr.sin_port);
+ else
perror("inet_ntop");
- fprintf(stderr, "inet_ntop() error?\n");
- }
} else {
+ sess->src_addr[0] = '\0';
+ sess->src_port = 0;
fprintf(stderr, "Warning, accept overflow!\n");
}
- ret = register_new_client(state->epl_fd, sess);
+ epld.ptr = (void *)sess;
+ wrk_idx = atomic_fetch_add(&state->wrk_idx_use, 1u) % NR_WORKERS;
+ ret = install_infd_to_worker(cli_fd, &state->workers[wrk_idx], epld);
if (unlikely(ret < 0))
return ret;
+ sess->need_epl_del = true;
printf("Got a new client (idx = %u) " FCIP "\n", sess->idx,
FCIP_ARG(sess));
return 0;
}
-static ssize_t send_to_client(struct client_sess *sess, const char *buf,
- size_t len)
+static __hot int handle_new_client(struct worker *worker)
+{
+ int tcp_fd = worker->state->tcp_fd;
+ uint32_t acc_iter = 512;
+ int ret;
+
+ while (acc_iter--) {
+ ret = _handle_new_client(tcp_fd, worker);
+ if (likely(!ret))
+ continue;
+
+ if (likely(ret == -EAGAIN) || unlikely(ret == -EMFILE))
+ return 0;
+
+ return ret;
+ }
+ return 0;
+}
+
+static int poll_wait_for_fd_be_writable(int fd)
+{
+ struct pollfd fds[1];
+ int ret;
+
+ fds[0].fd = fd;
+ fds[0].events = POLLOUT;
+ fds[0].revents = 0;
+
+ ret = poll(fds, 1, -1);
+ if (unlikely(ret < 0)) {
+ ret = errno;
+ if (ret != EINTR)
+ return -ret;
+ }
+ return 0;
+}
+
+static ssize_t send_to_sess(struct client_sess *sess, const char *buf,
+ size_t len)
{
- constexpr uint32_t max_try = 10;
- uint32_t try_count = 0;
+ const char *bptr = buf;
+ ssize_t total_sent = 0;
+ size_t blen = len;
+ int fd = sess->fd;
ssize_t ret;
- int tmp;
repeat:
- if (unlikely(try_count++ >= max_try))
- return -ENETDOWN;
- ret = send(sess->fd, buf, len, MSG_DONTWAIT);
+ ret = send(fd, bptr, blen, MSG_DONTWAIT);
if (unlikely(ret < 0)) {
- tmp = errno;
- if (tmp == EAGAIN)
- goto repeat;
- perror("send");
- return -tmp;
- } else if (unlikely(ret == 0)) {
- return -ENETDOWN;
- } else if (unlikely((size_t)ret < len)) {
- buf = &buf[len];
- len -= (size_t)ret;
+ int ret = errno;
+ if (unlikely(ret != EAGAIN)) {
+ perror("send");
+ return -ret;
+ }
+
+ ret = poll_wait_for_fd_be_writable(fd);
+ if (unlikely(ret))
+ return ret;
+
goto repeat;
}
- return ret;
+
+ total_sent += ret;
+ if (unlikely((size_t)total_sent < len)) {
+ bptr = &buf[total_sent];
+ blen = len - (size_t)total_sent;
+ goto repeat;
+ }
+
+ return total_sent;
}
-static void send_http_error(int code, struct client_sess *sess)
+static ssize_t send_http_error(struct client_sess *sess, int code,
+ const char *errstr)
{
char buf[128];
int tmp;
tmp = snprintf(buf, sizeof(buf),
- "HTTP/1.1 %d\r\n"
+ "HTTP/1.1 %d%s%s\r\n"
+ "Connection: closed\r\n"
"Content-Type: text/plain\r\n\r\n"
"HTTP Error %d",
- code, code);
- send_to_client(sess, buf, (size_t)tmp);
+ code, (errstr ? " " : ""), (errstr ? errstr : ""),
+ code);
+ return send_to_sess(sess, buf, (size_t)tmp);
}
static char *parse_http_method(char *buf, enum http_method *method_p)
@@ -385,20 +731,21 @@ static char *parse_query_string(char *uri, char *end_uri)
while (uri < end_uri) {
if (*uri++ != '?')
continue;
+ uri[-1] = '\0';
if (uri == end_uri)
/*
* We got an empty query string:
* "http://somehwere.com/path?"
*/
return NULL;
- return uri;
+ return uri[0] ? uri : NULL;
}
return NULL;
}
static int parse_http_header(struct client_sess *sess)
{
- char *buf = sess->buf;
+ char *buf = sess->recv_buf;
char *end;
char *ret;
@@ -412,7 +759,7 @@ static int parse_http_header(struct client_sess *sess)
* Don't fail here if still have enough buffer,
* we will wait for the next recv() iteration.
*/
- if (sess->buf_size >= sizeof(sess->buf) - 1)
+ if (sess->rbuf_len >= sizeof(sess->recv_buf) - 1)
goto bad_req;
return 0;
@@ -435,6 +782,8 @@ static int parse_http_header(struct client_sess *sess)
* ^
* @ret
*/
+ if (unlikely(ret[0] != '/'))
+ goto bad_req;
sess->uri = ret;
ret = strstr(sess->uri, " ");
if (unlikely(!ret))
@@ -455,138 +804,715 @@ static int parse_http_header(struct client_sess *sess)
goto bad_req;
ret[0] = '\0';
+ sess->header = &ret[1];
sess->got_http_header = true;
return 0;
bad_req:
- send_http_error(400, sess);
+ send_http_error(sess, 400, "Bad Request");
return -EBADMSG;
}
-static void close_sess(struct client_sess *sess, struct server_state *state)
+static ssize_t http_redirect(struct client_sess *sess, const char *location)
{
- printf("Closing session " FCIP " (idx = %u)\n", FCIP_ARG(sess),
- sess->idx);
- delete_client(state->epl_fd, sess);
- close(sess->fd);
- put_sess_idx(sess->idx, state);
-}
+ size_t need_len;
+ ssize_t ret;
+ char *buf;
+ int len;
-#define HTTP_200_HTML "HTTP/1.1 200\r\nContent-Type: text/html\r\n\r\n"
-#define HTTP_200_TEXT "HTTP/1.1 200\r\nContent-Type: text/plain\r\n\r\n"
+ need_len = strlen(location) * 2 + 256;
+ buf = new char[need_len];
+ if (unlikely(!buf))
+ return -ENOMEM;
-static int route_show_index(struct client_sess *sess,
- struct server_state *state)
-{
- static const char buf[] =
- HTTP_200_HTML
- "<!DOCTYPE html>"
- "<html>"
- "<body>"
- "<h1>This is the index!</h1>"
- "</body>"
- "</html>";
+ len = snprintf(buf, need_len,
+ "HTTP/1.1 302\r\n"
+ "Connection: closed\r\n"
+ "Location: %s\r\n\r\nYou are redirected to %s\n\n",
+ location, location);
- send_to_client(sess, buf, sizeof(buf) - 1);
- close_sess(sess, state);
- return 0;
+ ret = send_to_sess(sess, buf, (size_t)len);
+ delete[] buf;
+ return ret;
}
-static int route_show_hello(struct client_sess *sess,
- struct server_state *state)
+static void not_found_or_forbidden(int e, struct client_sess *sess)
{
- static const char buf[] =
- HTTP_200_HTML
- "<!DOCTYPE html>"
- "<html>"
- "<body>"
- "<h1>Hello World!</h1>"
- "</body>"
- "</html>";
-
- send_to_client(sess, buf, sizeof(buf) - 1);
- close_sess(sess, state);
- return 0;
+ if (e == ENOENT)
+ send_http_error(sess, 404, "Not Found");
+ else
+ send_http_error(sess, 403, "Forbidden");
}
-static int handle_route_get(struct client_sess *sess,
- struct server_state *state)
+static int parse_http_header2(struct client_sess *sess)
{
- const char *uri = sess->uri;
+ char *hdr = sess->header + 1;
+
+ if (!sess->http_headers)
+ sess->http_headers = new __typeof__(*sess->http_headers);
+
+ while (1) {
+ char *keyval, *key, *val, *it;
+
+ key = hdr;
+ keyval = strstr(key, "\r\n");
+ if (!keyval)
+ break;
+ keyval[0] = '\0';
+
+ hdr = &keyval[2];
+ val = strstr(key, ":");
+ if (!val)
+ continue;
+ val[0] = '\0';
+ val++;
+
+ while (*val == ' ')
+ val++;
+
+ it = key;
+ while (*it) {
+ *it = tolower((unsigned char)*it);
+ it++;
+ }
- if (!strcmp(uri, "/"))
- return route_show_index(sess, state);
- if (!strcmp(uri, "/hello"))
- return route_show_hello(sess, state);
+ sess->http_headers->emplace(key, val);
+ }
- send_http_error(404, sess);
- close_sess(sess, state);
return 0;
}
-static int route_show_echo(struct client_sess *sess, struct server_state *state)
+static char *open_file_for_stream(const char *file, struct stat *st,
+ struct client_sess *sess,
+ struct stream_file_data *sfd)
{
- char buf[4096] = HTTP_200_TEXT;
- constexpr size_t max_body_len = sizeof(buf) - sizeof(HTTP_200_TEXT);
+ char *map;
+ int ret;
+ int fd;
+
+ fd = open(file, O_RDONLY);
+ if (unlikely(fd < 0)) {
+ fd = errno;
+ not_found_or_forbidden(fd, sess);
+ fprintf(stderr, "Cannot open file: \"%s\": %s\n", file,
+ strerror(fd));
+ return NULL;
+ }
- size_t header_size = (size_t)(sess->body - sess->buf);
- size_t body_len = sess->buf_size - header_size;
- size_t send_len;
+ ret = fstat(fd, st);
+ if (unlikely(ret < 0)) {
+ ret = errno;
+ not_found_or_forbidden(ret, sess);
+ close(fd);
+ fprintf(stderr, "Cannot stat file: \"%s\": %s\n", file,
+ strerror(ret));
+ return NULL;
+ }
- if (body_len > max_body_len)
- body_len = max_body_len;
+ map = (char *)mmap(NULL, st->st_size, PROT_READ, MAP_SHARED, fd, 0);
+ close(fd);
+ if (unlikely(map == MAP_FAILED)) {
+ ret = errno;
+ not_found_or_forbidden(ret, sess);
+ fprintf(stderr, "Cannot map file: \"%s\": %s\n", file,
+ strerror(ret));
+ return NULL;
+ }
- send_len = body_len + sizeof(HTTP_200_TEXT) - 1;
- memcpy(&buf[sizeof(HTTP_200_TEXT) - 1], sess->body, body_len);
- send_to_client(sess, buf, send_len);
- close_sess(sess, state);
- return 0;
+ sfd->size = st->st_size;
+ sfd->map = map;
+ sfd->cur_off = 0;
+ return map;
}
-static int handle_route_post(struct client_sess *sess,
- struct server_state *state)
+static int send_http_header_for_stream_file(struct client_sess *sess,
+ off_t start_offset,
+ off_t content_length)
{
- const char *uri = sess->uri;
+ char buf[1024];
+ int ret;
- if (!strcmp(uri, "/echo"))
- return route_show_echo(sess, state);
+ ret = snprintf(buf, sizeof(buf),
+ "HTTP/1.1 %s\r\n"
+ "Content-Type: text/plain\r\n"
+ "Content-Range: bytes %lu-%lu/%lu\r\n"
+ "Content-Length: %zu\r\n\r\n",
+ start_offset ? "206 Partial Content" : "200 OK",
+ start_offset, content_length - 1, content_length,
+ content_length);
+
+ ret = send_to_sess(sess, buf, (size_t)ret);
+ if (unlikely(ret < 0))
+ return -EBADMSG;
- send_http_error(404, sess);
- close_sess(sess, state);
return 0;
}
-static int handle_route(struct client_sess *sess, struct server_state *state)
+static int stream_file_once(struct stream_file_data *sfd,
+ struct client_sess *sess)
{
+ size_t send_len;
int ret;
- switch (sess->method) {
+ if (unlikely(sfd->cur_off >= sfd->size))
+ return 1;
+
+ send_len = (size_t)(sfd->size - sfd->cur_off);
+ ret = send_to_sess(sess, &sfd->map[sfd->cur_off], send_len);
+ if (unlikely(ret < 0))
+ return -EBADMSG;
+
+ return 1;
+}
+
+constexpr off_t max_send_len_file = 1024 * 128;
+
+static int stream_file_loop(struct stream_file_data *sfd,
+ struct client_sess *sess,
+ struct worker *worker)
+{
+ size_t send_len;
+ int ret;
+
+ if (unlikely(sfd->cur_off >= sfd->size)) {
+ ret = 1;
+ goto out;
+ }
+
+ if (unlikely(!sess->priv_data)) {
+ struct stream_file_data *sfd_h;
+
+ sfd_h = new struct stream_file_data;
+ if (unlikely(!sfd_h))
+ return -ENOMEM;
+
+ *sfd_h = *sfd;
+ sfd = sfd_h;
+ sess->priv_data = (void *)sfd_h;
+ madvise(sfd->map, sfd->size, MADV_SEQUENTIAL);
+ send_len = max_send_len_file;
+ } else {
+ send_len = (size_t)(sfd->size - sfd->cur_off);
+ if (send_len > max_send_len_file)
+ send_len = max_send_len_file;
+ }
+
+ ret = send_to_sess(sess, &sfd->map[sfd->cur_off], send_len);
+ if (unlikely(ret < 0)) {
+ ret = -EBADMSG;
+ goto out;
+ }
+ sfd->cur_off += (off_t)ret;
+ ret = 1;
+
+ if (sfd->cur_off < sfd->size) {
+ sess->action = HTTP_ACT_FILE_STREAM;
+ worker->buf_queue->push(sess->idx);
+ return 0;
+ }
+
+out:
+ munmap(sfd->map, sfd->size);
+ if (likely(sess->priv_data)) {
+ delete (struct stream_file_data *)sess->priv_data;
+ sess->priv_data = NULL;
+ }
+
+ return ret;
+}
+
+static void handle_range_header(struct client_sess *sess, off_t *start_offset_p)
+{
+ parse_http_header2(sess);
+
+ const auto &http_headers = *sess->http_headers;
+ auto it = http_headers.find("range");
+ if (it != http_headers.end()) {
+ const char *val = it->second.c_str();
+ const char *start_bytes;
+
+ start_bytes = strstr(val, "bytes=");
+ if (!start_bytes)
+ return;
+
+ start_bytes += 6;
+ *start_offset_p = strtoll(start_bytes, NULL, 10);
+ }
+}
+
+static void stream_file_bad_req(struct client_sess *sess, const char *msg)
+{
+ char buf[512];
+ int ret;
+
+ ret = snprintf(buf, sizeof(buf),
+ "HTTP/1.1 400 Bad Request\r\n"
+ "Connection: closed\r\n"
+ "Content-Type: text/plain\r\n\r\n"
+ "%s\n\n", msg);
+
+ send_to_sess(sess, buf, (size_t)ret);
+}
+
+static int start_stream_file(const char *file, struct client_sess *sess,
+ struct worker *worker)
+{
+ struct stream_file_data sfd;
+ off_t start_offset = 0;
+ struct stat st;
+ char *map;
+ int ret;
+
+ map = open_file_for_stream(file, &st, sess, &sfd);
+ if (unlikely(!map))
+ return -EBADMSG;
+
+ handle_range_header(sess, &start_offset);
+ if (start_offset >= sfd.size || start_offset < 0) {
+ stream_file_bad_req(sess, "Bad range offset!");
+ return -EBADMSG;
+ }
+
+ sfd.cur_off = start_offset;
+ ret = send_http_header_for_stream_file(sess, start_offset, st.st_size);
+ if (unlikely(ret))
+ return -EBADMSG;
+
+ if (st.st_size <= max_send_len_file) {
+ ret = stream_file_once(&sfd, sess);
+ munmap(sfd.map, sfd.size);
+ } else {
+ ret = stream_file_loop(&sfd, sess, worker);
+ }
+
+ return ret;
+}
+
+static int stream_file(const char *file, struct client_sess *sess,
+ struct worker *worker)
+{
+ int ret;
+
+ if (sess->action == HTTP_ACT_NONE) {
+ ret = start_stream_file(file, sess, worker);
+ if (unlikely(ret))
+ return ret;
+ if (sess->action == HTTP_ACT_FILE_STREAM)
+ return 0;
+ }
+ return 1;
+}
+
+#if 0
+static size_t htmlspecialchars(char *_output, size_t outlen, const char *_input,
+ size_t inlen)
+{
+ struct html_char_map {
+ const char to[8];
+ const uint8_t len;
+ };
+
+ static const struct html_char_map html_map[0x100u] = {
+ ['<'] = {"<", 4},
+ ['>'] = {">", 4},
+ ['"'] = {""", 6},
+ ['&'] = {"&", 5},
+ };
+
+
+ size_t j = 0;
+ uint8_t len = 0;
+ unsigned char *output = (unsigned char *)_output;
+ const unsigned char *input = (const unsigned char *)_input;
+ const unsigned char *in_end = input + inlen;
+
+ while (likely(input < in_end)) {
+ const unsigned char *cp;
+ const struct html_char_map *map_to = &html_map[(size_t)*input];
+
+ if (likely(*map_to->to == '\0')) {
+ cp = input;
+ len = 1;
+ } else {
+ cp = (const unsigned char *)map_to->to;
+ len = map_to->len;
+ }
+
+ if (unlikely((j + len - 1) >= outlen))
+ break;
+
+ memcpy(&output[j], cp, len);
+ j += len;
+ input++;
+ }
+
+ if (likely(outlen > 0)) {
+ if (unlikely((j + 1) > outlen))
+ j -= len;
+ output[++j] = '\0';
+ }
+
+ return j;
+}
+#endif
+
+static int redirect_on_no_trailing_slash(const char *path,
+ struct client_sess *sess)
+{
+ size_t len;
+ char *buf;
+ char *qs;
+
+ len = strlen(path);
+ if (path[len - 1] == '/')
+ return 0;
+
+ qs = sess->qs;
+ if (qs)
+ len += strlen(qs);
+
+ len += 16;
+ buf = new char[len];
+ if (unlikely(!buf))
+ return -ENETDOWN;
+
+ if (qs)
+ snprintf(buf, len, "%s/?%s", path, qs);
+ else
+ snprintf(buf, len, "%s/", path);
+
+ http_redirect(sess, buf);
+ delete[] buf;
+ return 1;
+}
+
+static int stream_dir_list_open(const char *path, struct client_sess *sess,
+ DIR **dir_p)
+{
+ DIR *dir;
+ int ret;
+
+ dir = opendir(path);
+ if (unlikely(!dir)) {
+ ret = errno;
+ if (ret == ENOENT)
+ send_http_error(sess, 404, "Not Found");
+ else
+ send_http_error(sess, 403, "Forbidden");
+
+ return -EBADMSG;
+ }
+ *dir_p = dir;
+ return 0;
+}
+
+static int send_init_payload_for_stream_dir_list(struct client_sess *sess)
+{
+ constexpr static const char buf[] =
+ "HTTP/1.1 200\r\n"
+ "Content-Type: text/html\r\n"
+ "Connection: closed\r\n\r\n"
+ "<!DOCTYPE html>\n"
+ "<html>\n"
+ "<style type=\"text/css\">"
+ "td {padding: 10px;}"
+ "a {color: blue; text-decoration: none}"
+ "a:hover {text-decoration: underline}"
+ "</style>"
+ "<body>\n"
+ "\t<h1>GNU/Weeb HTTP Server</h1>\n"
+ "\t<table border=\"1\">\n"
+ "\t\t<tr>"
+ "<th>Filename</th>"
+ "<th>Type</th>"
+ "<th>Mode</th>"
+ "</tr>\n";
+
+ int ret;
+
+ ret = send_to_sess(sess, buf, sizeof(buf) - 1);
+ if (unlikely(ret < 0))
+ return -EBADMSG;
+
+ return 0;
+}
+
+static int construct_file_row(const char *path, const char *file, char **pbuf_p,
+ size_t *capacity_p)
+{
+ char fpath[4096 + 256];
+ const char *type;
+ struct stat st;
+ int ret;
+
+ snprintf(fpath, sizeof(fpath), "%s/%s", path, file);
+
+ ret = stat(fpath, &st);
+ if (unlikely(ret < 0)) {
+ ret = errno;
+ fprintf(stderr, "Can't stat \"%s\": %s\n", fpath, strerror(ret));
+ return -ret;
+ }
+
+ if (S_ISDIR(st.st_mode))
+ type = "Directory";
+ else if (S_ISREG(st.st_mode))
+ type = "Regular File";
+ else if (S_ISCHR(st.st_mode))
+ type = "Char dev";
+ else if (S_ISBLK(st.st_mode))
+ type = "Block dev";
+ else if (S_ISFIFO(st.st_mode))
+ type = "FIFO";
+ else if (S_ISLNK(st.st_mode))
+ type = "Symlink";
+ else if (S_ISSOCK(st.st_mode))
+ type = "Socket";
+ else
+ return -ENOTSUP;
+
+ ret = snprintf(*pbuf_p, *capacity_p,
+ "\t\t<tr>"
+ "<td><a href=\"%s%s\">%s</a></td>"
+ "<td>%s</td>"
+ "<td>%d%d%d%d</td>"
+ "</tr>\n",
+ file, S_ISDIR(st.st_mode) ? "/" : "", file,
+ type,
+ (st.st_mode & 07000) >> 9,
+ (st.st_mode & 00700) >> 6,
+ (st.st_mode & 00070) >> 3,
+ (st.st_mode & 00007)
+ );
+ *pbuf_p += (size_t)ret;
+ *capacity_p -= (size_t)ret;
+ return 0;
+}
+
+static int _stream_dir_list_loop(struct stream_dir_list_data *sdld,
+ char **pbuf_p, size_t *capacity_p)
+{
+ struct dirent *de;
+ const char *f;
+ int ret;
+
+ de = readdir(sdld->dir);
+ if (unlikely(!de))
+ return -1;
+
+ f = de->d_name;
+ if (unlikely(!strcmp(f, ".") || !strcmp(f, "..")))
+ return 0;
+
+ ret = construct_file_row(sdld->path, f, pbuf_p, capacity_p);
+ if (unlikely(ret > 0))
+ return 0;
+
+ return 0;
+}
+
+static int stream_dir_list_queue(struct stream_dir_list_data *sdld_s,
+ struct client_sess *sess,
+ struct worker *worker)
+{
+ if (unlikely(!sess->priv_data)) {
+ struct stream_dir_list_data *sdld;
+
+ sdld = new struct stream_dir_list_data;
+ if (unlikely(!sdld))
+ return -EBADMSG;
+
+ *sdld = *sdld_s;
+ sess->priv_data = (void *)sdld;
+ }
+
+ sess->action = HTTP_ACT_DIRLIST;
+ worker->buf_queue->push(sess->idx);
+ return 0;
+}
+
+constexpr static const char dir_list_foot[] =
+ "\t</table>\n</body>\n</html>\n";
+
+static int send_footer_dir_list(struct client_sess *sess)
+{
+ int ret;
+
+ ret = send_to_sess(sess, dir_list_foot, sizeof(dir_list_foot) - 1);
+ if (unlikely(ret < 0))
+ return ret;
+
+ return 0;
+}
+
+static int stream_dir_list_loop(struct stream_dir_list_data *sdld,
+ struct client_sess *sess,
+ struct worker *worker)
+{
+ bool need_send_footer;
+ char buf[1024 * 128];
+ size_t capacity = sizeof(buf) - 1;
+ size_t send_len;
+ char *pbuf = buf;
+ int ret;
+
+ while (1) {
+
+ if (unlikely(capacity < 8192)) {
+ send_len = sizeof(buf) - 1 - capacity;
+ ret = send_to_sess(sess, buf, send_len);
+ if (unlikely(ret < 0))
+ goto out_close;
+
+ ret = stream_dir_list_queue(sdld, sess, worker);
+ if (unlikely(ret))
+ goto out_close;
+
+ /*
+ * We are still in the queue loop.
+ */
+ return 0;
+ }
+
+ ret = _stream_dir_list_loop(sdld, &pbuf, &capacity);
+ if (unlikely(ret))
+ break;
+ }
+
+ if (capacity > sizeof(dir_list_foot) - 1) {
+ capacity -= sizeof(dir_list_foot) - 1;
+ memcpy(pbuf, dir_list_foot, sizeof(dir_list_foot) - 1);
+ need_send_footer = false;
+ } else {
+ need_send_footer = true;
+ }
+
+ send_len = sizeof(buf) - 1 - capacity;
+ if (send_len > 0) {
+ ret = send_to_sess(sess, buf, send_len);
+ if (unlikely(ret < 0))
+ goto out_close;
+ }
+
+ if (need_send_footer) {
+ ret = send_footer_dir_list(sess);
+ if (unlikely(ret))
+ goto out_close;
+ }
+
+ ret = 1;
+
+out_close:
+ closedir(sdld->dir);
+ if (sess->priv_data) {
+ delete (struct stream_dir_list_data *)sess->priv_data;
+ sess->priv_data = NULL;
+ }
+
+ return ret;
+}
+
+static int stream_dir_list(const char *path, struct client_sess *sess,
+ struct worker *worker)
+{
+ struct stream_dir_list_data sdld;
+ int ret;
+
+ /*
+ * If we are going to list the directory, the URI path must be
+ * ended with a trailing slash. We it's not, put the a trailing
+ * slash by redirecting the client.
+ */
+ ret = redirect_on_no_trailing_slash(path, sess);
+ if (ret)
+ return ret;
+
+ ret = stream_dir_list_open(path, sess, &sdld.dir);
+ if (unlikely(ret))
+ return ret;
+
+ ret = send_init_payload_for_stream_dir_list(sess);
+ if (unlikely(ret))
+ return ret;
+
+ snprintf(sdld.path, sizeof(sdld.path), "%s", path);
+ return stream_dir_list_loop(&sdld, sess, worker);
+}
+
+static int handle_route_get(struct client_sess *sess, struct worker *worker)
+{
+ char path[4096 + 128];
+ struct stat st;
+ int ret;
+
+ if (likely(sess->need_epl_del)) {
+ sess->need_epl_del = false;
+ uninstall_fd_from_worker(sess->fd, worker);
+ }
+
+ snprintf(path, sizeof(path), "./%s", sess->uri);
+
+ /*
+ * Don't allow to step up to the parent directory.
+ */
+ if (unlikely(strstr(path, "/.."))) {
+ send_http_error(sess, 404, "Not Found");
+ return -EBADMSG;
+ }
+
+ ret = stat(path, &st);
+ if (unlikely(ret < 0)) {
+ ret = errno;
+ fprintf(stderr, "Can't stat \"%s\": %s\n", path, strerror(ret));
+
+ if (ret == ENOENT)
+ send_http_error(sess, 404, "Not Found");
+ else
+ send_http_error(sess, 403, "Forbidden");
+
+ return -EBADMSG;
+ }
+
+ if (S_ISDIR(st.st_mode))
+ return stream_dir_list(path, sess, worker);
+
+ if (S_ISREG(st.st_mode))
+ return stream_file(path, sess, worker);
+
+ return -EBADMSG;
+}
+
+static int handle_route(struct client_sess *sess, struct worker *worker)
+{
+ int ret;
+
+ switch (sess->method) {
case HTTP_GET:
- ret = handle_route_get(sess, state);
+ ret = handle_route_get(sess, worker);
break;
case HTTP_POST:
- ret = handle_route_post(sess, state);
- break;
+ case HTTP_DELETE:
+ case HTTP_PATCH:
+ case HTTP_PUT:
default:
- send_http_error(405, sess);
- close_sess(sess, state);
- return 0;
+ send_http_error(sess, 405, "Method not allowed");
+ return -EBADMSG;
}
return ret;
}
-static int _handle_client(struct client_sess *sess, struct server_state *state)
+static int _handle_client(struct client_sess *sess, struct worker *worker)
{
int ret = 0;
- sess->buf[sess->buf_size] = '\0';
if (!sess->got_http_header) {
+ sess->recv_buf[sess->rbuf_len] = '\0';
ret = parse_http_header(sess);
- if (ret)
+ if (unlikely(ret))
+ goto out;
+ if (unlikely(!sess->got_http_header))
goto out;
- if (!sess->got_http_header)
- return 0;
}
#if 0
@@ -595,68 +1521,107 @@ static int _handle_client(struct client_sess *sess, struct server_state *state)
printf("HTTP version: %s\n", sess->http_ver);
#endif
- ret = handle_route(sess, state);
+ ret = handle_route(sess, worker);
out:
if (ret) {
- close_sess(sess, state);
- if (likely(ret == -EBADMSG || ret == -ENETDOWN))
+ close_sess(sess, worker);
+ if (likely(ret == -EBADMSG || ret == -ENETDOWN ||
+ ret == -ECONNRESET || ret == -EPIPE ||
+ ret == 1))
ret = 0;
}
return ret;
}
-static int handle_client(struct client_sess *sess, struct server_state *state)
+static int handle_client(struct client_sess *sess, struct worker *worker)
{
ssize_t recv_ret;
size_t len;
char *buf;
int ret;
- len = sizeof(sess->buf) - sess->buf_size - 1;
- buf = &sess->buf[sess->buf_size];
+ len = sizeof(sess->recv_buf) - 1 - sess->rbuf_len;
+ buf = &sess->recv_buf[sess->rbuf_len];
recv_ret = recv(sess->fd, buf, len, 0);
- if (unlikely(recv_ret < 0)) {
+ if (unlikely(recv_ret <= 0)) {
+
+ if (recv_ret == 0) {
+ close_sess(sess, worker);
+ return 0;
+ }
+
ret = errno;
if (ret == EAGAIN)
return 0;
- perror("recv");
- return -ret;
- }
- if (unlikely(recv_ret == 0 && len != 0)) {
- close_sess(sess, state);
+ close_sess(sess, worker);
+ perror("recv");
return 0;
}
+ sess->rbuf_len += (size_t)recv_ret;
+ return _handle_client(sess, worker);
+}
+
+static __hot int handle_event(struct epoll_event *event,
+ struct worker *worker)
+{
+ struct client_sess *sess;
- sess->buf_size += (size_t)recv_ret;
- return _handle_client(sess, state);
+ if (!event->data.ptr)
+ return handle_new_client(worker);
+
+ sess = (struct client_sess *)event->data.ptr;
+ return handle_client(sess, worker);
}
-static int handle_event(struct epoll_event *event, struct server_state *state)
+static __hot int handle_buf_queue(uint32_t idx, struct worker *worker)
{
+ struct server_state *state = worker->state;
+ struct client_sess *sess = &state->sess[idx];
int ret = 0;
- if (event->data.fd == 0) {
- ret = handle_new_client(state);
- } else {
- struct client_sess *sess =
- (struct client_sess *)event->data.ptr;
+ switch (sess->action) {
+ case HTTP_ACT_NONE:
+ ret = -ENETDOWN;
+ break;
+ case HTTP_ACT_DIRLIST: {
+ struct stream_dir_list_data *sdld;
- ret = handle_client(sess, state);
+ sdld = (struct stream_dir_list_data *)sess->priv_data;
+ ret = stream_dir_list_loop(sdld, sess, worker);
+ break;
}
+ case HTTP_ACT_FILE_STREAM: {
+ struct stream_file_data *sfd;
- return ret;
+ sfd = (struct stream_file_data *)sess->priv_data;
+ ret = stream_file_loop(sfd, sess, worker);
+ break;
+ }
+ }
+
+ if (ret)
+ close_sess(sess, worker);
+
+ return 0;
}
-static int _run_server(int epl_fd, struct server_state *state)
+static __hot int run_worker(int epl_fd, struct epoll_event *events,
+ struct worker *worker)
{
- struct epoll_event *events = state->events;
+ int timeout = 1000;
+ size_t qlen;
+ int nr_evt;
int ret;
int i;
- ret = epoll_wait(epl_fd, events, NR_EPOLL_EVT, 1000);
- if (unlikely(ret < 0)) {
+ qlen = worker->buf_queue->size();
+ if (qlen)
+ timeout = 0;
+
+ nr_evt = epoll_wait(epl_fd, events, NR_EPOLL_EVT, timeout);
+ if (unlikely(nr_evt < 0)) {
ret = errno;
perror("epoll_wait");
if (ret == EINTR)
@@ -664,106 +1629,199 @@ static int _run_server(int epl_fd, struct server_state *state)
return -ret;
}
- for (i = 0; i < ret; i++) {
- ret = handle_event(&events[i], state);
+ for (i = 0; i < nr_evt; i++) {
+ ret = handle_event(&events[i], worker);
if (unlikely(ret))
return ret;
}
- return ret;
+ while (qlen--) {
+ uint32_t idx;
+
+ idx = worker->buf_queue->pop();
+ ret = handle_buf_queue(idx, worker);
+ if (unlikely(ret))
+ return ret;
+ }
+
+ return 0;
}
-static int run_server(struct server_state *state)
+static __hot int _worker_func(struct worker *worker)
{
- int epl_fd = state->epl_fd;
+ struct epoll_event *events = worker->events;
+ struct server_state *state = worker->state;
+ int epl_fd = worker->epl_fd;
int ret = 0;
while (likely(!state->stop)) {
- ret = _run_server(epl_fd, state);
- if (unlikely(ret))
- break;
+ ret = run_worker(epl_fd, events, worker);
+ if (likely(!ret))
+ continue;
+
+ state->stop = true;
+ break;
}
return ret;
}
-static void destroy_state(struct server_state *state)
+static noinline int worker_func(struct worker *worker)
+{
+ int ret;
+
+ if (worker->idx > 0)
+ worker->need_join = true;
+
+ atomic_fetch_add(&worker->state->nr_on_thread, 1u);
+ wait_for_ready_state(worker);
+ ret = _worker_func(worker);
+ atomic_fetch_sub(&worker->state->nr_on_thread, 1u);
+ return ret;
+}
+
+static int init_epoll_for_worker(struct worker *worker)
+{
+ int epl_fd;
+
+ epl_fd = epoll_create(255);
+ if (unlikely(epl_fd < 0)) {
+ int ret = errno;
+ perror("epoll_create");
+ return -ret;
+ }
+
+ worker->epl_fd = epl_fd;
+ return 0;
+}
+
+static __cold int wait_for_worker_online(struct worker *worker)
+{
+ uint32_t i = 0;
+
+ while (!worker->need_join) {
+ usleep(1000);
+ if (i++ < 10000)
+ continue;
+
+ fprintf(stderr, "Timedout while waiting for thread %u\n",
+ worker->idx);
+ worker->state->stop = true;
+ worker->thread.join();
+ worker->need_join = false;
+ return -ETIMEDOUT;
+ }
+ return 0;
+}
+
+static __cold int run_workers(struct server_state *state)
+{
+ struct worker *workers = state->workers;
+ epoll_data_t edt;
+ int ret = 0;
+ uint32_t i;
+
+ i = NR_WORKERS;
+ while (i--) {
+ ret = init_epoll_for_worker(&workers[i]);
+ if (unlikely(ret))
+ return ret;
+ }
+
+
+ i = NR_WORKERS;
+ /*
+ * Skip i == 0, we will run the worker
+ * on the main thread.
+ */
+ while (--i > 0) {
+ workers[i].thread = std::thread(worker_func, &workers[i]);
+ ret = wait_for_worker_online(&workers[i]);
+ if (unlikely(ret))
+ return ret;
+ }
+
+ edt.ptr = NULL;
+ ret = install_infd_to_worker(state->tcp_fd, &workers[0], edt);
+ if (unlikely(ret))
+ return ret;
+
+ return worker_func(&workers[0]);
+}
+
+static __cold void destroy_state(struct server_state *state)
{
uint32_t i;
+ int fd;
- if (state->tcp_fd != -1)
- close(state->tcp_fd);
- if (state->epl_fd != -1)
- close(state->epl_fd);
+ if (!state)
+ return;
+
+ fd = state->tcp_fd;
+ if (fd != -1)
+ close(fd);
+
+ /*
+ * Do not join @workers[0].thread, it doesn't
+ * have an LWP!
+ */
+ for (i = 1; i < NR_WORKERS; i++) {
+ if (state->workers[i].need_join)
+ state->workers[i].thread.join();
+ }
+
+ for (i = 0; i < NR_WORKERS; i++) {
+ fd = state->workers[i].epl_fd;
+ if (fd != -1)
+ close(fd);
+ }
for (i = 0; i < NR_MAX_CLIENTS; i++) {
- int fd = state->sess[i].fd;
+ fd = state->sess[i].fd;
if (fd != -1)
close(fd);
}
- delete state->sess_free_idx;
- delete state->sfi_lock;
-#ifdef USE_ASAN
- delete state;
-#else
- munmap(state, sizeof(*state));
-#endif
- g_state = NULL;
+ i = NR_WORKERS;
+ while (i--) {
+ wq_queue<uint32_t> *p = state->workers[i].buf_queue;
+ if (p)
+ delete p;
+ }
+
+ if (state->workers)
+ delete[] state->workers;
+ if (state->sess_free_lock)
+ delete state->sess_free_lock;
+ if (state->sess_free)
+ delete state->sess_free;
+
+ free_state(state);
}
-/**
- *
- * ./gwhttpd 0.0.0.0 8000
- *
- */
int main(int argc, char *argv[])
{
- struct server_state *state;
+ struct server_state *state = NULL;
int ret;
setvbuf(stdout, NULL, _IOLBF, 4096);
-
if (argc != 3) {
printf("Usage: %s [bind_address] [bind_port]\n", argv[0]);
return 0;
}
-#ifdef USE_ASAN
- state = new struct server_state;
- if (!state) {
- perror("malloc");
- return ret;
- }
-#else
- state = (struct server_state *)mmap(NULL, sizeof(*state),
- PROT_READ|PROT_WRITE,
- MAP_ANONYMOUS|MAP_PRIVATE, -1, 0);
- if (state == MAP_FAILED) {
- ret = errno;
- perror("mmap");
+ ret = init_state(&state);
+ if (unlikely(ret))
return ret;
- }
- mlock(state, sizeof(*state));
-#endif
+ state->bind_addr = argv[1];
+ state->bind_port = (uint16_t)atoi(argv[2]);
- g_state = state;
- ret = init_state(state);
- if (ret)
- return ret;
- ret = init_socket(state, argv[1], (uint16_t)atoi(argv[2]));
- if (ret)
- goto out;
- ret = init_epoll(state);
- if (ret)
+ ret = init_socket(state);
+ if (unlikely(ret))
goto out;
-
- ret = run_server(state);
-
+ ret = run_workers(state);
out:
destroy_state(state);
- if (ret < 0)
- ret = -ret;
return ret;
}
--
Ammar Faizi
next prev parent reply other threads:[~2022-07-08 12:10 UTC|newest]
Thread overview: 15+ messages / expand[flat|nested] mbox.gz Atom feed top
2022-07-08 12:10 [PATCH gwhttpd 00/14] gwhttpd updates Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 01/14] gwhttpd: Do an early return when `parse_http_header()` fails Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 02/14] gwhttpd: Don't print any error when mlock fails Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 03/14] gwhttpd: Replace `send_error_and_close()` with `send_http_error()` Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 04/14] gwhttpd: Add log in the interrupt handler Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 05/14] gwhttpd: Refactor HTTP header parser Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 06/14] gwhttpd: Avoid endless busy spinning on `send()` Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 07/14] Makefile: Add "make clean" command Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 08/14] gwhttpd: Skip interrupt error from `epoll_wait()` Ammar Faizi
2022-07-08 12:10 ` Ammar Faizi [this message]
2022-07-08 12:10 ` [PATCH gwhttpd 10/14] gwhttpd: Add command line options Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 11/14] gwhttpd: Add SLC support Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 12/14] gwhttpd: slc: Shut the SLC log up Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 13/14] gwhttpd: Fix 403 HTTP error when accessing an empty file Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 14/14] gwhttpd: Add connecting log for SLC Ammar Faizi
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox