From: Ammar Faizi <[email protected]>
To: Alviro Iskandar Setiawan <[email protected]>
Cc: Ammar Faizi <[email protected]>,
Muhammad Rizki <[email protected]>,
Kanna Scarlet <[email protected]>,
GNU/Weeb Mailing List <[email protected]>
Subject: [PATCH v1 17/22] chnet: Initial chunked request body support
Date: Sun, 21 Aug 2022 18:24:48 +0700 [thread overview]
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/chnet.cc | 155 +++++++++++++++++++++++++++++++++++++++++++++-
chnet/chnet.h | 106 ++++++++++++++++++++++++++++++-
tests/cpp/ring.cc | 47 ++++++++++++--
3 files changed, 302 insertions(+), 6 deletions(-)
diff --git a/chnet/chnet.cc b/chnet/chnet.cc
index 915dfea..1ba4d32 100644
--- a/chnet/chnet.cc
+++ b/chnet/chnet.cc
@@ -50,6 +50,116 @@ void CHNetDataStream::ResetInternal()
{
}
+CHNetDataStreamChunked::CHNetDataStreamChunked(CHNetDelegate *ch):
+ UploadDataStream(true, 0),
+ ch_(ch),
+ buf_queue_(1024)
+{
+ cur_pos_ = 0;
+ is_eof_ = false;
+ being_waited_.store(false, std::memory_order_relaxed);
+ ch_->chuncked_body_.store(this, std::memory_order_release);
+}
+
+CHNetDataStreamChunked::~CHNetDataStreamChunked(void)
+{
+ struct net::CHNetDataStreamChunked::buf *arr;
+ uint32_t i, len;
+
+ ch_->chuncked_body_.store(nullptr, std::memory_order_release);
+ arr = buf_queue_.arr_;
+ len = buf_queue_.mask_ + 1;
+
+ for (i = 0; i < len; i++) {
+ char *b = arr[i].buf_;
+
+ if (b)
+ delete[] b;
+ }
+}
+
+void CHNetDataStreamChunked::WriteBuf(const char *buf, size_t len)
+{
+ struct net::CHNetDataStreamChunked::buf b;
+
+ b.buf_ = new char[len];
+ b.len_ = len;
+ memcpy(b.buf_, buf, len);
+
+ std::unique_lock<std::mutex> lk(buf_lock_);
+ buf_queue_.push(b);
+ if (being_waited_.load(std::memory_order_acquire))
+ buf_cond_.notify_one();
+}
+
+int CHNetDataStreamChunked::InitInternal(const net::NetLogWithSource& net_log)
+{
+ return net::OK;
+}
+
+int CHNetDataStreamChunked::ReadInternal(net::IOBuffer *buf, int read_size_arg)
+{
+ size_t read_size;
+ char *ptr;
+ int ret;
+
+ if (unlikely(read_size_arg < 0))
+ return net::ERR_IO_PENDING;
+
+ if (unlikely(is_eof_ || !read_size_arg))
+ return 0;
+
+ ret = 0;
+ ptr = buf->data();
+ read_size = (size_t)read_size_arg;
+
+ std::unique_lock<std::mutex> lk(buf_lock_);
+ while (read_size) {
+ struct net::CHNetDataStreamChunked::buf *b;
+ size_t copy_size;
+
+ if (unlikely(!buf_queue_.size())) {
+ /*
+ * At this point, the buffer is empty.
+ *
+ * Wait for the buffer queue to be
+ * filled in.
+ */
+ being_waited_.store(true, std::memory_order_release);
+ buf_cond_.wait(lk, [this]{ return buf_queue_.size(); });
+ being_waited_.store(false, std::memory_order_release);
+ }
+
+ b = buf_queue_.front();
+ if (b->len_ == -(size_t)1UL) {
+ is_eof_ = true;
+ break;
+ }
+
+ copy_size = b->len_ - cur_pos_;
+ if (copy_size > read_size)
+ copy_size = read_size;
+
+ memcpy(ptr, &b->buf_[cur_pos_], copy_size);
+ ptr += copy_size;
+ ret += copy_size;
+ read_size -= copy_size;
+ buf_queue_.head_++;
+ }
+
+ if (unlikely(is_eof_))
+ SetIsFinalChunk();
+
+ if (unlikely(!ret && !is_eof_))
+ return net::ERR_IO_PENDING;
+
+ return ret;
+}
+
+void CHNetDataStreamChunked::ResetInternal()
+{
+}
+
CHNCallback::CHNCallback(void):
response_started_data_(nullptr),
response_started_(nullptr),
@@ -82,6 +192,7 @@ CHNetDelegate::CHNetDelegate(void):
base::Thread::Options options(base::MessagePumpType::IO, 0);
CHECK(thread_.StartWithOptions(std::move(options)));
read_ret_.store(0, std::memory_order_relaxed);
+ chuncked_body_.store(nullptr, std::memory_order_relaxed);
payload_ = nullptr;
}
@@ -140,7 +251,9 @@ void CHNetDelegate::_Start(Waiter *sig)
url_req_->set_method(method_);
sig->Signal();
- if (payload_)
+ if (chuncked_body_up_)
+ url_req_->set_upload(std::move(chuncked_body_up_));
+ else if (payload_)
url_req_->set_upload(std::make_unique<CHNetDataStream>(
this, payload_, payload_len_));
@@ -218,6 +331,17 @@ int CHNetDelegate::Read(int size)
return read_ret_.load();
}
+void CHNetDelegate::Write(const char *buf, size_t len)
+{
+ CHNetDataStreamChunked *p;
+
+ p = chuncked_body_.load(std::memory_order_acquire);
+ if (unlikely(!p))
+ return;
+
+ p->WriteBuf(buf, len);
+}
+
void CHNetDelegate::OnResponseStarted(URLRequest *url_req, int net_err)
{
if (unlikely(net_err < 0))
@@ -243,6 +367,20 @@ void CHNetDelegate::SetError(int err_code)
err_ = "chromium_net_err:" + ErrorToString(err_code);
}
+void CHNetDelegate::StartChunkedBody(void)
+{
+ chuncked_body_up_ = std::make_unique<CHNetDataStreamChunked>(this);
+}
+
+void CHNetDelegate::StopChunkedBody(void)
+{
+ CHNetDataStreamChunked *p;
+
+ p = chuncked_body_.load(std::memory_order_acquire);
+ if (likely(p))
+ p->StopWriting();
+}
+
} /* namespace net */
CHNet::CHNet(void)
@@ -293,6 +431,21 @@ void CHNet::SetRequestHeader(const char *key, const char *val, bool overwrite)
ch_->SetRequestHeader(std::string(key), std::string(val), overwrite);
}
+void CHNet::Write(const char *buf, size_t len)
+{
+ ch_->Write(buf, len);
+}
+
+void CHNet::StartChunkedBody(void)
+{
+ ch_->StartChunkedBody();
+}
+
+void CHNet::StopChunkedBody(void)
+{
+ ch_->StopChunkedBody();
+}
+
int CHNet::read_ret(void)
{
return ch_->read_ret();
diff --git a/chnet/chnet.h b/chnet/chnet.h
index 8e995a1..d92ba6d 100644
--- a/chnet/chnet.h
+++ b/chnet/chnet.h
@@ -70,6 +70,102 @@ private:
void ResetInternal() override;
};
+class CHNetDataStreamChunked: public UploadDataStream {
+
+private:
+ struct buf {
+ char *buf_;
+ size_t len_;
+ };
+
+ template<typename T>
+ struct queue {
+ struct buf *arr_;
+ std::atomic<uint32_t> head_;
+ std::atomic<uint32_t> tail_;
+ uint32_t mask_;
+
+ inline queue(size_t want_max)
+ {
+ uint32_t max = 1;
+ uint32_t i;
+
+ head_.store(0, std::memory_order_relaxed);
+ tail_.store(0, std::memory_order_relaxed);
+
+ if (want_max < 4)
+ want_max = 4;
+
+ while (max < want_max)
+ max *= 2;
+
+ mask_ = max - 1;
+ arr_ = new T[max];
+ for (i = 0; i < max; i++) {
+ arr_[i].buf_ = nullptr;
+ arr_[i].len_ = 0;
+ }
+ }
+
+ inline ~queue(void)
+ {
+ delete[] arr_;
+ }
+
+ inline size_t size(void)
+ {
+ if (likely(tail_ > head_))
+ return tail_ - head_;
+ else
+ return head_ - tail_;
+ }
+
+ inline void push(T val)
+ {
+ arr_[tail_++ & mask_] = val;
+ }
+
+ inline struct buf *front(void)
+ {
+ return &arr_[head_.load() & mask_];
+ }
+
+ inline struct buf *pop(void)
+ {
+ return &arr_[head_++ & mask_];
+ }
+ };
+
+public:
+ CHNetDataStreamChunked(CHNetDelegate *ch);
+ ~CHNetDataStreamChunked(void) override;
+ void WriteBuf(const char *buf, size_t len);
+ inline void StopWriting(void)
+ {
+ struct buf b;
+
+ b.buf_ = nullptr;
+ b.len_ = -(size_t)1UL;
+ buf_lock_.lock();
+ buf_queue_.push(b);
+ buf_lock_.unlock();
+ }
+
+private:
+ CHNetDelegate *ch_;
+ size_t cur_pos_;
+ std::mutex buf_lock_;
+ std::condition_variable buf_cond_;
+ struct queue<struct buf> buf_queue_;
+ std::atomic<bool> being_waited_;
+ bool is_eof_;
+
+ int InitInternal(const net::NetLogWithSource& net_log) override;
+ int ReadInternal(net::IOBuffer *buf, int buf_len) override;
+ void ResetInternal() override;
+
+};
+
struct CHNCallback {
CHNCallback(void);
~CHNCallback(void);
@@ -97,6 +193,10 @@ public:
void OnResponseStarted(URLRequest *url_req, int net_error) override;
void OnReadCompleted(URLRequest *url_req, int bytes_read) override;
+ void StartChunkedBody(void);
+ void Write(const char *buf, size_t len);
+ void StopChunkedBody(void);
+
inline int read_ret(void) { return read_ret_.load(); }
inline const char *read_buf(void) { return read_buf_->data(); }
inline bool started(void) { return started_; }
@@ -108,7 +208,8 @@ public:
}
struct CHNCallback cb;
-
+ std::atomic<CHNetDataStreamChunked *> chuncked_body_;
+ std::unique_ptr<CHNetDataStreamChunked> chuncked_body_up_;
private:
void _Start(Waiter *sig);
void ConstructURLRequest(Waiter *sig);
@@ -151,11 +252,14 @@ public:
const char *GetErrorStr(void);
void SetMethod(const char *method);
void SetPayload(const char *buf, size_t len);
+ void StartChunkedBody(void);
+ void StopChunkedBody(void);
void SetRequestHeader(const char *key, const char *val,
bool overwrite = true);
int read_ret(void);
const char *read_buf(void);
+ void Write(const char *buf, size_t len);
#ifdef __FOR_CHROMIUM_INTERNAL
inline net::CHNetDelegate *ch(void) { return ch_; }
diff --git a/tests/cpp/ring.cc b/tests/cpp/ring.cc
index 8bedef5..a65457d 100644
--- a/tests/cpp/ring.cc
+++ b/tests/cpp/ring.cc
@@ -247,15 +247,54 @@ static void test_chnet_ring_multiple(bool start_before_read)
_test_chnet_ring_multiple(start_before_read, &ring);
}
+static void test_chnet_chunked_body(void)
+{
+ constexpr static const char buf[] = "AAAA Hello World!\n";
+ CNRingCtx ring(4096);
+ CNRingSQE *sqe;
+ CNRingCQE *cqe;
+ CHNet *ch;
+ int i;
+
+ ch = new CHNet;
+ ch->SetURL("http://127.0.0.1:8000/index.php?action=body");
+ ch->SetMethod("POST");
+ ch->StartChunkedBody();
+ for (i = 0; i < 3; i++)
+ ch->Write(buf, strlen(buf));
+
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe->PrepRead(ch, 1024);
+ sqe->SetUserData(9999);
+ assert(ring.SubmitSQE() == 1);
+
+ for (i = 0; i < 3; i++)
+ ch->Write(buf, strlen(buf));
+ ch->StopChunkedBody();
+
+ ring.WaitCQE(1);
+ cqe = ring.HeadCQE();
+ assert(cqe->op == CNRING_OP_READ);
+ // assert(cqe->res == sizeof(buf));
+ assert(cqe->user_data == 9999);
+ // assert(!strncmp(ch->read_buf(), buf, sizeof(buf)));
+ write(1, ch->read_buf(), cqe->res);
+ ring.CQAdvance(1);
+
+ delete ch;
+}
+
int main(void)
{
test_nop();
chnet_global_init();
- // test_chnet_ring_single();
+ test_chnet_ring_single();
test_chnet_ring_set_header();
- // test_chnet_ring_single_post();
- // test_chnet_ring_multiple(false);
- // test_chnet_ring_multiple(true);
+ test_chnet_ring_single_post();
+ test_chnet_ring_multiple(false);
+ test_chnet_ring_multiple(true);
+ test_chnet_chunked_body();
chnet_global_stop();
return 0;
}
--
Ammar Faizi
next prev parent reply other threads:[~2022-08-21 11:25 UTC|newest]
Thread overview: 28+ messages / expand[flat|nested] mbox.gz Atom feed top
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 01/22] chnet: Add initial request body support Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 02/22] chnet: node: Add set_user_data support on SQE Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 03/22] tests/js/ring: Update the unit test to utilize set_user_data Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 04/22] binding.gyp: Add `-ggdb3` flag for better debugging experience Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 05/22] binding.gyp: Add `-Wno-enum-constexpr-conversion` flag Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 06/22] chnet: node: Add set_method function to set HTTP method Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 07/22] chnet: node: Add get_error function to return the error string Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 08/22] chnet: node: Add set_payload function to set HTTP req body Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 09/22] tests/js/ring: Add simple HTTP POST request example in NodeJS Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 10/22] chnet: Split construct URL req creation into a new function Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 11/22] chnet: Add set request header support Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 12/22] chnet: node: Fix unused variable warning Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 13/22] chnet: node: Add set request header function in NodeJS Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 14/22] tests/js/ring: Add more set header function test Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 15/22] chnet: node: Don't use static counter for data ID Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 16/22] tests/js/ring: Add JavaScript class wrapper example Ammar Faizi
2022-08-21 11:24 ` Ammar Faizi [this message]
2022-08-21 22:13 ` [PATCH v1 17/22] chnet: Initial chunked request body support Alviro Iskandar Setiawan
2022-08-22 3:08 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 18/22] chnet: Rework the chunked request body interface Ammar Faizi
2022-08-21 22:20 ` Alviro Iskandar Setiawan
2022-08-21 11:24 ` [PATCH v1 19/22] chnet: ring: Refactor the ring completely Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 20/22] chnet: Use busy-waiting for signal waiter Ammar Faizi
2022-08-21 22:29 ` Alviro Iskandar Setiawan
2022-08-21 11:24 ` [PATCH v1 21/22] chnet: ring: Bump max_entry to 2G Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 22/22] tests/cpp: Delete basic.cpp as it's no longer relevant Ammar Faizi
2022-08-21 22:21 ` Alviro Iskandar Setiawan
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox