public inbox for [email protected]
 help / color / mirror / Atom feed
From: Ammar Faizi <[email protected]>
To: GNU/Weeb Mailing List <[email protected]>
Cc: Ammar Faizi <[email protected]>,
	Alviro Iskandar Setiawan <[email protected]>,
	Arthur Lapz <[email protected]>,
	Fernanda Ma'rouf <[email protected]>,
	Sprite <[email protected]>, Yonle <[email protected]>
Subject: [PATCH gwhttpd 09/14] gwhttpd: Add directory listing and download file support
Date: Fri,  8 Jul 2022 19:10:20 +0700	[thread overview]
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>

A huge changes, full refactoring. Let's start again from this point.

Signed-off-by: Ammar Faizi <[email protected]>
---
 Makefile    |    4 +-
 gwhttpd.cpp | 1680 +++++++++++++++++++++++++++++++++++++++++----------
 2 files changed, 1371 insertions(+), 313 deletions(-)

diff --git a/Makefile b/Makefile
index 28cc0c8..cfbd377 100644
--- a/Makefile
+++ b/Makefile
@@ -2,11 +2,11 @@
 CXXFLAGS = -ggdb3 -Wall -Wextra -O2 -fno-exceptions
 
 ifeq ($(SANITIZE_BUILD),1)
-	CXXFLAGS += "-fsanitize=address -DUSE_ASAN=1"
+	CXXFLAGS += -fsanitize=address -DUSE_ASAN=1
 endif
 
 gwhttpd: gwhttpd.cpp
-	$(CXX) $(CXXFLAGS) -o $(@) $(^)
+	$(CXX) $(CXXFLAGS) -o $(@) $(^) -lpthread
 
 clean:
 	rm -vf gwhttpd
diff --git a/gwhttpd.cpp b/gwhttpd.cpp
index bb8dc93..0e4c558 100644
--- a/gwhttpd.cpp
+++ b/gwhttpd.cpp
@@ -6,6 +6,8 @@
 #define _GNU_SOURCE
 #endif
 
+#include <poll.h>
+#include <fcntl.h>
 #include <cstdio>
 #include <cstdint>
 #include <cstdlib>
@@ -13,22 +15,130 @@
 #include <unistd.h>
 #include <cerrno>
 #include <stack>
+#include <queue>
+#include <atomic>
 #include <mutex>
+#include <dirent.h>
 #include <signal.h>
+#include <thread>
+#include <unordered_map>
 #include <sys/mman.h>
 #include <sys/socket.h>
 #include <sys/epoll.h>
 #include <arpa/inet.h>
+#include <sys/stat.h>
+#include <sys/types.h>
 
+#define noinline		__attribute__((__noinline__))
+#define __hot			__attribute__((__hot__))
+#define __cold			__attribute__((__cold__))
 #define likely(COND)		__builtin_expect(!!(COND), 1)
 #define unlikely(COND)		__builtin_expect(!!(COND), 0)
-#define NR_EPOLL_EVT		32
-#define NR_MAX_CLIENTS		4096
+#define NR_WORKERS		16
+#define NR_EPOLL_EVT		512
+#define NR_MAX_CLIENTS		10240
 #define IPV4_LEN		(sizeof("xxx.xxx.xxx.xxx"))
 #define FCIP			"%s:%u"
-#define FCIP_ARG(P)		(P)->src_addr, (P)->src_port
+#define FCIP_ARG(P)		((P)->src_addr), ((P)->src_port)
+
+#if defined(__x86_64__)
+#define	__page_size_aligned_in_smp	__attribute__((__aligned__(4096)))
+#else
+#define	__page_size_aligned_in_smp
+#endif
+
+template<typename T>
+struct wq_stack {
+	T	*arr_;
+	size_t	pos_;
+	size_t	max_;
+
+	inline wq_stack(size_t max):
+		pos_(max),
+		max_(max)
+	{
+		arr_ = new T[max];
+	}
+
+	inline ~wq_stack(void)
+	{
+		delete[] arr_;
+	}
+
+	inline int64_t push(T val)
+	{
+		arr_[--pos_] = val;
+		return pos_;
+	}
+
+	inline T top(void)
+	{
+		return arr_[pos_];
+	}
+
+	inline T pop(void)
+	{
+		return arr_[pos_++];
+	}
+
+	inline bool empty(void)
+	{
+		return max_ == pos_;
+	}
+};
+
+
+template<typename T>
+struct wq_queue {
+	T	*arr_;
+	size_t	front_;
+	size_t	rear_;
+	size_t	and_;
+
+	inline wq_queue(size_t want_max):
+		front_(0),
+		rear_(0)
+	{
+		size_t max = 1;
+		while (max < want_max)
+			max *= 2;
+
+		and_ = max - 1;
+		arr_ = new T[max];
+	}
+
+	inline ~wq_queue(void)
+	{
+		delete[] arr_;
+	}
+
+	inline size_t size(void)
+	{
+		if (rear_ >= front_)
+			return rear_ - front_;
+		else
+			return front_ - rear_;
+	}
+
+	inline int64_t push(T val)
+	{
+		arr_[rear_ & and_] = val;
+		return rear_++;
+	}
+
+	inline T front(void)
+	{
+		return arr_[front_ & and_];
+	}
+
+	inline T pop(void)
+	{
+		return arr_[front_++ & and_];
+	}
+};
 
 enum http_method {
+	HTTP_NOP = 0,
 	HTTP_GET,
 	HTTP_POST,
 	HTTP_PATCH,
@@ -36,43 +146,84 @@ enum http_method {
 	HTTP_DELETE,
 };
 
+enum http_action {
+	HTTP_ACT_NONE = 0,
+	HTTP_ACT_DIRLIST,
+	HTTP_ACT_FILE_STREAM,
+};
+
+struct stream_dir_list_data {
+	DIR			*dir;
+	char			path[4096];
+};
+
+struct stream_file_data {
+	char			*map;
+	off_t			size;
+	off_t			cur_off;
+};
+
 struct client_sess {
 	int			fd;
-	size_t			buf_size;
-	char			buf[4096];
+	uint32_t		idx;
+	void			*priv_data;
+
+	enum http_action	action;
 	enum http_method	method;
 	char			*uri;
 	char			*qs;
 	char			*http_ver;
+	char			*header;
 	char			*body;
 	bool			got_http_header;
-	struct sockaddr_in	addr;
-	uint16_t		src_port;
 	char			src_addr[IPV4_LEN];
+	uint16_t		src_port;
+	char			recv_buf[4096];
+	size_t			rbuf_len;
+	bool			need_epl_del;
+	std::unordered_map<std::string, std::string>	*http_headers;
+} __page_size_aligned_in_smp;
+
+struct server_state;
+struct worker {
+	int			epl_fd;
+	struct epoll_event	events[NR_EPOLL_EVT];
 	uint32_t		idx;
-};
+	struct server_state	*state;
+	wq_queue<uint32_t>	*buf_queue;
+	std::thread		thread;
+	volatile bool		need_join;
+} __page_size_aligned_in_smp;
 
 struct server_state {
 	volatile bool		stop;
+	std::atomic<uint32_t>	wrk_idx_use;
 	int			tcp_fd;
-	int			epl_fd;
-	struct epoll_event	events[NR_EPOLL_EVT];
-	std::mutex		*sfi_lock;
-	std::stack<uint32_t>	*sess_free_idx;
-	struct client_sess	sess[NR_MAX_CLIENTS];
+	int			sig;
 
 	/*
-	 * Signal caught by the interrupt handler.
+	 * Stack of free session indexes.
 	 */
-	int			sig;
+	wq_stack<uint32_t>	*sess_free;
+	std::mutex		*sess_free_lock;
+
+	/*
+	 * Array of client sessions.
+	 */
+	struct client_sess	sess[NR_MAX_CLIENTS];
+	std::atomic<uint32_t>	nr_on_thread;
+
+	struct worker		*workers;
+
+	const char		*bind_addr;
+	uint16_t		bind_port;
 };
 
-static struct server_state *g_state;
+static struct server_state *g_state = NULL;
 
-static void interrupt_handler(int sig)
+static __cold void signal_handler_func(int sig)
 {
-	putchar('\n');
-	printf("Got signal: %d\n", sig);
+	printf("\nGot signal: %d\n", sig);
 	if (!g_state)
 		return;
 
@@ -80,94 +231,223 @@ static void interrupt_handler(int sig)
 	g_state->sig = sig;
 }
 
-static int init_state(struct server_state *state)
+static __cold struct server_state *alloc_state(void)
+{
+	struct server_state *state;
+
+#ifdef USE_ASAN
+	state = new struct server_state;
+	if (!state) {
+		errno = ENOMEM;
+		perror("new()");
+		return NULL;
+	}
+#else /* #ifdef USE_ASAN */
+	state = (struct server_state *)mmap(NULL, sizeof(*state),
+					    PROT_READ|PROT_WRITE,
+					    MAP_ANONYMOUS|MAP_PRIVATE, -1, 0);
+	if (state == MAP_FAILED) {
+		perror("mmap");
+		return NULL;
+	}
+	mlock(state, sizeof(*state));
+#endif /* #ifdef USE_ASAN */
+
+	return state;
+}
+
+static __cold void free_state(struct server_state *state)
+{
+#ifdef USE_ASAN
+	delete state;
+#else
+	munlock(state, sizeof(*state));
+	munmap(state, sizeof(*state));
+#endif
+}
+
+static __cold int setup_signal_handler(void)
 {
 	struct sigaction act;
-	uint32_t i;
 	int ret;
 
-	memset(state, 0, sizeof(*state));
-	state->tcp_fd = -1;
-	state->epl_fd = -1;
-	state->sig = -1;
-
 	memset(&act, 0, sizeof(act));
-	act.sa_handler = interrupt_handler;
+	act.sa_handler = signal_handler_func;
 	ret = sigaction(SIGINT, &act, NULL);
-	ret |= sigaction(SIGHUP, &act, NULL);
-	ret |= sigaction(SIGTERM, &act, NULL);
+	if (unlikely(ret))
+		goto err;
+	ret = sigaction(SIGHUP, &act, NULL);
+	if (unlikely(ret))
+		goto err;
+	ret = sigaction(SIGTERM, &act, NULL);
+	if (unlikely(ret))
+		goto err;
+
 	act.sa_handler = SIG_IGN;
-	ret |= sigaction(SIGPIPE, &act, NULL);
-	if (ret) {
-		fprintf(stderr, "Failed to set up the interrupt handler.\n");
+	ret = sigaction(SIGPIPE, &act, NULL);
+	if (unlikely(ret))
+		goto err;
+
+	return 0;
+err:
+	perror("sigaction");
+	return -errno;
+}
+
+static __cold int init_state(struct server_state **state_p)
+{
+	struct server_state *state;
+	uint32_t i;
+	int ret;
+
+	ret = setup_signal_handler();
+	if (unlikely(ret))
 		return ret;
-	}
 
-	state->sfi_lock = new std::mutex;
-	if (!state->sfi_lock) {
-		fprintf(stderr, "Cannot allocate mutex\n");
+	state = alloc_state();
+	if (unlikely(!state))
 		return -ENOMEM;
-	}
 
-	state->sess_free_idx = new __typeof__(*state->sess_free_idx);
-	if (!state->sess_free_idx) {
-		fprintf(stderr, "Cannot allocate sess_free_idx\n");
-		delete state->sfi_lock;
+	state->tcp_fd = -1;
+
+	state->sess_free = new wq_stack<uint32_t>(NR_MAX_CLIENTS);
+	if (unlikely(!state->sess_free)) {
+		errno = ENOMEM;
+		perror("state->sess_free = new()");
 		return -ENOMEM;
 	}
 
-	for (i = NR_MAX_CLIENTS - 1; i--; ) {
+	state->sess_free_lock = new std::mutex;
+	if (unlikely(!state->sess_free_lock)) {
+		errno = ENOMEM;
+		perror("state->sess_free_lock = new()");
+		goto out_err_sess_free_lock;
+	}
+
+	state->workers = new struct worker[NR_WORKERS];
+	if (unlikely(!state->workers)) {
+		ret = -ENOMEM;
+		errno = -ret;
+		perror("state->workers = new()");
+		goto out_err_workers;
+	}
+
+	i = NR_MAX_CLIENTS;
+	while (i--) {
 		state->sess[i].fd = -1;
 		state->sess[i].idx = i;
-		state->sess_free_idx->push(i);
+		state->sess_free->push(i);
+	}
+
+	i = NR_WORKERS;
+	while (i--)
+		state->workers[i].buf_queue = NULL;
+
+	i = NR_WORKERS;
+	while (i--) {
+		wq_queue<uint32_t> *p = new wq_queue<uint32_t>(NR_MAX_CLIENTS);
+		if (unlikely(!p))
+			goto out_err_wq_queue;
+		state->workers[i].idx = i;
+		state->workers[i].epl_fd = -1;
+		state->workers[i].state = state;
+		state->workers[i].need_join = false;
+		state->workers[i].buf_queue = p;
+	}
+
+	state->stop = false;
+	atomic_store(&state->nr_on_thread, 0u);
+	atomic_store(&state->wrk_idx_use, 0u);
+	*state_p = state;
+	g_state = state;
+	return 0;
+
+out_err_wq_queue:
+	while (i < NR_WORKERS) {
+		delete state->workers[i].buf_queue;
+		state->workers[i].buf_queue = NULL;
+		i++;
+	}
+out_err_workers:
+	delete state->sess_free_lock;
+	state->sess_free_lock = NULL;
+out_err_sess_free_lock:
+	delete state->sess_free;
+	state->sess_free = NULL;
+	return ret;
+}
+
+static __cold int set_socket_options(int tcp_fd)
+{
+	int val;
+	const void *y = (const void *)&val;
+	const char *on, *ov;
+	int ret;
+
+	val = 1;
+	ret = setsockopt(tcp_fd, SOL_SOCKET, SO_REUSEADDR, &y, sizeof(val));
+	if (ret < 0) {
+		on = "SOL_SOCKET";
+		ov = "SO_REUSEADDR";
+		goto out_err;
 	}
 
 	return 0;
+
+out_err:
+	ret = errno;
+	fprintf(stderr, "setsockopt(%d, %s, %s, ...): %s\n", tcp_fd, on, ov,
+		strerror(ret));
+	return -ret;
 }
 
-static int init_socket(struct server_state *state, const char *addr,
-		       uint16_t port)
+static __cold int init_socket(struct server_state *state)
 {
 	struct sockaddr_in saddr;
 	int tcp_fd;
 	int ret;
-	int y;
+
+	if (unlikely(!state->bind_addr)) {
+		fprintf(stderr, "Error: state->bind_addr is empty!\n");
+		return -EINVAL;
+	}
+
+	if (unlikely(!state->bind_port)) {
+		fprintf(stderr, "Error: state->bind_port is empty!\n");
+		return -EINVAL;
+	}
 
 	tcp_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
-	if (tcp_fd < 0) {
+	if (unlikely(tcp_fd < 0)) {
 		ret = -errno;
 		perror("socket");
 		return ret;
 	}
 
-	y = 1;
-	ret = setsockopt(tcp_fd, SOL_SOCKET, SO_REUSEADDR, (void *)&y, sizeof(y));
-	if (ret < 0) {
-		ret = -errno;
-		perror("setsockopt");
+	ret = set_socket_options(tcp_fd);
+	if (unlikely(ret))
 		goto out_err;
-	}
 
 	memset(&saddr, 0, sizeof(saddr));
 	saddr.sin_family = AF_INET;
-	saddr.sin_port = htons(port);
-	saddr.sin_addr.s_addr = inet_addr(addr);
+	saddr.sin_addr.s_addr = inet_addr(state->bind_addr);
+	saddr.sin_port = htons(state->bind_port);
 
 	ret = bind(tcp_fd, (struct sockaddr *)&saddr, sizeof(saddr));
-	if (ret < 0) {
+	if (unlikely(ret < 0)) {
 		ret = -errno;
 		perror("bind");
 		goto out_err;
 	}
 
-	ret = listen(tcp_fd, 1000);
-	if (ret < 0) {
+	ret = listen(tcp_fd, 4096);
+	if (unlikely(ret < 0)) {
 		ret = -errno;
 		perror("listen");
-		goto out_err;	
+		goto out_err;
 	}
 
-	printf("Listening on %s:%u...\n", addr, port);
+	printf("Listening on %s:%u...\n", state->bind_addr, state->bind_port);
 	state->tcp_fd = tcp_fd;
 	return 0;
 
@@ -176,178 +456,244 @@ out_err:
 	return ret;
 }
 
-static int init_epoll(struct server_state *state)
+static __cold void wait_for_ready_state(struct worker *worker)
 {
-	struct epoll_event evt;
-	int epl_fd;
-	int ret;
+	uint32_t i = 0;
+	uint32_t on;
 
-	epl_fd = epoll_create(255);
-	if (epl_fd < 0) {
-		ret = -errno;
-		perror("epoll_create");
-		return ret;
+	while (true) {
+		on = atomic_load(&worker->state->nr_on_thread);
+		if (on >= NR_WORKERS || worker->state->stop)
+			break;
+
+		usleep(1000);
+		if (i++ < 10000)
+			continue;
+
+		fprintf(stderr, "Timedout while waiting for ready state");
+		worker->state->stop = true;
+		return;
 	}
+}
+
+static int install_infd_to_worker(int fd, struct worker *worker,
+				  epoll_data_t data)
+{
+	struct epoll_event evt;
+	int ret;
 
 	memset(&evt, 0, sizeof(evt));
 	evt.events = EPOLLIN | EPOLLPRI;
-	ret = epoll_ctl(epl_fd, EPOLL_CTL_ADD, state->tcp_fd, &evt);
+	evt.data = data;
+	ret = epoll_ctl(worker->epl_fd, EPOLL_CTL_ADD, fd, &evt);
 	if (ret < 0) {
 		ret = -errno;
 		perror("epoll_ctl");
 		return ret;
 	}
-
-	state->epl_fd = epl_fd;
-	return 0;
-}
-
-static void put_sess_idx(uint32_t idx, struct server_state *state)
-{
-	state->sfi_lock->lock();
-	state->sess_free_idx->push(idx);
-	state->sfi_lock->unlock();
+	return ret;
 }
 
 static int64_t get_sess_idx(struct server_state *state)
 {
-	int64_t ret;
-	state->sfi_lock->lock();
-	if (unlikely(state->sess_free_idx->empty())) {
-		state->sfi_lock->unlock();
+	int64_t idx;
+
+	state->sess_free_lock->lock();
+	if (unlikely(state->sess_free->empty())) {
+		state->sess_free_lock->unlock();
 		return -EAGAIN;
 	}
-	ret = state->sess_free_idx->top();
-	state->sess_free_idx->pop();
-	state->sfi_lock->unlock();
-	return ret;
+	idx = (int64_t)state->sess_free->pop();
+	state->sess_free_lock->unlock();
+	return idx;
 }
 
-static int delete_client(int epl_fd, struct client_sess *sess)
+static void put_sess_idx(uint32_t idx, struct server_state *state)
+{
+	state->sess_free_lock->lock();
+	state->sess_free->push(idx);
+	state->sess_free_lock->unlock();
+}
+
+static int uninstall_fd_from_worker(int fd, struct worker *worker)
 {
 	int ret;
 
-	ret = epoll_ctl(epl_fd, EPOLL_CTL_DEL, sess->fd, NULL);
+	ret = epoll_ctl(worker->epl_fd, EPOLL_CTL_DEL, fd, NULL);
 	if (ret < 0) {
 		ret = -errno;
 		perror("epoll_ctl");
 		return ret;
 	}
-	return 0;
+	return ret;
 }
 
-static int register_new_client(int epl_fd, struct client_sess *sess)
+static void close_sess(struct client_sess *sess, struct worker *worker)
 {
-	struct epoll_event evt;
-	int ret;
+	printf("Closing session " FCIP " (idx = %u)\n", FCIP_ARG(sess),
+		sess->idx);
 
-	memset(&evt, 0, sizeof(evt));
-	evt.events = EPOLLIN | EPOLLPRI;
-	evt.data.ptr = (void *)sess;
-	ret = epoll_ctl(epl_fd, EPOLL_CTL_ADD, sess->fd, &evt);
-	if (ret < 0) {
-		ret = -errno;
-		perror("epoll_ctl");
-		return ret;
+	if (sess->need_epl_del)
+		uninstall_fd_from_worker(sess->fd, worker);
+
+	close(sess->fd);
+	sess->fd = -1;
+	sess->action = HTTP_ACT_NONE;
+	if (sess->http_headers) {
+		delete sess->http_headers;
+		sess->http_headers = NULL;
 	}
-	return 0;
+	put_sess_idx(sess->idx, worker->state);
 }
 
-static int handle_new_client(struct server_state *state)
-{	
+static int _handle_new_client(int tcp_fd, struct worker *worker)
+{
+	struct server_state *state = worker->state;
 	struct sockaddr_in caddr;
 	socklen_t addrlen = sizeof(caddr);
 	struct client_sess *sess;
+	epoll_data_t epld;
+	uint32_t wrk_idx;
 	int64_t idx;
 	int cli_fd;
 	int ret;
 
 	memset(&caddr, 0, sizeof(caddr));
-	cli_fd = accept(state->tcp_fd, (struct sockaddr *)&caddr, &addrlen);
+	cli_fd = accept(tcp_fd, (struct sockaddr *)&caddr, &addrlen);
 	if (unlikely(cli_fd < 0)) {
 		ret = errno;
-		perror("accept");
-		if (ret == EAGAIN || ret == EMFILE)
-			return 0;
+		if (ret != EAGAIN)
+			perror("accept");
 		return -ret;
 	}
 
 	idx = get_sess_idx(state);
 	if (unlikely(idx == -EAGAIN)) {
 		close(cli_fd);
-		fprintf(stderr, "Cannot handle a new client\n");
-		return 0;
+		fprintf(stderr, "Client session is full, cannot handle a new "
+			"client\n");
+		return -EAGAIN;
 	}
 
 	sess = &state->sess[idx];
 	sess->fd = cli_fd;
-	sess->buf_size = 0;
+	sess->rbuf_len = 0;
 	sess->got_http_header = false;
-	sess->addr = caddr;
+	sess->action = HTTP_ACT_NONE;
+	sess->method = HTTP_NOP;
 
-	sess->src_addr[0] = '\0';
-	sess->src_port = 0;
 	if (addrlen <= sizeof(caddr)) {
 		const char *p;
-		sess->src_port = ntohs(caddr.sin_port);
 		p = inet_ntop(AF_INET, &caddr.sin_addr.s_addr, sess->src_addr,
 			      sizeof(sess->src_addr));
-		if (!p) {
+		if (p)
+			sess->src_port = ntohs(caddr.sin_port);
+		else
 			perror("inet_ntop");
-			fprintf(stderr, "inet_ntop() error?\n");
-		}
 	} else {
+		sess->src_addr[0] = '\0';
+		sess->src_port = 0;
 		fprintf(stderr, "Warning, accept overflow!\n");
 	}
 
-	ret = register_new_client(state->epl_fd, sess);
+	epld.ptr = (void *)sess;
+	wrk_idx = atomic_fetch_add(&state->wrk_idx_use, 1u) % NR_WORKERS;
+	ret = install_infd_to_worker(cli_fd, &state->workers[wrk_idx], epld);
 	if (unlikely(ret < 0))
 		return ret;
 
+	sess->need_epl_del = true;
 	printf("Got a new client (idx = %u) " FCIP "\n", sess->idx,
 		FCIP_ARG(sess));
 	return 0;
 }
 
-static ssize_t send_to_client(struct client_sess *sess, const char *buf,
-			      size_t len)
+static __hot int handle_new_client(struct worker *worker)
+{
+	int tcp_fd = worker->state->tcp_fd;
+	uint32_t acc_iter = 512;
+	int ret;
+
+	while (acc_iter--) {
+		ret = _handle_new_client(tcp_fd, worker);
+		if (likely(!ret))
+			continue;
+
+		if (likely(ret == -EAGAIN) || unlikely(ret == -EMFILE))
+			return 0;
+
+		return ret;
+	}
+	return 0;
+}
+
+static int poll_wait_for_fd_be_writable(int fd)
+{
+	struct pollfd fds[1];
+	int ret;
+
+	fds[0].fd = fd;
+	fds[0].events = POLLOUT;
+	fds[0].revents = 0;
+
+	ret = poll(fds, 1, -1);
+	if (unlikely(ret < 0)) {
+		ret = errno;
+		if (ret != EINTR)
+			return -ret;
+	}
+	return 0;
+}
+
+static ssize_t send_to_sess(struct client_sess *sess, const char *buf,
+			    size_t len)
 {
-	constexpr uint32_t max_try = 10;
-	uint32_t try_count = 0;
+	const char *bptr = buf;
+	ssize_t total_sent = 0;
+	size_t blen = len;
+	int fd = sess->fd;
 	ssize_t ret;
-	int tmp;
 
 repeat:
-	if (unlikely(try_count++ >= max_try))
-		return -ENETDOWN;
-	ret = send(sess->fd, buf, len, MSG_DONTWAIT);
+	ret = send(fd, bptr, blen, MSG_DONTWAIT);
 	if (unlikely(ret < 0)) {
-		tmp = errno;
-		if (tmp == EAGAIN)
-			goto repeat;
-		perror("send");
-		return -tmp;
-	} else if (unlikely(ret == 0)) {
-		return -ENETDOWN;
-	} else if (unlikely((size_t)ret < len)) {
-		buf = &buf[len];
-		len -= (size_t)ret;
+		int ret = errno;
+		if (unlikely(ret != EAGAIN)) {
+			perror("send");
+			return -ret;
+		}
+
+		ret = poll_wait_for_fd_be_writable(fd);
+		if (unlikely(ret))
+			return ret;
+
 		goto repeat;
 	}
-	return ret;
+
+	total_sent += ret;
+	if (unlikely((size_t)total_sent < len)) {
+		bptr = &buf[total_sent];
+		blen = len - (size_t)total_sent;
+		goto repeat;
+	}
+
+	return total_sent;
 }
 
-static void send_http_error(int code, struct client_sess *sess)
+static ssize_t send_http_error(struct client_sess *sess, int code,
+			       const char *errstr)
 {
 	char buf[128];
 	int tmp;
 
 	tmp = snprintf(buf, sizeof(buf),
-			"HTTP/1.1 %d\r\n"
+			"HTTP/1.1 %d%s%s\r\n"
+			"Connection: closed\r\n"
 			"Content-Type: text/plain\r\n\r\n"
 			"HTTP Error %d",
-			code, code);
-	send_to_client(sess, buf, (size_t)tmp);
+			code, (errstr ? " " : ""), (errstr ? errstr : ""),
+			code);
+	return send_to_sess(sess, buf, (size_t)tmp);
 }
 
 static char *parse_http_method(char *buf, enum http_method *method_p)
@@ -385,20 +731,21 @@ static char *parse_query_string(char *uri, char *end_uri)
 	while (uri < end_uri) {
 		if (*uri++ != '?')
 			continue;
+		uri[-1] = '\0';
 		if (uri == end_uri)
 			/*
 			 * We got an empty query string:
 			 *  "http://somehwere.com/path?"
 			 */
 			return NULL;
-		return uri;
+		return uri[0] ? uri : NULL;
 	}
 	return NULL;
 }
 
 static int parse_http_header(struct client_sess *sess)
 {
-	char *buf = sess->buf;
+	char *buf = sess->recv_buf;
 	char *end;
 	char *ret;
 
@@ -412,7 +759,7 @@ static int parse_http_header(struct client_sess *sess)
 		 * Don't fail here if still have enough buffer,
 		 * we will wait for the next recv() iteration.
 		 */
-		if (sess->buf_size >= sizeof(sess->buf) - 1)
+		if (sess->rbuf_len >= sizeof(sess->recv_buf) - 1)
 			goto bad_req;
 
 		return 0;
@@ -435,6 +782,8 @@ static int parse_http_header(struct client_sess *sess)
 	 *       ^
 	 *     @ret
 	 */
+	if (unlikely(ret[0] != '/'))
+		goto bad_req;
 	sess->uri = ret;
 	ret = strstr(sess->uri, " ");
 	if (unlikely(!ret))
@@ -455,138 +804,715 @@ static int parse_http_header(struct client_sess *sess)
 		goto bad_req;
 
 	ret[0] = '\0';
+	sess->header = &ret[1];
 	sess->got_http_header = true;
 	return 0;
 
 bad_req:
-	send_http_error(400, sess);
+	send_http_error(sess, 400, "Bad Request");
 	return -EBADMSG;
 }
 
-static void close_sess(struct client_sess *sess, struct server_state *state)
+static ssize_t http_redirect(struct client_sess *sess, const char *location)
 {
-	printf("Closing session " FCIP " (idx = %u)\n", FCIP_ARG(sess),
-		sess->idx);
-	delete_client(state->epl_fd, sess);
-	close(sess->fd);
-	put_sess_idx(sess->idx, state);
-}
+	size_t need_len;
+	ssize_t ret;
+	char *buf;
+	int len;
 
-#define HTTP_200_HTML "HTTP/1.1 200\r\nContent-Type: text/html\r\n\r\n"
-#define HTTP_200_TEXT "HTTP/1.1 200\r\nContent-Type: text/plain\r\n\r\n"
+	need_len = strlen(location) * 2 + 256;
+	buf = new char[need_len];
+	if (unlikely(!buf))
+		return -ENOMEM;
 
-static int route_show_index(struct client_sess *sess,
-			    struct server_state *state)
-{
-	static const char buf[] =
-		HTTP_200_HTML
-		"<!DOCTYPE html>"
-		"<html>"
-			"<body>"
-				"<h1>This is the index!</h1>"
-			"</body>"
-		"</html>";
+	len = snprintf(buf, need_len,
+			"HTTP/1.1 302\r\n"
+			"Connection: closed\r\n"
+			"Location: %s\r\n\r\nYou are redirected to %s\n\n",
+			location, location);
 
-	send_to_client(sess, buf, sizeof(buf) - 1);
-	close_sess(sess, state);
-	return 0;
+	ret = send_to_sess(sess, buf, (size_t)len);
+	delete[] buf;
+	return ret;
 }
 
-static int route_show_hello(struct client_sess *sess,
-			    struct server_state *state)
+static void not_found_or_forbidden(int e, struct client_sess *sess)
 {
-	static const char buf[] =
-		HTTP_200_HTML
-		"<!DOCTYPE html>"
-		"<html>"
-			"<body>"
-				"<h1>Hello World!</h1>"
-			"</body>"
-		"</html>";
-
-	send_to_client(sess, buf, sizeof(buf) - 1);
-	close_sess(sess, state);
-	return 0;
+	if (e == ENOENT)
+		send_http_error(sess, 404, "Not Found");
+	else
+		send_http_error(sess, 403, "Forbidden");
 }
 
-static int handle_route_get(struct client_sess *sess,
-			    struct server_state *state)
+static int parse_http_header2(struct client_sess *sess)
 {
-	const char *uri = sess->uri;
+	char *hdr = sess->header + 1;
+
+	if (!sess->http_headers)
+		sess->http_headers = new __typeof__(*sess->http_headers);
+
+	while (1) {
+		char *keyval, *key, *val, *it;
+
+		key = hdr;
+		keyval = strstr(key, "\r\n");
+		if (!keyval)
+			break;
+		keyval[0] = '\0';
+
+		hdr = &keyval[2];
+		val = strstr(key, ":");
+		if (!val)
+			continue;
+		val[0] = '\0';
+		val++;
+
+		while (*val == ' ')
+			val++;
+
+		it = key;
+		while (*it) {
+			*it = tolower((unsigned char)*it);
+			it++;
+		}
 
-	if (!strcmp(uri, "/"))
-		return route_show_index(sess, state);
-	if (!strcmp(uri, "/hello"))
-		return route_show_hello(sess, state);
+		sess->http_headers->emplace(key, val);
+	}
 
-	send_http_error(404, sess);
-	close_sess(sess, state);
 	return 0;
 }
 
-static int route_show_echo(struct client_sess *sess, struct server_state *state)
+static char *open_file_for_stream(const char *file, struct stat *st,
+				  struct client_sess *sess,
+				  struct stream_file_data *sfd)
 {
-	char buf[4096] = HTTP_200_TEXT;
-	constexpr size_t max_body_len = sizeof(buf) - sizeof(HTTP_200_TEXT);
+	char *map;
+	int ret;
+	int fd;
+
+	fd = open(file, O_RDONLY);
+	if (unlikely(fd < 0)) {
+		fd = errno;
+		not_found_or_forbidden(fd, sess);
+		fprintf(stderr, "Cannot open file: \"%s\": %s\n", file,
+			strerror(fd));
+		return NULL;
+	}
 
-	size_t header_size = (size_t)(sess->body - sess->buf);
-	size_t body_len = sess->buf_size - header_size;
-	size_t send_len;
+	ret = fstat(fd, st);
+	if (unlikely(ret < 0)) {
+		ret = errno;
+		not_found_or_forbidden(ret, sess);
+		close(fd);
+		fprintf(stderr, "Cannot stat file: \"%s\": %s\n", file,
+			strerror(ret));
+		return NULL;
+	}
 
-	if (body_len > max_body_len)
-		body_len = max_body_len;
+	map = (char *)mmap(NULL, st->st_size, PROT_READ, MAP_SHARED, fd, 0);
+	close(fd);
+	if (unlikely(map == MAP_FAILED)) {
+		ret = errno;
+		not_found_or_forbidden(ret, sess);
+		fprintf(stderr, "Cannot map file: \"%s\": %s\n", file,
+			strerror(ret));
+		return NULL;
+	}
 
-	send_len = body_len + sizeof(HTTP_200_TEXT) - 1;
-	memcpy(&buf[sizeof(HTTP_200_TEXT) - 1], sess->body, body_len);
-	send_to_client(sess, buf, send_len);
-	close_sess(sess, state);
-	return 0;
+	sfd->size = st->st_size;
+	sfd->map = map;
+	sfd->cur_off = 0;
+	return map;
 }
 
-static int handle_route_post(struct client_sess *sess,
-			     struct server_state *state)
+static int send_http_header_for_stream_file(struct client_sess *sess,
+					    off_t start_offset,
+					    off_t content_length)
 {
-	const char *uri = sess->uri;
+	char buf[1024];
+	int ret;
 
-	if (!strcmp(uri, "/echo"))
-		return route_show_echo(sess, state);
+	ret = snprintf(buf, sizeof(buf),
+			"HTTP/1.1 %s\r\n"
+			"Content-Type: text/plain\r\n"
+			"Content-Range: bytes %lu-%lu/%lu\r\n"
+			"Content-Length: %zu\r\n\r\n",
+			start_offset ? "206 Partial Content" : "200 OK",
+			start_offset, content_length - 1, content_length,
+			content_length);
+
+	ret = send_to_sess(sess, buf, (size_t)ret);
+	if (unlikely(ret < 0))
+		return -EBADMSG;
 
-	send_http_error(404, sess);
-	close_sess(sess, state);
 	return 0;
 }
 
-static int handle_route(struct client_sess *sess, struct server_state *state)
+static int stream_file_once(struct stream_file_data *sfd,
+			    struct client_sess *sess)
 {
+	size_t send_len;
 	int ret;
 
-	switch (sess->method) {
+	if (unlikely(sfd->cur_off >= sfd->size))
+		return 1;
+
+	send_len = (size_t)(sfd->size - sfd->cur_off);
+	ret = send_to_sess(sess, &sfd->map[sfd->cur_off], send_len);
+	if (unlikely(ret < 0))
+		return -EBADMSG;
+
+	return 1;
+}
+
+constexpr off_t max_send_len_file = 1024 * 128;
+
+static int stream_file_loop(struct stream_file_data *sfd,
+			    struct client_sess *sess,
+			    struct worker *worker)
+{
+	size_t send_len;
+	int ret;
+
+	if (unlikely(sfd->cur_off >= sfd->size)) {
+		ret = 1;
+		goto out;
+	}
+
+	if (unlikely(!sess->priv_data)) {
+		struct stream_file_data *sfd_h;
+
+		sfd_h = new struct stream_file_data;
+		if (unlikely(!sfd_h))
+			return -ENOMEM;
+
+		*sfd_h = *sfd;
+		sfd = sfd_h;
+		sess->priv_data = (void *)sfd_h;
+		madvise(sfd->map, sfd->size, MADV_SEQUENTIAL);
+		send_len = max_send_len_file;
+	} else {
+		send_len = (size_t)(sfd->size - sfd->cur_off);
+		if (send_len > max_send_len_file)
+			send_len = max_send_len_file;
+	}
+
+	ret = send_to_sess(sess, &sfd->map[sfd->cur_off], send_len);
+	if (unlikely(ret < 0)) {
+		ret = -EBADMSG;
+		goto out;
+	}
+	sfd->cur_off += (off_t)ret;
+	ret = 1;
+
+	if (sfd->cur_off < sfd->size) {
+		sess->action = HTTP_ACT_FILE_STREAM;
+		worker->buf_queue->push(sess->idx);
+		return 0;
+	}
+
+out:
+	munmap(sfd->map, sfd->size);
+	if (likely(sess->priv_data)) {
+		delete (struct stream_file_data *)sess->priv_data;
+		sess->priv_data = NULL;
+	}
+
+	return ret;
+}
+
+static void handle_range_header(struct client_sess *sess, off_t *start_offset_p)
+{
+	parse_http_header2(sess);
+
+	const auto &http_headers = *sess->http_headers;
+	auto it = http_headers.find("range");
+	if (it != http_headers.end()) {
+		const char *val = it->second.c_str();
+		const char *start_bytes;
+
+		start_bytes = strstr(val, "bytes=");
+		if (!start_bytes)
+			return;
+
+		start_bytes += 6;
+		*start_offset_p = strtoll(start_bytes, NULL, 10);
+	}
+}
+
+static void stream_file_bad_req(struct client_sess *sess, const char *msg)
+{
+	char buf[512];
+	int ret;
+
+	ret = snprintf(buf, sizeof(buf),
+			"HTTP/1.1 400 Bad Request\r\n"
+			"Connection: closed\r\n"
+			"Content-Type: text/plain\r\n\r\n"
+			"%s\n\n", msg);
+
+	send_to_sess(sess, buf, (size_t)ret);
+}
+
+static int start_stream_file(const char *file, struct client_sess *sess,
+			     struct worker *worker)
+{
+	struct stream_file_data sfd;
+	off_t start_offset = 0;
+	struct stat st;
+	char *map;
+	int ret;
+
+	map = open_file_for_stream(file, &st, sess, &sfd);
+	if (unlikely(!map))
+		return -EBADMSG;
+
+	handle_range_header(sess, &start_offset);
+	if (start_offset >= sfd.size || start_offset < 0) {
+		stream_file_bad_req(sess, "Bad range offset!");
+		return -EBADMSG;
+	}
+
+	sfd.cur_off = start_offset;
+	ret = send_http_header_for_stream_file(sess, start_offset, st.st_size);
+	if (unlikely(ret))
+		return -EBADMSG;
+
+	if (st.st_size <= max_send_len_file) {
+		ret = stream_file_once(&sfd, sess);
+		munmap(sfd.map, sfd.size);
+	} else {
+		ret = stream_file_loop(&sfd, sess, worker);
+	}
+
+	return ret;
+}
+
+static int stream_file(const char *file, struct client_sess *sess,
+		       struct worker *worker)
+{
+	int ret;
+
+	if (sess->action == HTTP_ACT_NONE) {
+		ret = start_stream_file(file, sess, worker);
+		if (unlikely(ret))
+			return ret;
+		if (sess->action == HTTP_ACT_FILE_STREAM)
+			return 0;
+	}
+	return 1;
+}
+
+#if 0
+static size_t htmlspecialchars(char *_output, size_t outlen, const char *_input,
+			       size_t inlen)
+{
+	struct html_char_map {
+		const char	to[8];
+		const uint8_t	len;
+	};
+
+	static const struct html_char_map html_map[0x100u] = {
+		['<'] = {"&lt;",	4},
+		['>'] = {"&gt;",	4},
+		['"'] = {"&quot;",	6},
+		['&'] = {"&amp;",	5},
+	};
+
+
+	size_t j = 0;
+	uint8_t len = 0;
+	unsigned char *output = (unsigned char *)_output;
+	const unsigned char *input  = (const unsigned char *)_input;
+	const unsigned char *in_end = input + inlen;
+
+	while (likely(input < in_end)) {
+		const unsigned char *cp;
+		const struct html_char_map *map_to = &html_map[(size_t)*input];
+
+		if (likely(*map_to->to == '\0')) {
+			cp  = input;
+			len = 1;
+		} else {
+			cp  = (const unsigned char *)map_to->to;
+			len = map_to->len;
+		}
+
+		if (unlikely((j + len - 1) >= outlen))
+			break;
+
+		memcpy(&output[j], cp, len);
+		j += len;
+		input++;
+	}
+
+	if (likely(outlen > 0)) {
+		if (unlikely((j + 1) > outlen))
+			j -= len;
+		output[++j] = '\0';
+	}
+
+	return j;
+}
+#endif
+
+static int redirect_on_no_trailing_slash(const char *path,
+					 struct client_sess *sess)
+{
+	size_t len;
+	char *buf;
+	char *qs;
+
+	len = strlen(path);
+	if (path[len - 1] == '/')
+		return 0;
+
+	qs = sess->qs;
+	if (qs)
+		len += strlen(qs);
+
+	len += 16;
+	buf = new char[len];
+	if (unlikely(!buf))
+		return -ENETDOWN;
+
+	if (qs)
+		snprintf(buf, len, "%s/?%s", path, qs);
+	else
+		snprintf(buf, len, "%s/", path);
+
+	http_redirect(sess, buf);
+	delete[] buf;
+	return 1;
+}
+
+static int stream_dir_list_open(const char *path, struct client_sess *sess,
+				DIR **dir_p)
+{
+	DIR *dir;
+	int ret;
+
+	dir = opendir(path);
+	if (unlikely(!dir)) {
+		ret = errno;
+		if (ret == ENOENT)
+			send_http_error(sess, 404, "Not Found");
+		else
+			send_http_error(sess, 403, "Forbidden");
+
+		return -EBADMSG;
+	}
+	*dir_p = dir;
+	return 0;
+}
+
+static int send_init_payload_for_stream_dir_list(struct client_sess *sess)
+{
+	constexpr static const char buf[] =
+		"HTTP/1.1 200\r\n"
+		"Content-Type: text/html\r\n"
+		"Connection: closed\r\n\r\n"
+		"<!DOCTYPE html>\n"
+		"<html>\n"
+		"<style type=\"text/css\">"
+			"td {padding: 10px;}"
+			"a {color: blue; text-decoration: none}"
+			"a:hover {text-decoration: underline}"
+		"</style>"
+		"<body>\n"
+		"\t<h1>GNU/Weeb HTTP Server</h1>\n"
+		"\t<table border=\"1\">\n"
+		"\t\t<tr>"
+			"<th>Filename</th>"
+			"<th>Type</th>"
+			"<th>Mode</th>"
+		"</tr>\n";
+
+	int ret;
+
+	ret = send_to_sess(sess, buf, sizeof(buf) - 1);
+	if (unlikely(ret < 0))
+		return -EBADMSG;
+
+	return 0;
+}
+
+static int construct_file_row(const char *path, const char *file, char **pbuf_p,
+			      size_t *capacity_p)
+{
+	char fpath[4096 + 256];
+	const char *type;
+	struct stat st;
+	int ret;
+
+	snprintf(fpath, sizeof(fpath), "%s/%s", path, file);
+
+	ret = stat(fpath, &st);
+	if (unlikely(ret < 0)) {
+		ret = errno;
+		fprintf(stderr, "Can't stat \"%s\": %s\n", fpath, strerror(ret));
+		return -ret;
+	}
+
+	if (S_ISDIR(st.st_mode))
+		type = "Directory";
+	else if (S_ISREG(st.st_mode))
+		type = "Regular File";
+	else if (S_ISCHR(st.st_mode))
+		type = "Char dev";
+	else if (S_ISBLK(st.st_mode))
+		type = "Block dev";
+	else if (S_ISFIFO(st.st_mode))
+		type = "FIFO";
+	else if (S_ISLNK(st.st_mode))
+		type = "Symlink";
+	else if (S_ISSOCK(st.st_mode))
+		type = "Socket";
+	else
+		return -ENOTSUP;
+
+	ret = snprintf(*pbuf_p, *capacity_p,
+		"\t\t<tr>"
+			"<td><a href=\"%s%s\">%s</a></td>"
+			"<td>%s</td>"
+			"<td>%d%d%d%d</td>"
+		"</tr>\n",
+		file, S_ISDIR(st.st_mode) ? "/" : "", file,
+		type,
+		(st.st_mode & 07000) >> 9,
+		(st.st_mode & 00700) >> 6,
+		(st.st_mode & 00070) >> 3,
+		(st.st_mode & 00007)
+	);
+	*pbuf_p += (size_t)ret;
+	*capacity_p -= (size_t)ret;
+	return 0;
+}
+
+static int _stream_dir_list_loop(struct stream_dir_list_data *sdld,
+				 char **pbuf_p, size_t *capacity_p)
+{
+	struct dirent *de;
+	const char *f;
+	int ret;
+
+	de = readdir(sdld->dir);
+	if (unlikely(!de))
+		return -1;
+
+	f = de->d_name;
+	if (unlikely(!strcmp(f, ".") || !strcmp(f, "..")))
+		return 0;
+
+	ret = construct_file_row(sdld->path, f, pbuf_p, capacity_p);
+	if (unlikely(ret > 0))
+		return 0;
+
+	return 0;
+}
+
+static int stream_dir_list_queue(struct stream_dir_list_data *sdld_s,
+				 struct client_sess *sess,
+				 struct worker *worker)
+{
+	if (unlikely(!sess->priv_data)) {
+		struct stream_dir_list_data *sdld;
+
+		sdld = new struct stream_dir_list_data;
+		if (unlikely(!sdld))
+			return -EBADMSG;
+
+		*sdld = *sdld_s;
+		sess->priv_data = (void *)sdld;
+	}
+
+	sess->action = HTTP_ACT_DIRLIST;
+	worker->buf_queue->push(sess->idx);
+	return 0;
+}
+
+constexpr static const char dir_list_foot[] =
+	"\t</table>\n</body>\n</html>\n";
+
+static int send_footer_dir_list(struct client_sess *sess)
+{
+	int ret;
+
+	ret = send_to_sess(sess, dir_list_foot, sizeof(dir_list_foot) - 1);
+	if (unlikely(ret < 0))
+		return ret;
+
+	return 0;
+}
+
+static int stream_dir_list_loop(struct stream_dir_list_data *sdld,
+				struct client_sess *sess,
+				struct worker *worker)
+{
+	bool need_send_footer;
+	char buf[1024 * 128];
+	size_t capacity = sizeof(buf) - 1;
+	size_t send_len;
+	char *pbuf = buf;
+	int ret;
+
+	while (1) {
+
+		if (unlikely(capacity < 8192)) {
+			send_len = sizeof(buf) - 1 - capacity;
+			ret = send_to_sess(sess, buf, send_len);
+			if (unlikely(ret < 0))
+				goto out_close;
+
+			ret = stream_dir_list_queue(sdld, sess, worker);
+			if (unlikely(ret))
+				goto out_close;
+
+			/*
+			 * We are still in the queue loop.
+			 */
+			return 0;
+		}
+
+		ret = _stream_dir_list_loop(sdld, &pbuf, &capacity);
+		if (unlikely(ret))
+			break;
+	}
+
+	if (capacity > sizeof(dir_list_foot) - 1) {
+		capacity -= sizeof(dir_list_foot) - 1;
+		memcpy(pbuf, dir_list_foot, sizeof(dir_list_foot) - 1);
+		need_send_footer = false;
+	} else {
+		need_send_footer = true;
+	}
+
+	send_len = sizeof(buf) - 1 - capacity;
+	if (send_len > 0) {
+		ret = send_to_sess(sess, buf, send_len);
+		if (unlikely(ret < 0))
+			goto out_close;
+	}
+
+	if (need_send_footer) {
+		ret = send_footer_dir_list(sess);
+		if (unlikely(ret))
+			goto out_close;
+	}
+
+	ret = 1;
+
+out_close:
+	closedir(sdld->dir);
+	if (sess->priv_data) {
+		delete (struct stream_dir_list_data *)sess->priv_data;
+		sess->priv_data = NULL;
+	}
+
+	return ret;
+}
+
+static int stream_dir_list(const char *path, struct client_sess *sess,
+			   struct worker *worker)
+{
+	struct stream_dir_list_data sdld;
+	int ret;
+
+	/*
+	 * If we are going to list the directory, the URI path must be
+	 * ended with a trailing slash. We it's not, put the a trailing
+	 * slash by redirecting the client.
+	 */
+	ret = redirect_on_no_trailing_slash(path, sess);
+	if (ret)
+		return ret;
+
+	ret = stream_dir_list_open(path, sess, &sdld.dir);
+	if (unlikely(ret))
+		return ret;
+
+	ret = send_init_payload_for_stream_dir_list(sess);
+	if (unlikely(ret))
+		return ret;
+
+	snprintf(sdld.path, sizeof(sdld.path), "%s", path);
+	return stream_dir_list_loop(&sdld, sess, worker);
+}
+
+static int handle_route_get(struct client_sess *sess, struct worker *worker)
+{
+	char path[4096 + 128];
+	struct stat st;
+	int ret;
+
+	if (likely(sess->need_epl_del)) {
+		sess->need_epl_del = false;
+		uninstall_fd_from_worker(sess->fd, worker);
+	}
+
+	snprintf(path, sizeof(path), "./%s", sess->uri);
+
+	/*
+	 * Don't allow to step up to the parent directory.
+	 */
+	if (unlikely(strstr(path, "/.."))) {
+		send_http_error(sess, 404, "Not Found");
+		return -EBADMSG;
+	}
+
+	ret = stat(path, &st);
+	if (unlikely(ret < 0)) {
+		ret = errno;
+		fprintf(stderr, "Can't stat \"%s\": %s\n", path, strerror(ret));
+
+		if (ret == ENOENT)
+			send_http_error(sess, 404, "Not Found");
+		else
+			send_http_error(sess, 403, "Forbidden");
+
+		return -EBADMSG;
+	}
+
+	if (S_ISDIR(st.st_mode))
+		return stream_dir_list(path, sess, worker);
+
+	if (S_ISREG(st.st_mode))
+		return stream_file(path, sess, worker);
+
+	return -EBADMSG;
+}
+
+static int handle_route(struct client_sess *sess, struct worker *worker)
+{
+	int ret;
+
+	switch (sess->method) {
 	case HTTP_GET:
-		ret = handle_route_get(sess, state);
+		ret = handle_route_get(sess, worker);
 		break;
 	case HTTP_POST:
-		ret = handle_route_post(sess, state);
-		break;
+	case HTTP_DELETE:
+	case HTTP_PATCH:
+	case HTTP_PUT:
 	default:
-		send_http_error(405, sess);
-		close_sess(sess, state);
-		return 0;
+		send_http_error(sess, 405, "Method not allowed");
+		return -EBADMSG;
 	}
 
 	return ret;
 }
 
-static int _handle_client(struct client_sess *sess, struct server_state *state)
+static int _handle_client(struct client_sess *sess, struct worker *worker)
 {
 	int ret = 0;
 
-	sess->buf[sess->buf_size] = '\0';
 	if (!sess->got_http_header) {
+		sess->recv_buf[sess->rbuf_len] = '\0';
 		ret = parse_http_header(sess);
-		if (ret)
+		if (unlikely(ret))
+			goto out;
+		if (unlikely(!sess->got_http_header))
 			goto out;
-		if (!sess->got_http_header)
-			return 0;
 	}
 
 #if 0
@@ -595,68 +1521,107 @@ static int _handle_client(struct client_sess *sess, struct server_state *state)
 	printf("HTTP version: %s\n", sess->http_ver);
 #endif
 
-	ret = handle_route(sess, state);
+	ret = handle_route(sess, worker);
 out:
 	if (ret) {
-		close_sess(sess, state);
-		if (likely(ret == -EBADMSG || ret == -ENETDOWN))
+		close_sess(sess, worker);
+		if (likely(ret == -EBADMSG || ret == -ENETDOWN ||
+			   ret == -ECONNRESET || ret == -EPIPE ||
+			   ret == 1))
 			ret = 0;
 	}
 	return ret;
 }
 
-static int handle_client(struct client_sess *sess, struct server_state *state)
+static int handle_client(struct client_sess *sess, struct worker *worker)
 {
 	ssize_t recv_ret;
 	size_t len;
 	char *buf;
 	int ret;
 
-	len = sizeof(sess->buf) - sess->buf_size - 1;
-	buf = &sess->buf[sess->buf_size];
+	len = sizeof(sess->recv_buf) - 1 - sess->rbuf_len;
+	buf = &sess->recv_buf[sess->rbuf_len];
 
 	recv_ret = recv(sess->fd, buf, len, 0);
-	if (unlikely(recv_ret < 0)) {
+	if (unlikely(recv_ret <= 0)) {
+
+		if (recv_ret == 0) {
+			close_sess(sess, worker);
+			return 0;
+		}
+
 		ret = errno;
 		if (ret == EAGAIN)
 			return 0;
-		perror("recv");
-		return -ret;
-	}
 
-	if (unlikely(recv_ret == 0 && len != 0)) {
-		close_sess(sess, state);
+		close_sess(sess, worker);
+		perror("recv");
 		return 0;
 	}
+	sess->rbuf_len += (size_t)recv_ret;
+	return _handle_client(sess, worker);
+}
+
+static __hot int handle_event(struct epoll_event *event,
+			      struct worker *worker)
+{
+	struct client_sess *sess;
 
-	sess->buf_size += (size_t)recv_ret;
-	return _handle_client(sess, state);
+	if (!event->data.ptr)
+		return handle_new_client(worker);
+
+	sess = (struct client_sess *)event->data.ptr;
+	return handle_client(sess, worker);
 }
 
-static int handle_event(struct epoll_event *event, struct server_state *state)
+static __hot int handle_buf_queue(uint32_t idx, struct worker *worker)
 {
+	struct server_state *state = worker->state;
+	struct client_sess *sess = &state->sess[idx];
 	int ret = 0;
 
-	if (event->data.fd == 0) {
-		ret = handle_new_client(state);
-	} else {
-		struct client_sess *sess =
-			(struct client_sess *)event->data.ptr;
+	switch (sess->action) {
+	case HTTP_ACT_NONE:
+		ret = -ENETDOWN;
+		break;
+	case HTTP_ACT_DIRLIST: {
+		struct stream_dir_list_data *sdld;
 
-		ret = handle_client(sess, state);
+		sdld = (struct stream_dir_list_data *)sess->priv_data;
+		ret = stream_dir_list_loop(sdld, sess, worker);
+		break;
 	}
+	case HTTP_ACT_FILE_STREAM: {
+		struct stream_file_data *sfd;
 
-	return ret;
+		sfd = (struct stream_file_data *)sess->priv_data;
+		ret = stream_file_loop(sfd, sess, worker);
+		break;
+	}
+	}
+
+	if (ret)
+		close_sess(sess, worker);
+
+	return 0;
 }
 
-static int _run_server(int epl_fd, struct server_state *state)
+static __hot int run_worker(int epl_fd, struct epoll_event *events,
+			    struct worker *worker)
 {
-	struct epoll_event *events = state->events;
+	int timeout = 1000;
+	size_t qlen;
+	int nr_evt;
 	int ret;
 	int i;
 
-	ret = epoll_wait(epl_fd, events, NR_EPOLL_EVT, 1000);
-	if (unlikely(ret < 0)) {
+	qlen = worker->buf_queue->size();
+	if (qlen)
+		timeout = 0;
+
+	nr_evt = epoll_wait(epl_fd, events, NR_EPOLL_EVT, timeout);
+	if (unlikely(nr_evt < 0)) {
 		ret = errno;
 		perror("epoll_wait");
 		if (ret == EINTR)
@@ -664,106 +1629,199 @@ static int _run_server(int epl_fd, struct server_state *state)
 		return -ret;
 	}
 
-	for (i = 0; i < ret; i++) {
-		ret = handle_event(&events[i], state);
+	for (i = 0; i < nr_evt; i++) {
+		ret = handle_event(&events[i], worker);
 		if (unlikely(ret))
 			return ret;
 	}
 
-	return ret;
+	while (qlen--) {
+		uint32_t idx;
+
+		idx = worker->buf_queue->pop();
+		ret = handle_buf_queue(idx, worker);
+		if (unlikely(ret))
+			return ret;
+	}
+
+	return 0;
 }
 
-static int run_server(struct server_state *state)
+static __hot int _worker_func(struct worker *worker)
 {
-	int epl_fd = state->epl_fd;
+	struct epoll_event *events = worker->events;
+	struct server_state *state = worker->state;
+	int epl_fd = worker->epl_fd;
 	int ret = 0;
 
 	while (likely(!state->stop)) {
-		ret = _run_server(epl_fd, state);
-		if (unlikely(ret))
-			break;
+		ret = run_worker(epl_fd, events, worker);
+		if (likely(!ret))
+			continue;
+
+		state->stop = true;
+		break;
 	}
 
 	return ret;
 }
 
-static void destroy_state(struct server_state *state)
+static noinline int worker_func(struct worker *worker)
+{
+	int ret;
+
+	if (worker->idx > 0)
+		worker->need_join = true;
+
+	atomic_fetch_add(&worker->state->nr_on_thread, 1u);
+	wait_for_ready_state(worker);
+	ret = _worker_func(worker);
+	atomic_fetch_sub(&worker->state->nr_on_thread, 1u);
+	return ret;
+}
+
+static int init_epoll_for_worker(struct worker *worker)
+{
+	int epl_fd;
+
+	epl_fd = epoll_create(255);
+	if (unlikely(epl_fd < 0)) {
+		int ret = errno;
+		perror("epoll_create");
+		return -ret;
+	}
+
+	worker->epl_fd = epl_fd;
+	return 0;
+}
+
+static __cold int wait_for_worker_online(struct worker *worker)
+{
+	uint32_t i = 0;
+
+	while (!worker->need_join) {
+		usleep(1000);
+		if (i++ < 10000)
+			continue;
+
+		fprintf(stderr, "Timedout while waiting for thread %u\n",
+			worker->idx);
+		worker->state->stop = true;
+		worker->thread.join();
+		worker->need_join = false;
+		return -ETIMEDOUT;
+	}
+	return 0;
+}
+
+static __cold int run_workers(struct server_state *state)
+{
+	struct worker *workers = state->workers;
+	epoll_data_t edt;
+	int ret = 0;
+	uint32_t i;
+
+	i = NR_WORKERS;
+	while (i--) {
+		ret = init_epoll_for_worker(&workers[i]);
+		if (unlikely(ret))
+			return ret;
+	}
+
+
+	i = NR_WORKERS;
+	/*
+	 * Skip i == 0, we will run the worker
+	 * on the main thread.
+	 */
+	while (--i > 0) {
+		workers[i].thread = std::thread(worker_func, &workers[i]);
+		ret = wait_for_worker_online(&workers[i]);
+		if (unlikely(ret))
+			return ret;
+	}
+
+	edt.ptr = NULL;
+	ret = install_infd_to_worker(state->tcp_fd, &workers[0], edt);
+	if (unlikely(ret))
+		return ret;
+
+	return worker_func(&workers[0]);
+}
+
+static __cold void destroy_state(struct server_state *state)
 {
 	uint32_t i;
+	int fd;
 
-	if (state->tcp_fd != -1)
-		close(state->tcp_fd);
-	if (state->epl_fd != -1)
-		close(state->epl_fd);
+	if (!state)
+		return;
+
+	fd = state->tcp_fd;
+	if (fd != -1)
+		close(fd);
+
+	/*
+	 * Do not join @workers[0].thread, it doesn't
+	 * have an LWP!
+	 */
+	for (i = 1; i < NR_WORKERS; i++) {
+		if (state->workers[i].need_join)
+			state->workers[i].thread.join();
+	}
+
+	for (i = 0; i < NR_WORKERS; i++) {
+		fd = state->workers[i].epl_fd;
+		if (fd != -1)
+			close(fd);
+	}
 
 	for (i = 0; i < NR_MAX_CLIENTS; i++) {
-		int fd = state->sess[i].fd;
+		fd = state->sess[i].fd;
 		if (fd != -1)
 			close(fd);
 	}
 
-	delete state->sess_free_idx;
-	delete state->sfi_lock;
-#ifdef USE_ASAN
-	delete state;
-#else
-	munmap(state, sizeof(*state));
-#endif
-	g_state = NULL;
+	i = NR_WORKERS;
+	while (i--) {
+		wq_queue<uint32_t> *p = state->workers[i].buf_queue;
+		if (p)
+			delete p;
+	}
+
+	if (state->workers)
+		delete[] state->workers;
+	if (state->sess_free_lock)
+		delete state->sess_free_lock;
+	if (state->sess_free)
+		delete state->sess_free;
+
+	free_state(state);
 }
 
-/**
- *
- * ./gwhttpd 0.0.0.0 8000
- *
- */
 int main(int argc, char *argv[])
 {
-	struct server_state *state;
+	struct server_state *state = NULL;
 	int ret;
 
 	setvbuf(stdout, NULL, _IOLBF, 4096);
-
 	if (argc != 3) {
 		printf("Usage: %s [bind_address] [bind_port]\n", argv[0]);
 		return 0;
 	}
 
-#ifdef USE_ASAN
-	state = new struct server_state;
-	if (!state) {
-		perror("malloc");
-		return ret;
-	}
-#else
-	state = (struct server_state *)mmap(NULL, sizeof(*state),
-					    PROT_READ|PROT_WRITE,
-					    MAP_ANONYMOUS|MAP_PRIVATE, -1, 0);
-	if (state == MAP_FAILED) {
-		ret = errno;
-		perror("mmap");
+	ret = init_state(&state);
+	if (unlikely(ret))
 		return ret;
-	}
 
-	mlock(state, sizeof(*state));
-#endif
+	state->bind_addr = argv[1];
+	state->bind_port = (uint16_t)atoi(argv[2]);
 
-	g_state = state;
-	ret = init_state(state);
-	if (ret)
-		return ret;
-	ret = init_socket(state, argv[1], (uint16_t)atoi(argv[2]));
-	if (ret)
-		goto out;
-	ret = init_epoll(state);
-	if (ret)
+	ret = init_socket(state);
+	if (unlikely(ret))
 		goto out;
-
-	ret = run_server(state);
-
+	ret = run_workers(state);
 out:
 	destroy_state(state);
-	if (ret < 0)
-		ret = -ret;
 	return ret;
 }
-- 
Ammar Faizi


  parent reply	other threads:[~2022-07-08 12:10 UTC|newest]

Thread overview: 15+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2022-07-08 12:10 [PATCH gwhttpd 00/14] gwhttpd updates Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 01/14] gwhttpd: Do an early return when `parse_http_header()` fails Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 02/14] gwhttpd: Don't print any error when mlock fails Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 03/14] gwhttpd: Replace `send_error_and_close()` with `send_http_error()` Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 04/14] gwhttpd: Add log in the interrupt handler Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 05/14] gwhttpd: Refactor HTTP header parser Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 06/14] gwhttpd: Avoid endless busy spinning on `send()` Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 07/14] Makefile: Add "make clean" command Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 08/14] gwhttpd: Skip interrupt error from `epoll_wait()` Ammar Faizi
2022-07-08 12:10 ` Ammar Faizi [this message]
2022-07-08 12:10 ` [PATCH gwhttpd 10/14] gwhttpd: Add command line options Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 11/14] gwhttpd: Add SLC support Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 12/14] gwhttpd: slc: Shut the SLC log up Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 13/14] gwhttpd: Fix 403 HTTP error when accessing an empty file Ammar Faizi
2022-07-08 12:10 ` [PATCH gwhttpd 14/14] gwhttpd: Add connecting log for SLC Ammar Faizi

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox