public inbox for [email protected]
 help / color / mirror / Atom feed
From: Ammar Faizi <[email protected]>
To: Alviro Iskandar Setiawan <[email protected]>
Cc: Ammar Faizi <[email protected]>,
	Muhammad Rizki <[email protected]>,
	Kanna Scarlet <[email protected]>,
	GNU/Weeb Mailing List <[email protected]>
Subject: [PATCH v1 18/22] chnet: Rework the chunked request body interface
Date: Sun, 21 Aug 2022 18:24:49 +0700	[thread overview]
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>

This seems to be working fine at a glance, but there are still an issue
with double-free bug. It rarely happens, but it's a real bug that the
ASAN has reported. I will take a look into this further on Monday.

Signed-off-by: Ammar Faizi <[email protected]>
---
 chnet/chnet.cc    | 74 +++++++++++++++++++++++++++++++++--------------
 chnet/chnet.h     | 68 +++++--------------------------------------
 tests/cpp/ring.cc | 59 ++++++++++++++++++++++++-------------
 3 files changed, 99 insertions(+), 102 deletions(-)

diff --git a/chnet/chnet.cc b/chnet/chnet.cc
index 1ba4d32..a34bb18 100644
--- a/chnet/chnet.cc
+++ b/chnet/chnet.cc
@@ -52,30 +52,42 @@ void CHNetDataStream::ResetInternal()
 
 CHNetDataStreamChunked::CHNetDataStreamChunked(CHNetDelegate *ch):
 	UploadDataStream(true, 0),
-	ch_(ch),
-	buf_queue_(1024)
+	ch_(ch)
 {
 	cur_pos_ = 0;
+	mem_hold_size_ = 0;
 	is_eof_ = false;
 	being_waited_.store(false, std::memory_order_relaxed);
 	ch_->chuncked_body_.store(this, std::memory_order_release);
 }
 
+void CHNetDataStreamChunked::ClearDelQueue(void)
+{
+	char *b;
+
+	while (!del_queue_.empty()) {
+		b = del_queue_.front();
+		if (b)
+			delete[] b;
+
+		del_queue_.pop();
+	}
+}
+
 CHNetDataStreamChunked::~CHNetDataStreamChunked(void)
 {
-	struct net::CHNetDataStreamChunked::buf *arr;
-	uint32_t i, len;
+	char *b;
 
 	ch_->chuncked_body_.store(nullptr, std::memory_order_release);
-	arr = buf_queue_.arr_;
-	len = buf_queue_.mask_ + 1;
-
-	for (i = 0; i < len; i++) {
-		char *b = arr[i].buf_;
 
+	while (!buf_queue_.empty()) {
+		b = buf_queue_.front().buf_;
 		if (b)
 			delete[] b;
+
+		buf_queue_.pop();
 	}
+	ClearDelQueue();
 }
 
 void CHNetDataStreamChunked::WriteBuf(const char *buf, size_t len)
@@ -85,6 +97,7 @@ void CHNetDataStreamChunked::WriteBuf(const char *buf, size_t len)
 	b.buf_ = new char[len];
 	b.len_ = len;
 	memcpy(b.buf_, buf, len);
+	mem_hold_size_.fetch_add(len);
 
 	std::unique_lock<std::mutex> lk(buf_lock_);
 	buf_queue_.push(b);
@@ -115,44 +128,61 @@ int CHNetDataStreamChunked::ReadInternal(net::IOBuffer *buf, int read_size_arg)
 
 	std::unique_lock<std::mutex> lk(buf_lock_);
 	while (read_size) {
-		struct net::CHNetDataStreamChunked::buf *b;
+		struct net::CHNetDataStreamChunked::buf b;
 		size_t copy_size;
 
-		if (unlikely(!buf_queue_.size())) {
+		if (unlikely(buf_queue_.empty())) {
 			/*
-			 * At this point, the buffer is empty.
+			 * At this point, the buffer queue is empty.
 			 *
-			 * Wait for the buffer queue to be
-			 * filled in.
+			 * Wait for the buffer queue to be filled in...
 			 */
 			being_waited_.store(true, std::memory_order_release);
-			buf_cond_.wait(lk, [this]{ return buf_queue_.size(); });
+			ClearDelQueue();
+			buf_cond_.wait(lk, [this]{ return !buf_queue_.empty(); });
 			being_waited_.store(false, std::memory_order_release);
 		}
 
 		b = buf_queue_.front();
-		if (b->len_ == -(size_t)1UL) {
+		if (unlikely(b.len_ == -(size_t)1UL)) {
 			is_eof_ = true;
+			buf_queue_.pop();
 			break;
 		}
 
-		copy_size = b->len_ - cur_pos_;
+		copy_size = b.len_ - cur_pos_;
 		if (copy_size > read_size)
 			copy_size = read_size;
 
-		memcpy(ptr, &b->buf_[cur_pos_], copy_size);
+		memcpy(ptr, &b.buf_[cur_pos_], copy_size);
 		ptr += copy_size;
 		ret += copy_size;
 		read_size -= copy_size;
-		buf_queue_.head_++;
+		cur_pos_  += copy_size;
+
+		if (cur_pos_ == b.len_) {
+			size_t mem;
+
+			cur_pos_ = 0;
+			buf_queue_.pop();
+			del_queue_.push(b.buf_);
+			mem = mem_hold_size_.load(std::memory_order_acquire);
+
+			/*
+			 * Make sure we don't hold too many delete
+			 * queues to avoid memory pressure.
+			 */
+			if (mem >= MEM_HOLD_THRESHOLD) {
+				ClearDelQueue();
+				mem_hold_size_.store(0,
+						     std::memory_order_release);
+			}
+		}
 	}
 
 	if (unlikely(is_eof_))
 		SetIsFinalChunk();
 
-	if (unlikely(!ret && !is_eof_))
-		return net::ERR_IO_PENDING;
-
 	return ret;
 }
 
diff --git a/chnet/chnet.h b/chnet/chnet.h
index d92ba6d..84db8d4 100644
--- a/chnet/chnet.h
+++ b/chnet/chnet.h
@@ -29,6 +29,8 @@
 #include "net/base/net_errors.h"
 #define CHNET_EXPORT COMPONENT_EXPORT(CHNET)
 
+#include <queue>
+
 #else /* #ifdef __FOR_CHROMIUM_INTERNAL */
 
 /*
@@ -78,64 +80,6 @@ private:
 		size_t	len_;
 	};
 
-	template<typename T>
-	struct queue {
-		struct buf		*arr_;
-		std::atomic<uint32_t>	head_;
-		std::atomic<uint32_t>	tail_;
-		uint32_t		mask_;
-
-		inline queue(size_t want_max)
-		{
-			uint32_t max = 1;
-			uint32_t i;
-
-			head_.store(0, std::memory_order_relaxed);
-			tail_.store(0, std::memory_order_relaxed);
-
-			if (want_max < 4)
-				want_max = 4;
-
-			while (max < want_max)
-				max *= 2;
-
-			mask_ = max - 1;
-			arr_ = new T[max];
-			for (i = 0; i < max; i++) {
-				arr_[i].buf_ = nullptr;
-				arr_[i].len_ = 0;
-			}
-		}
-
-		inline ~queue(void)
-		{
-			delete[] arr_;
-		}
-
-		inline size_t size(void)
-		{
-			if (likely(tail_ > head_))
-				return tail_ - head_;
-			else
-				return head_ - tail_;
-		}
-
-		inline void push(T val)
-		{
-			arr_[tail_++ & mask_] = val;
-		}
-
-		inline struct buf *front(void)
-		{
-			return &arr_[head_.load() & mask_];
-		}
-
-		inline struct buf *pop(void)
-		{
-			return &arr_[head_++ & mask_];
-		}
-	};
-
 public:
 	CHNetDataStreamChunked(CHNetDelegate *ch);
 	~CHNetDataStreamChunked(void) override;
@@ -146,6 +90,7 @@ public:
 
 		b.buf_ = nullptr;
 		b.len_ = -(size_t)1UL;
+
 		buf_lock_.lock();
 		buf_queue_.push(b);
 		buf_lock_.unlock();
@@ -156,14 +101,17 @@ private:
 	size_t				cur_pos_;
 	std::mutex 			buf_lock_;
 	std::condition_variable		buf_cond_;
-	struct queue<struct buf>	buf_queue_;
+	std::queue<struct buf>		buf_queue_;
+	std::queue<char *>		del_queue_;
 	std::atomic<bool>		being_waited_;
+	std::atomic<size_t>		mem_hold_size_;
 	bool				is_eof_;
+	constexpr static const uint32_t MEM_HOLD_THRESHOLD = 1024 * 1024 * 4;
 
 	int InitInternal(const net::NetLogWithSource& net_log) override;
 	int ReadInternal(net::IOBuffer *buf, int buf_len) override;
 	void ResetInternal() override;
-
+	void ClearDelQueue(void);
 };
 
 struct CHNCallback {
diff --git a/tests/cpp/ring.cc b/tests/cpp/ring.cc
index a65457d..54426ef 100644
--- a/tests/cpp/ring.cc
+++ b/tests/cpp/ring.cc
@@ -249,39 +249,58 @@ static void test_chnet_ring_multiple(bool start_before_read)
 
 static void test_chnet_chunked_body(void)
 {
-	constexpr static const char buf[] = "AAAA Hello World!\n";
+	constexpr size_t len = 1024 * 64;
+	constexpr size_t nr_loop = 100;
+	size_t total_read = 0;
 	CNRingCtx ring(4096);
 	CNRingSQE *sqe;
 	CNRingCQE *cqe;
 	CHNet *ch;
-	int i;
+	char *buf;
+	size_t i;
 
 	ch = new CHNet;
 	ch->SetURL("http://127.0.0.1:8000/index.php?action=body");
 	ch->SetMethod("POST");
 	ch->StartChunkedBody();
-	for (i = 0; i < 3; i++)
-		ch->Write(buf, strlen(buf));
 
 	sqe = ring.GetSQE();
 	assert(sqe);
-	sqe->PrepRead(ch, 1024);
+	sqe->PrepRead(ch, len);
 	sqe->SetUserData(9999);
 	assert(ring.SubmitSQE() == 1);
 
-	for (i = 0; i < 3; i++)
-		ch->Write(buf, strlen(buf));
+	buf = new char[len];
+	memset(buf, 'A', len);
+
+	for (i = 0; i < nr_loop; i++)
+		ch->Write(buf, len);
 	ch->StopChunkedBody();
 
-	ring.WaitCQE(1);
-	cqe = ring.HeadCQE();
-	assert(cqe->op == CNRING_OP_READ);
-	// assert(cqe->res == sizeof(buf));
-	assert(cqe->user_data == 9999);
-	// assert(!strncmp(ch->read_buf(), buf, sizeof(buf)));
-	write(1, ch->read_buf(), cqe->res);
-	ring.CQAdvance(1);
+	while (total_read < (len * nr_loop)) {
+		int res;
+
+		ring.WaitCQE(1);
+		cqe = ring.HeadCQE();
+		assert(cqe->op == CNRING_OP_READ);
+		assert(cqe->user_data == 9999);
+		res = cqe->res;
+		total_read += res;
+		ring.CQAdvance(1);
+
+		if (!res)
+			break;
+
+		sqe = ring.GetSQE();
+		assert(sqe);
+		sqe->PrepRead(ch, len);
+		sqe->SetUserData(9999);
+		assert(ring.SubmitSQE() == 1);
+	}
+	printf("total_read = %zu %zu\n", total_read, (len * nr_loop));
+	assert(total_read == (len * nr_loop));
 
+	delete[] buf;
 	delete ch;
 }
 
@@ -289,11 +308,11 @@ int main(void)
 {
 	test_nop();
 	chnet_global_init();
-	test_chnet_ring_single();
-	test_chnet_ring_set_header();
-	test_chnet_ring_single_post();
-	test_chnet_ring_multiple(false);
-	test_chnet_ring_multiple(true);
+	// test_chnet_ring_single();
+	// test_chnet_ring_set_header();
+	// test_chnet_ring_single_post();
+	// test_chnet_ring_multiple(false);
+	// test_chnet_ring_multiple(true);
 	test_chnet_chunked_body();
 	chnet_global_stop();
 	return 0;
-- 
Ammar Faizi


  parent reply	other threads:[~2022-08-21 11:25 UTC|newest]

Thread overview: 28+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 01/22] chnet: Add initial request body support Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 02/22] chnet: node: Add set_user_data support on SQE Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 03/22] tests/js/ring: Update the unit test to utilize set_user_data Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 04/22] binding.gyp: Add `-ggdb3` flag for better debugging experience Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 05/22] binding.gyp: Add `-Wno-enum-constexpr-conversion` flag Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 06/22] chnet: node: Add set_method function to set HTTP method Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 07/22] chnet: node: Add get_error function to return the error string Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 08/22] chnet: node: Add set_payload function to set HTTP req body Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 09/22] tests/js/ring: Add simple HTTP POST request example in NodeJS Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 10/22] chnet: Split construct URL req creation into a new function Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 11/22] chnet: Add set request header support Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 12/22] chnet: node: Fix unused variable warning Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 13/22] chnet: node: Add set request header function in NodeJS Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 14/22] tests/js/ring: Add more set header function test Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 15/22] chnet: node: Don't use static counter for data ID Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 16/22] tests/js/ring: Add JavaScript class wrapper example Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 17/22] chnet: Initial chunked request body support Ammar Faizi
2022-08-21 22:13   ` Alviro Iskandar Setiawan
2022-08-22  3:08     ` Ammar Faizi
2022-08-21 11:24 ` Ammar Faizi [this message]
2022-08-21 22:20   ` [PATCH v1 18/22] chnet: Rework the chunked request body interface Alviro Iskandar Setiawan
2022-08-21 11:24 ` [PATCH v1 19/22] chnet: ring: Refactor the ring completely Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 20/22] chnet: Use busy-waiting for signal waiter Ammar Faizi
2022-08-21 22:29   ` Alviro Iskandar Setiawan
2022-08-21 11:24 ` [PATCH v1 21/22] chnet: ring: Bump max_entry to 2G Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 22/22] tests/cpp: Delete basic.cpp as it's no longer relevant Ammar Faizi
2022-08-21 22:21   ` Alviro Iskandar Setiawan

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox