From: Ammar Faizi <[email protected]>
To: Alviro Iskandar Setiawan <[email protected]>
Cc: Ammar Faizi <[email protected]>,
Muhammad Rizki <[email protected]>,
Kanna Scarlet <[email protected]>,
GNU/Weeb Mailing List <[email protected]>
Subject: [PATCH v1 18/22] chnet: Rework the chunked request body interface
Date: Sun, 21 Aug 2022 18:24:49 +0700 [thread overview]
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
This seems to be working fine at a glance, but there are still an issue
with double-free bug. It rarely happens, but it's a real bug that the
ASAN has reported. I will take a look into this further on Monday.
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/chnet.cc | 74 +++++++++++++++++++++++++++++++++--------------
chnet/chnet.h | 68 +++++--------------------------------------
tests/cpp/ring.cc | 59 ++++++++++++++++++++++++-------------
3 files changed, 99 insertions(+), 102 deletions(-)
diff --git a/chnet/chnet.cc b/chnet/chnet.cc
index 1ba4d32..a34bb18 100644
--- a/chnet/chnet.cc
+++ b/chnet/chnet.cc
@@ -52,30 +52,42 @@ void CHNetDataStream::ResetInternal()
CHNetDataStreamChunked::CHNetDataStreamChunked(CHNetDelegate *ch):
UploadDataStream(true, 0),
- ch_(ch),
- buf_queue_(1024)
+ ch_(ch)
{
cur_pos_ = 0;
+ mem_hold_size_ = 0;
is_eof_ = false;
being_waited_.store(false, std::memory_order_relaxed);
ch_->chuncked_body_.store(this, std::memory_order_release);
}
+void CHNetDataStreamChunked::ClearDelQueue(void)
+{
+ char *b;
+
+ while (!del_queue_.empty()) {
+ b = del_queue_.front();
+ if (b)
+ delete[] b;
+
+ del_queue_.pop();
+ }
+}
+
CHNetDataStreamChunked::~CHNetDataStreamChunked(void)
{
- struct net::CHNetDataStreamChunked::buf *arr;
- uint32_t i, len;
+ char *b;
ch_->chuncked_body_.store(nullptr, std::memory_order_release);
- arr = buf_queue_.arr_;
- len = buf_queue_.mask_ + 1;
-
- for (i = 0; i < len; i++) {
- char *b = arr[i].buf_;
+ while (!buf_queue_.empty()) {
+ b = buf_queue_.front().buf_;
if (b)
delete[] b;
+
+ buf_queue_.pop();
}
+ ClearDelQueue();
}
void CHNetDataStreamChunked::WriteBuf(const char *buf, size_t len)
@@ -85,6 +97,7 @@ void CHNetDataStreamChunked::WriteBuf(const char *buf, size_t len)
b.buf_ = new char[len];
b.len_ = len;
memcpy(b.buf_, buf, len);
+ mem_hold_size_.fetch_add(len);
std::unique_lock<std::mutex> lk(buf_lock_);
buf_queue_.push(b);
@@ -115,44 +128,61 @@ int CHNetDataStreamChunked::ReadInternal(net::IOBuffer *buf, int read_size_arg)
std::unique_lock<std::mutex> lk(buf_lock_);
while (read_size) {
- struct net::CHNetDataStreamChunked::buf *b;
+ struct net::CHNetDataStreamChunked::buf b;
size_t copy_size;
- if (unlikely(!buf_queue_.size())) {
+ if (unlikely(buf_queue_.empty())) {
/*
- * At this point, the buffer is empty.
+ * At this point, the buffer queue is empty.
*
- * Wait for the buffer queue to be
- * filled in.
+ * Wait for the buffer queue to be filled in...
*/
being_waited_.store(true, std::memory_order_release);
- buf_cond_.wait(lk, [this]{ return buf_queue_.size(); });
+ ClearDelQueue();
+ buf_cond_.wait(lk, [this]{ return !buf_queue_.empty(); });
being_waited_.store(false, std::memory_order_release);
}
b = buf_queue_.front();
- if (b->len_ == -(size_t)1UL) {
+ if (unlikely(b.len_ == -(size_t)1UL)) {
is_eof_ = true;
+ buf_queue_.pop();
break;
}
- copy_size = b->len_ - cur_pos_;
+ copy_size = b.len_ - cur_pos_;
if (copy_size > read_size)
copy_size = read_size;
- memcpy(ptr, &b->buf_[cur_pos_], copy_size);
+ memcpy(ptr, &b.buf_[cur_pos_], copy_size);
ptr += copy_size;
ret += copy_size;
read_size -= copy_size;
- buf_queue_.head_++;
+ cur_pos_ += copy_size;
+
+ if (cur_pos_ == b.len_) {
+ size_t mem;
+
+ cur_pos_ = 0;
+ buf_queue_.pop();
+ del_queue_.push(b.buf_);
+ mem = mem_hold_size_.load(std::memory_order_acquire);
+
+ /*
+ * Make sure we don't hold too many delete
+ * queues to avoid memory pressure.
+ */
+ if (mem >= MEM_HOLD_THRESHOLD) {
+ ClearDelQueue();
+ mem_hold_size_.store(0,
+ std::memory_order_release);
+ }
+ }
}
if (unlikely(is_eof_))
SetIsFinalChunk();
- if (unlikely(!ret && !is_eof_))
- return net::ERR_IO_PENDING;
-
return ret;
}
diff --git a/chnet/chnet.h b/chnet/chnet.h
index d92ba6d..84db8d4 100644
--- a/chnet/chnet.h
+++ b/chnet/chnet.h
@@ -29,6 +29,8 @@
#include "net/base/net_errors.h"
#define CHNET_EXPORT COMPONENT_EXPORT(CHNET)
+#include <queue>
+
#else /* #ifdef __FOR_CHROMIUM_INTERNAL */
/*
@@ -78,64 +80,6 @@ private:
size_t len_;
};
- template<typename T>
- struct queue {
- struct buf *arr_;
- std::atomic<uint32_t> head_;
- std::atomic<uint32_t> tail_;
- uint32_t mask_;
-
- inline queue(size_t want_max)
- {
- uint32_t max = 1;
- uint32_t i;
-
- head_.store(0, std::memory_order_relaxed);
- tail_.store(0, std::memory_order_relaxed);
-
- if (want_max < 4)
- want_max = 4;
-
- while (max < want_max)
- max *= 2;
-
- mask_ = max - 1;
- arr_ = new T[max];
- for (i = 0; i < max; i++) {
- arr_[i].buf_ = nullptr;
- arr_[i].len_ = 0;
- }
- }
-
- inline ~queue(void)
- {
- delete[] arr_;
- }
-
- inline size_t size(void)
- {
- if (likely(tail_ > head_))
- return tail_ - head_;
- else
- return head_ - tail_;
- }
-
- inline void push(T val)
- {
- arr_[tail_++ & mask_] = val;
- }
-
- inline struct buf *front(void)
- {
- return &arr_[head_.load() & mask_];
- }
-
- inline struct buf *pop(void)
- {
- return &arr_[head_++ & mask_];
- }
- };
-
public:
CHNetDataStreamChunked(CHNetDelegate *ch);
~CHNetDataStreamChunked(void) override;
@@ -146,6 +90,7 @@ public:
b.buf_ = nullptr;
b.len_ = -(size_t)1UL;
+
buf_lock_.lock();
buf_queue_.push(b);
buf_lock_.unlock();
@@ -156,14 +101,17 @@ private:
size_t cur_pos_;
std::mutex buf_lock_;
std::condition_variable buf_cond_;
- struct queue<struct buf> buf_queue_;
+ std::queue<struct buf> buf_queue_;
+ std::queue<char *> del_queue_;
std::atomic<bool> being_waited_;
+ std::atomic<size_t> mem_hold_size_;
bool is_eof_;
+ constexpr static const uint32_t MEM_HOLD_THRESHOLD = 1024 * 1024 * 4;
int InitInternal(const net::NetLogWithSource& net_log) override;
int ReadInternal(net::IOBuffer *buf, int buf_len) override;
void ResetInternal() override;
-
+ void ClearDelQueue(void);
};
struct CHNCallback {
diff --git a/tests/cpp/ring.cc b/tests/cpp/ring.cc
index a65457d..54426ef 100644
--- a/tests/cpp/ring.cc
+++ b/tests/cpp/ring.cc
@@ -249,39 +249,58 @@ static void test_chnet_ring_multiple(bool start_before_read)
static void test_chnet_chunked_body(void)
{
- constexpr static const char buf[] = "AAAA Hello World!\n";
+ constexpr size_t len = 1024 * 64;
+ constexpr size_t nr_loop = 100;
+ size_t total_read = 0;
CNRingCtx ring(4096);
CNRingSQE *sqe;
CNRingCQE *cqe;
CHNet *ch;
- int i;
+ char *buf;
+ size_t i;
ch = new CHNet;
ch->SetURL("http://127.0.0.1:8000/index.php?action=body");
ch->SetMethod("POST");
ch->StartChunkedBody();
- for (i = 0; i < 3; i++)
- ch->Write(buf, strlen(buf));
sqe = ring.GetSQE();
assert(sqe);
- sqe->PrepRead(ch, 1024);
+ sqe->PrepRead(ch, len);
sqe->SetUserData(9999);
assert(ring.SubmitSQE() == 1);
- for (i = 0; i < 3; i++)
- ch->Write(buf, strlen(buf));
+ buf = new char[len];
+ memset(buf, 'A', len);
+
+ for (i = 0; i < nr_loop; i++)
+ ch->Write(buf, len);
ch->StopChunkedBody();
- ring.WaitCQE(1);
- cqe = ring.HeadCQE();
- assert(cqe->op == CNRING_OP_READ);
- // assert(cqe->res == sizeof(buf));
- assert(cqe->user_data == 9999);
- // assert(!strncmp(ch->read_buf(), buf, sizeof(buf)));
- write(1, ch->read_buf(), cqe->res);
- ring.CQAdvance(1);
+ while (total_read < (len * nr_loop)) {
+ int res;
+
+ ring.WaitCQE(1);
+ cqe = ring.HeadCQE();
+ assert(cqe->op == CNRING_OP_READ);
+ assert(cqe->user_data == 9999);
+ res = cqe->res;
+ total_read += res;
+ ring.CQAdvance(1);
+
+ if (!res)
+ break;
+
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe->PrepRead(ch, len);
+ sqe->SetUserData(9999);
+ assert(ring.SubmitSQE() == 1);
+ }
+ printf("total_read = %zu %zu\n", total_read, (len * nr_loop));
+ assert(total_read == (len * nr_loop));
+ delete[] buf;
delete ch;
}
@@ -289,11 +308,11 @@ int main(void)
{
test_nop();
chnet_global_init();
- test_chnet_ring_single();
- test_chnet_ring_set_header();
- test_chnet_ring_single_post();
- test_chnet_ring_multiple(false);
- test_chnet_ring_multiple(true);
+ // test_chnet_ring_single();
+ // test_chnet_ring_set_header();
+ // test_chnet_ring_single_post();
+ // test_chnet_ring_multiple(false);
+ // test_chnet_ring_multiple(true);
test_chnet_chunked_body();
chnet_global_stop();
return 0;
--
Ammar Faizi
next prev parent reply other threads:[~2022-08-21 11:25 UTC|newest]
Thread overview: 28+ messages / expand[flat|nested] mbox.gz Atom feed top
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 01/22] chnet: Add initial request body support Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 02/22] chnet: node: Add set_user_data support on SQE Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 03/22] tests/js/ring: Update the unit test to utilize set_user_data Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 04/22] binding.gyp: Add `-ggdb3` flag for better debugging experience Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 05/22] binding.gyp: Add `-Wno-enum-constexpr-conversion` flag Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 06/22] chnet: node: Add set_method function to set HTTP method Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 07/22] chnet: node: Add get_error function to return the error string Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 08/22] chnet: node: Add set_payload function to set HTTP req body Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 09/22] tests/js/ring: Add simple HTTP POST request example in NodeJS Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 10/22] chnet: Split construct URL req creation into a new function Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 11/22] chnet: Add set request header support Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 12/22] chnet: node: Fix unused variable warning Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 13/22] chnet: node: Add set request header function in NodeJS Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 14/22] tests/js/ring: Add more set header function test Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 15/22] chnet: node: Don't use static counter for data ID Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 16/22] tests/js/ring: Add JavaScript class wrapper example Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 17/22] chnet: Initial chunked request body support Ammar Faizi
2022-08-21 22:13 ` Alviro Iskandar Setiawan
2022-08-22 3:08 ` Ammar Faizi
2022-08-21 11:24 ` Ammar Faizi [this message]
2022-08-21 22:20 ` [PATCH v1 18/22] chnet: Rework the chunked request body interface Alviro Iskandar Setiawan
2022-08-21 11:24 ` [PATCH v1 19/22] chnet: ring: Refactor the ring completely Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 20/22] chnet: Use busy-waiting for signal waiter Ammar Faizi
2022-08-21 22:29 ` Alviro Iskandar Setiawan
2022-08-21 11:24 ` [PATCH v1 21/22] chnet: ring: Bump max_entry to 2G Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 22/22] tests/cpp: Delete basic.cpp as it's no longer relevant Ammar Faizi
2022-08-21 22:21 ` Alviro Iskandar Setiawan
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox