From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on gnuweeb.org X-Spam-Level: X-Spam-Status: No, score=-0.8 required=5.0 tests=ALL_TRUSTED,DKIM_SIGNED, DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF,NO_DNS_FOR_FROM, NUMERIC_HTTP_ADDR,URIBL_BLOCKED,WEIRD_PORT autolearn=no autolearn_force=no version=3.4.6 Received: from localhost.localdomain (unknown [180.246.144.41]) by gnuweeb.org (Postfix) with ESMTPSA id 3AD6F80A2B; Sun, 21 Aug 2022 11:25:53 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=gnuweeb.org; s=default; t=1661081154; bh=OQu5EZsGjjfuaEIzBV4GnDu6kQfVn8km55BgROlDLv0=; h=From:To:Cc:Subject:Date:In-Reply-To:References:From; b=e998kiS6ll8fZ856EFDNZh9fgGsS9SxlZ1C5/+fAgORP4zsJfHsyWuhPFBsqqhgOC eRbpXLuFCPUqrpZpNBSTKlqhxWNi8U01iyImXdTxNFJIKNl+e8XJEarUYu/W0/dFSw I40tknMQBwt6IiipZYT+O5tPDIAXyZ6nGX4tYvFcbtzN4h8PadSTBb9bmOWVmKAFiD 0uLhdN6q41WTO2KG+4Nqf4TcU8CGZ/+kxlbEopLZAF3uRqsQkDuL+QC6QIOl1qF1u8 n/k2dadQQOV6/WpQwOTrwV05WV01j/JlxA4bEJXsFj4ZB/84JOIXEypSPmcR7RggEs qbJuRhoGvXThA== From: Ammar Faizi To: Alviro Iskandar Setiawan Cc: Ammar Faizi , Muhammad Rizki , Kanna Scarlet , GNU/Weeb Mailing List Subject: [PATCH v1 17/22] chnet: Initial chunked request body support Date: Sun, 21 Aug 2022 18:24:48 +0700 Message-Id: <20220821112453.3026255-18-ammarfaizi2@gnuweeb.org> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20220821112453.3026255-1-ammarfaizi2@gnuweeb.org> References: <20220821112453.3026255-1-ammarfaizi2@gnuweeb.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Signed-off-by: Ammar Faizi --- chnet/chnet.cc | 155 +++++++++++++++++++++++++++++++++++++++++++++- chnet/chnet.h | 106 ++++++++++++++++++++++++++++++- tests/cpp/ring.cc | 47 ++++++++++++-- 3 files changed, 302 insertions(+), 6 deletions(-) diff --git a/chnet/chnet.cc b/chnet/chnet.cc index 915dfea..1ba4d32 100644 --- a/chnet/chnet.cc +++ b/chnet/chnet.cc @@ -50,6 +50,116 @@ void CHNetDataStream::ResetInternal() { } +CHNetDataStreamChunked::CHNetDataStreamChunked(CHNetDelegate *ch): + UploadDataStream(true, 0), + ch_(ch), + buf_queue_(1024) +{ + cur_pos_ = 0; + is_eof_ = false; + being_waited_.store(false, std::memory_order_relaxed); + ch_->chuncked_body_.store(this, std::memory_order_release); +} + +CHNetDataStreamChunked::~CHNetDataStreamChunked(void) +{ + struct net::CHNetDataStreamChunked::buf *arr; + uint32_t i, len; + + ch_->chuncked_body_.store(nullptr, std::memory_order_release); + arr = buf_queue_.arr_; + len = buf_queue_.mask_ + 1; + + for (i = 0; i < len; i++) { + char *b = arr[i].buf_; + + if (b) + delete[] b; + } +} + +void CHNetDataStreamChunked::WriteBuf(const char *buf, size_t len) +{ + struct net::CHNetDataStreamChunked::buf b; + + b.buf_ = new char[len]; + b.len_ = len; + memcpy(b.buf_, buf, len); + + std::unique_lock lk(buf_lock_); + buf_queue_.push(b); + if (being_waited_.load(std::memory_order_acquire)) + buf_cond_.notify_one(); +} + +int CHNetDataStreamChunked::InitInternal(const net::NetLogWithSource& net_log) +{ + return net::OK; +} + +int CHNetDataStreamChunked::ReadInternal(net::IOBuffer *buf, int read_size_arg) +{ + size_t read_size; + char *ptr; + int ret; + + if (unlikely(read_size_arg < 0)) + return net::ERR_IO_PENDING; + + if (unlikely(is_eof_ || !read_size_arg)) + return 0; + + ret = 0; + ptr = buf->data(); + read_size = (size_t)read_size_arg; + + std::unique_lock lk(buf_lock_); + while (read_size) { + struct net::CHNetDataStreamChunked::buf *b; + size_t copy_size; + + if (unlikely(!buf_queue_.size())) { + /* + * At this point, the buffer is empty. + * + * Wait for the buffer queue to be + * filled in. + */ + being_waited_.store(true, std::memory_order_release); + buf_cond_.wait(lk, [this]{ return buf_queue_.size(); }); + being_waited_.store(false, std::memory_order_release); + } + + b = buf_queue_.front(); + if (b->len_ == -(size_t)1UL) { + is_eof_ = true; + break; + } + + copy_size = b->len_ - cur_pos_; + if (copy_size > read_size) + copy_size = read_size; + + memcpy(ptr, &b->buf_[cur_pos_], copy_size); + ptr += copy_size; + ret += copy_size; + read_size -= copy_size; + buf_queue_.head_++; + } + + if (unlikely(is_eof_)) + SetIsFinalChunk(); + + if (unlikely(!ret && !is_eof_)) + return net::ERR_IO_PENDING; + + return ret; +} + +void CHNetDataStreamChunked::ResetInternal() +{ +} + CHNCallback::CHNCallback(void): response_started_data_(nullptr), response_started_(nullptr), @@ -82,6 +192,7 @@ CHNetDelegate::CHNetDelegate(void): base::Thread::Options options(base::MessagePumpType::IO, 0); CHECK(thread_.StartWithOptions(std::move(options))); read_ret_.store(0, std::memory_order_relaxed); + chuncked_body_.store(nullptr, std::memory_order_relaxed); payload_ = nullptr; } @@ -140,7 +251,9 @@ void CHNetDelegate::_Start(Waiter *sig) url_req_->set_method(method_); sig->Signal(); - if (payload_) + if (chuncked_body_up_) + url_req_->set_upload(std::move(chuncked_body_up_)); + else if (payload_) url_req_->set_upload(std::make_unique( this, payload_, payload_len_)); @@ -218,6 +331,17 @@ int CHNetDelegate::Read(int size) return read_ret_.load(); } +void CHNetDelegate::Write(const char *buf, size_t len) +{ + CHNetDataStreamChunked *p; + + p = chuncked_body_.load(std::memory_order_acquire); + if (unlikely(!p)) + return; + + p->WriteBuf(buf, len); +} + void CHNetDelegate::OnResponseStarted(URLRequest *url_req, int net_err) { if (unlikely(net_err < 0)) @@ -243,6 +367,20 @@ void CHNetDelegate::SetError(int err_code) err_ = "chromium_net_err:" + ErrorToString(err_code); } +void CHNetDelegate::StartChunkedBody(void) +{ + chuncked_body_up_ = std::make_unique(this); +} + +void CHNetDelegate::StopChunkedBody(void) +{ + CHNetDataStreamChunked *p; + + p = chuncked_body_.load(std::memory_order_acquire); + if (likely(p)) + p->StopWriting(); +} + } /* namespace net */ CHNet::CHNet(void) @@ -293,6 +431,21 @@ void CHNet::SetRequestHeader(const char *key, const char *val, bool overwrite) ch_->SetRequestHeader(std::string(key), std::string(val), overwrite); } +void CHNet::Write(const char *buf, size_t len) +{ + ch_->Write(buf, len); +} + +void CHNet::StartChunkedBody(void) +{ + ch_->StartChunkedBody(); +} + +void CHNet::StopChunkedBody(void) +{ + ch_->StopChunkedBody(); +} + int CHNet::read_ret(void) { return ch_->read_ret(); diff --git a/chnet/chnet.h b/chnet/chnet.h index 8e995a1..d92ba6d 100644 --- a/chnet/chnet.h +++ b/chnet/chnet.h @@ -70,6 +70,102 @@ private: void ResetInternal() override; }; +class CHNetDataStreamChunked: public UploadDataStream { + +private: + struct buf { + char *buf_; + size_t len_; + }; + + template + struct queue { + struct buf *arr_; + std::atomic head_; + std::atomic tail_; + uint32_t mask_; + + inline queue(size_t want_max) + { + uint32_t max = 1; + uint32_t i; + + head_.store(0, std::memory_order_relaxed); + tail_.store(0, std::memory_order_relaxed); + + if (want_max < 4) + want_max = 4; + + while (max < want_max) + max *= 2; + + mask_ = max - 1; + arr_ = new T[max]; + for (i = 0; i < max; i++) { + arr_[i].buf_ = nullptr; + arr_[i].len_ = 0; + } + } + + inline ~queue(void) + { + delete[] arr_; + } + + inline size_t size(void) + { + if (likely(tail_ > head_)) + return tail_ - head_; + else + return head_ - tail_; + } + + inline void push(T val) + { + arr_[tail_++ & mask_] = val; + } + + inline struct buf *front(void) + { + return &arr_[head_.load() & mask_]; + } + + inline struct buf *pop(void) + { + return &arr_[head_++ & mask_]; + } + }; + +public: + CHNetDataStreamChunked(CHNetDelegate *ch); + ~CHNetDataStreamChunked(void) override; + void WriteBuf(const char *buf, size_t len); + inline void StopWriting(void) + { + struct buf b; + + b.buf_ = nullptr; + b.len_ = -(size_t)1UL; + buf_lock_.lock(); + buf_queue_.push(b); + buf_lock_.unlock(); + } + +private: + CHNetDelegate *ch_; + size_t cur_pos_; + std::mutex buf_lock_; + std::condition_variable buf_cond_; + struct queue buf_queue_; + std::atomic being_waited_; + bool is_eof_; + + int InitInternal(const net::NetLogWithSource& net_log) override; + int ReadInternal(net::IOBuffer *buf, int buf_len) override; + void ResetInternal() override; + +}; + struct CHNCallback { CHNCallback(void); ~CHNCallback(void); @@ -97,6 +193,10 @@ public: void OnResponseStarted(URLRequest *url_req, int net_error) override; void OnReadCompleted(URLRequest *url_req, int bytes_read) override; + void StartChunkedBody(void); + void Write(const char *buf, size_t len); + void StopChunkedBody(void); + inline int read_ret(void) { return read_ret_.load(); } inline const char *read_buf(void) { return read_buf_->data(); } inline bool started(void) { return started_; } @@ -108,7 +208,8 @@ public: } struct CHNCallback cb; - + std::atomic chuncked_body_; + std::unique_ptr chuncked_body_up_; private: void _Start(Waiter *sig); void ConstructURLRequest(Waiter *sig); @@ -151,11 +252,14 @@ public: const char *GetErrorStr(void); void SetMethod(const char *method); void SetPayload(const char *buf, size_t len); + void StartChunkedBody(void); + void StopChunkedBody(void); void SetRequestHeader(const char *key, const char *val, bool overwrite = true); int read_ret(void); const char *read_buf(void); + void Write(const char *buf, size_t len); #ifdef __FOR_CHROMIUM_INTERNAL inline net::CHNetDelegate *ch(void) { return ch_; } diff --git a/tests/cpp/ring.cc b/tests/cpp/ring.cc index 8bedef5..a65457d 100644 --- a/tests/cpp/ring.cc +++ b/tests/cpp/ring.cc @@ -247,15 +247,54 @@ static void test_chnet_ring_multiple(bool start_before_read) _test_chnet_ring_multiple(start_before_read, &ring); } +static void test_chnet_chunked_body(void) +{ + constexpr static const char buf[] = "AAAA Hello World!\n"; + CNRingCtx ring(4096); + CNRingSQE *sqe; + CNRingCQE *cqe; + CHNet *ch; + int i; + + ch = new CHNet; + ch->SetURL("http://127.0.0.1:8000/index.php?action=body"); + ch->SetMethod("POST"); + ch->StartChunkedBody(); + for (i = 0; i < 3; i++) + ch->Write(buf, strlen(buf)); + + sqe = ring.GetSQE(); + assert(sqe); + sqe->PrepRead(ch, 1024); + sqe->SetUserData(9999); + assert(ring.SubmitSQE() == 1); + + for (i = 0; i < 3; i++) + ch->Write(buf, strlen(buf)); + ch->StopChunkedBody(); + + ring.WaitCQE(1); + cqe = ring.HeadCQE(); + assert(cqe->op == CNRING_OP_READ); + // assert(cqe->res == sizeof(buf)); + assert(cqe->user_data == 9999); + // assert(!strncmp(ch->read_buf(), buf, sizeof(buf))); + write(1, ch->read_buf(), cqe->res); + ring.CQAdvance(1); + + delete ch; +} + int main(void) { test_nop(); chnet_global_init(); - // test_chnet_ring_single(); + test_chnet_ring_single(); test_chnet_ring_set_header(); - // test_chnet_ring_single_post(); - // test_chnet_ring_multiple(false); - // test_chnet_ring_multiple(true); + test_chnet_ring_single_post(); + test_chnet_ring_multiple(false); + test_chnet_ring_multiple(true); + test_chnet_chunked_body(); chnet_global_stop(); return 0; } -- Ammar Faizi