public inbox for [email protected]
 help / color / mirror / Atom feed
From: Ammar Faizi <[email protected]>
To: Alviro Iskandar Setiawan <[email protected]>
Cc: Ammar Faizi <[email protected]>,
	Muhammad Rizki <[email protected]>,
	Kanna Scarlet <[email protected]>,
	GNU/Weeb Mailing List <[email protected]>
Subject: [PATCH v1 17/22] chnet: Initial chunked request body support
Date: Sun, 21 Aug 2022 18:24:48 +0700	[thread overview]
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>

Signed-off-by: Ammar Faizi <[email protected]>
---
 chnet/chnet.cc    | 155 +++++++++++++++++++++++++++++++++++++++++++++-
 chnet/chnet.h     | 106 ++++++++++++++++++++++++++++++-
 tests/cpp/ring.cc |  47 ++++++++++++--
 3 files changed, 302 insertions(+), 6 deletions(-)

diff --git a/chnet/chnet.cc b/chnet/chnet.cc
index 915dfea..1ba4d32 100644
--- a/chnet/chnet.cc
+++ b/chnet/chnet.cc
@@ -50,6 +50,116 @@ void CHNetDataStream::ResetInternal()
 {
 }
 
+CHNetDataStreamChunked::CHNetDataStreamChunked(CHNetDelegate *ch):
+	UploadDataStream(true, 0),
+	ch_(ch),
+	buf_queue_(1024)
+{
+	cur_pos_ = 0;
+	is_eof_ = false;
+	being_waited_.store(false, std::memory_order_relaxed);
+	ch_->chuncked_body_.store(this, std::memory_order_release);
+}
+
+CHNetDataStreamChunked::~CHNetDataStreamChunked(void)
+{
+	struct net::CHNetDataStreamChunked::buf *arr;
+	uint32_t i, len;
+
+	ch_->chuncked_body_.store(nullptr, std::memory_order_release);
+	arr = buf_queue_.arr_;
+	len = buf_queue_.mask_ + 1;
+
+	for (i = 0; i < len; i++) {
+		char *b = arr[i].buf_;
+
+		if (b)
+			delete[] b;
+	}
+}
+
+void CHNetDataStreamChunked::WriteBuf(const char *buf, size_t len)
+{
+	struct net::CHNetDataStreamChunked::buf b;
+
+	b.buf_ = new char[len];
+	b.len_ = len;
+	memcpy(b.buf_, buf, len);
+
+	std::unique_lock<std::mutex> lk(buf_lock_);
+	buf_queue_.push(b);
+	if (being_waited_.load(std::memory_order_acquire))
+		buf_cond_.notify_one();
+}
+
+int CHNetDataStreamChunked::InitInternal(const net::NetLogWithSource& net_log)
+{
+	return net::OK;
+}
+
+int CHNetDataStreamChunked::ReadInternal(net::IOBuffer *buf, int read_size_arg)
+{
+	size_t read_size;
+	char *ptr;
+	int ret;
+
+	if (unlikely(read_size_arg < 0))
+		return net::ERR_IO_PENDING;
+
+	if (unlikely(is_eof_ || !read_size_arg))
+		return 0;
+
+	ret       = 0;
+	ptr       = buf->data();
+	read_size = (size_t)read_size_arg;
+
+	std::unique_lock<std::mutex> lk(buf_lock_);
+	while (read_size) {
+		struct net::CHNetDataStreamChunked::buf *b;
+		size_t copy_size;
+
+		if (unlikely(!buf_queue_.size())) {
+			/*
+			 * At this point, the buffer is empty.
+			 *
+			 * Wait for the buffer queue to be
+			 * filled in.
+			 */
+			being_waited_.store(true, std::memory_order_release);
+			buf_cond_.wait(lk, [this]{ return buf_queue_.size(); });
+			being_waited_.store(false, std::memory_order_release);
+		}
+
+		b = buf_queue_.front();
+		if (b->len_ == -(size_t)1UL) {
+			is_eof_ = true;
+			break;
+		}
+
+		copy_size = b->len_ - cur_pos_;
+		if (copy_size > read_size)
+			copy_size = read_size;
+
+		memcpy(ptr, &b->buf_[cur_pos_], copy_size);
+		ptr += copy_size;
+		ret += copy_size;
+		read_size -= copy_size;
+		buf_queue_.head_++;
+	}
+
+	if (unlikely(is_eof_))
+		SetIsFinalChunk();
+
+	if (unlikely(!ret && !is_eof_))
+		return net::ERR_IO_PENDING;
+
+	return ret;
+}
+
+void CHNetDataStreamChunked::ResetInternal()
+{
+}
+
 CHNCallback::CHNCallback(void):
 	response_started_data_(nullptr),
 	response_started_(nullptr),
@@ -82,6 +192,7 @@ CHNetDelegate::CHNetDelegate(void):
 	base::Thread::Options options(base::MessagePumpType::IO, 0);
 	CHECK(thread_.StartWithOptions(std::move(options)));
 	read_ret_.store(0, std::memory_order_relaxed);
+	chuncked_body_.store(nullptr, std::memory_order_relaxed);
 	payload_ = nullptr;
 }
 
@@ -140,7 +251,9 @@ void CHNetDelegate::_Start(Waiter *sig)
 	url_req_->set_method(method_);
 	sig->Signal();
 
-	if (payload_)
+	if (chuncked_body_up_)
+		url_req_->set_upload(std::move(chuncked_body_up_));
+	else if (payload_)
 		url_req_->set_upload(std::make_unique<CHNetDataStream>(
 					this, payload_, payload_len_));
 
@@ -218,6 +331,17 @@ int CHNetDelegate::Read(int size)
 	return read_ret_.load();
 }
 
+void CHNetDelegate::Write(const char *buf, size_t len)
+{
+	CHNetDataStreamChunked *p;
+
+	p = chuncked_body_.load(std::memory_order_acquire);
+	if (unlikely(!p))
+		return;
+
+	p->WriteBuf(buf, len);
+}
+
 void CHNetDelegate::OnResponseStarted(URLRequest *url_req, int net_err)
 {
 	if (unlikely(net_err < 0))
@@ -243,6 +367,20 @@ void CHNetDelegate::SetError(int err_code)
 	err_ = "chromium_net_err:" + ErrorToString(err_code);
 }
 
+void CHNetDelegate::StartChunkedBody(void)
+{
+	chuncked_body_up_ = std::make_unique<CHNetDataStreamChunked>(this);
+}
+
+void CHNetDelegate::StopChunkedBody(void)
+{
+	CHNetDataStreamChunked *p;
+
+	p = chuncked_body_.load(std::memory_order_acquire);
+	if (likely(p))
+		p->StopWriting();
+}
+
 } /* namespace net */
 
 CHNet::CHNet(void)
@@ -293,6 +431,21 @@ void CHNet::SetRequestHeader(const char *key, const char *val, bool overwrite)
 	ch_->SetRequestHeader(std::string(key), std::string(val), overwrite);
 }
 
+void CHNet::Write(const char *buf, size_t len)
+{
+	ch_->Write(buf, len);
+}
+
+void CHNet::StartChunkedBody(void)
+{
+	ch_->StartChunkedBody();
+}
+
+void CHNet::StopChunkedBody(void)
+{
+	ch_->StopChunkedBody();
+}
+
 int CHNet::read_ret(void)
 {
 	return ch_->read_ret();
diff --git a/chnet/chnet.h b/chnet/chnet.h
index 8e995a1..d92ba6d 100644
--- a/chnet/chnet.h
+++ b/chnet/chnet.h
@@ -70,6 +70,102 @@ private:
 	void ResetInternal() override;
 };
 
+class CHNetDataStreamChunked: public UploadDataStream {
+
+private:
+	struct buf {
+		char	*buf_;
+		size_t	len_;
+	};
+
+	template<typename T>
+	struct queue {
+		struct buf		*arr_;
+		std::atomic<uint32_t>	head_;
+		std::atomic<uint32_t>	tail_;
+		uint32_t		mask_;
+
+		inline queue(size_t want_max)
+		{
+			uint32_t max = 1;
+			uint32_t i;
+
+			head_.store(0, std::memory_order_relaxed);
+			tail_.store(0, std::memory_order_relaxed);
+
+			if (want_max < 4)
+				want_max = 4;
+
+			while (max < want_max)
+				max *= 2;
+
+			mask_ = max - 1;
+			arr_ = new T[max];
+			for (i = 0; i < max; i++) {
+				arr_[i].buf_ = nullptr;
+				arr_[i].len_ = 0;
+			}
+		}
+
+		inline ~queue(void)
+		{
+			delete[] arr_;
+		}
+
+		inline size_t size(void)
+		{
+			if (likely(tail_ > head_))
+				return tail_ - head_;
+			else
+				return head_ - tail_;
+		}
+
+		inline void push(T val)
+		{
+			arr_[tail_++ & mask_] = val;
+		}
+
+		inline struct buf *front(void)
+		{
+			return &arr_[head_.load() & mask_];
+		}
+
+		inline struct buf *pop(void)
+		{
+			return &arr_[head_++ & mask_];
+		}
+	};
+
+public:
+	CHNetDataStreamChunked(CHNetDelegate *ch);
+	~CHNetDataStreamChunked(void) override;
+	void WriteBuf(const char *buf, size_t len);
+	inline void StopWriting(void)
+	{
+		struct buf b;
+
+		b.buf_ = nullptr;
+		b.len_ = -(size_t)1UL;
+		buf_lock_.lock();
+		buf_queue_.push(b);
+		buf_lock_.unlock();
+	}
+
+private:
+	CHNetDelegate			*ch_;
+	size_t				cur_pos_;
+	std::mutex 			buf_lock_;
+	std::condition_variable		buf_cond_;
+	struct queue<struct buf>	buf_queue_;
+	std::atomic<bool>		being_waited_;
+	bool				is_eof_;
+
+	int InitInternal(const net::NetLogWithSource& net_log) override;
+	int ReadInternal(net::IOBuffer *buf, int buf_len) override;
+	void ResetInternal() override;
+
+};
+
 struct CHNCallback {
 	CHNCallback(void);
 	~CHNCallback(void);
@@ -97,6 +193,10 @@ public:
 
 	void OnResponseStarted(URLRequest *url_req, int net_error) override;
 	void OnReadCompleted(URLRequest *url_req, int bytes_read) override;
+	void StartChunkedBody(void);
+	void Write(const char *buf, size_t len);
+	void StopChunkedBody(void);
+
 	inline int read_ret(void) { return read_ret_.load(); }
 	inline const char *read_buf(void) { return read_buf_->data(); }
 	inline bool started(void) { return started_; }
@@ -108,7 +208,8 @@ public:
 	}
 
 	struct CHNCallback cb;
-
+	std::atomic<CHNetDataStreamChunked *>	chuncked_body_;
+	std::unique_ptr<CHNetDataStreamChunked>	chuncked_body_up_;
 private:
 	void _Start(Waiter *sig);
 	void ConstructURLRequest(Waiter *sig);
@@ -151,11 +252,14 @@ public:
 	const char *GetErrorStr(void);
 	void SetMethod(const char *method);
 	void SetPayload(const char *buf, size_t len);
+	void StartChunkedBody(void);
+	void StopChunkedBody(void);
 	void SetRequestHeader(const char *key, const char *val,
 			      bool overwrite = true);
 
 	int read_ret(void);
 	const char *read_buf(void);
+	void Write(const char *buf, size_t len);
 
 #ifdef __FOR_CHROMIUM_INTERNAL
 	inline net::CHNetDelegate *ch(void) { return ch_; }
diff --git a/tests/cpp/ring.cc b/tests/cpp/ring.cc
index 8bedef5..a65457d 100644
--- a/tests/cpp/ring.cc
+++ b/tests/cpp/ring.cc
@@ -247,15 +247,54 @@ static void test_chnet_ring_multiple(bool start_before_read)
 		_test_chnet_ring_multiple(start_before_read, &ring);
 }
 
+static void test_chnet_chunked_body(void)
+{
+	constexpr static const char buf[] = "AAAA Hello World!\n";
+	CNRingCtx ring(4096);
+	CNRingSQE *sqe;
+	CNRingCQE *cqe;
+	CHNet *ch;
+	int i;
+
+	ch = new CHNet;
+	ch->SetURL("http://127.0.0.1:8000/index.php?action=body");
+	ch->SetMethod("POST");
+	ch->StartChunkedBody();
+	for (i = 0; i < 3; i++)
+		ch->Write(buf, strlen(buf));
+
+	sqe = ring.GetSQE();
+	assert(sqe);
+	sqe->PrepRead(ch, 1024);
+	sqe->SetUserData(9999);
+	assert(ring.SubmitSQE() == 1);
+
+	for (i = 0; i < 3; i++)
+		ch->Write(buf, strlen(buf));
+	ch->StopChunkedBody();
+
+	ring.WaitCQE(1);
+	cqe = ring.HeadCQE();
+	assert(cqe->op == CNRING_OP_READ);
+	// assert(cqe->res == sizeof(buf));
+	assert(cqe->user_data == 9999);
+	// assert(!strncmp(ch->read_buf(), buf, sizeof(buf)));
+	write(1, ch->read_buf(), cqe->res);
+	ring.CQAdvance(1);
+
+	delete ch;
+}
+
 int main(void)
 {
 	test_nop();
 	chnet_global_init();
-	// test_chnet_ring_single();
+	test_chnet_ring_single();
 	test_chnet_ring_set_header();
-	// test_chnet_ring_single_post();
-	// test_chnet_ring_multiple(false);
-	// test_chnet_ring_multiple(true);
+	test_chnet_ring_single_post();
+	test_chnet_ring_multiple(false);
+	test_chnet_ring_multiple(true);
+	test_chnet_chunked_body();
 	chnet_global_stop();
 	return 0;
 }
-- 
Ammar Faizi


  parent reply	other threads:[~2022-08-21 11:25 UTC|newest]

Thread overview: 28+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 01/22] chnet: Add initial request body support Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 02/22] chnet: node: Add set_user_data support on SQE Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 03/22] tests/js/ring: Update the unit test to utilize set_user_data Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 04/22] binding.gyp: Add `-ggdb3` flag for better debugging experience Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 05/22] binding.gyp: Add `-Wno-enum-constexpr-conversion` flag Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 06/22] chnet: node: Add set_method function to set HTTP method Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 07/22] chnet: node: Add get_error function to return the error string Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 08/22] chnet: node: Add set_payload function to set HTTP req body Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 09/22] tests/js/ring: Add simple HTTP POST request example in NodeJS Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 10/22] chnet: Split construct URL req creation into a new function Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 11/22] chnet: Add set request header support Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 12/22] chnet: node: Fix unused variable warning Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 13/22] chnet: node: Add set request header function in NodeJS Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 14/22] tests/js/ring: Add more set header function test Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 15/22] chnet: node: Don't use static counter for data ID Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 16/22] tests/js/ring: Add JavaScript class wrapper example Ammar Faizi
2022-08-21 11:24 ` Ammar Faizi [this message]
2022-08-21 22:13   ` [PATCH v1 17/22] chnet: Initial chunked request body support Alviro Iskandar Setiawan
2022-08-22  3:08     ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 18/22] chnet: Rework the chunked request body interface Ammar Faizi
2022-08-21 22:20   ` Alviro Iskandar Setiawan
2022-08-21 11:24 ` [PATCH v1 19/22] chnet: ring: Refactor the ring completely Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 20/22] chnet: Use busy-waiting for signal waiter Ammar Faizi
2022-08-21 22:29   ` Alviro Iskandar Setiawan
2022-08-21 11:24 ` [PATCH v1 21/22] chnet: ring: Bump max_entry to 2G Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 22/22] tests/cpp: Delete basic.cpp as it's no longer relevant Ammar Faizi
2022-08-21 22:21   ` Alviro Iskandar Setiawan

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox