From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on gnuweeb.org X-Spam-Level: X-Spam-Status: No, score=-0.8 required=5.0 tests=ALL_TRUSTED,DKIM_SIGNED, DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF,NO_DNS_FOR_FROM, NUMERIC_HTTP_ADDR,URIBL_BLOCKED,WEIRD_PORT autolearn=no autolearn_force=no version=3.4.6 Received: from localhost.localdomain (unknown [180.246.144.41]) by gnuweeb.org (Postfix) with ESMTPSA id 810F380A2B; Sun, 21 Aug 2022 11:25:57 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=gnuweeb.org; s=default; t=1661081159; bh=7Rb75LFlKJMh9utd1lcgyQy5z1bHg1cnEGKo3qHAflM=; h=From:To:Cc:Subject:Date:In-Reply-To:References:From; b=sfooAf9oYLerIAfhDKNnK0Y5/aG5GFIycZ/p7YWDw8VAAftXLKswFBYp36oh6H4tR Ta2ty5kBxN+RBvyYSa4qHJEAogULvv/thuT7VLMdozgFYMRsnUw/4Z95NsFhdjaNY4 ZTPPLDzIXNGb/wILgSw6zTPs/LyYCuLN3trqoR2QR82QP+r93rRV8uawheorHh5hAf ORYs7NqPCGRao7a/9GWeRJsLmwa0Wf+rVSzBlyozoeiRUh58Ea9ICevdrr6QTOy/QO b9GaloTUjjC5QZkuk5dLofsX3BBV2Z+RsTduyPG8u+ir6VQuJX2PPA3tNAYIvyE2Gg uyn5kicPlzn1g== From: Ammar Faizi To: Alviro Iskandar Setiawan Cc: Ammar Faizi , Muhammad Rizki , Kanna Scarlet , GNU/Weeb Mailing List Subject: [PATCH v1 19/22] chnet: ring: Refactor the ring completely Date: Sun, 21 Aug 2022 18:24:50 +0700 Message-Id: <20220821112453.3026255-20-ammarfaizi2@gnuweeb.org> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20220821112453.3026255-1-ammarfaizi2@gnuweeb.org> References: <20220821112453.3026255-1-ammarfaizi2@gnuweeb.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This commit was multiple commits squashed into one, some interesting things were: - chnet: ring: Support read operation again - chnet: ring: Support start operation again - tests/cpp/ring: Add more tests and delete old tests - chnet: node: Refactor the NodeJS interface In short, rewriting them to make it cleaner and better... Signed-off-by: Ammar Faizi --- chnet/WorkQueue.cc | 17 +- chnet/WorkQueue.h | 5 + chnet/chnet_node.cc | 453 ++++++++++++++++++++++++-------------- chnet/chnet_ring.cc | 460 +++++++++++++++++++-------------------- chnet/chnet_ring.h | 200 +++++++++-------- tests/cpp/ring.cc | 504 +++++++++++++++++++++++++------------------ tests/js/ring.js | 516 +++++++++++++++++++++++++++----------------- 7 files changed, 1260 insertions(+), 895 deletions(-) diff --git a/chnet/WorkQueue.cc b/chnet/WorkQueue.cc index 30163bc..7411123 100644 --- a/chnet/WorkQueue.cc +++ b/chnet/WorkQueue.cc @@ -33,6 +33,12 @@ WorkQueue::~WorkQueue(void) stop_ = true; + if (helper_thread_) { + jobs_cond_.notify_all(); + helper_thread_->join(); + delete helper_thread_; + } + i = nr_threads_; while (i--) { std::thread *t; @@ -44,11 +50,6 @@ WorkQueue::~WorkQueue(void) } } - if (helper_thread_) { - helper_thread_->join(); - delete helper_thread_; - } - delete[] threads_; delete[] jobs_; } @@ -99,7 +100,7 @@ void WorkQueue::_wq_thread_worker(struct wq_thread *t) noexcept t->set_task_state(WQ_INTERRUPTIBLE); while (!stop_) { __wq_thread_worker(t, lk); - auto cv_ret = jobs_cond_.wait_for(lk, 1s); + auto cv_ret = jobs_cond_.wait_for(lk, 200ms); if (unlikely(cv_ret == std::cv_status::timeout)) { if (t->idx < nr_idle_thread_) continue; @@ -174,7 +175,7 @@ void WorkQueue::wq_helper(void) _wq_helper(njob); lk.lock(); } - jobs_cond_.wait_for(lk, 10s, [this]{ return stop_; }); + jobs_cond_.wait_for(lk, 200ms, [this]{ return stop_; }); lk.unlock(); } } @@ -284,7 +285,7 @@ int64_t WorkQueue::schedule_work(wq_job_callback_t cb, void *data) noexcept } lk.lock(); - sched_idle_cond_.wait_for(lk, 10s); + sched_idle_cond_.wait_for(lk, 200ms); lk.unlock(); } diff --git a/chnet/WorkQueue.h b/chnet/WorkQueue.h index ddb3bf5..ad4f831 100644 --- a/chnet/WorkQueue.h +++ b/chnet/WorkQueue.h @@ -234,6 +234,11 @@ public: { jobs_cond_.notify_one(); } + + inline bool should_stop(void) + { + return stop_; + } }; #endif /* #ifndef CHNET_WORKQUEUE_H */ diff --git a/chnet/chnet_node.cc b/chnet/chnet_node.cc index 621d7aa..cea4ad6 100644 --- a/chnet/chnet_node.cc +++ b/chnet/chnet_node.cc @@ -4,22 +4,36 @@ class NodeCHNetRing { public: inline NodeCHNetRing(uint32_t entry): - ring_(entry) + ring_(new CNRing(entry)) { - ucount_.store(0, std::memory_order_relaxed); + id_seq_.store(0, std::memory_order_relaxed); } - CNRingCtx ring_; + inline ~NodeCHNetRing(void) + { + if (ring_) + delete ring_; + } + + inline void Close(void) + { + if (ring_) { + delete ring_; + ring_ = nullptr; + } + } + + CNRing *ring_; + std::atomic id_seq_; Napi::ObjectReference ref_; - std::atomic ucount_; }; class NodeCNRingSQE { public: - inline NodeCNRingSQE(NodeCHNetRing *ring, CNRingSQE *sqe) + inline NodeCNRingSQE(CNRingSQE *sqe, NodeCHNetRing *ring): + sqe_(sqe), + ring_(ring) { - sqe_ = sqe; - ring_ = ring; } CNRingSQE *sqe_; @@ -33,7 +47,26 @@ struct NodeSQData { class NodeCHNet { public: - CHNet ch_; + inline NodeCHNet(void): + ch_(new CHNet) + { + } + + inline ~NodeCHNet(void) + { + if (ch_) + delete ch_; + } + + inline void Close(void) + { + if (ch_) { + delete ch_; + ch_ = nullptr; + } + } + + CHNet *ch_; std::string payload_; }; @@ -51,8 +84,9 @@ static inline void obj_add_func(Napi::Env &env, Napi::Object &obj, static void CHN_SQEPrepNop(const Napi::CallbackInfo &info) { - NodeCNRingSQE *nsqe = (NodeCNRingSQE *)info.Data(); + NodeCNRingSQE *nsqe; + nsqe = (NodeCNRingSQE *)info.Data(); nsqe->sqe_->PrepNop(); } @@ -77,27 +111,6 @@ static NodeCHNet *CHN_PrepGetCHObj(Napi::Env &env, const Napi::Value &arg) ch_in_obj.As().Int64Value(); } -static void CHN_SQEPrepStart(const Napi::CallbackInfo &info) -{ - Napi::Env env = info.Env(); - NodeCNRingSQE *nsqe; - NodeCHNet *ch; - CHNet *chn; - - if (unlikely(info.Length() != 1 || !info[0].IsObject())) { - throw_js_exception(env, "sqe.prep_start must be given a chnet object argument"); - return; - } - - ch = CHN_PrepGetCHObj(env, info[0]); - if (unlikely(!ch)) - return; - - chn = &ch->ch_; - nsqe = (NodeCNRingSQE *)info.Data(); - nsqe->sqe_->PrepStart(chn); -} - static void CHN_SQEPrepRead(const Napi::CallbackInfo &info) { Napi::Env env = info.Env(); @@ -108,7 +121,13 @@ static void CHN_SQEPrepRead(const Napi::CallbackInfo &info) if (unlikely(info.Length() != 2 || !info[0].IsObject() || !info[1].IsNumber())) { - throw_js_exception(env, "sqe.prep_read must be given a chnet object argument and a read size"); + throw_js_exception(env, "sqe.PrepRead must be given a chnet object argument and a read size"); + return; + } + + nsqe = (NodeCNRingSQE *)info.Data(); + if (unlikely(!nsqe->ring_->ring_)) { + throw_js_exception(env, "ring has been closed!"); return; } @@ -117,8 +136,7 @@ static void CHN_SQEPrepRead(const Napi::CallbackInfo &info) return; read_size = info[1].As().Int32Value(); - chn = &ch->ch_; - nsqe = (NodeCNRingSQE *)info.Data(); + chn = ch->ch_; nsqe->sqe_->PrepRead(chn, read_size); } @@ -129,77 +147,135 @@ static Napi::Value CHN_SQESetUserData(const Napi::CallbackInfo &info) NodeCNRingSQE *nsqe; if (unlikely(info.Length() != 1)) { - throw_js_exception(env, "sqe.set_user_data must be given exactly one argument"); + throw_js_exception(env, "sqe.SetUserData must be given exactly one argument"); return env.Null(); } nsqe = (NodeCNRingSQE *)info.Data(); + if (unlikely(!nsqe->ring_->ring_)) { + throw_js_exception(env, "ring has been closed!"); + return env.Null(); + } + data = new struct NodeSQData; data->ring_ = nsqe->ring_; - data->id_ = nsqe->ring_->ucount_++ & nsqe->ring_->ring_.cq_mask_; + data->id_ = nsqe->ring_->id_seq_++ & nsqe->ring_->ring_->cq_mask_; nsqe->ring_->ref_.Get("__udata") .As()[data->id_] = info[0]; nsqe->sqe_->SetUserDataPtr(data); return info[0]; } -static void CHN_SQEDestruct(Napi::Env env, NodeCNRingSQE *sqe) +static void CHN_SQEPrepStart(const Napi::CallbackInfo &info) { - delete sqe; - (void)env; + Napi::Env env = info.Env(); + NodeCNRingSQE *nsqe; + NodeCHNet *ch; + CHNet *chn; + + if (unlikely(info.Length() != 1 || !info[0].IsObject())) { + throw_js_exception(env, "sqe.PrepStart must be given a chnet object argument"); + return; + } + + ch = CHN_PrepGetCHObj(env, info[0]); + if (unlikely(!ch)) + return; + + chn = ch->ch_; + nsqe = (NodeCNRingSQE *)info.Data(); + if (unlikely(!nsqe->ring_->ring_)) { + throw_js_exception(env, "ring has been closed!"); + return; + } + + nsqe->sqe_->PrepStart(chn); } -static Napi::Object BuildSQEObject(Napi::Env &env, NodeCHNetRing *ring, - CNRingSQE *sqe) +static void CHN_SQEDestruct(Napi::Env env, NodeCNRingSQE *nsqe) { - Napi::Object obj = Napi::Object::New(env); - NodeCNRingSQE *nsqe; - - sqe->SetUserData(0); - nsqe = new NodeCNRingSQE(ring, sqe); - obj_add_func(env, obj, nsqe, CHN_SQEPrepNop, "prep_nop"); - obj_add_func(env, obj, nsqe, CHN_SQEPrepStart, "prep_start"); - obj_add_func(env, obj, nsqe, CHN_SQEPrepRead, "prep_read"); - obj_add_func(env, obj, nsqe, CHN_SQESetUserData, "set_user_data"); - obj.AddFinalizer(CHN_SQEDestruct, nsqe); - return obj; + delete nsqe; } static Napi::Value CHN_RingGetSQE(const Napi::CallbackInfo &info) { Napi::Env env = info.Env(); NodeCHNetRing *ring; + NodeCNRingSQE *nsqe; + Napi::Object obj; CNRingSQE *sqe; ring = (NodeCHNetRing *)info.Data(); - sqe = ring->ring_.GetSQE(); + if (unlikely(!ring->ring_)) { + throw_js_exception(env, "ring has been closed!"); + return env.Null(); + } + + sqe = ring->ring_->GetSQE(); if (unlikely(!sqe)) return env.Null(); - return BuildSQEObject(env, ring, sqe); + sqe->SetUserData(0); + nsqe = new NodeCNRingSQE(sqe, ring); + obj = Napi::Object::New(env); + obj_add_func(env, obj, nsqe, CHN_SQEPrepNop, "PrepNop"); + obj_add_func(env, obj, nsqe, CHN_SQEPrepRead, "PrepRead"); + obj_add_func(env, obj, nsqe, CHN_SQEPrepStart, "PrepStart"); + obj_add_func(env, obj, nsqe, CHN_SQESetUserData, "SetUserData"); + obj.AddFinalizer(CHN_SQEDestruct, nsqe); + return obj; } -static Napi::Value CHN_RingSubmitSQE(const Napi::CallbackInfo &info) +static Napi::Value CHN_RingSubmit(const Napi::CallbackInfo &info) { Napi::Env env = info.Env(); NodeCHNetRing *ring; + uint32_t to_submit; + + if (info.Length() > 0) { + if (unlikely(!info[0].IsNumber())) { + throw_js_exception(env, "ring.Submit the first argument must be a positive integer"); + return env.Null(); + } + + to_submit = info[0].ToNumber().Uint32Value(); + } else { + to_submit = -1U; + } ring = (NodeCHNetRing *)info.Data(); - return Napi::Number::New(env, ring->ring_.SubmitSQE()); + if (unlikely(!ring->ring_)) { + throw_js_exception(env, "ring has been closed!"); + return env.Null(); + } + + return Napi::Number::New(env, ring->ring_->Submit(to_submit)); } -static void CHN_RingWaitCQE(const Napi::CallbackInfo &info) +static Napi::Value CHN_RingWaitCQE(const Napi::CallbackInfo &info) { - uint32_t to_wait; + Napi::Env env = info.Env(); NodeCHNetRing *ring; + uint32_t to_wait; + + if (info.Length() > 0) { + if (unlikely(!info[0].IsNumber())) { + throw_js_exception(env, "ring.WaitCQE the first argument must be a positive integer"); + return env.Null(); + } - if (unlikely(info.Length() < 1 || !info[0].IsNumber())) - to_wait = 1; - else to_wait = info[0].ToNumber().Uint32Value(); + } else { + to_wait = 1; + } ring = (NodeCHNetRing *)info.Data(); - ring->ring_.WaitCQE(to_wait); + if (unlikely(!ring->ring_)) { + throw_js_exception(env, "ring has been closed!"); + return env.Null(); + } + + return Napi::Number::New(env, ring->ring_->WaitCQE(to_wait)); } static void CHN_CQEDestruct(Napi::Env env, NodeSQData *data) @@ -209,49 +285,56 @@ static void CHN_CQEDestruct(Napi::Env env, NodeSQData *data) * when it's no longer used. */ - // data->ring_->ref_.Get("__udata").As() - // .Delete(data->id_); +#if 0 + data->ring_->ref_.Get("__udata").As() + .Delete(data->id_); +#endif + delete data; (void)env; } static Napi::Object BuildCQEObject(Napi::Env &env, CNRingCQE *cqe) { - Napi::Object obj = Napi::Object::New(env); NodeSQData *data = (NodeSQData *) cqe->GetUserDataPtr(); + Napi::Object obj = Napi::Object::New(env); - obj["res"] = Napi::Number::New(env, cqe->res); + obj["res"] = Napi::Number::New(env, cqe->res_); + if (!data) + return obj; - if (data) { - obj["user_data"] = data->ring_->ref_.Get("__udata") - .As().Get(data->id_); - obj.AddFinalizer(CHN_CQEDestruct, data); - } + obj["user_data"] = data->ring_->ref_.Get("__udata").As() + .Get(data->id_); + obj.AddFinalizer(CHN_CQEDestruct, data); return obj; } + static Napi::Value CHN_RingForEachCQE(const Napi::CallbackInfo &info) { - constexpr static const char err_msg[] = - "ring.for_each_cqe must be given exactly a callback function argument"; - Napi::Env env = info.Env(); Napi::Function callback; - uint32_t head, i; NodeCHNetRing *ring; CNRingCQE *cqe; + uint32_t head; + uint32_t tail; + uint32_t i; if (unlikely(info.Length() != 1 || !info[0].IsFunction())) { - throw_js_exception(env, err_msg); + throw_js_exception(env, "ring.ForEachCQE must be given exactly one callback function argument"); return env.Null(); } - callback = info[0].As(); + ring = (NodeCHNetRing *)info.Data(); + if (unlikely(!ring->ring_)) { + throw_js_exception(env, "ring has been closed!"); + return env.Null(); + } i = 0; - ring = (NodeCHNetRing *)info.Data(); - chnring_for_each_cqe(&ring->ring_, head, cqe) { - Napi::Object cqe_obj = BuildCQEObject(env, cqe); + callback = info[0].As(); + cnring_for_each_cqe(ring->ring_, head, tail, cqe) { + Napi::Value cqe_obj = BuildCQEObject(env, cqe); callback.Call({cqe_obj}); i++; @@ -262,104 +345,93 @@ static Napi::Value CHN_RingForEachCQE(const Napi::CallbackInfo &info) static void CHN_RingCQAdvance(const Napi::CallbackInfo &info) { - constexpr static const char err_msg[] = - "ring.cq_advance must be given exactly an integer argument"; - Napi::Env env = info.Env(); NodeCHNetRing *ring; if (unlikely(info.Length() < 1 || !info[0].IsNumber())) { - throw_js_exception(env, err_msg); + throw_js_exception(env, "ring.CQAdvance must be given exactly one positive integer argument"); return; } ring = (NodeCHNetRing *)info.Data(); - ring->ring_.CQAdvance(info[0].ToNumber().Uint32Value()); -} + if (unlikely(!ring->ring_)) { + throw_js_exception(env, "ring has been closed!"); + return; + } -static void CHN_RingDestruct(Napi::Env env, NodeCHNetRing *ring) -{ - delete ring; - (void)env; + ring->ring_->CQAdvance(info[0].ToNumber().Uint32Value()); } -static inline Napi::Value _CHN_CreateRing(Napi::Env &env, NodeCHNetRing *ring) +static Napi::Value CHN_RingGetCQEHead(const Napi::CallbackInfo &info) { - Napi::Object obj = Napi::Object::New(env); + Napi::Env env = info.Env(); + NodeCHNetRing *ring; - obj["__udata"] = Napi::Object::New(env); - obj_add_func(env, obj, ring, CHN_RingGetSQE, "get_sqe"); - obj_add_func(env, obj, ring, CHN_RingSubmitSQE, "submit_sqe"); - obj_add_func(env, obj, ring, CHN_RingWaitCQE, "wait_cqe"); - obj_add_func(env, obj, ring, CHN_RingForEachCQE, "for_each_cqe"); - obj_add_func(env, obj, ring, CHN_RingCQAdvance, "cq_advance"); - obj.AddFinalizer(CHN_RingDestruct, ring); - ring->ref_ = Napi::Persistent(obj); - return obj; + ring = (NodeCHNetRing *)info.Data(); + if (unlikely(!ring->ring_)) { + throw_js_exception(env, "ring has been closed!"); + return env.Null(); + } + + return BuildCQEObject(env, ring->ring_->GetCQEHead()); } -static Napi::Value CHN_CreateRing(const Napi::CallbackInfo &info) +static void CHN_RingClose(const Napi::CallbackInfo &info) { - constexpr static const char err_msg[] = - "chnet.create_ring must be given exactly a positive integer argument"; - - Napi::Env env = info.Env(); NodeCHNetRing *ring; - uint32_t entry; - if (unlikely(info.Length() < 1 || !info[0].IsNumber())) { - throw_js_exception(env, err_msg); - return env.Null(); - } + ring = (NodeCHNetRing *)info.Data(); + ring->Close(); +} - entry = info[0].ToNumber().Uint32Value(); - ring = new NodeCHNetRing(entry); - return _CHN_CreateRing(env, ring); +static void CHN_RingDestruct(Napi::Env env, NodeCHNetRing *ring) +{ + delete ring; } static void CHN_NetSetURL(const Napi::CallbackInfo &info) { - constexpr static const char err_msg[] = - "chnet.set_url must be given exactly 1 string argument"; - Napi::Env env = info.Env(); NodeCHNet *ch; if (unlikely(info.Length() != 1 || !info[0].IsString())) { - throw_js_exception(env, err_msg); + throw_js_exception(env, "net.SetURL must be given exactly 1 string argument"); return; } ch = (NodeCHNet *)info.Data(); + if (unlikely(!ch->ch_)) { + throw_js_exception(env, "chnet has been closed!"); + return; + } + const std::string &url = info[0].ToString().Utf8Value(); - ch->ch_.SetURL(url.c_str()); + ch->ch_->SetURL(url.c_str()); } static void CHN_NetSetMethod(const Napi::CallbackInfo &info) { - constexpr static const char err_msg[] = - "chnet.set_method must be given exactly 1 string argument"; - Napi::Env env = info.Env(); NodeCHNet *ch; if (unlikely(info.Length() != 1 || !info[0].IsString())) { - throw_js_exception(env, err_msg); + throw_js_exception(env, "net.SetMethod must be given exactly 1 string argument"); return; } ch = (NodeCHNet *)info.Data(); - const std::string &method =info[0].ToString().Utf8Value(); - ch->ch_.SetMethod(method.c_str()); + if (unlikely(!ch->ch_)) { + throw_js_exception(env, "chnet has been closed!"); + return; + } + + const std::string &method = info[0].ToString().Utf8Value(); + ch->ch_->SetMethod(method.c_str()); } static void CHN_NetSetRequestHeader(const Napi::CallbackInfo &info) { - constexpr static const char err_msg[] = - "chnet.set_request_header must be at least given 2 string arguments"; - constexpr static const char err_msg2[] = - "chnet.set_request_header can only receive 3 arguments with the third argument being a boolean"; - + constexpr static const char err_arg3[] = "chnet.SetRequestHeader can only receive 3 arguments with the third argument being a boolean"; Napi::Env env = info.Env(); bool overwrite = true; NodeCHNet *ch; @@ -369,61 +441,80 @@ static void CHN_NetSetRequestHeader(const Napi::CallbackInfo &info) nr_arg = info.Length(); if (unlikely(nr_arg < 2 || !info[0].IsString() || !info[1].IsString())) { - throw_js_exception(env, err_msg); + throw_js_exception(env, "chnet.SetRequestHeader must be at least given 2 string arguments"); return; } if (unlikely(nr_arg > 3)) { - throw_js_exception(env, err_msg2); + throw_js_exception(env, err_arg3); return; } if (nr_arg == 3) { if (unlikely(!info[2].IsBoolean())) { - throw_js_exception(env, err_msg2); + throw_js_exception(env, err_arg3); return; } overwrite = info[2].As(); } ch = (NodeCHNet *)info.Data(); + if (unlikely(!ch->ch_)) { + throw_js_exception(env, "chnet has been closed!"); + return; + } + const std::string &key = info[0].ToString().Utf8Value(); const std::string &val = info[1].ToString().Utf8Value(); - ch->ch_.SetRequestHeader(key.c_str(), val.c_str(), overwrite); + ch->ch_->SetRequestHeader(key.c_str(), val.c_str(), overwrite); } - -static Napi::Value CHN_NetReadBuf(const Napi::CallbackInfo &info) +static Napi::Value CHN_NetReadRet(const Napi::CallbackInfo &info) { Napi::Env env = info.Env(); NodeCHNet *ch; - int ret; ch = (NodeCHNet *)info.Data(); - ret = ch->ch_.read_ret(); - if (ret > 0) - return Napi::String::New(env, ch->ch_.read_buf(), (size_t)ret); + if (unlikely(!ch->ch_)) { + throw_js_exception(env, "chnet has been closed!"); + return env.Null(); + } - return env.Null(); + return Napi::Number::New(env, ch->ch_->read_ret()); } -static Napi::Number CHN_NetReadRet(const Napi::CallbackInfo &info) +static Napi::Value CHN_NetReadBuf(const Napi::CallbackInfo &info) { Napi::Env env = info.Env(); NodeCHNet *ch; + int ret; ch = (NodeCHNet *)info.Data(); - return Napi::Number::New(env, ch->ch_.read_ret()); + if (unlikely(!ch->ch_)) { + throw_js_exception(env, "chnet has been closed!"); + return env.Null(); + } + + ret = ch->ch_->read_ret(); + if (ret > 0) + return Napi::String::New(env, ch->ch_->read_buf(), (size_t)ret); + + return env.Null(); } -static Napi::Value CHN_NetGetError(const Napi::CallbackInfo &info) +static Napi::Value CHN_NetGetErrorStr(const Napi::CallbackInfo &info) { Napi::Env env = info.Env(); const char *err; NodeCHNet *ch; ch = (NodeCHNet *)info.Data(); - err = ch->ch_.GetErrorStr(); + if (unlikely(!ch->ch_)) { + throw_js_exception(env, "chnet has been closed!"); + return env.Null(); + } + + err = ch->ch_->GetErrorStr(); if (err) return Napi::String::New(env, err); @@ -432,20 +523,30 @@ static Napi::Value CHN_NetGetError(const Napi::CallbackInfo &info) static void CHN_NetSetPayload(const Napi::CallbackInfo &info) { - constexpr static const char err_msg[] = - "chnet.set_payload must be given exactly 1 string argument"; - Napi::Env env = info.Env(); NodeCHNet *ch; if (unlikely(info.Length() != 1 || !info[0].IsString())) { - throw_js_exception(env, err_msg); + throw_js_exception(env, "chnet.SetPayload must be given exactly 1 string argument"); return; } ch = (NodeCHNet *)info.Data(); + if (unlikely(!ch->ch_)) { + throw_js_exception(env, "chnet has been closed!"); + return; + } + ch->payload_ = info[0].ToString().Utf8Value(); - ch->ch_.SetPayload(ch->payload_.c_str(), ch->payload_.size()); + ch->ch_->SetPayload(ch->payload_.c_str(), ch->payload_.size()); +} + +static void CHN_NetClose(const Napi::CallbackInfo &info) +{ + NodeCHNet *ch; + + ch = (NodeCHNet *)info.Data(); + ch->Close(); } static void CHN_NetDestruct(Napi::Env env, NodeCHNet *ch) @@ -454,6 +555,42 @@ static void CHN_NetDestruct(Napi::Env env, NodeCHNet *ch) (void)env; } +static Napi::Value _CHN_CreateRing(Napi::Env env, uint32_t entry) +{ + NodeCHNetRing *ring; + Napi::Object obj; + + obj = Napi::Object::New(env); + ring = new NodeCHNetRing(entry); + obj["__udata"] = Napi::Object::New(env); + obj_add_func(env, obj, ring, CHN_RingGetSQE, "GetSQE"); + obj_add_func(env, obj, ring, CHN_RingSubmit, "Submit"); + obj_add_func(env, obj, ring, CHN_RingWaitCQE, "WaitCQE"); + obj_add_func(env, obj, ring, CHN_RingForEachCQE, "ForEachCQE"); + obj_add_func(env, obj, ring, CHN_RingCQAdvance, "CQAdvance"); + obj_add_func(env, obj, ring, CHN_RingGetCQEHead, "GetCQEHead"); + obj_add_func(env, obj, ring, CHN_RingClose, "Close"); + obj.AddFinalizer(CHN_RingDestruct, ring); + obj.Seal(); + obj.Freeze(); + ring->ref_ = Napi::Persistent(obj); + return obj; +} + +static Napi::Value CHN_CreateRing(const Napi::CallbackInfo &info) +{ + Napi::Env env = info.Env(); + uint32_t entry; + + if (unlikely(info.Length() < 1 || !info[0].IsNumber())) { + throw_js_exception(env, "chnet.CreateRing must be given exactly one positive integer argument"); + return env.Null(); + } + + entry = info[0].ToNumber().Uint32Value(); + return _CHN_CreateRing(env, entry); +} + static Napi::Object CHN_CreateNet(const Napi::CallbackInfo &info) { Napi::Env env = info.Env(); @@ -461,14 +598,14 @@ static Napi::Object CHN_CreateNet(const Napi::CallbackInfo &info) NodeCHNet *ch = new NodeCHNet; int64_t ch_ptr; - obj_add_func(env, obj, ch, CHN_NetSetURL, "set_url"); - obj_add_func(env, obj, ch, CHN_NetSetMethod, "set_method"); - obj_add_func(env, obj, ch, CHN_NetSetRequestHeader, "set_request_header"); + obj_add_func(env, obj, ch, CHN_NetSetURL, "SetURL"); + obj_add_func(env, obj, ch, CHN_NetSetMethod, "SetMethod"); + obj_add_func(env, obj, ch, CHN_NetSetRequestHeader, "SetRequestHeader"); obj_add_func(env, obj, ch, CHN_NetReadRet, "read_ret"); obj_add_func(env, obj, ch, CHN_NetReadBuf, "read_buf"); - obj_add_func(env, obj, ch, CHN_NetGetError, "get_error"); - obj_add_func(env, obj, ch, CHN_NetSetPayload, "set_payload"); - + obj_add_func(env, obj, ch, CHN_NetGetErrorStr, "GetErrorStr"); + obj_add_func(env, obj, ch, CHN_NetSetPayload, "SetPayload"); + obj_add_func(env, obj, ch, CHN_NetClose, "Close"); ch_ptr = (int64_t) (intptr_t) ch; obj["__ch"] = Napi::Number::New(env, ch_ptr); obj.AddFinalizer(CHN_NetDestruct, ch); @@ -480,8 +617,10 @@ static Napi::Object CHN_CreateNet(const Napi::CallbackInfo &info) static Napi::Object CHN_GlobalInit(Napi::Env env, Napi::Object exports) { chnet_global_init(); - obj_add_func(env, exports, NULL, CHN_CreateRing, "create_ring"); - obj_add_func(env, exports, NULL, CHN_CreateNet, "create_net"); + obj_add_func(env, exports, NULL, CHN_CreateRing, "CreateRing"); + obj_add_func(env, exports, NULL, CHN_CreateNet, "CreateNet"); + exports.Seal(); + exports.Freeze(); return exports; } NODE_API_MODULE(chnet, CHN_GlobalInit); diff --git a/chnet/chnet_ring.cc b/chnet/chnet_ring.cc index a5f3094..b858421 100644 --- a/chnet/chnet_ring.cc +++ b/chnet/chnet_ring.cc @@ -8,15 +8,14 @@ #include using namespace std::chrono_literals; -#include -CNRingCtx::CNRingCtx(uint32_t entry) +CNRing::CNRing(uint32_t entry) { uint32_t sq_max; uint32_t cq_max; - if (entry < 16) - entry = 16; + if (entry < 2) + entry = 2; if (entry > 1048576) entry = 1048576; @@ -28,372 +27,353 @@ CNRingCtx::CNRingCtx(uint32_t entry) sq_mask_ = sq_max - 1; cq_mask_ = cq_max - 1; - - sqes_ = new CNRingSQE[sq_max + 1]; - cqes_ = new CNRingCQE[cq_max + 1]; - state_ = new CNRingState(sq_max, cq_max, 2); - - state_->cqe_to_wait_.store(0, std::memory_order_relaxed); - state_->nr_post_cqe_wait_.store(0, std::memory_order_relaxed); sq_head_.store(0, std::memory_order_relaxed); sq_tail_.store(0, std::memory_order_relaxed); cq_head_.store(0, std::memory_order_relaxed); cq_tail_.store(0, std::memory_order_relaxed); + cq_max_to_wait_.store(0, std::memory_order_relaxed); + sqes_ = new CNRingSQE[sq_max]; + cqes_ = new CNRingCQE[cq_max]; + state_ = new CNRingState(4096, 8192, 1); } -CNRingCtx::~CNRingCtx(void) +CNRing::~CNRing(void) { + state_->stop_ = true; + NotifyFreeCQEWaiter(true); delete state_; delete[] sqes_; delete[] cqes_; } -static inline uint32_t big_min_small(uint32_t a, uint32_t b) -{ - if (a < b) - return b - a; - else - return a - b; -} - -uint32_t CNRingCtx::sqe_size(void) -{ - return big_min_small(sq_head_.load(), sq_tail_.load()); -} - -uint32_t CNRingCtx::cqe_size(void) -{ - return big_min_small(cq_head_.load(), cq_tail_.load()); -} - -CNRingSQE *CNRingCtx::GetSQE(void) -{ - uint32_t head = sq_head_.load(); - uint32_t tail = sq_tail_.load(); - uint32_t mask = sq_mask_; - CNRingSQE *sqe; - - if (unlikely(big_min_small(head, tail) > mask)) - return nullptr; - - sqe = &sqes_[tail & mask]; - sq_tail_++; - return sqe; -} - -CNRingCQE *CNRingCtx::GetCQENoTailIncrement(void) - __must_hold(&state_->cqe_lock_) +CNRingState::CNRingState(uint32_t nr_thread, uint32_t nr_jobs, + uint32_t nr_idle_thread): + wq_(nr_thread, nr_jobs, nr_idle_thread) { - uint32_t head = cq_head_.load(); - uint32_t tail = cq_tail_.load(); - uint32_t mask = cq_mask_; - - if (unlikely(big_min_small(head, tail) > mask)) - return nullptr; - - return &cqes_[tail & mask]; + cqe_to_wait_.store(0, std::memory_order_relaxed); + nr_free_cqe_slot_waiter_.store(0, std::memory_order_relaxed); + stop_ = false; + wq_.run(); } -void CNRingCtx::NotifyCQEWaiter(void) +void CNRing::NotifyCQEWaiter(void) __must_hold(&state_->cqe_lock_) { - uint32_t to_wait = state_->cqe_to_wait_.load(); + uint32_t to_wait = state_->cqe_to_wait_.load(std::memory_order_acquire); if (!to_wait) return; - if (to_wait > cqe_size()) + if (to_wait > CQESize()) return; - state_->cqe_cond_.notify_one(); + state_->cqe_to_wait_cond_.notify_one(); } -bool CNRingCtx::TryPostCQE(CNRingSQE *sqe, int64_t res) +void CNRing::NotifyFreeCQEWaiter(bool force) { - CNRingCQE *cqe; - - state_->cqe_lock_.lock(); - cqe = GetCQENoTailIncrement(); - if (unlikely(!cqe)) { - NotifyCQEWaiter(); - state_->cqe_lock_.unlock(); - return false; + uint32_t n; + + n = state_->nr_free_cqe_slot_waiter_.load(std::memory_order_acquire); + if (unlikely(n > 0 || force)) { + std::condition_variable *cond; + std::mutex *mutex; + + mutex = &state_->wait_for_a_free_cqe_slot_lock_; + cond = &state_->wait_for_a_free_cqe_slot_cond_; + mutex->lock(); + if (n == 1) + cond->notify_one(); + else + cond->notify_all(); + mutex->unlock(); } - - cqe->op = sqe->op; - cqe->user_data = sqe->user_data; - cqe->res = res; - cq_tail_++; - NotifyCQEWaiter(); - state_->cqe_lock_.unlock(); - return true; } -void CNRingCtx::WaitForCQEFreeSlot(void) +void CNRing::SleepToWaitForAFreeCQE(void) { - std::unique_lock lock(state_->post_cqe_lock_); + std::unique_lock lk(state_->wait_for_a_free_cqe_slot_lock_); + std::atomic *nr_waiter = &state_->nr_free_cqe_slot_waiter_; - state_->nr_post_cqe_wait_++; - state_->post_cqe_cond_.wait(lock, [this]{ - bool ret; + (*nr_waiter)++; + state_->wait_for_a_free_cqe_slot_cond_.wait(lk, [this]{ + uint32_t ret; state_->cqe_lock_.lock(); - ret = !!GetCQENoTailIncrement(); + ret = CQESize(); state_->cqe_lock_.unlock(); - return ret; + return ret < (cq_mask_ + 1) || state_->should_stop(); }); - state_->nr_post_cqe_wait_--; + (*nr_waiter)--; } -void CNRingCtx::PostCQE(CNRingSQE *sqe, int64_t res) +void CNRing::PostCQE(const CNRingSQE *sqe, int64_t res) { while (1) { + if (unlikely(state_->should_stop())) + return; + if (TryPostCQE(sqe, res)) - break; + return; - WaitForCQEFreeSlot(); + SleepToWaitForAFreeCQE(); } } -void CNRingCtx::NotifyWaitCQEFreeSlot(void) +bool CNRing::TryPostCQE(const CNRingSQE *sqe, int64_t res) { - uint32_t nr_wait = state_->nr_post_cqe_wait_.load(); + bool ret = false; + CNRingCQE *cqe; + uint32_t mask; - if (unlikely(nr_wait)) { - state_->post_cqe_lock_.lock(); - if (nr_wait == 1) - state_->post_cqe_cond_.notify_one(); - else - state_->post_cqe_cond_.notify_all(); - state_->post_cqe_lock_.unlock(); + state_->cqe_lock_.lock(); + mask = cq_mask_; + + if (likely(CQESize() < (mask + 1))) { + cqe = &cqes_[cq_tail_++ & mask]; + ret = true; + cqe->op_ = sqe->op_; + cqe->res_ = res; + cqe->user_data_ = sqe->user_data_; + NotifyCQEWaiter(); } -} -void CNRingCtx::CQAdvance(uint32_t n) -{ - state_->cqe_lock_.lock(); - cq_head_ += n; state_->cqe_lock_.unlock(); - NotifyWaitCQEFreeSlot(); + return ret; } -/* - * Use this when the caller is not allowed to block. - */ -void CNRingCtx::CallPostCQE(CNRingSQE *sqe, int64_t res) +bool CNRing::PostCQENoSleep(const CNRingSQE *sqe, int64_t res) { - CNRingSQE *tmp; - if (likely(TryPostCQE(sqe, res))) - return; + return true; - /* - * The CQE slot is full, but the caller is not allowed to - * block, let's schedule the post in the workqueue thread. - */ - tmp = new CNRingSQE; - *tmp = *sqe; - state_->wq_.schedule_work([this, res](struct wq_job_data *data){ - CNRingSQE *sqe = (CNRingSQE *)data->data; - - this->PostCQE(sqe, res); - delete sqe; - }, tmp); + auto func = [this, res, copy_sqe = *sqe](auto *x){ + this->PostCQE(©_sqe, res); + }; + return state_->wq_.try_schedule_work(std::move(func), nullptr) >= 0; } -void CNRingCtx::IssueNopSQE(CNRingSQE *sqe) +bool CNRing::IssueSQENop(const CNRingSQE *sqe) { - CallPostCQE(sqe, 0); + return PostCQENoSleep(sqe, 0); } -static void issue_start_sqe(void *udata, net::URLRequest *url_req, int net_err) +static inline void post_sqe_read(CNRing *ring, const CNRingSQE *sqe, int size) { - CNRingOpStart *sop; - CNRingSQE *sqe; - - sqe = (CNRingSQE *)udata; - sop = (CNRingOpStart *)sqe->sq_data; - sop->ring->PostCQE(sqe, net_err); - delete sop; - delete sqe; + ring->PostCQE(sqe, size); } -void CNRingCtx::IssueStartSQE(CNRingSQE *sqe) +static int issue_sqe_read(CNRing *ring, const CNRingSQE *sqe) { + CHNet *ch = sqe->sq_start_.ch_; struct net::CHNCallback *cb; - CNRingOpStart *sop; - CNRingSQE *tmp; - int ret; - tmp = new CNRingSQE; - *tmp = *sqe; + auto f = [ring, sqe_cp = *sqe](void *u, net::URLRequest *ur, int size){ + post_sqe_read(ring, &sqe_cp, size); + }; - sop = (CNRingOpStart *)tmp->sq_data; - sop->ring = this; - - cb = &sop->chnet->ch()->cb; - cb->response_started_ = issue_start_sqe; - cb->response_started_data_ = tmp; - ret = sop->chnet->Start(); - if (unlikely(ret)) { - CallPostCQE(sqe, ret); - delete tmp; - } -} - -static void issue_read_sqe(void *udata, net::URLRequest *url_req, int read_ret) -{ - CNRingOpStart *sop; - CNRingSQE *sqe; - - sqe = (CNRingSQE *)udata; - sop = (CNRingOpStart *)sqe->sq_data; - sop->ring->PostCQE(sqe, read_ret); - delete sop; - delete sqe; + cb = &ch->ch()->cb; + cb->read_completed_ = std::move(f); + return ch->Read(sqe->sq_read_.size_); } -static void issue_read_sqe_need_start(void *udata, net::URLRequest *url_req, - int net_err) +static void _issue_sqe_read_need_start(CNRing *ring, const CNRingSQE *sqe, + int net_err) { + CHNet *ch = sqe->sq_start_.ch_; struct net::CHNCallback *cb; - CNRingOpRead *sop; - CNRingSQE *sqe; - sqe = (CNRingSQE *)udata; - sop = (CNRingOpRead *)sqe->sq_data; - - if (likely(net_err == net::OK)) { - /* - * The start operation succeeds, now we can do - * the read operation directly without issuing - * extra SQE. - */ - cb = &sop->chnet->ch()->cb; - cb->read_completed_ = issue_read_sqe; - cb->read_completed_data_ = sqe; - sop->chnet->ch()->_Read(nullptr, sop->read_size); + if (unlikely(net_err != net::OK)) { + ring->PostCQE(sqe, net_err); return; } + auto f = [ring, sqe_cp = *sqe](void *u, net::URLRequest *ur, int size){ + post_sqe_read(ring, &sqe_cp, size); + }; + /* - * The start operation fails, just post the CQE directly. + * The start operation succeeds, now we can do + * the read operation directly without issuing + * extra SQE. */ - sop->ring->PostCQE(sqe, net_err); - delete sop; - delete sqe; + cb = &ch->ch()->cb; + cb->read_completed_ = std::move(f); + ch->ch()->_Read(nullptr, sqe->sq_read_.size_); } -void CNRingCtx::IssueReadSQE(CNRingSQE *sqe) +static int issue_sqe_read_need_start(CNRing *ring, const CNRingSQE *sqe) { + CHNet *ch = sqe->sq_start_.ch_; struct net::CHNCallback *cb; - net::CHNetDelegate *ch; - CNRingOpRead *sop; - CNRingSQE *tmp; - int ret; - tmp = new CNRingSQE; - *tmp = *sqe; + auto f = [ring, sqe_cp = *sqe](void *u, auto *ur, int net_err){ + _issue_sqe_read_need_start(ring, &sqe_cp, net_err); + }; + + cb = &ch->ch()->cb; + cb->response_started_ = std::move(f); + return ch->Start(); +} - sop = (CNRingOpRead *)tmp->sq_data; - sop->ring = this; +bool CNRing::IssueSQERead(const CNRingSQE *sqe) +{ + int ret; - ch = sop->chnet->ch(); - cb = &ch->cb; - if (unlikely(!ch->started())) { + if (!sqe->sq_start_.ch_->ch()->started()) { /* * The request hasn't been started, we can't perform * a read operation at this point, do the start * operation first! */ - cb->response_started_ = issue_read_sqe_need_start; - cb->response_started_data_ = tmp; - ret = sop->chnet->Start(); + ret = issue_sqe_read_need_start(this, sqe); } else { /* * Normal read operation here. */ - cb->read_completed_ = issue_read_sqe; - cb->read_completed_data_ = tmp; - ret = sop->chnet->Read(sop->read_size); + ret = issue_sqe_read(this, sqe); } if (likely(ret >= 0)) - return; + return true; - /* - * Aiee, the operation fails! - */ - CallPostCQE(sqe, ret); - delete tmp; + return PostCQENoSleep(sqe, ret); +} + +bool CNRing::IssueSQEStart(const CNRingSQE *sqe) +{ + CHNet *ch = sqe->sq_start_.ch_; + struct net::CHNCallback *cb; + int ret; + + auto f = [this, sqe_cp = *sqe](void *u, auto *ur, int net_err){ + this->PostCQE(&sqe_cp, net_err); + }; + + cb = &ch->ch()->cb; + cb->response_started_ = std::move(f); + ret = ch->Start(); + + if (likely(ret >= 0)) + return true; + + return PostCQENoSleep(sqe, ret); } -void CNRingCtx::IssueSQE(CNRingSQE *sqe) +void CNRing::IssueSQE(const CNRingSQE *sqe) { - switch (sqe->op) { + bool ok = false; + + switch (sqe->op_) { case CNRING_OP_NOP: - IssueNopSQE(sqe); - break; - case CNRING_OP_START: - IssueStartSQE(sqe); + ok = IssueSQENop(sqe); break; case CNRING_OP_READ: - IssueReadSQE(sqe); + ok = IssueSQERead(sqe); + break; + case CNRING_OP_START: + ok = IssueSQEStart(sqe); break; } + + if (likely(ok)) + cq_max_to_wait_++; } -uint32_t CNRingCtx::SubmitSQE(uint32_t to_submit) +uint32_t CNRing::Submit(uint32_t to_submit) { - uint32_t head = sq_head_.load(); - uint32_t tail = sq_tail_.load(); - uint32_t ret = 0; + uint32_t head; + uint32_t tail; + uint32_t mask; + uint32_t ret; + + if (unlikely(!to_submit)) + return 0; + + state_->sqe_lock_.lock(); + head = sq_head_.load(std::memory_order_acquire); + tail = sq_tail_.load(std::memory_order_acquire); + mask = sq_mask_; + ret = 0; while (to_submit--) { if (unlikely(head == tail)) break; - IssueSQE(&sqes_[head++ & sq_mask_]); + IssueSQE(&sqes_[head++ & mask]); ret++; } - if (ret) - sq_head_.fetch_add(ret, std::memory_order_release); + sq_head_.store(head, std::memory_order_release); + state_->sqe_lock_.unlock(); + return ret; +} + +CNRingSQE *CNRing::GetSQE(void) +{ + CNRingSQE *sqe = nullptr; + + state_->sqe_lock_.lock(); + if (likely(SQESize() < (sq_mask_ + 1))) + sqe = &sqes_[sq_tail_++ & sq_mask_]; + + state_->sqe_lock_.unlock(); + return sqe; +} + +uint32_t CNRing::_WaitCQE(uint32_t to_wait, uint32_t timeout_ms, + std::unique_lock &lock) + __must_hold(&state_->cqe_lock_) +{ + std::atomic *cqe_to_wait = &state_->cqe_to_wait_; + std::condition_variable *cond = &state_->cqe_to_wait_cond_; + uint32_t ret; + + cqe_to_wait->store(to_wait, std::memory_order_release); + auto callback = [this, to_wait, &ret]{ + ret = CQESize(); + return ret >= to_wait; + }; + + if (timeout_ms == -1U) + cond->wait(lock, std::move(callback)); + else + cond->wait_for(lock, timeout_ms*1ms, std::move(callback)); + + cqe_to_wait->store(0, std::memory_order_release); return ret; } -void CNRingCtx::WaitCQE(uint32_t to_wait) +uint32_t CNRing::WaitCQE(uint32_t to_wait, uint32_t timeout_ms) { uint32_t max_to_wait; - uint32_t cq_size; + uint32_t ret; if (unlikely(!to_wait)) - return; + return 0; + + max_to_wait = cq_max_to_wait_.load(std::memory_order_acquire); + if (unlikely(!max_to_wait)) + return 0; - max_to_wait = cq_mask_; if (to_wait > max_to_wait) to_wait = max_to_wait; - std::unique_lock cqe_lock(state_->cqe_lock_); - cq_size = cqe_size(); - - if (cq_size < cq_mask_ + 1) - NotifyWaitCQEFreeSlot(); + std::unique_lock lock(state_->cqe_lock_); - if (to_wait <= cq_size) - return; + ret = CQESize(); + if (unlikely(ret >= to_wait)) + return ret; - state_->cqe_to_wait_.store(to_wait); - state_->cqe_cond_.wait(cqe_lock, [this, to_wait]{ - return to_wait <= cqe_size(); - }); - state_->cqe_to_wait_.store(0); + return _WaitCQE(to_wait, timeout_ms, lock); } -CNRingState::CNRingState(uint32_t nr_thread, uint32_t nr_jobs, - uint32_t nr_idle_thread): - wq_(nr_thread, nr_jobs, nr_idle_thread) +void CNRing::CQAdvance(uint32_t n) { - wq_.run(); + state_->cqe_lock_.lock(); + cq_head_ += n; + cq_max_to_wait_ -= n; + state_->cqe_lock_.unlock(); + NotifyFreeCQEWaiter(); } diff --git a/chnet/chnet_ring.h b/chnet/chnet_ring.h index f41d75b..0630050 100644 --- a/chnet/chnet_ring.h +++ b/chnet/chnet_ring.h @@ -8,169 +8,193 @@ #include "common.h" #ifdef __FOR_CHROMIUM_INTERNAL -#include - /* * Chromium internal includes. */ #include "base/component_export.h" #include "WorkQueue.h" +#include #define CHNET_EXPORT COMPONENT_EXPORT(CHNET) + #else /* #ifdef __FOR_CHROMIUM_INTERNAL */ + #define CHNET_EXPORT -#endif + +#endif /* #ifdef __FOR_CHROMIUM_INTERNAL */ + enum CNRingOp { CNRING_OP_NOP = 0, - CNRING_OP_START = 1, - CNRING_OP_READ = 2, + CNRING_OP_READ = 1, + CNRING_OP_START = 2, + CNRING_OP_WRITE = 3, }; -class CNRingCtx; - -struct CHNET_EXPORT CNRingOpStart { - CHNet *chnet; - CNRingCtx *ring; +struct CHNET_EXPORT CNRingOpRead { + CHNet *ch_; + uint32_t size_; }; -struct CHNET_EXPORT CNRingOpRead { - CHNet *chnet; - CNRingCtx *ring; - int read_size; +struct CHNET_EXPORT CNRingOpStart { + CHNet *ch_; }; class CHNET_EXPORT CNRingSQE { public: - uint8_t op; - void *sq_data; - uint64_t user_data; + uint8_t op_; + uint64_t user_data_; + union { + struct CNRingOpStart sq_start_; + struct CNRingOpRead sq_read_; + }; inline void PrepNop(void) { - op = CNRING_OP_NOP; + op_ = CNRING_OP_NOP; } - inline void PrepStart(CHNet *chnet) + inline void PrepRead(CHNet *ch, uint32_t size) { - CNRingOpStart *d = new CNRingOpStart; - - d->chnet = chnet; - d->ring = nullptr; - sq_data = (void *)d; - op = CNRING_OP_START; + op_ = CNRING_OP_READ; + sq_read_.ch_ = ch; + sq_read_.size_ = size; } - inline void PrepRead(CHNet *chnet, int read_size) + inline void PrepStart(CHNet *ch) { - CNRingOpRead *d = new CNRingOpRead; - - d->chnet = chnet; - d->read_size = read_size; - sq_data = (void *)d; - op = CNRING_OP_READ; + op_ = CNRING_OP_START; + sq_start_.ch_ = ch; } - inline void SetUserDataPtr(void *ptr) + inline void SetUserData(uint64_t data) { - user_data = (uint64_t) ptr; + user_data_ = data; } - inline void SetUserData(uint64_t ptr) + inline void SetUserDataPtr(void *data) { - user_data = ptr; + user_data_ = (uint64_t) (uintptr_t) data; } }; class CHNET_EXPORT CNRingCQE { public: - uint8_t op; - uint64_t user_data; - union { - int64_t res; - }; + uint8_t op_; + int64_t res_; + uint64_t user_data_; inline uint64_t GetUserData(void) { - return user_data; + return user_data_; } inline void *GetUserDataPtr(void) { - return (void *) (uintptr_t) user_data; + return (void *) (uintptr_t) user_data_; } }; #ifdef __FOR_CHROMIUM_INTERNAL class CNRingState { public: - CNRingState(uint32_t nr_thread = 64, uint32_t nr_jobs = 4096, - uint32_t nr_idle_thread = -1U); - WorkQueue wq_; - std::mutex cqe_lock_; - std::condition_variable cqe_cond_; - std::atomic cqe_to_wait_; - - std::mutex post_cqe_lock_; - std::condition_variable post_cqe_cond_; - std::atomic nr_post_cqe_wait_; + CNRingState(uint32_t nr_thread, uint32_t nr_jobs, + uint32_t nr_idle_thread); + + std::atomic cqe_to_wait_; + std::atomic nr_free_cqe_slot_waiter_; + + std::condition_variable cqe_to_wait_cond_; + std::condition_variable wait_for_a_free_cqe_slot_cond_; + + std::mutex wait_for_a_free_cqe_slot_lock_; + std::mutex sqe_lock_; + std::mutex cqe_lock_; + WorkQueue wq_; + volatile bool stop_; + + inline bool should_stop(void) + { + return stop_ || wq_.should_stop(); + } }; -#endif +#endif /* #ifdef __FOR_CHROMIUM_INTERNAL */ -class CHNET_EXPORT CNRingCtx { + +static inline uint32_t u32_diff(uint32_t a, uint32_t b) +{ + if (b > a) + return b - a; + else + return a - b; +} + +class CHNET_EXPORT CNRing { public: - CNRingCtx(uint32_t entry); - ~CNRingCtx(void); - uint32_t SubmitSQE(uint32_t to_submit = -1U); + CNRing(uint32_t entry); + ~CNRing(void); + uint32_t Submit(uint32_t to_submit = -1U); CNRingSQE *GetSQE(void); - void WaitCQE(uint32_t to_wait); - uint32_t sqe_size(void); - uint32_t cqe_size(void); - - void PostCQE(CNRingSQE *sqe, int64_t res); - void CallPostCQE(CNRingSQE *sqe, int64_t res); + uint32_t WaitCQE(uint32_t to_wait = 1, uint32_t timeout_ms = -1U); void CQAdvance(uint32_t n); + bool TryPostCQE(const CNRingSQE *sqe, int64_t res); + void PostCQE(const CNRingSQE *sqe, int64_t res); + bool PostCQENoSleep(const CNRingSQE *sqe, int64_t res); - inline CNRingCQE *HeadCQE(void) + inline CNRingCQE *GetCQEHead(void) { - return &cqes_[cq_head_.load()]; + return &cqes_[cq_head_.load(std::memory_order_acquire) & + cq_mask_]; } + CNRingSQE *sqes_; + CNRingCQE *cqes_; + #ifdef __FOR_CHROMIUM_INTERNAL - CNRingState *state_; + CNRingState *state_; #else - void *state_; + void *state_; #endif - CNRingSQE *sqes_; - CNRingCQE *cqes_; - - uint32_t sq_mask_; - std::atomic sq_head_; std::atomic sq_tail_; + std::atomic sq_head_; + uint32_t sq_mask_; - uint32_t cq_mask_; - std::atomic cq_head_; std::atomic cq_tail_; + std::atomic cq_head_; + uint32_t cq_mask_; + std::atomic cq_max_to_wait_; private: - void HandleSQE(CNRingSQE *sqe); - CNRingSQE *ConsumeSQE(void); - bool TryPostCQE(CNRingSQE *sqe, int64_t res); - CNRingCQE *GetCQENoTailIncrement(void); + uint32_t _WaitCQE(uint32_t to_wait, uint32_t timeout_ms, + std::unique_lock &lock); + void IssueSQE(const CNRingSQE *sqe); + bool IssueSQENop(const CNRingSQE *sqe); + bool IssueSQERead(const CNRingSQE *sqe); + bool IssueSQEStart(const CNRingSQE *sqe); void NotifyCQEWaiter(void); - void NotifyWaitCQEFreeSlot(void); - void WaitForCQEFreeSlot(void); + void NotifyFreeCQEWaiter(bool force = false); + void SleepToWaitForAFreeCQE(void); + + int _IssueSQEReadNeedStart(const CNRingSQE *sqe); - void IssueSQE(CNRingSQE *sqe); - void IssueNopSQE(CNRingSQE *sqe); - void IssueStartSQE(CNRingSQE *sqe); - void IssueReadSQE(CNRingSQE *sqe); + inline uint32_t CQESize(void) + { + return u32_diff(cq_head_.load(std::memory_order_acquire), + cq_tail_.load(std::memory_order_acquire)); + } + + inline uint32_t SQESize(void) + { + return u32_diff(sq_head_.load(std::memory_order_acquire), + sq_tail_.load(std::memory_order_acquire)); + } }; -#define chnring_for_each_cqe(ring, head, cqe) \ +#define cnring_for_each_cqe(ring, head, tail, cqe) \ for ( \ - head = (ring)->cq_head_; \ - (cqe = (head != (ring)->cq_tail_) ? \ + head = (ring)->cq_head_.load(std::memory_order_acquire),\ + tail = (ring)->cq_tail_.load(std::memory_order_acquire);\ + (cqe = (head != tail) ? \ &(ring)->cqes_[head & (ring)->cq_mask_] : \ NULL); \ head++ \ diff --git a/tests/cpp/ring.cc b/tests/cpp/ring.cc index 54426ef..22da7ea 100644 --- a/tests/cpp/ring.cc +++ b/tests/cpp/ring.cc @@ -6,231 +6,246 @@ static void test_nop(void) { - /* - * Simple nop test. - * - * This shows that CQE size is double of SQE size. - */ - - static constexpr uint32_t entry = 4096; - unsigned head, i, j, a, b; - CNRingCtx ring(entry); + constexpr static uint32_t nr_loop = 1024 * 8; + constexpr static uint32_t entry = 4096; + CNRing ring(entry); CNRingSQE *sqe; CNRingCQE *cqe; + uint32_t total; + uint32_t head; + uint32_t tail; + uint32_t ret; + uint32_t i; - i = 0; - for (j = 0; j < 3; j++) { - while (1) { + for (i = 0; i < nr_loop; i++) { + sqe = ring.GetSQE(); + if (!sqe) { + assert(ring.Submit() == entry); sqe = ring.GetSQE(); - if (!sqe) - break; - sqe->PrepNop(); - sqe->SetUserData(i++); + assert(sqe); } - ring.SubmitSQE(); + sqe->PrepNop(); + sqe->SetUserData(1); } - ring.WaitCQE(entry * 2); - - a = i; - i = 0; - chnring_for_each_cqe(&ring, head, cqe) - i++; - b = i; - ring.CQAdvance(i); - assert(b == entry * 2); - - ring.WaitCQE(entry); - i = 0; - chnring_for_each_cqe(&ring, head, cqe) - i++; - ring.CQAdvance(i); - b += i; - assert(a == b); + assert(ring.Submit() == entry); + + total = 0; + while (total < nr_loop) { + ret = ring.WaitCQE(1); + i = 0; + cnring_for_each_cqe(&ring, head, tail, cqe) { + assert(cqe->op_ == CNRING_OP_NOP); + assert(cqe->res_ == 0); + assert(cqe->user_data_ == 1); + i++; + } + ring.CQAdvance(i); + assert(ret >= 1); + assert(i >= ret); + total += ret; + } + assert(total == nr_loop); } -static void test_chnet_ring_single(void) +static void test_chnet_ring_read(bool do_sq_start = false) { - CNRingCtx ring(4096); + CNRing ring(1); CNRingSQE *sqe; CNRingCQE *cqe; + uint32_t ret; CHNet *ch; ch = new CHNet; ch->SetURL("http://127.0.0.1:8000/index.php?action=hello"); + ch->SetMethod("GET"); - sqe = ring.GetSQE(); - assert(sqe); - sqe->PrepStart(ch); - sqe->SetUserData(9999); - assert(ring.SubmitSQE() == 1); - - ring.WaitCQE(1); - cqe = ring.HeadCQE(); - assert(cqe->op == CNRING_OP_START); - assert(cqe->res == 0); - assert(cqe->user_data == 9999); - ring.CQAdvance(1); + if (do_sq_start) { + sqe = ring.GetSQE(); + assert(sqe); + sqe->PrepStart(ch); + assert(ring.Submit() == 1); + + ret = ring.WaitCQE(1); + cqe = ring.GetCQEHead(); + assert(ret == 1); + assert(cqe->op_ == CNRING_OP_START); + assert(cqe->res_ == 0); + ring.CQAdvance(1); + } sqe = ring.GetSQE(); - assert(sqe); sqe->PrepRead(ch, 1024); - sqe->SetUserData(9999); - assert(ring.SubmitSQE() == 1); - - ring.WaitCQE(1); - cqe = ring.HeadCQE(); - assert(cqe->op == CNRING_OP_READ); - assert(cqe->res == 13); - assert(cqe->user_data == 9999); + ring.Submit(); + + ret = ring.WaitCQE(1); + cqe = ring.GetCQEHead(); + + assert(ret == 1); + assert(cqe->op_ == CNRING_OP_READ); + assert(cqe->res_ == 13); + assert(ch->read_ret() == cqe->res_); + assert(!strncmp("Hello World!\n", ch->read_buf(), cqe->res_)); ring.CQAdvance(1); delete ch; } -static void test_chnet_ring_set_header(void) +static void test_chnet_ring_partial_read(bool do_sq_start = false) { - constexpr static const char ua[] = "This is just a test user agent!!!"; - CNRingCtx ring(4096); + CNRing ring(1); CNRingSQE *sqe; CNRingCQE *cqe; + uint32_t ret; CHNet *ch; ch = new CHNet; - ch->SetURL("http://127.0.0.1:8000/index.php?action=user_agent"); - ch->SetRequestHeader("User-agent", ua); + ch->SetURL("http://127.0.0.1:8000/index.php?action=hello"); + ch->SetMethod("GET"); + if (do_sq_start) { + sqe = ring.GetSQE(); + sqe->PrepStart(ch); + assert(ring.Submit() == 1); + + ret = ring.WaitCQE(1); + cqe = ring.GetCQEHead(); + assert(ret == 1); + assert(cqe->op_ == CNRING_OP_START); + assert(cqe->res_ == 0); + ring.CQAdvance(1); + } + + /* + * The first read. + */ sqe = ring.GetSQE(); - assert(sqe); - sqe->PrepStart(ch); - sqe->SetUserData(9999); - assert(ring.SubmitSQE() == 1); - - ring.WaitCQE(1); - cqe = ring.HeadCQE(); - assert(cqe->op == CNRING_OP_START); - assert(cqe->res == 0); - assert(cqe->user_data == 9999); + sqe->PrepRead(ch, 4); + assert(ring.Submit() == 1); + + ret = ring.WaitCQE(1); + cqe = ring.GetCQEHead(); + + assert(ret == 1); + assert(cqe->op_ == CNRING_OP_READ); + assert(cqe->res_ == 4); + assert(ch->read_ret() == cqe->res_); + assert(!strncmp("Hell", ch->read_buf(), cqe->res_)); ring.CQAdvance(1); + + /* + * The second read. + */ sqe = ring.GetSQE(); - assert(sqe); - sqe->PrepRead(ch, 1024); - sqe->SetUserData(9999); - assert(ring.SubmitSQE() == 1); - - ring.WaitCQE(1); - cqe = ring.HeadCQE(); - assert(cqe->op == CNRING_OP_READ); - assert(cqe->res == strlen(ua)); - assert(cqe->user_data == 9999); - assert(!strncmp(ua, ch->read_buf(), strlen(ua))); - ring.CQAdvance(1); + sqe->PrepRead(ch, 4); + assert(ring.Submit() == 1); - delete ch; -} + ret = ring.WaitCQE(1); + cqe = ring.GetCQEHead(); -static void test_chnet_ring_single_post(void) -{ - constexpr static const char buf[] = "AAAA Hello World!\n"; - CNRingCtx ring(4096); - CNRingSQE *sqe; - CNRingCQE *cqe; - CHNet *ch; + assert(ret == 1); + assert(cqe->op_ == CNRING_OP_READ); + assert(cqe->res_ == 4); + assert(ch->read_ret() == cqe->res_); + assert(!strncmp("o Wo", ch->read_buf(), cqe->res_)); + ring.CQAdvance(1); - ch = new CHNet; - ch->SetURL("http://127.0.0.1:8000/index.php?action=body"); - ch->SetMethod("POST"); - ch->SetPayload(buf, sizeof(buf)); + /* + * The third read. + */ sqe = ring.GetSQE(); - assert(sqe); - sqe->PrepStart(ch); - sqe->SetUserData(9999); - assert(ring.SubmitSQE() == 1); - - ring.WaitCQE(1); - cqe = ring.HeadCQE(); - assert(cqe->op == CNRING_OP_START); - assert(cqe->res == 0); - assert(cqe->user_data == 9999); + sqe->PrepRead(ch, 1024); + assert(ring.Submit() == 1); + + ret = ring.WaitCQE(1); + cqe = ring.GetCQEHead(); + + assert(ret == 1); + assert(cqe->op_ == CNRING_OP_READ); + assert(cqe->res_ == 5); + assert(ch->read_ret() == cqe->res_); + assert(!strncmp("rld!\n", ch->read_buf(), cqe->res_)); ring.CQAdvance(1); + + /* + * The 4-th read. + */ sqe = ring.GetSQE(); - assert(sqe); sqe->PrepRead(ch, 1024); - sqe->SetUserData(9999); - assert(ring.SubmitSQE() == 1); - - ring.WaitCQE(1); - cqe = ring.HeadCQE(); - assert(cqe->op == CNRING_OP_READ); - assert(cqe->res == sizeof(buf)); - assert(cqe->user_data == 9999); - assert(!strncmp(ch->read_buf(), buf, sizeof(buf))); + assert(ring.Submit() == 1); + + ret = ring.WaitCQE(1); + cqe = ring.GetCQEHead(); + + assert(ret == 1); + assert(cqe->op_ == CNRING_OP_READ); + assert(cqe->res_ == 0); + assert(ch->read_ret() == cqe->res_); ring.CQAdvance(1); delete ch; } -#define NR_REQ 1024 - -static void _test_chnet_ring_multiple(bool start_before_read, CNRingCtx *ring) +#define NR_REQ 4096 +static void test_chnet_ring_read_parallel(bool do_sq_start = false) { - unsigned i, j, head; + CNRing ring(NR_REQ); CNRingSQE *sqe; CNRingCQE *cqe; + uint32_t head; + uint32_t tail; + uint32_t i; CHNet **ch; - ch = new CHNet*[NR_REQ]; + ch = new CHNet *[NR_REQ]; for (i = 0; i < NR_REQ; i++) { ch[i] = new CHNet; ch[i]->SetURL("http://127.0.0.1:8000/index.php?action=hello"); } - if (start_before_read) { + if (do_sq_start) { for (i = 0; i < NR_REQ; i++) { - sqe = ring->GetSQE(); + sqe = ring.GetSQE(); assert(sqe); sqe->PrepStart(ch[i]); sqe->SetUserDataPtr(ch[i]); } - assert(ring->SubmitSQE() == i); - ring->WaitCQE(i); - - j = 0; - chnring_for_each_cqe(ring, head, cqe) { - assert(cqe->op == CNRING_OP_START); - assert(!cqe->res); - j++; + assert(ring.Submit() == NR_REQ); + assert(ring.WaitCQE(NR_REQ) == NR_REQ); + + cnring_for_each_cqe(&ring, head, tail, cqe) { + CHNet *ch = (CHNet *) cqe->GetUserDataPtr(); + + assert(ch); + assert(cqe->res_ == 0); + assert(cqe->op_ == CNRING_OP_START); } - assert(j == i); - ring->CQAdvance(j); + ring.CQAdvance(NR_REQ); } + for (i = 0; i < NR_REQ; i++) { - sqe = ring->GetSQE(); + sqe = ring.GetSQE(); assert(sqe); sqe->PrepRead(ch[i], 1024); sqe->SetUserDataPtr(ch[i]); } - assert(ring->SubmitSQE() == i); - ring->WaitCQE(i); + assert(ring.Submit() == NR_REQ); + assert(ring.WaitCQE(NR_REQ) == NR_REQ); - j = 0; - chnring_for_each_cqe(ring, head, cqe) { - CHNet *ch; + cnring_for_each_cqe(&ring, head, tail, cqe) { + CHNet *ch = (CHNet *) cqe->GetUserDataPtr(); - ch = (CHNet *)cqe->GetUserDataPtr(); assert(ch); - assert(cqe->op == CNRING_OP_READ); - assert(cqe->res == 13); - assert(ch->read_ret() == cqe->res); - assert(!strncmp(ch->read_buf(), "Hello World!\n", 13)); - j++; + assert(cqe->res_ == 13); + assert(cqe->op_ == CNRING_OP_READ); + assert(ch->read_ret() == cqe->res_); + assert(!strncmp("Hello World!\n", ch->read_buf(), cqe->res_)); } - assert(j == i); - ring->CQAdvance(j); + ring.CQAdvance(NR_REQ); for (i = 0; i < NR_REQ; i++) delete ch[i]; @@ -238,82 +253,157 @@ static void _test_chnet_ring_multiple(bool start_before_read, CNRingCtx *ring) delete[] ch; } -static void test_chnet_ring_multiple(bool start_before_read) -{ - CNRingCtx ring(NR_REQ); - unsigned i; - - for (i = 0; i < 3; i++) - _test_chnet_ring_multiple(start_before_read, &ring); -} - -static void test_chnet_chunked_body(void) +static void test_chnet_ring_partial_read_parallel(bool do_sq_start = false) { - constexpr size_t len = 1024 * 64; - constexpr size_t nr_loop = 100; - size_t total_read = 0; - CNRingCtx ring(4096); + CNRing ring(NR_REQ); CNRingSQE *sqe; CNRingCQE *cqe; - CHNet *ch; - char *buf; - size_t i; + uint32_t head; + uint32_t tail; + uint32_t i; + CHNet **ch; - ch = new CHNet; - ch->SetURL("http://127.0.0.1:8000/index.php?action=body"); - ch->SetMethod("POST"); - ch->StartChunkedBody(); + ch = new CHNet *[NR_REQ]; - sqe = ring.GetSQE(); - assert(sqe); - sqe->PrepRead(ch, len); - sqe->SetUserData(9999); - assert(ring.SubmitSQE() == 1); - - buf = new char[len]; - memset(buf, 'A', len); - - for (i = 0; i < nr_loop; i++) - ch->Write(buf, len); - ch->StopChunkedBody(); - - while (total_read < (len * nr_loop)) { - int res; - - ring.WaitCQE(1); - cqe = ring.HeadCQE(); - assert(cqe->op == CNRING_OP_READ); - assert(cqe->user_data == 9999); - res = cqe->res; - total_read += res; - ring.CQAdvance(1); + for (i = 0; i < NR_REQ; i++) { + ch[i] = new CHNet; + ch[i]->SetURL("http://127.0.0.1:8000/index.php?action=hello"); + ch[i]->SetMethod("GET"); + } - if (!res) - break; + if (do_sq_start) { + for (i = 0; i < NR_REQ; i++) { + sqe = ring.GetSQE(); + assert(sqe); + sqe->PrepStart(ch[i]); + sqe->SetUserDataPtr(ch[i]); + } + assert(ring.Submit() == NR_REQ); + assert(ring.WaitCQE(NR_REQ) == NR_REQ); + + cnring_for_each_cqe(&ring, head, tail, cqe) { + CHNet *ch = (CHNet *) cqe->GetUserDataPtr(); + assert(ch); + assert(cqe->res_ == 0); + assert(cqe->op_ == CNRING_OP_START); + } + ring.CQAdvance(NR_REQ); + } + + + /* + * The first read. + */ + for (i = 0; i < NR_REQ; i++) { sqe = ring.GetSQE(); assert(sqe); - sqe->PrepRead(ch, len); - sqe->SetUserData(9999); - assert(ring.SubmitSQE() == 1); + sqe->PrepRead(ch[i], 4); + sqe->SetUserDataPtr(ch[i]); } - printf("total_read = %zu %zu\n", total_read, (len * nr_loop)); - assert(total_read == (len * nr_loop)); + assert(ring.Submit() == NR_REQ); + assert(ring.WaitCQE(NR_REQ) == NR_REQ); - delete[] buf; - delete ch; + cnring_for_each_cqe(&ring, head, tail, cqe) { + CHNet *ch = (CHNet *) cqe->GetUserDataPtr(); + + assert(ch); + assert(cqe->res_ == 4); + assert(cqe->op_ == CNRING_OP_READ); + assert(ch->read_ret() == cqe->res_); + assert(!strncmp("Hell", ch->read_buf(), cqe->res_)); + } + ring.CQAdvance(NR_REQ); + + + /* + * The second read. + */ + for (i = 0; i < NR_REQ; i++) { + sqe = ring.GetSQE(); + assert(sqe); + sqe->PrepRead(ch[i], 4); + sqe->SetUserDataPtr(ch[i]); + } + assert(ring.Submit() == NR_REQ); + assert(ring.WaitCQE(NR_REQ) == NR_REQ); + + cnring_for_each_cqe(&ring, head, tail, cqe) { + CHNet *ch = (CHNet *) cqe->GetUserDataPtr(); + + assert(ch); + assert(cqe->res_ == 4); + assert(cqe->op_ == CNRING_OP_READ); + assert(ch->read_ret() == cqe->res_); + assert(!strncmp("o Wo", ch->read_buf(), cqe->res_)); + } + ring.CQAdvance(NR_REQ); + + + /* + * The third read. + */ + for (i = 0; i < NR_REQ; i++) { + sqe = ring.GetSQE(); + assert(sqe); + sqe->PrepRead(ch[i], 100); + sqe->SetUserDataPtr(ch[i]); + } + assert(ring.Submit() == NR_REQ); + assert(ring.WaitCQE(NR_REQ) == NR_REQ); + + cnring_for_each_cqe(&ring, head, tail, cqe) { + CHNet *ch = (CHNet *) cqe->GetUserDataPtr(); + + assert(ch); + assert(cqe->res_ == 5); + assert(cqe->op_ == CNRING_OP_READ); + assert(ch->read_ret() == cqe->res_); + assert(!strncmp("rld!\n", ch->read_buf(), cqe->res_)); + } + ring.CQAdvance(NR_REQ); + + + /* + * The 4-th read. + */ + for (i = 0; i < NR_REQ; i++) { + sqe = ring.GetSQE(); + assert(sqe); + sqe->PrepRead(ch[i], 100); + sqe->SetUserDataPtr(ch[i]); + } + assert(ring.Submit() == NR_REQ); + assert(ring.WaitCQE(NR_REQ) == NR_REQ); + + cnring_for_each_cqe(&ring, head, tail, cqe) { + CHNet *ch = (CHNet *) cqe->GetUserDataPtr(); + + assert(ch); + assert(cqe->res_ == 0); + assert(cqe->op_ == CNRING_OP_READ); + assert(ch->read_ret() == 0); + } + ring.CQAdvance(NR_REQ); + + for (i = 0; i < NR_REQ; i++) + delete ch[i]; + + delete[] ch; } int main(void) { + int i; + test_nop(); chnet_global_init(); - // test_chnet_ring_single(); - // test_chnet_ring_set_header(); - // test_chnet_ring_single_post(); - // test_chnet_ring_multiple(false); - // test_chnet_ring_multiple(true); - test_chnet_chunked_body(); + for (i = 0; i < 2; i++) { + test_chnet_ring_read(!!i); + test_chnet_ring_partial_read(!!i); + test_chnet_ring_read_parallel(!!i); + test_chnet_ring_partial_read_parallel(!!i); + } chnet_global_stop(); return 0; } diff --git a/tests/js/ring.js b/tests/js/ring.js index 372c6e6..2f3fe1c 100644 --- a/tests/js/ring.js +++ b/tests/js/ring.js @@ -3,263 +3,389 @@ const chnet = require("bindings")("chnet"); function assert(cond) { - if (!cond) { - console.log("Assertion failed!"); - process.exit(1); - } + if (!cond) + throw new Error("Assertion failed!"); } function test_nop() { - const entry = 16; - let ring; - let cqe; + const nr_loop = 1024 * 8; + const entry = 4096; + let ring = chnet.CreateRing(entry); let sqe; - let a, b, i, j; - - i = 0; - ring = chnet.create_ring(entry); - for (j = 0; j < 3; j++) { - while (1) { - sqe = ring.get_sqe(); - if (!sqe) - break; - sqe.prep_nop(); - i++; + let cqe; + let total; + let ret; + let i; + + for (i = 0; i < nr_loop; i++) { + sqe = ring.GetSQE(); + if (!sqe) { + assert(ring.Submit() == entry); + sqe = ring.GetSQE(); + assert(sqe); } - ring.submit_sqe(); + sqe.PrepNop(); + sqe.SetUserData(1); + } + assert(ring.Submit() == entry); + assert(ring.WaitCQE(1)); + + total = 0; + while (total < nr_loop) { + ret = ring.WaitCQE(1); + i = ring.ForEachCQE(function (cqe) { + assert(!cqe.res); + assert(cqe.user_data == 1); + }); + ring.CQAdvance(i); + assert(ret >= 1); + assert(i >= ret); + total += ret; } - ring.wait_cqe(entry * 2); - - a = i; - i = ring.for_each_cqe(function (cqe) { }); - b = i; - ring.cq_advance(i); - assert(b == entry * 2); - - ring.wait_cqe(entry); - i = ring.for_each_cqe(function (cqe) { }); - b += i; - ring.cq_advance(i); - assert(a == b); + assert(total == nr_loop); + ring.Close(); } -function test_chnet_ring_multiple() +function test_chnet_ring_read(do_sq_start = false) { - const NR_REQ = 10; - let ring; + let ring = chnet.CreateRing(1); let sqe; let cqe; + let ret; let ch; - let i; - ring = chnet.create_ring(NR_REQ); - ch = []; + ch = chnet.CreateNet(); + ch.SetURL("http://127.0.0.1:8000/index.php?action=hello"); + ch.SetMethod("GET"); - for (i = 0; i < NR_REQ; i++) { - ch[i] = chnet.create_net(); - ch[i].set_url("http://127.0.0.1:8000/index.php?action=hello"); - } + if (do_sq_start) { + sqe = ring.GetSQE(); + sqe.PrepStart(ch); + assert(ring.Submit() == 1); - for (i = 0; i < NR_REQ; i++) { - sqe = ring.get_sqe(); - assert(sqe); - sqe.prep_start(ch[i]); - sqe.set_user_data(ch[i]); + ret = ring.WaitCQE(1); + cqe = ring.GetCQEHead(); + assert(ret == 1); + assert(cqe.res == 0); + ring.CQAdvance(1); } - assert(ring.submit_sqe() == i); - ring.wait_cqe(i); - j = ring.for_each_cqe(function (cqe) { - assert(cqe.res == 0); - }); - assert(j == i); - ring.cq_advance(j); + sqe = ring.GetSQE(); + sqe.PrepRead(ch, 1024); + assert(ring.Submit() == 1); - for (i = 0; i < NR_REQ; i++) { - sqe = ring.get_sqe(); - assert(sqe); - sqe.prep_read(ch[i], 1024); - sqe.set_user_data(ch[i]); - } - assert(ring.submit_sqe() == i); - ring.wait_cqe(i); - j = ring.for_each_cqe(function (cqe) { - let ch = cqe.user_data; - assert(cqe.res == 13); - assert(ch.read_buf() === "Hello World!\n"); - }); - assert(j == i); - ring.cq_advance(j); + ret = ring.WaitCQE(1); + cqe = ring.GetCQEHead(); + + assert(ret == 1); + assert(cqe.res == 13); + assert(ch.read_ret() == cqe.res); + assert("Hello World!\n" == ch.read_buf()); + ring.CQAdvance(1); + ch.Close(); + ring.Close(); } -class SimpleHttp { - constructor(url, method = "GET") { - this.ring = chnet.create_ring(1); - this.ch = chnet.create_net(); - this.ch.set_url(url); - this.ch.set_method(method); - this.res = -1; - } +function test_chnet_ring_partial_read(do_sq_start = false) +{ + let ring = chnet.CreateRing(1); + let sqe; + let cqe; + let ret; + let ch; - prep_read(read_size) { - let sqe = this.ring.get_sqe(); - sqe.prep_read(this.ch, read_size); - sqe.set_user_data(this); + ch = chnet.CreateNet(); + ch.SetURL("http://127.0.0.1:8000/index.php?action=hello"); + ch.SetMethod("GET"); + + if (do_sq_start) { + sqe = ring.GetSQE(); + sqe.PrepStart(ch); + assert(ring.Submit() == 1); + + ret = ring.WaitCQE(1); + cqe = ring.GetCQEHead(); + assert(ret == 1); + assert(cqe.res == 0); + ring.CQAdvance(1); } - set_payload(payload) { - this.ch.set_payload(payload); + /* + * The first read. + */ + sqe = ring.GetSQE(); + sqe.PrepRead(ch, 4); + assert(ring.Submit() == 1); + + ret = ring.WaitCQE(1); + cqe = ring.GetCQEHead(); + + assert(ret == 1); + assert(cqe.res == 4); + assert(ch.read_ret() == cqe.res); + assert("Hell" == ch.read_buf()); + ring.CQAdvance(1); + + + /* + * The second read. + */ + sqe = ring.GetSQE(); + sqe.PrepRead(ch, 4); + assert(ring.Submit() == 1); + + ret = ring.WaitCQE(1); + cqe = ring.GetCQEHead(); + + assert(ret == 1); + assert(cqe.res == 4); + assert(ch.read_ret() == cqe.res); + assert("o Wo" == ch.read_buf()); + ring.CQAdvance(1); + + + /* + * The third read. + */ + sqe = ring.GetSQE(); + sqe.PrepRead(ch, 1024); + assert(ring.Submit() == 1); + + ret = ring.WaitCQE(1); + cqe = ring.GetCQEHead(); + + assert(ret == 1); + assert(cqe.res == 5); + assert(ch.read_ret() == cqe.res); + assert("rld!\n" == ch.read_buf()); + ring.CQAdvance(1); + + + /* + * The 4-th read. + */ + sqe = ring.GetSQE(); + sqe.PrepRead(ch, 4); + assert(ring.Submit() == 1); + + ret = ring.WaitCQE(1); + cqe = ring.GetCQEHead(); + + assert(ret == 1); + assert(cqe.res == 0); + assert(ch.read_ret() == cqe.res); + assert(!ch.read_buf()); + ring.CQAdvance(1); + ch.Close(); + ring.Close(); +} + +const NR_REQ = 4096; +function test_chnet_ring_read_parallel(do_sq_start = false) +{ + let ring = chnet.CreateRing(NR_REQ); + let sqe; + let cqe; + let i; + let ch_arr; + + ch_arr = []; + + for (i = 0; i < NR_REQ; i++) { + ch_arr[i] = chnet.CreateNet(); + ch_arr[i].SetURL("http://127.0.0.1:8000/index.php?action=hello"); + ch_arr[i].SetMethod("GET"); } - run() { - this.ring.submit_sqe(); - this.ring.wait_cqe(1); - this.ring.for_each_cqe(function (cqe) { - let that = cqe.user_data; - that.res = cqe.res; - if (cqe.res < 0) - console.log("Error: "+that.ch.get_error()); + if (do_sq_start) { + for (i = 0; i < NR_REQ; i++) { + sqe = ring.GetSQE(); + assert(sqe); + sqe.PrepStart(ch_arr[i]); + sqe.SetUserData(ch_arr[i]); + } + assert(ring.Submit() == NR_REQ); + assert(ring.WaitCQE(NR_REQ) == NR_REQ); + + ring.ForEachCQE(function (cqe) { + let ch = cqe.user_data; + + assert(ch); + assert(cqe.res == 0); }); - this.ring.cq_advance(1); + ring.CQAdvance(NR_REQ); } - get_buffer() { - return this.ch.read_buf(); + + for (i = 0; i < NR_REQ; i++) { + sqe = ring.GetSQE(); + assert(sqe); + sqe.PrepRead(ch_arr[i], 1024); + sqe.SetUserData(ch_arr[i]); } -} + assert(ring.Submit() == NR_REQ); + assert(ring.WaitCQE(NR_REQ) == NR_REQ); -function test_simple_http() -{ - const payload = "Hello World! AAAAAAAAAAAAAAAAAAAAAAAAA\n"; - let h = new SimpleHttp("http://127.0.0.1:8000/index.php?action=body", "POST"); - h.set_payload(payload); - h.prep_read(1024); - h.run(); - assert(h.get_buffer() === payload); - assert(h.ch.read_ret() === payload.length); -} + ring.ForEachCQE(function (cqe) { + let ch = cqe.user_data; -function test_simple_http_set_header() -{ - const ua = "This is just a test user agent!"; - let h = new SimpleHttp("http://127.0.0.1:8000/index.php?action=user_agent", "GET"); - h.ch.set_request_header("User-Agent", ua); - h.prep_read(1024); - h.run(); - assert(h.get_buffer() === ua); - assert(h.ch.read_ret() === ua.length); + assert(ch); + assert(cqe.res == 13); + assert(ch.read_ret() == cqe.res); + assert("Hello World!\n" == ch.read_buf()); + }); + ring.CQAdvance(NR_REQ); + + for (i = 0; i < NR_REQ; i++) + ch_arr[i].Close(); + + ring.Close(); } -function test_simple_http_post_set_header() +function test_chnet_ring_partial_read_parallel(do_sq_start = false) { - const ss = "This is just a test string!"; - let h = new SimpleHttp("http://127.0.0.1:8000/index.php?action=print_post&key=test", "POST"); - h.ch.set_request_header("User-Agent", ss); - h.ch.set_request_header("Content-type", "application/x-www-form-urlencoded"); - h.ch.set_payload("test="+encodeURIComponent(ss)); - h.prep_read(1024); - h.run(); - assert(h.get_buffer() === ss); - assert(h.ch.read_ret() === ss.length); -} + let ring = chnet.CreateRing(NR_REQ); + let sqe; + let cqe; + let i; + let ch_arr; -class CHNet { - constructor(cr, url, method) { - this.ch = chnet.create_net(); - this.ch.set_url(url); - this.ch.set_method(method); - this.cr = cr; - } + ch_arr = []; - prep_read(read_size) { - let ring = this.cr.ring; - let sqe = ring.get_sqe(); - if (!sqe) - return false; - sqe.prep_read(this.ch, read_size); - sqe.set_user_data(this); - return true; + for (i = 0; i < NR_REQ; i++) { + ch_arr[i] = chnet.CreateNet(); + ch_arr[i].SetURL("http://127.0.0.1:8000/index.php?action=hello"); + ch_arr[i].SetMethod("GET"); } - set_user_data(data) { - this.udata = data; - } + if (do_sq_start) { + for (i = 0; i < NR_REQ; i++) { + sqe = ring.GetSQE(); + assert(sqe); + sqe.PrepStart(ch_arr[i]); + sqe.SetUserData(ch_arr[i]); + } + assert(ring.Submit() == NR_REQ); + assert(ring.WaitCQE(NR_REQ) == NR_REQ); - get_user_data() { - return this.udata; - } + ring.ForEachCQE(function (cqe) { + let ch = cqe.user_data; - set_header(key, val, overwrite = true) { - this.ch.set_request_header(key, val, overwrite); + assert(ch); + assert(cqe.res == 0); + }); + ring.CQAdvance(NR_REQ); } -}; -class ChromiumNet { - constructor() { - this.ring = chnet.create_ring(4096); + + /* + * The first read. + */ + for (i = 0; i < NR_REQ; i++) { + sqe = ring.GetSQE(); + assert(sqe); + sqe.PrepRead(ch_arr[i], 4); + sqe.SetUserData(ch_arr[i]); } + assert(ring.Submit() == NR_REQ); + assert(ring.WaitCQE(NR_REQ) == NR_REQ); + + ring.ForEachCQE(function (cqe) { + let ch = cqe.user_data; + + assert(ch); + assert(cqe.res == 4); + assert(ch.read_ret() == cqe.res); + assert("Hell" == ch.read_buf()); + }); + ring.CQAdvance(NR_REQ); + - create(url, method = "GET") { - return new CHNet(this, url, method); + /* + * The second read. + */ + for (i = 0; i < NR_REQ; i++) { + sqe = ring.GetSQE(); + assert(sqe); + sqe.PrepRead(ch_arr[i], 4); + sqe.SetUserData(ch_arr[i]); } + assert(ring.Submit() == NR_REQ); + assert(ring.WaitCQE(NR_REQ) == NR_REQ); - for_each_cqe(cb = null) { - let ret = this.ring.for_each_cqe(function (cqe) { + ring.ForEachCQE(function (cqe) { + let ch = cqe.user_data; - if (cb) - cb(cqe); + assert(ch); + assert(cqe.res == 4); + assert(ch.read_ret() == cqe.res); + assert("o Wo" == ch.read_buf()); + }); + ring.CQAdvance(NR_REQ); - let ch = cqe.user_data; - let ret = cqe.res; - let udata = ch.udata; - if (ret < 0 && ch.onerror) - ch.onerror(ch, ret, udata); - else - ch.onreadcomplete(ch, ret, udata); - }); - this.ring.cq_advance(ret); - return ret; + /* + * The third read. + */ + for (i = 0; i < NR_REQ; i++) { + sqe = ring.GetSQE(); + assert(sqe); + sqe.PrepRead(ch_arr[i], 1024); + sqe.SetUserData(ch_arr[i]); } -}; + assert(ring.Submit() == NR_REQ); + assert(ring.WaitCQE(NR_REQ) == NR_REQ); -function test_classes() -{ - const UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36"; - const NR_REQ = 10; + ring.ForEachCQE(function (cqe) { + let ch = cqe.user_data; - let cr = new ChromiumNet; - let ch_arr = []; - let i, j, ch; + assert(ch); + assert(cqe.res == 5); + assert(ch.read_ret() == cqe.res); + assert("rld!\n" == ch.read_buf()); + }); + ring.CQAdvance(NR_REQ); + + /* + * The 4-th read. + */ for (i = 0; i < NR_REQ; i++) { - ch = cr.create("http://127.0.0.1:8000/index.php?action=hello"); - ch.set_header("User-Agent", UA); - ch.prep_read(4096); - ch.set_user_data(i); - ch.onerror = function (cr, ret, udata) { - console.log(`Req: ${udata} returned error: ${cr.ch.get_error()}`); - }; - ch.onreadcomplete = function (cr, ret, udata) { - console.log(`Req: ${udata} returned ${ret} bytes: ${cr.ch.read_buf()}`); - }; - ch_arr[i] = ch; + sqe = ring.GetSQE(); + assert(sqe); + sqe.PrepRead(ch_arr[i], 1024); + sqe.SetUserData(ch_arr[i]); } + assert(ring.Submit() == NR_REQ); + assert(ring.WaitCQE(NR_REQ) == NR_REQ); + + ring.ForEachCQE(function (cqe) { + let ch = cqe.user_data; - cr.ring.submit_sqe(); - cr.ring.wait_cqe(1); - cr.for_each_cqe(); + assert(ch); + assert(cqe.res == 0); + assert(ch.read_ret() == cqe.res); + }); + ring.CQAdvance(NR_REQ); + + for (i = 0; i < NR_REQ; i++) + ch_arr[i].Close(); + + ring.Close(); } function main() { + let i; + test_nop(); - test_chnet_ring_multiple(); - test_simple_http(); - test_simple_http_set_header(); - test_simple_http_post_set_header(); - test_classes(); + for (i = 0; i < 2; i++) { + test_chnet_ring_read(!!i); + test_chnet_ring_partial_read(!!i); + test_chnet_ring_read_parallel(!!i); + test_chnet_ring_partial_read_parallel(!!i); + } } main(); -- Ammar Faizi