From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on gnuweeb.org X-Spam-Level: X-Spam-Status: No, score=-0.8 required=5.0 tests=ALL_TRUSTED,DKIM_SIGNED, DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF,NO_DNS_FOR_FROM, NUMERIC_HTTP_ADDR,URIBL_BLOCKED,WEIRD_PORT autolearn=no autolearn_force=no version=3.4.6 Received: from localhost.localdomain (unknown [180.246.144.41]) by gnuweeb.org (Postfix) with ESMTPSA id 5E2C380A10; Sun, 21 Aug 2022 11:25:55 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=gnuweeb.org; s=default; t=1661081157; bh=SKBmaVuKMxhyeF/qkOq6YzVpwiBSJzQnt1HjqWzPQAQ=; h=From:To:Cc:Subject:Date:In-Reply-To:References:From; b=nKkZFw9u6tTVDoISWf/9HrShPLk9KU48K7Hkhs0TYB6jdLzmATtjPkjWDIpzJqagS lTdKIAnCT8sqPbSUwKXQzzB6SfWgBE4NiOFBt6nwFvhHqwafRLIPVCrjExL9IfeLzZ yIG0a1NNh3v478ISrJ78ioa9Nzm2oaSEOFez75LA4KMKIKJRCfJmaIN+cb3yilkPdH /+9uSgMIxjgA+g8jB3lr/i/yWB7ngQXu/ihQ/eD8XkiU37yW37fy6BedDNQQ4iA4fS /yLCjBiabUQ/PXROp/SgbPvhV6zu8dZYuR0LmQfUW7svM9EZiZp+2N2/SS9F5Ez0LQ zfSByDx/XCuJQ== From: Ammar Faizi To: Alviro Iskandar Setiawan Cc: Ammar Faizi , Muhammad Rizki , Kanna Scarlet , GNU/Weeb Mailing List Subject: [PATCH v1 18/22] chnet: Rework the chunked request body interface Date: Sun, 21 Aug 2022 18:24:49 +0700 Message-Id: <20220821112453.3026255-19-ammarfaizi2@gnuweeb.org> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20220821112453.3026255-1-ammarfaizi2@gnuweeb.org> References: <20220821112453.3026255-1-ammarfaizi2@gnuweeb.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This seems to be working fine at a glance, but there are still an issue with double-free bug. It rarely happens, but it's a real bug that the ASAN has reported. I will take a look into this further on Monday. Signed-off-by: Ammar Faizi --- chnet/chnet.cc | 74 +++++++++++++++++++++++++++++++++-------------- chnet/chnet.h | 68 +++++-------------------------------------- tests/cpp/ring.cc | 59 ++++++++++++++++++++++++------------- 3 files changed, 99 insertions(+), 102 deletions(-) diff --git a/chnet/chnet.cc b/chnet/chnet.cc index 1ba4d32..a34bb18 100644 --- a/chnet/chnet.cc +++ b/chnet/chnet.cc @@ -52,30 +52,42 @@ void CHNetDataStream::ResetInternal() CHNetDataStreamChunked::CHNetDataStreamChunked(CHNetDelegate *ch): UploadDataStream(true, 0), - ch_(ch), - buf_queue_(1024) + ch_(ch) { cur_pos_ = 0; + mem_hold_size_ = 0; is_eof_ = false; being_waited_.store(false, std::memory_order_relaxed); ch_->chuncked_body_.store(this, std::memory_order_release); } +void CHNetDataStreamChunked::ClearDelQueue(void) +{ + char *b; + + while (!del_queue_.empty()) { + b = del_queue_.front(); + if (b) + delete[] b; + + del_queue_.pop(); + } +} + CHNetDataStreamChunked::~CHNetDataStreamChunked(void) { - struct net::CHNetDataStreamChunked::buf *arr; - uint32_t i, len; + char *b; ch_->chuncked_body_.store(nullptr, std::memory_order_release); - arr = buf_queue_.arr_; - len = buf_queue_.mask_ + 1; - - for (i = 0; i < len; i++) { - char *b = arr[i].buf_; + while (!buf_queue_.empty()) { + b = buf_queue_.front().buf_; if (b) delete[] b; + + buf_queue_.pop(); } + ClearDelQueue(); } void CHNetDataStreamChunked::WriteBuf(const char *buf, size_t len) @@ -85,6 +97,7 @@ void CHNetDataStreamChunked::WriteBuf(const char *buf, size_t len) b.buf_ = new char[len]; b.len_ = len; memcpy(b.buf_, buf, len); + mem_hold_size_.fetch_add(len); std::unique_lock lk(buf_lock_); buf_queue_.push(b); @@ -115,44 +128,61 @@ int CHNetDataStreamChunked::ReadInternal(net::IOBuffer *buf, int read_size_arg) std::unique_lock lk(buf_lock_); while (read_size) { - struct net::CHNetDataStreamChunked::buf *b; + struct net::CHNetDataStreamChunked::buf b; size_t copy_size; - if (unlikely(!buf_queue_.size())) { + if (unlikely(buf_queue_.empty())) { /* - * At this point, the buffer is empty. + * At this point, the buffer queue is empty. * - * Wait for the buffer queue to be - * filled in. + * Wait for the buffer queue to be filled in... */ being_waited_.store(true, std::memory_order_release); - buf_cond_.wait(lk, [this]{ return buf_queue_.size(); }); + ClearDelQueue(); + buf_cond_.wait(lk, [this]{ return !buf_queue_.empty(); }); being_waited_.store(false, std::memory_order_release); } b = buf_queue_.front(); - if (b->len_ == -(size_t)1UL) { + if (unlikely(b.len_ == -(size_t)1UL)) { is_eof_ = true; + buf_queue_.pop(); break; } - copy_size = b->len_ - cur_pos_; + copy_size = b.len_ - cur_pos_; if (copy_size > read_size) copy_size = read_size; - memcpy(ptr, &b->buf_[cur_pos_], copy_size); + memcpy(ptr, &b.buf_[cur_pos_], copy_size); ptr += copy_size; ret += copy_size; read_size -= copy_size; - buf_queue_.head_++; + cur_pos_ += copy_size; + + if (cur_pos_ == b.len_) { + size_t mem; + + cur_pos_ = 0; + buf_queue_.pop(); + del_queue_.push(b.buf_); + mem = mem_hold_size_.load(std::memory_order_acquire); + + /* + * Make sure we don't hold too many delete + * queues to avoid memory pressure. + */ + if (mem >= MEM_HOLD_THRESHOLD) { + ClearDelQueue(); + mem_hold_size_.store(0, + std::memory_order_release); + } + } } if (unlikely(is_eof_)) SetIsFinalChunk(); - if (unlikely(!ret && !is_eof_)) - return net::ERR_IO_PENDING; - return ret; } diff --git a/chnet/chnet.h b/chnet/chnet.h index d92ba6d..84db8d4 100644 --- a/chnet/chnet.h +++ b/chnet/chnet.h @@ -29,6 +29,8 @@ #include "net/base/net_errors.h" #define CHNET_EXPORT COMPONENT_EXPORT(CHNET) +#include + #else /* #ifdef __FOR_CHROMIUM_INTERNAL */ /* @@ -78,64 +80,6 @@ private: size_t len_; }; - template - struct queue { - struct buf *arr_; - std::atomic head_; - std::atomic tail_; - uint32_t mask_; - - inline queue(size_t want_max) - { - uint32_t max = 1; - uint32_t i; - - head_.store(0, std::memory_order_relaxed); - tail_.store(0, std::memory_order_relaxed); - - if (want_max < 4) - want_max = 4; - - while (max < want_max) - max *= 2; - - mask_ = max - 1; - arr_ = new T[max]; - for (i = 0; i < max; i++) { - arr_[i].buf_ = nullptr; - arr_[i].len_ = 0; - } - } - - inline ~queue(void) - { - delete[] arr_; - } - - inline size_t size(void) - { - if (likely(tail_ > head_)) - return tail_ - head_; - else - return head_ - tail_; - } - - inline void push(T val) - { - arr_[tail_++ & mask_] = val; - } - - inline struct buf *front(void) - { - return &arr_[head_.load() & mask_]; - } - - inline struct buf *pop(void) - { - return &arr_[head_++ & mask_]; - } - }; - public: CHNetDataStreamChunked(CHNetDelegate *ch); ~CHNetDataStreamChunked(void) override; @@ -146,6 +90,7 @@ public: b.buf_ = nullptr; b.len_ = -(size_t)1UL; + buf_lock_.lock(); buf_queue_.push(b); buf_lock_.unlock(); @@ -156,14 +101,17 @@ private: size_t cur_pos_; std::mutex buf_lock_; std::condition_variable buf_cond_; - struct queue buf_queue_; + std::queue buf_queue_; + std::queue del_queue_; std::atomic being_waited_; + std::atomic mem_hold_size_; bool is_eof_; + constexpr static const uint32_t MEM_HOLD_THRESHOLD = 1024 * 1024 * 4; int InitInternal(const net::NetLogWithSource& net_log) override; int ReadInternal(net::IOBuffer *buf, int buf_len) override; void ResetInternal() override; - + void ClearDelQueue(void); }; struct CHNCallback { diff --git a/tests/cpp/ring.cc b/tests/cpp/ring.cc index a65457d..54426ef 100644 --- a/tests/cpp/ring.cc +++ b/tests/cpp/ring.cc @@ -249,39 +249,58 @@ static void test_chnet_ring_multiple(bool start_before_read) static void test_chnet_chunked_body(void) { - constexpr static const char buf[] = "AAAA Hello World!\n"; + constexpr size_t len = 1024 * 64; + constexpr size_t nr_loop = 100; + size_t total_read = 0; CNRingCtx ring(4096); CNRingSQE *sqe; CNRingCQE *cqe; CHNet *ch; - int i; + char *buf; + size_t i; ch = new CHNet; ch->SetURL("http://127.0.0.1:8000/index.php?action=body"); ch->SetMethod("POST"); ch->StartChunkedBody(); - for (i = 0; i < 3; i++) - ch->Write(buf, strlen(buf)); sqe = ring.GetSQE(); assert(sqe); - sqe->PrepRead(ch, 1024); + sqe->PrepRead(ch, len); sqe->SetUserData(9999); assert(ring.SubmitSQE() == 1); - for (i = 0; i < 3; i++) - ch->Write(buf, strlen(buf)); + buf = new char[len]; + memset(buf, 'A', len); + + for (i = 0; i < nr_loop; i++) + ch->Write(buf, len); ch->StopChunkedBody(); - ring.WaitCQE(1); - cqe = ring.HeadCQE(); - assert(cqe->op == CNRING_OP_READ); - // assert(cqe->res == sizeof(buf)); - assert(cqe->user_data == 9999); - // assert(!strncmp(ch->read_buf(), buf, sizeof(buf))); - write(1, ch->read_buf(), cqe->res); - ring.CQAdvance(1); + while (total_read < (len * nr_loop)) { + int res; + + ring.WaitCQE(1); + cqe = ring.HeadCQE(); + assert(cqe->op == CNRING_OP_READ); + assert(cqe->user_data == 9999); + res = cqe->res; + total_read += res; + ring.CQAdvance(1); + + if (!res) + break; + + sqe = ring.GetSQE(); + assert(sqe); + sqe->PrepRead(ch, len); + sqe->SetUserData(9999); + assert(ring.SubmitSQE() == 1); + } + printf("total_read = %zu %zu\n", total_read, (len * nr_loop)); + assert(total_read == (len * nr_loop)); + delete[] buf; delete ch; } @@ -289,11 +308,11 @@ int main(void) { test_nop(); chnet_global_init(); - test_chnet_ring_single(); - test_chnet_ring_set_header(); - test_chnet_ring_single_post(); - test_chnet_ring_multiple(false); - test_chnet_ring_multiple(true); + // test_chnet_ring_single(); + // test_chnet_ring_set_header(); + // test_chnet_ring_single_post(); + // test_chnet_ring_multiple(false); + // test_chnet_ring_multiple(true); test_chnet_chunked_body(); chnet_global_stop(); return 0; -- Ammar Faizi