* [PATCH ncns v1 1/4] chnet: ring: Refactor SQE handling
2022-08-16 16:53 [PATCH ncns v1 0/4] chnet ring cleanups Ammar Faizi
@ 2022-08-16 16:53 ` Ammar Faizi
2022-08-16 16:53 ` [PATCH ncns v1 2/4] chnet: ring: Improve `PostCQE()` waiting mechanism Ammar Faizi
` (3 subsequent siblings)
4 siblings, 0 replies; 9+ messages in thread
From: Ammar Faizi @ 2022-08-16 16:53 UTC (permalink / raw)
To: Alviro Iskandar Setiawan; +Cc: Ammar Faizi, GNU/Weeb Mailing List
Make everything about SQE handling simpler and rename them to represent
the function purpose better.
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/chnet_ring.cc | 146 +++++++++++++++++++++++++-------------------
chnet/chnet_ring.h | 11 ++--
2 files changed, 88 insertions(+), 69 deletions(-)
diff --git a/chnet/chnet_ring.cc b/chnet/chnet_ring.cc
index 13a7009..966e60e 100644
--- a/chnet/chnet_ring.cc
+++ b/chnet/chnet_ring.cc
@@ -113,7 +113,6 @@ bool CNRingCtx::TryPostCQE(CNRingSQE *sqe, int64_t res)
state_->cqe_lock_.lock();
cqe = GetCQENoTailIncrement();
-
if (unlikely(!cqe)) {
NotifyCQEWaiter();
state_->cqe_lock_.unlock();
@@ -129,45 +128,94 @@ bool CNRingCtx::TryPostCQE(CNRingSQE *sqe, int64_t res)
return true;
}
-void CNRingCtx::PostCQENoFail(CNRingSQE *sqe, int64_t res)
+void CNRingCtx::PostCQE(CNRingSQE *sqe, int64_t res)
{
while (1) {
if (TryPostCQE(sqe, res))
- return;
+ break;
+
sleep(1);
}
}
-void CNRingCtx::PostCQE(CNRingSQE *sqe, int64_t res)
+/*
+ * Use this when the caller is not allowed to block.
+ */
+void CNRingCtx::CallPostCQE(CNRingSQE *sqe, int64_t res)
{
- CNRingSQE *sqe_l;
+ CNRingSQE *tmp;
- if (TryPostCQE(sqe, res))
+ if (likely(TryPostCQE(sqe, res)))
return;
- sqe_l = new CNRingSQE;
- *sqe_l = *sqe;
- state_->wq_.schedule_work([=](struct wq_job_data *data){
- CNRingSQE *sqe_tmp = (CNRingSQE *)data->data;
- this->PostCQENoFail(sqe_tmp, res);
- delete sqe_tmp;
- }, sqe_l);
+ /*
+ * The CQE slot is full, but the caller is not allowed to
+ * block, let's schedule the post in the workqueue thread.
+ */
+ tmp = new CNRingSQE;
+ *tmp = *sqe;
+ state_->wq_.schedule_work([this, res](struct wq_job_data *data){
+ CNRingSQE *sqe = (CNRingSQE *)data->data;
+
+ this->PostCQE(sqe, res);
+ delete sqe;
+ }, tmp);
+}
+
+void CNRingCtx::IssueNopSQE(CNRingSQE *sqe)
+{
+ CallPostCQE(sqe, 0);
+}
+
+static void issue_start_sqe(void *udata, net::URLRequest *url_req, int net_err)
+{
+ CNRingOpStart *sop;
+ CNRingSQE *sqe;
+
+ sqe = (CNRingSQE *)udata;
+ sop = (CNRingOpStart *)sqe->sq_data;
+ sop->ring->PostCQE(sqe, net_err);
+ delete sop;
+ delete sqe;
+}
+
+void CNRingCtx::IssueStartSQE(CNRingSQE *sqe)
+{
+ struct net::CHNCallback *cb;
+ CNRingOpStart *sop;
+ CNRingSQE *tmp;
+ int ret;
+
+ tmp = new CNRingSQE;
+ *tmp = *sqe;
+
+ sop = (CNRingOpStart *)tmp->sq_data;
+ sop->ring = this;
+
+ cb = &sop->chnet->ch()->cb;
+ cb->response_started_ = issue_start_sqe;
+ cb->response_started_data_ = tmp;
+ ret = sop->chnet->Start();
+ if (unlikely(ret)) {
+ CallPostCQE(sqe, ret);
+ delete tmp;
+ }
}
-static void handle_read_sqe(void *udata, net::URLRequest *url_req, int bytes_read)
+static void issue_read_sqe(void *udata, net::URLRequest *url_req, int read_ret)
{
CNRingOpStart *sop;
CNRingSQE *sqe;
sqe = (CNRingSQE *)udata;
sop = (CNRingOpStart *)sqe->sq_data;
- sop->ring->PostCQENoFail(sqe, bytes_read);
+ sop->ring->PostCQE(sqe, read_ret);
delete sop;
delete sqe;
}
-static void handle_read_sqe_need_start(void *udata, net::URLRequest *url_req,
- int net_err)
+static void issue_read_sqe_need_start(void *udata, net::URLRequest *url_req,
+ int net_err)
{
struct net::CHNCallback *cb;
CNRingOpRead *sop;
@@ -177,22 +225,27 @@ static void handle_read_sqe_need_start(void *udata, net::URLRequest *url_req,
sop = (CNRingOpRead *)sqe->sq_data;
if (likely(net_err == net::OK)) {
+ /*
+ * The start operation succeeds, now we can do
+ * the read operation directly without issuing
+ * extra SQE.
+ */
cb = &sop->chnet->ch()->cb;
- cb->read_completed_ = handle_read_sqe;
+ cb->read_completed_ = issue_read_sqe;
cb->read_completed_data_ = sqe;
sop->chnet->ch()->_Read(nullptr, sop->read_size);
return;
}
/*
- * Operation fails!
+ * The start operation fails, just post the CQE directly.
*/
- sop->ring->PostCQENoFail(sqe, net_err);
+ sop->ring->PostCQE(sqe, net_err);
delete sop;
delete sqe;
}
-void CNRingCtx::HandleReadSQE(CNRingSQE *sqe)
+void CNRingCtx::IssueReadSQE(CNRingSQE *sqe)
{
struct net::CHNCallback *cb;
net::CHNetDelegate *ch;
@@ -214,14 +267,14 @@ void CNRingCtx::HandleReadSQE(CNRingSQE *sqe)
* a read operation at this point, do the start
* operation first!
*/
- cb->response_started_ = handle_read_sqe_need_start;
+ cb->response_started_ = issue_read_sqe_need_start;
cb->response_started_data_ = tmp;
ret = sop->chnet->Start();
} else {
/*
* Normal read operation here.
*/
- cb->read_completed_ = handle_read_sqe;
+ cb->read_completed_ = issue_read_sqe;
cb->read_completed_data_ = tmp;
ret = sop->chnet->Read(sop->read_size);
}
@@ -232,56 +285,21 @@ void CNRingCtx::HandleReadSQE(CNRingSQE *sqe)
/*
* Aiee, the operation fails!
*/
- PostCQE(sqe, ret);
+ CallPostCQE(sqe, ret);
delete tmp;
}
-static void handle_start_sqe(void *udata, net::URLRequest *url_req, int net_err)
-{
- CNRingOpStart *sop;
- CNRingSQE *sqe;
-
- sqe = (CNRingSQE *)udata;
- sop = (CNRingOpStart *)sqe->sq_data;
- sop->ring->PostCQENoFail(sqe, net_err);
- delete sop;
- delete sqe;
-}
-
-void CNRingCtx::HandleStartSQE(CNRingSQE *sqe)
-{
- struct net::CHNCallback *cb;
- CNRingOpStart *sop;
- CNRingSQE *tmp;
- int ret;
-
- tmp = new CNRingSQE;
- *tmp = *sqe;
-
- sop = (CNRingOpStart *)tmp->sq_data;
- sop->ring = this;
-
- cb = &sop->chnet->ch()->cb;
- cb->response_started_ = handle_start_sqe;
- cb->response_started_data_ = tmp;
- ret = sop->chnet->Start();
- if (unlikely(ret)) {
- PostCQE(sqe, ret);
- delete tmp;
- }
-}
-
-void CNRingCtx::ProcessSQE(CNRingSQE *sqe)
+void CNRingCtx::IssueSQE(CNRingSQE *sqe)
{
switch (sqe->op) {
case CNRING_OP_NOP:
- PostCQE(sqe, 0);
+ IssueNopSQE(sqe);
break;
case CNRING_OP_START:
- HandleStartSQE(sqe);
+ IssueStartSQE(sqe);
break;
case CNRING_OP_READ:
- HandleReadSQE(sqe);
+ IssueReadSQE(sqe);
break;
}
}
@@ -296,7 +314,7 @@ uint32_t CNRingCtx::SubmitSQE(uint32_t to_submit)
if (unlikely(head == tail))
break;
- ProcessSQE(&sqes_[head++ & sq_mask_]);
+ IssueSQE(&sqes_[head++ & sq_mask_]);
ret++;
}
diff --git a/chnet/chnet_ring.h b/chnet/chnet_ring.h
index e4574df..b0cfc50 100644
--- a/chnet/chnet_ring.h
+++ b/chnet/chnet_ring.h
@@ -123,7 +123,7 @@ public:
uint32_t cqe_size(void);
void PostCQE(CNRingSQE *sqe, int64_t res);
- void PostCQENoFail(CNRingSQE *sqe, int64_t res);
+ void CallPostCQE(CNRingSQE *sqe, int64_t res);
inline void CQAdvance(uint32_t n)
{
@@ -155,13 +155,14 @@ public:
private:
void HandleSQE(CNRingSQE *sqe);
CNRingSQE *ConsumeSQE(void);
- void ProcessSQEWQ(CNRingSQE *sqe);
- void ProcessSQE(CNRingSQE *sqe);
bool TryPostCQE(CNRingSQE *sqe, int64_t res);
CNRingCQE *GetCQENoTailIncrement(void);
void NotifyCQEWaiter(void);
- void HandleReadSQE(CNRingSQE *sqe);
- void HandleStartSQE(CNRingSQE *sqe);
+
+ void IssueSQE(CNRingSQE *sqe);
+ void IssueNopSQE(CNRingSQE *sqe);
+ void IssueStartSQE(CNRingSQE *sqe);
+ void IssueReadSQE(CNRingSQE *sqe);
};
#define chnring_for_each_cqe(ring, head, cqe) \
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH ncns v1 2/4] chnet: ring: Improve `PostCQE()` waiting mechanism
2022-08-16 16:53 [PATCH ncns v1 0/4] chnet ring cleanups Ammar Faizi
2022-08-16 16:53 ` [PATCH ncns v1 1/4] chnet: ring: Refactor SQE handling Ammar Faizi
@ 2022-08-16 16:53 ` Ammar Faizi
2022-08-16 16:53 ` [PATCH ncns v1 3/4] chnet: ring: Make sure we are holding the lock when calling `cqe_size()` Ammar Faizi
` (2 subsequent siblings)
4 siblings, 0 replies; 9+ messages in thread
From: Ammar Faizi @ 2022-08-16 16:53 UTC (permalink / raw)
To: Alviro Iskandar Setiawan; +Cc: Ammar Faizi, GNU/Weeb Mailing List
Don't use sleep(1) in a loop to wait for the CQE slot be free, instead,
use a conditional variable to reduce the loop cycle and also to reduce
the latency because the wake up doesn't need to wait for a full second.
It just waits until it's signaled whenever the CQE slot becomes
available again.
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/chnet_ring.cc | 42 +++++++++++++++++++++++++++++++++++++++++-
chnet/chnet_ring.h | 12 +++++++-----
2 files changed, 48 insertions(+), 6 deletions(-)
diff --git a/chnet/chnet_ring.cc b/chnet/chnet_ring.cc
index 966e60e..c9a3da5 100644
--- a/chnet/chnet_ring.cc
+++ b/chnet/chnet_ring.cc
@@ -34,6 +34,7 @@ CNRingCtx::CNRingCtx(uint32_t entry)
state_ = new CNRingState(sq_max, cq_max, 2);
state_->cqe_to_wait_.store(0, std::memory_order_relaxed);
+ state_->nr_post_cqe_wait_.store(0, std::memory_order_relaxed);
sq_head_.store(0, std::memory_order_relaxed);
sq_tail_.store(0, std::memory_order_relaxed);
cq_head_.store(0, std::memory_order_relaxed);
@@ -128,16 +129,54 @@ bool CNRingCtx::TryPostCQE(CNRingSQE *sqe, int64_t res)
return true;
}
+void CNRingCtx::WaitForCQEFreeSlot(void)
+{
+ std::unique_lock<std::mutex> lock(state_->post_cqe_lock_);
+
+ state_->nr_post_cqe_wait_++;
+ state_->post_cqe_cond_.wait(lock, [this]{
+ bool ret;
+
+ state_->cqe_lock_.lock();
+ ret = !!GetCQENoTailIncrement();
+ state_->cqe_lock_.unlock();
+ return ret;
+ });
+ state_->nr_post_cqe_wait_--;
+}
+
void CNRingCtx::PostCQE(CNRingSQE *sqe, int64_t res)
{
while (1) {
if (TryPostCQE(sqe, res))
break;
- sleep(1);
+ WaitForCQEFreeSlot();
+ }
+}
+
+void CNRingCtx::NotifyWaitCQEFreeSlot(void)
+{
+ uint32_t nr_wait = state_->nr_post_cqe_wait_.load();
+
+ if (unlikely(nr_wait)) {
+ state_->post_cqe_lock_.lock();
+ if (nr_wait == 1)
+ state_->post_cqe_cond_.notify_one();
+ else
+ state_->post_cqe_cond_.notify_all();
+ state_->post_cqe_lock_.unlock();
}
}
+void CNRingCtx::CQAdvance(uint32_t n)
+{
+ state_->cqe_lock_.lock();
+ cq_head_ += n;
+ state_->cqe_lock_.unlock();
+ NotifyWaitCQEFreeSlot();
+}
+
/*
* Use this when the caller is not allowed to block.
*/
@@ -335,6 +374,7 @@ void CNRingCtx::WaitCQE(uint32_t to_wait)
if (to_wait > max_to_wait)
to_wait = max_to_wait;
+ NotifyWaitCQEFreeSlot();
if (to_wait <= cqe_size())
return;
diff --git a/chnet/chnet_ring.h b/chnet/chnet_ring.h
index b0cfc50..f41d75b 100644
--- a/chnet/chnet_ring.h
+++ b/chnet/chnet_ring.h
@@ -109,6 +109,10 @@ public:
std::mutex cqe_lock_;
std::condition_variable cqe_cond_;
std::atomic<uint32_t> cqe_to_wait_;
+
+ std::mutex post_cqe_lock_;
+ std::condition_variable post_cqe_cond_;
+ std::atomic<uint32_t> nr_post_cqe_wait_;
};
#endif
@@ -124,11 +128,7 @@ public:
void PostCQE(CNRingSQE *sqe, int64_t res);
void CallPostCQE(CNRingSQE *sqe, int64_t res);
-
- inline void CQAdvance(uint32_t n)
- {
- cq_head_ += n;
- }
+ void CQAdvance(uint32_t n);
inline CNRingCQE *HeadCQE(void)
{
@@ -158,6 +158,8 @@ private:
bool TryPostCQE(CNRingSQE *sqe, int64_t res);
CNRingCQE *GetCQENoTailIncrement(void);
void NotifyCQEWaiter(void);
+ void NotifyWaitCQEFreeSlot(void);
+ void WaitForCQEFreeSlot(void);
void IssueSQE(CNRingSQE *sqe);
void IssueNopSQE(CNRingSQE *sqe);
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 9+ messages in thread