From: Ammar Faizi <[email protected]>
To: Alviro Iskandar Setiawan <[email protected]>
Cc: Ammar Faizi <[email protected]>,
Muhammad Rizki <[email protected]>,
Kanna Scarlet <[email protected]>,
GNU/Weeb Mailing List <[email protected]>
Subject: [PATCH v1 19/22] chnet: ring: Refactor the ring completely
Date: Sun, 21 Aug 2022 18:24:50 +0700 [thread overview]
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
This commit was multiple commits squashed into one, some interesting
things were:
- chnet: ring: Support read operation again
- chnet: ring: Support start operation again
- tests/cpp/ring: Add more tests and delete old tests
- chnet: node: Refactor the NodeJS interface
In short, rewriting them to make it cleaner and better...
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/WorkQueue.cc | 17 +-
chnet/WorkQueue.h | 5 +
chnet/chnet_node.cc | 453 ++++++++++++++++++++++++--------------
chnet/chnet_ring.cc | 460 +++++++++++++++++++--------------------
chnet/chnet_ring.h | 200 +++++++++--------
tests/cpp/ring.cc | 504 +++++++++++++++++++++++++------------------
tests/js/ring.js | 516 +++++++++++++++++++++++++++-----------------
7 files changed, 1260 insertions(+), 895 deletions(-)
diff --git a/chnet/WorkQueue.cc b/chnet/WorkQueue.cc
index 30163bc..7411123 100644
--- a/chnet/WorkQueue.cc
+++ b/chnet/WorkQueue.cc
@@ -33,6 +33,12 @@ WorkQueue::~WorkQueue(void)
stop_ = true;
+ if (helper_thread_) {
+ jobs_cond_.notify_all();
+ helper_thread_->join();
+ delete helper_thread_;
+ }
+
i = nr_threads_;
while (i--) {
std::thread *t;
@@ -44,11 +50,6 @@ WorkQueue::~WorkQueue(void)
}
}
- if (helper_thread_) {
- helper_thread_->join();
- delete helper_thread_;
- }
-
delete[] threads_;
delete[] jobs_;
}
@@ -99,7 +100,7 @@ void WorkQueue::_wq_thread_worker(struct wq_thread *t) noexcept
t->set_task_state(WQ_INTERRUPTIBLE);
while (!stop_) {
__wq_thread_worker(t, lk);
- auto cv_ret = jobs_cond_.wait_for(lk, 1s);
+ auto cv_ret = jobs_cond_.wait_for(lk, 200ms);
if (unlikely(cv_ret == std::cv_status::timeout)) {
if (t->idx < nr_idle_thread_)
continue;
@@ -174,7 +175,7 @@ void WorkQueue::wq_helper(void)
_wq_helper(njob);
lk.lock();
}
- jobs_cond_.wait_for(lk, 10s, [this]{ return stop_; });
+ jobs_cond_.wait_for(lk, 200ms, [this]{ return stop_; });
lk.unlock();
}
}
@@ -284,7 +285,7 @@ int64_t WorkQueue::schedule_work(wq_job_callback_t cb, void *data) noexcept
}
lk.lock();
- sched_idle_cond_.wait_for(lk, 10s);
+ sched_idle_cond_.wait_for(lk, 200ms);
lk.unlock();
}
diff --git a/chnet/WorkQueue.h b/chnet/WorkQueue.h
index ddb3bf5..ad4f831 100644
--- a/chnet/WorkQueue.h
+++ b/chnet/WorkQueue.h
@@ -234,6 +234,11 @@ public:
{
jobs_cond_.notify_one();
}
+
+ inline bool should_stop(void)
+ {
+ return stop_;
+ }
};
#endif /* #ifndef CHNET_WORKQUEUE_H */
diff --git a/chnet/chnet_node.cc b/chnet/chnet_node.cc
index 621d7aa..cea4ad6 100644
--- a/chnet/chnet_node.cc
+++ b/chnet/chnet_node.cc
@@ -4,22 +4,36 @@
class NodeCHNetRing {
public:
inline NodeCHNetRing(uint32_t entry):
- ring_(entry)
+ ring_(new CNRing(entry))
{
- ucount_.store(0, std::memory_order_relaxed);
+ id_seq_.store(0, std::memory_order_relaxed);
}
- CNRingCtx ring_;
+ inline ~NodeCHNetRing(void)
+ {
+ if (ring_)
+ delete ring_;
+ }
+
+ inline void Close(void)
+ {
+ if (ring_) {
+ delete ring_;
+ ring_ = nullptr;
+ }
+ }
+
+ CNRing *ring_;
+ std::atomic<uint32_t> id_seq_;
Napi::ObjectReference ref_;
- std::atomic<uint32_t> ucount_;
};
class NodeCNRingSQE {
public:
- inline NodeCNRingSQE(NodeCHNetRing *ring, CNRingSQE *sqe)
+ inline NodeCNRingSQE(CNRingSQE *sqe, NodeCHNetRing *ring):
+ sqe_(sqe),
+ ring_(ring)
{
- sqe_ = sqe;
- ring_ = ring;
}
CNRingSQE *sqe_;
@@ -33,7 +47,26 @@ struct NodeSQData {
class NodeCHNet {
public:
- CHNet ch_;
+ inline NodeCHNet(void):
+ ch_(new CHNet)
+ {
+ }
+
+ inline ~NodeCHNet(void)
+ {
+ if (ch_)
+ delete ch_;
+ }
+
+ inline void Close(void)
+ {
+ if (ch_) {
+ delete ch_;
+ ch_ = nullptr;
+ }
+ }
+
+ CHNet *ch_;
std::string payload_;
};
@@ -51,8 +84,9 @@ static inline void obj_add_func(Napi::Env &env, Napi::Object &obj,
static void CHN_SQEPrepNop(const Napi::CallbackInfo &info)
{
- NodeCNRingSQE *nsqe = (NodeCNRingSQE *)info.Data();
+ NodeCNRingSQE *nsqe;
+ nsqe = (NodeCNRingSQE *)info.Data();
nsqe->sqe_->PrepNop();
}
@@ -77,27 +111,6 @@ static NodeCHNet *CHN_PrepGetCHObj(Napi::Env &env, const Napi::Value &arg)
ch_in_obj.As<Napi::Number>().Int64Value();
}
-static void CHN_SQEPrepStart(const Napi::CallbackInfo &info)
-{
- Napi::Env env = info.Env();
- NodeCNRingSQE *nsqe;
- NodeCHNet *ch;
- CHNet *chn;
-
- if (unlikely(info.Length() != 1 || !info[0].IsObject())) {
- throw_js_exception(env, "sqe.prep_start must be given a chnet object argument");
- return;
- }
-
- ch = CHN_PrepGetCHObj(env, info[0]);
- if (unlikely(!ch))
- return;
-
- chn = &ch->ch_;
- nsqe = (NodeCNRingSQE *)info.Data();
- nsqe->sqe_->PrepStart(chn);
-}
-
static void CHN_SQEPrepRead(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
@@ -108,7 +121,13 @@ static void CHN_SQEPrepRead(const Napi::CallbackInfo &info)
if (unlikely(info.Length() != 2 || !info[0].IsObject() ||
!info[1].IsNumber())) {
- throw_js_exception(env, "sqe.prep_read must be given a chnet object argument and a read size");
+ throw_js_exception(env, "sqe.PrepRead must be given a chnet object argument and a read size");
+ return;
+ }
+
+ nsqe = (NodeCNRingSQE *)info.Data();
+ if (unlikely(!nsqe->ring_->ring_)) {
+ throw_js_exception(env, "ring has been closed!");
return;
}
@@ -117,8 +136,7 @@ static void CHN_SQEPrepRead(const Napi::CallbackInfo &info)
return;
read_size = info[1].As<Napi::Number>().Int32Value();
- chn = &ch->ch_;
- nsqe = (NodeCNRingSQE *)info.Data();
+ chn = ch->ch_;
nsqe->sqe_->PrepRead(chn, read_size);
}
@@ -129,77 +147,135 @@ static Napi::Value CHN_SQESetUserData(const Napi::CallbackInfo &info)
NodeCNRingSQE *nsqe;
if (unlikely(info.Length() != 1)) {
- throw_js_exception(env, "sqe.set_user_data must be given exactly one argument");
+ throw_js_exception(env, "sqe.SetUserData must be given exactly one argument");
return env.Null();
}
nsqe = (NodeCNRingSQE *)info.Data();
+ if (unlikely(!nsqe->ring_->ring_)) {
+ throw_js_exception(env, "ring has been closed!");
+ return env.Null();
+ }
+
data = new struct NodeSQData;
data->ring_ = nsqe->ring_;
- data->id_ = nsqe->ring_->ucount_++ & nsqe->ring_->ring_.cq_mask_;
+ data->id_ = nsqe->ring_->id_seq_++ & nsqe->ring_->ring_->cq_mask_;
nsqe->ring_->ref_.Get("__udata")
.As<Napi::Object>()[data->id_] = info[0];
nsqe->sqe_->SetUserDataPtr(data);
return info[0];
}
-static void CHN_SQEDestruct(Napi::Env env, NodeCNRingSQE *sqe)
+static void CHN_SQEPrepStart(const Napi::CallbackInfo &info)
{
- delete sqe;
- (void)env;
+ Napi::Env env = info.Env();
+ NodeCNRingSQE *nsqe;
+ NodeCHNet *ch;
+ CHNet *chn;
+
+ if (unlikely(info.Length() != 1 || !info[0].IsObject())) {
+ throw_js_exception(env, "sqe.PrepStart must be given a chnet object argument");
+ return;
+ }
+
+ ch = CHN_PrepGetCHObj(env, info[0]);
+ if (unlikely(!ch))
+ return;
+
+ chn = ch->ch_;
+ nsqe = (NodeCNRingSQE *)info.Data();
+ if (unlikely(!nsqe->ring_->ring_)) {
+ throw_js_exception(env, "ring has been closed!");
+ return;
+ }
+
+ nsqe->sqe_->PrepStart(chn);
}
-static Napi::Object BuildSQEObject(Napi::Env &env, NodeCHNetRing *ring,
- CNRingSQE *sqe)
+static void CHN_SQEDestruct(Napi::Env env, NodeCNRingSQE *nsqe)
{
- Napi::Object obj = Napi::Object::New(env);
- NodeCNRingSQE *nsqe;
-
- sqe->SetUserData(0);
- nsqe = new NodeCNRingSQE(ring, sqe);
- obj_add_func(env, obj, nsqe, CHN_SQEPrepNop, "prep_nop");
- obj_add_func(env, obj, nsqe, CHN_SQEPrepStart, "prep_start");
- obj_add_func(env, obj, nsqe, CHN_SQEPrepRead, "prep_read");
- obj_add_func(env, obj, nsqe, CHN_SQESetUserData, "set_user_data");
- obj.AddFinalizer(CHN_SQEDestruct, nsqe);
- return obj;
+ delete nsqe;
}
static Napi::Value CHN_RingGetSQE(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
NodeCHNetRing *ring;
+ NodeCNRingSQE *nsqe;
+ Napi::Object obj;
CNRingSQE *sqe;
ring = (NodeCHNetRing *)info.Data();
- sqe = ring->ring_.GetSQE();
+ if (unlikely(!ring->ring_)) {
+ throw_js_exception(env, "ring has been closed!");
+ return env.Null();
+ }
+
+ sqe = ring->ring_->GetSQE();
if (unlikely(!sqe))
return env.Null();
- return BuildSQEObject(env, ring, sqe);
+ sqe->SetUserData(0);
+ nsqe = new NodeCNRingSQE(sqe, ring);
+ obj = Napi::Object::New(env);
+ obj_add_func(env, obj, nsqe, CHN_SQEPrepNop, "PrepNop");
+ obj_add_func(env, obj, nsqe, CHN_SQEPrepRead, "PrepRead");
+ obj_add_func(env, obj, nsqe, CHN_SQEPrepStart, "PrepStart");
+ obj_add_func(env, obj, nsqe, CHN_SQESetUserData, "SetUserData");
+ obj.AddFinalizer(CHN_SQEDestruct, nsqe);
+ return obj;
}
-static Napi::Value CHN_RingSubmitSQE(const Napi::CallbackInfo &info)
+static Napi::Value CHN_RingSubmit(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
NodeCHNetRing *ring;
+ uint32_t to_submit;
+
+ if (info.Length() > 0) {
+ if (unlikely(!info[0].IsNumber())) {
+ throw_js_exception(env, "ring.Submit the first argument must be a positive integer");
+ return env.Null();
+ }
+
+ to_submit = info[0].ToNumber().Uint32Value();
+ } else {
+ to_submit = -1U;
+ }
ring = (NodeCHNetRing *)info.Data();
- return Napi::Number::New(env, ring->ring_.SubmitSQE());
+ if (unlikely(!ring->ring_)) {
+ throw_js_exception(env, "ring has been closed!");
+ return env.Null();
+ }
+
+ return Napi::Number::New(env, ring->ring_->Submit(to_submit));
}
-static void CHN_RingWaitCQE(const Napi::CallbackInfo &info)
+static Napi::Value CHN_RingWaitCQE(const Napi::CallbackInfo &info)
{
- uint32_t to_wait;
+ Napi::Env env = info.Env();
NodeCHNetRing *ring;
+ uint32_t to_wait;
+
+ if (info.Length() > 0) {
+ if (unlikely(!info[0].IsNumber())) {
+ throw_js_exception(env, "ring.WaitCQE the first argument must be a positive integer");
+ return env.Null();
+ }
- if (unlikely(info.Length() < 1 || !info[0].IsNumber()))
- to_wait = 1;
- else
to_wait = info[0].ToNumber().Uint32Value();
+ } else {
+ to_wait = 1;
+ }
ring = (NodeCHNetRing *)info.Data();
- ring->ring_.WaitCQE(to_wait);
+ if (unlikely(!ring->ring_)) {
+ throw_js_exception(env, "ring has been closed!");
+ return env.Null();
+ }
+
+ return Napi::Number::New(env, ring->ring_->WaitCQE(to_wait));
}
static void CHN_CQEDestruct(Napi::Env env, NodeSQData *data)
@@ -209,49 +285,56 @@ static void CHN_CQEDestruct(Napi::Env env, NodeSQData *data)
* when it's no longer used.
*/
- // data->ring_->ref_.Get("__udata").As<Napi::Object>()
- // .Delete(data->id_);
+#if 0
+ data->ring_->ref_.Get("__udata").As<Napi::Object>()
+ .Delete(data->id_);
+#endif
+
delete data;
(void)env;
}
static Napi::Object BuildCQEObject(Napi::Env &env, CNRingCQE *cqe)
{
- Napi::Object obj = Napi::Object::New(env);
NodeSQData *data = (NodeSQData *) cqe->GetUserDataPtr();
+ Napi::Object obj = Napi::Object::New(env);
- obj["res"] = Napi::Number::New(env, cqe->res);
+ obj["res"] = Napi::Number::New(env, cqe->res_);
+ if (!data)
+ return obj;
- if (data) {
- obj["user_data"] = data->ring_->ref_.Get("__udata")
- .As<Napi::Object>().Get(data->id_);
- obj.AddFinalizer(CHN_CQEDestruct, data);
- }
+ obj["user_data"] = data->ring_->ref_.Get("__udata").As<Napi::Object>()
+ .Get(data->id_);
+ obj.AddFinalizer(CHN_CQEDestruct, data);
return obj;
}
+
static Napi::Value CHN_RingForEachCQE(const Napi::CallbackInfo &info)
{
- constexpr static const char err_msg[] =
- "ring.for_each_cqe must be given exactly a callback function argument";
-
Napi::Env env = info.Env();
Napi::Function callback;
- uint32_t head, i;
NodeCHNetRing *ring;
CNRingCQE *cqe;
+ uint32_t head;
+ uint32_t tail;
+ uint32_t i;
if (unlikely(info.Length() != 1 || !info[0].IsFunction())) {
- throw_js_exception(env, err_msg);
+ throw_js_exception(env, "ring.ForEachCQE must be given exactly one callback function argument");
return env.Null();
}
- callback = info[0].As<Napi::Function>();
+ ring = (NodeCHNetRing *)info.Data();
+ if (unlikely(!ring->ring_)) {
+ throw_js_exception(env, "ring has been closed!");
+ return env.Null();
+ }
i = 0;
- ring = (NodeCHNetRing *)info.Data();
- chnring_for_each_cqe(&ring->ring_, head, cqe) {
- Napi::Object cqe_obj = BuildCQEObject(env, cqe);
+ callback = info[0].As<Napi::Function>();
+ cnring_for_each_cqe(ring->ring_, head, tail, cqe) {
+ Napi::Value cqe_obj = BuildCQEObject(env, cqe);
callback.Call({cqe_obj});
i++;
@@ -262,104 +345,93 @@ static Napi::Value CHN_RingForEachCQE(const Napi::CallbackInfo &info)
static void CHN_RingCQAdvance(const Napi::CallbackInfo &info)
{
- constexpr static const char err_msg[] =
- "ring.cq_advance must be given exactly an integer argument";
-
Napi::Env env = info.Env();
NodeCHNetRing *ring;
if (unlikely(info.Length() < 1 || !info[0].IsNumber())) {
- throw_js_exception(env, err_msg);
+ throw_js_exception(env, "ring.CQAdvance must be given exactly one positive integer argument");
return;
}
ring = (NodeCHNetRing *)info.Data();
- ring->ring_.CQAdvance(info[0].ToNumber().Uint32Value());
-}
+ if (unlikely(!ring->ring_)) {
+ throw_js_exception(env, "ring has been closed!");
+ return;
+ }
-static void CHN_RingDestruct(Napi::Env env, NodeCHNetRing *ring)
-{
- delete ring;
- (void)env;
+ ring->ring_->CQAdvance(info[0].ToNumber().Uint32Value());
}
-static inline Napi::Value _CHN_CreateRing(Napi::Env &env, NodeCHNetRing *ring)
+static Napi::Value CHN_RingGetCQEHead(const Napi::CallbackInfo &info)
{
- Napi::Object obj = Napi::Object::New(env);
+ Napi::Env env = info.Env();
+ NodeCHNetRing *ring;
- obj["__udata"] = Napi::Object::New(env);
- obj_add_func(env, obj, ring, CHN_RingGetSQE, "get_sqe");
- obj_add_func(env, obj, ring, CHN_RingSubmitSQE, "submit_sqe");
- obj_add_func(env, obj, ring, CHN_RingWaitCQE, "wait_cqe");
- obj_add_func(env, obj, ring, CHN_RingForEachCQE, "for_each_cqe");
- obj_add_func(env, obj, ring, CHN_RingCQAdvance, "cq_advance");
- obj.AddFinalizer(CHN_RingDestruct, ring);
- ring->ref_ = Napi::Persistent(obj);
- return obj;
+ ring = (NodeCHNetRing *)info.Data();
+ if (unlikely(!ring->ring_)) {
+ throw_js_exception(env, "ring has been closed!");
+ return env.Null();
+ }
+
+ return BuildCQEObject(env, ring->ring_->GetCQEHead());
}
-static Napi::Value CHN_CreateRing(const Napi::CallbackInfo &info)
+static void CHN_RingClose(const Napi::CallbackInfo &info)
{
- constexpr static const char err_msg[] =
- "chnet.create_ring must be given exactly a positive integer argument";
-
- Napi::Env env = info.Env();
NodeCHNetRing *ring;
- uint32_t entry;
- if (unlikely(info.Length() < 1 || !info[0].IsNumber())) {
- throw_js_exception(env, err_msg);
- return env.Null();
- }
+ ring = (NodeCHNetRing *)info.Data();
+ ring->Close();
+}
- entry = info[0].ToNumber().Uint32Value();
- ring = new NodeCHNetRing(entry);
- return _CHN_CreateRing(env, ring);
+static void CHN_RingDestruct(Napi::Env env, NodeCHNetRing *ring)
+{
+ delete ring;
}
static void CHN_NetSetURL(const Napi::CallbackInfo &info)
{
- constexpr static const char err_msg[] =
- "chnet.set_url must be given exactly 1 string argument";
-
Napi::Env env = info.Env();
NodeCHNet *ch;
if (unlikely(info.Length() != 1 || !info[0].IsString())) {
- throw_js_exception(env, err_msg);
+ throw_js_exception(env, "net.SetURL must be given exactly 1 string argument");
return;
}
ch = (NodeCHNet *)info.Data();
+ if (unlikely(!ch->ch_)) {
+ throw_js_exception(env, "chnet has been closed!");
+ return;
+ }
+
const std::string &url = info[0].ToString().Utf8Value();
- ch->ch_.SetURL(url.c_str());
+ ch->ch_->SetURL(url.c_str());
}
static void CHN_NetSetMethod(const Napi::CallbackInfo &info)
{
- constexpr static const char err_msg[] =
- "chnet.set_method must be given exactly 1 string argument";
-
Napi::Env env = info.Env();
NodeCHNet *ch;
if (unlikely(info.Length() != 1 || !info[0].IsString())) {
- throw_js_exception(env, err_msg);
+ throw_js_exception(env, "net.SetMethod must be given exactly 1 string argument");
return;
}
ch = (NodeCHNet *)info.Data();
- const std::string &method =info[0].ToString().Utf8Value();
- ch->ch_.SetMethod(method.c_str());
+ if (unlikely(!ch->ch_)) {
+ throw_js_exception(env, "chnet has been closed!");
+ return;
+ }
+
+ const std::string &method = info[0].ToString().Utf8Value();
+ ch->ch_->SetMethod(method.c_str());
}
static void CHN_NetSetRequestHeader(const Napi::CallbackInfo &info)
{
- constexpr static const char err_msg[] =
- "chnet.set_request_header must be at least given 2 string arguments";
- constexpr static const char err_msg2[] =
- "chnet.set_request_header can only receive 3 arguments with the third argument being a boolean";
-
+ constexpr static const char err_arg3[] = "chnet.SetRequestHeader can only receive 3 arguments with the third argument being a boolean";
Napi::Env env = info.Env();
bool overwrite = true;
NodeCHNet *ch;
@@ -369,61 +441,80 @@ static void CHN_NetSetRequestHeader(const Napi::CallbackInfo &info)
nr_arg = info.Length();
if (unlikely(nr_arg < 2 || !info[0].IsString() ||
!info[1].IsString())) {
- throw_js_exception(env, err_msg);
+ throw_js_exception(env, "chnet.SetRequestHeader must be at least given 2 string arguments");
return;
}
if (unlikely(nr_arg > 3)) {
- throw_js_exception(env, err_msg2);
+ throw_js_exception(env, err_arg3);
return;
}
if (nr_arg == 3) {
if (unlikely(!info[2].IsBoolean())) {
- throw_js_exception(env, err_msg2);
+ throw_js_exception(env, err_arg3);
return;
}
overwrite = info[2].As<Napi::Boolean>();
}
ch = (NodeCHNet *)info.Data();
+ if (unlikely(!ch->ch_)) {
+ throw_js_exception(env, "chnet has been closed!");
+ return;
+ }
+
const std::string &key = info[0].ToString().Utf8Value();
const std::string &val = info[1].ToString().Utf8Value();
- ch->ch_.SetRequestHeader(key.c_str(), val.c_str(), overwrite);
+ ch->ch_->SetRequestHeader(key.c_str(), val.c_str(), overwrite);
}
-
-static Napi::Value CHN_NetReadBuf(const Napi::CallbackInfo &info)
+static Napi::Value CHN_NetReadRet(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
NodeCHNet *ch;
- int ret;
ch = (NodeCHNet *)info.Data();
- ret = ch->ch_.read_ret();
- if (ret > 0)
- return Napi::String::New(env, ch->ch_.read_buf(), (size_t)ret);
+ if (unlikely(!ch->ch_)) {
+ throw_js_exception(env, "chnet has been closed!");
+ return env.Null();
+ }
- return env.Null();
+ return Napi::Number::New(env, ch->ch_->read_ret());
}
-static Napi::Number CHN_NetReadRet(const Napi::CallbackInfo &info)
+static Napi::Value CHN_NetReadBuf(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
NodeCHNet *ch;
+ int ret;
ch = (NodeCHNet *)info.Data();
- return Napi::Number::New(env, ch->ch_.read_ret());
+ if (unlikely(!ch->ch_)) {
+ throw_js_exception(env, "chnet has been closed!");
+ return env.Null();
+ }
+
+ ret = ch->ch_->read_ret();
+ if (ret > 0)
+ return Napi::String::New(env, ch->ch_->read_buf(), (size_t)ret);
+
+ return env.Null();
}
-static Napi::Value CHN_NetGetError(const Napi::CallbackInfo &info)
+static Napi::Value CHN_NetGetErrorStr(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
const char *err;
NodeCHNet *ch;
ch = (NodeCHNet *)info.Data();
- err = ch->ch_.GetErrorStr();
+ if (unlikely(!ch->ch_)) {
+ throw_js_exception(env, "chnet has been closed!");
+ return env.Null();
+ }
+
+ err = ch->ch_->GetErrorStr();
if (err)
return Napi::String::New(env, err);
@@ -432,20 +523,30 @@ static Napi::Value CHN_NetGetError(const Napi::CallbackInfo &info)
static void CHN_NetSetPayload(const Napi::CallbackInfo &info)
{
- constexpr static const char err_msg[] =
- "chnet.set_payload must be given exactly 1 string argument";
-
Napi::Env env = info.Env();
NodeCHNet *ch;
if (unlikely(info.Length() != 1 || !info[0].IsString())) {
- throw_js_exception(env, err_msg);
+ throw_js_exception(env, "chnet.SetPayload must be given exactly 1 string argument");
return;
}
ch = (NodeCHNet *)info.Data();
+ if (unlikely(!ch->ch_)) {
+ throw_js_exception(env, "chnet has been closed!");
+ return;
+ }
+
ch->payload_ = info[0].ToString().Utf8Value();
- ch->ch_.SetPayload(ch->payload_.c_str(), ch->payload_.size());
+ ch->ch_->SetPayload(ch->payload_.c_str(), ch->payload_.size());
+}
+
+static void CHN_NetClose(const Napi::CallbackInfo &info)
+{
+ NodeCHNet *ch;
+
+ ch = (NodeCHNet *)info.Data();
+ ch->Close();
}
static void CHN_NetDestruct(Napi::Env env, NodeCHNet *ch)
@@ -454,6 +555,42 @@ static void CHN_NetDestruct(Napi::Env env, NodeCHNet *ch)
(void)env;
}
+static Napi::Value _CHN_CreateRing(Napi::Env env, uint32_t entry)
+{
+ NodeCHNetRing *ring;
+ Napi::Object obj;
+
+ obj = Napi::Object::New(env);
+ ring = new NodeCHNetRing(entry);
+ obj["__udata"] = Napi::Object::New(env);
+ obj_add_func(env, obj, ring, CHN_RingGetSQE, "GetSQE");
+ obj_add_func(env, obj, ring, CHN_RingSubmit, "Submit");
+ obj_add_func(env, obj, ring, CHN_RingWaitCQE, "WaitCQE");
+ obj_add_func(env, obj, ring, CHN_RingForEachCQE, "ForEachCQE");
+ obj_add_func(env, obj, ring, CHN_RingCQAdvance, "CQAdvance");
+ obj_add_func(env, obj, ring, CHN_RingGetCQEHead, "GetCQEHead");
+ obj_add_func(env, obj, ring, CHN_RingClose, "Close");
+ obj.AddFinalizer(CHN_RingDestruct, ring);
+ obj.Seal();
+ obj.Freeze();
+ ring->ref_ = Napi::Persistent(obj);
+ return obj;
+}
+
+static Napi::Value CHN_CreateRing(const Napi::CallbackInfo &info)
+{
+ Napi::Env env = info.Env();
+ uint32_t entry;
+
+ if (unlikely(info.Length() < 1 || !info[0].IsNumber())) {
+ throw_js_exception(env, "chnet.CreateRing must be given exactly one positive integer argument");
+ return env.Null();
+ }
+
+ entry = info[0].ToNumber().Uint32Value();
+ return _CHN_CreateRing(env, entry);
+}
+
static Napi::Object CHN_CreateNet(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
@@ -461,14 +598,14 @@ static Napi::Object CHN_CreateNet(const Napi::CallbackInfo &info)
NodeCHNet *ch = new NodeCHNet;
int64_t ch_ptr;
- obj_add_func(env, obj, ch, CHN_NetSetURL, "set_url");
- obj_add_func(env, obj, ch, CHN_NetSetMethod, "set_method");
- obj_add_func(env, obj, ch, CHN_NetSetRequestHeader, "set_request_header");
+ obj_add_func(env, obj, ch, CHN_NetSetURL, "SetURL");
+ obj_add_func(env, obj, ch, CHN_NetSetMethod, "SetMethod");
+ obj_add_func(env, obj, ch, CHN_NetSetRequestHeader, "SetRequestHeader");
obj_add_func(env, obj, ch, CHN_NetReadRet, "read_ret");
obj_add_func(env, obj, ch, CHN_NetReadBuf, "read_buf");
- obj_add_func(env, obj, ch, CHN_NetGetError, "get_error");
- obj_add_func(env, obj, ch, CHN_NetSetPayload, "set_payload");
-
+ obj_add_func(env, obj, ch, CHN_NetGetErrorStr, "GetErrorStr");
+ obj_add_func(env, obj, ch, CHN_NetSetPayload, "SetPayload");
+ obj_add_func(env, obj, ch, CHN_NetClose, "Close");
ch_ptr = (int64_t) (intptr_t) ch;
obj["__ch"] = Napi::Number::New(env, ch_ptr);
obj.AddFinalizer(CHN_NetDestruct, ch);
@@ -480,8 +617,10 @@ static Napi::Object CHN_CreateNet(const Napi::CallbackInfo &info)
static Napi::Object CHN_GlobalInit(Napi::Env env, Napi::Object exports)
{
chnet_global_init();
- obj_add_func(env, exports, NULL, CHN_CreateRing, "create_ring");
- obj_add_func(env, exports, NULL, CHN_CreateNet, "create_net");
+ obj_add_func(env, exports, NULL, CHN_CreateRing, "CreateRing");
+ obj_add_func(env, exports, NULL, CHN_CreateNet, "CreateNet");
+ exports.Seal();
+ exports.Freeze();
return exports;
}
NODE_API_MODULE(chnet, CHN_GlobalInit);
diff --git a/chnet/chnet_ring.cc b/chnet/chnet_ring.cc
index a5f3094..b858421 100644
--- a/chnet/chnet_ring.cc
+++ b/chnet/chnet_ring.cc
@@ -8,15 +8,14 @@
#include <chrono>
using namespace std::chrono_literals;
-#include <unistd.h>
-CNRingCtx::CNRingCtx(uint32_t entry)
+CNRing::CNRing(uint32_t entry)
{
uint32_t sq_max;
uint32_t cq_max;
- if (entry < 16)
- entry = 16;
+ if (entry < 2)
+ entry = 2;
if (entry > 1048576)
entry = 1048576;
@@ -28,372 +27,353 @@ CNRingCtx::CNRingCtx(uint32_t entry)
sq_mask_ = sq_max - 1;
cq_mask_ = cq_max - 1;
-
- sqes_ = new CNRingSQE[sq_max + 1];
- cqes_ = new CNRingCQE[cq_max + 1];
- state_ = new CNRingState(sq_max, cq_max, 2);
-
- state_->cqe_to_wait_.store(0, std::memory_order_relaxed);
- state_->nr_post_cqe_wait_.store(0, std::memory_order_relaxed);
sq_head_.store(0, std::memory_order_relaxed);
sq_tail_.store(0, std::memory_order_relaxed);
cq_head_.store(0, std::memory_order_relaxed);
cq_tail_.store(0, std::memory_order_relaxed);
+ cq_max_to_wait_.store(0, std::memory_order_relaxed);
+ sqes_ = new CNRingSQE[sq_max];
+ cqes_ = new CNRingCQE[cq_max];
+ state_ = new CNRingState(4096, 8192, 1);
}
-CNRingCtx::~CNRingCtx(void)
+CNRing::~CNRing(void)
{
+ state_->stop_ = true;
+ NotifyFreeCQEWaiter(true);
delete state_;
delete[] sqes_;
delete[] cqes_;
}
-static inline uint32_t big_min_small(uint32_t a, uint32_t b)
-{
- if (a < b)
- return b - a;
- else
- return a - b;
-}
-
-uint32_t CNRingCtx::sqe_size(void)
-{
- return big_min_small(sq_head_.load(), sq_tail_.load());
-}
-
-uint32_t CNRingCtx::cqe_size(void)
-{
- return big_min_small(cq_head_.load(), cq_tail_.load());
-}
-
-CNRingSQE *CNRingCtx::GetSQE(void)
-{
- uint32_t head = sq_head_.load();
- uint32_t tail = sq_tail_.load();
- uint32_t mask = sq_mask_;
- CNRingSQE *sqe;
-
- if (unlikely(big_min_small(head, tail) > mask))
- return nullptr;
-
- sqe = &sqes_[tail & mask];
- sq_tail_++;
- return sqe;
-}
-
-CNRingCQE *CNRingCtx::GetCQENoTailIncrement(void)
- __must_hold(&state_->cqe_lock_)
+CNRingState::CNRingState(uint32_t nr_thread, uint32_t nr_jobs,
+ uint32_t nr_idle_thread):
+ wq_(nr_thread, nr_jobs, nr_idle_thread)
{
- uint32_t head = cq_head_.load();
- uint32_t tail = cq_tail_.load();
- uint32_t mask = cq_mask_;
-
- if (unlikely(big_min_small(head, tail) > mask))
- return nullptr;
-
- return &cqes_[tail & mask];
+ cqe_to_wait_.store(0, std::memory_order_relaxed);
+ nr_free_cqe_slot_waiter_.store(0, std::memory_order_relaxed);
+ stop_ = false;
+ wq_.run();
}
-void CNRingCtx::NotifyCQEWaiter(void)
+void CNRing::NotifyCQEWaiter(void)
__must_hold(&state_->cqe_lock_)
{
- uint32_t to_wait = state_->cqe_to_wait_.load();
+ uint32_t to_wait = state_->cqe_to_wait_.load(std::memory_order_acquire);
if (!to_wait)
return;
- if (to_wait > cqe_size())
+ if (to_wait > CQESize())
return;
- state_->cqe_cond_.notify_one();
+ state_->cqe_to_wait_cond_.notify_one();
}
-bool CNRingCtx::TryPostCQE(CNRingSQE *sqe, int64_t res)
+void CNRing::NotifyFreeCQEWaiter(bool force)
{
- CNRingCQE *cqe;
-
- state_->cqe_lock_.lock();
- cqe = GetCQENoTailIncrement();
- if (unlikely(!cqe)) {
- NotifyCQEWaiter();
- state_->cqe_lock_.unlock();
- return false;
+ uint32_t n;
+
+ n = state_->nr_free_cqe_slot_waiter_.load(std::memory_order_acquire);
+ if (unlikely(n > 0 || force)) {
+ std::condition_variable *cond;
+ std::mutex *mutex;
+
+ mutex = &state_->wait_for_a_free_cqe_slot_lock_;
+ cond = &state_->wait_for_a_free_cqe_slot_cond_;
+ mutex->lock();
+ if (n == 1)
+ cond->notify_one();
+ else
+ cond->notify_all();
+ mutex->unlock();
}
-
- cqe->op = sqe->op;
- cqe->user_data = sqe->user_data;
- cqe->res = res;
- cq_tail_++;
- NotifyCQEWaiter();
- state_->cqe_lock_.unlock();
- return true;
}
-void CNRingCtx::WaitForCQEFreeSlot(void)
+void CNRing::SleepToWaitForAFreeCQE(void)
{
- std::unique_lock<std::mutex> lock(state_->post_cqe_lock_);
+ std::unique_lock<std::mutex> lk(state_->wait_for_a_free_cqe_slot_lock_);
+ std::atomic<uint32_t> *nr_waiter = &state_->nr_free_cqe_slot_waiter_;
- state_->nr_post_cqe_wait_++;
- state_->post_cqe_cond_.wait(lock, [this]{
- bool ret;
+ (*nr_waiter)++;
+ state_->wait_for_a_free_cqe_slot_cond_.wait(lk, [this]{
+ uint32_t ret;
state_->cqe_lock_.lock();
- ret = !!GetCQENoTailIncrement();
+ ret = CQESize();
state_->cqe_lock_.unlock();
- return ret;
+ return ret < (cq_mask_ + 1) || state_->should_stop();
});
- state_->nr_post_cqe_wait_--;
+ (*nr_waiter)--;
}
-void CNRingCtx::PostCQE(CNRingSQE *sqe, int64_t res)
+void CNRing::PostCQE(const CNRingSQE *sqe, int64_t res)
{
while (1) {
+ if (unlikely(state_->should_stop()))
+ return;
+
if (TryPostCQE(sqe, res))
- break;
+ return;
- WaitForCQEFreeSlot();
+ SleepToWaitForAFreeCQE();
}
}
-void CNRingCtx::NotifyWaitCQEFreeSlot(void)
+bool CNRing::TryPostCQE(const CNRingSQE *sqe, int64_t res)
{
- uint32_t nr_wait = state_->nr_post_cqe_wait_.load();
+ bool ret = false;
+ CNRingCQE *cqe;
+ uint32_t mask;
- if (unlikely(nr_wait)) {
- state_->post_cqe_lock_.lock();
- if (nr_wait == 1)
- state_->post_cqe_cond_.notify_one();
- else
- state_->post_cqe_cond_.notify_all();
- state_->post_cqe_lock_.unlock();
+ state_->cqe_lock_.lock();
+ mask = cq_mask_;
+
+ if (likely(CQESize() < (mask + 1))) {
+ cqe = &cqes_[cq_tail_++ & mask];
+ ret = true;
+ cqe->op_ = sqe->op_;
+ cqe->res_ = res;
+ cqe->user_data_ = sqe->user_data_;
+ NotifyCQEWaiter();
}
-}
-void CNRingCtx::CQAdvance(uint32_t n)
-{
- state_->cqe_lock_.lock();
- cq_head_ += n;
state_->cqe_lock_.unlock();
- NotifyWaitCQEFreeSlot();
+ return ret;
}
-/*
- * Use this when the caller is not allowed to block.
- */
-void CNRingCtx::CallPostCQE(CNRingSQE *sqe, int64_t res)
+bool CNRing::PostCQENoSleep(const CNRingSQE *sqe, int64_t res)
{
- CNRingSQE *tmp;
-
if (likely(TryPostCQE(sqe, res)))
- return;
+ return true;
- /*
- * The CQE slot is full, but the caller is not allowed to
- * block, let's schedule the post in the workqueue thread.
- */
- tmp = new CNRingSQE;
- *tmp = *sqe;
- state_->wq_.schedule_work([this, res](struct wq_job_data *data){
- CNRingSQE *sqe = (CNRingSQE *)data->data;
-
- this->PostCQE(sqe, res);
- delete sqe;
- }, tmp);
+ auto func = [this, res, copy_sqe = *sqe](auto *x){
+ this->PostCQE(©_sqe, res);
+ };
+ return state_->wq_.try_schedule_work(std::move(func), nullptr) >= 0;
}
-void CNRingCtx::IssueNopSQE(CNRingSQE *sqe)
+bool CNRing::IssueSQENop(const CNRingSQE *sqe)
{
- CallPostCQE(sqe, 0);
+ return PostCQENoSleep(sqe, 0);
}
-static void issue_start_sqe(void *udata, net::URLRequest *url_req, int net_err)
+static inline void post_sqe_read(CNRing *ring, const CNRingSQE *sqe, int size)
{
- CNRingOpStart *sop;
- CNRingSQE *sqe;
-
- sqe = (CNRingSQE *)udata;
- sop = (CNRingOpStart *)sqe->sq_data;
- sop->ring->PostCQE(sqe, net_err);
- delete sop;
- delete sqe;
+ ring->PostCQE(sqe, size);
}
-void CNRingCtx::IssueStartSQE(CNRingSQE *sqe)
+static int issue_sqe_read(CNRing *ring, const CNRingSQE *sqe)
{
+ CHNet *ch = sqe->sq_start_.ch_;
struct net::CHNCallback *cb;
- CNRingOpStart *sop;
- CNRingSQE *tmp;
- int ret;
- tmp = new CNRingSQE;
- *tmp = *sqe;
+ auto f = [ring, sqe_cp = *sqe](void *u, net::URLRequest *ur, int size){
+ post_sqe_read(ring, &sqe_cp, size);
+ };
- sop = (CNRingOpStart *)tmp->sq_data;
- sop->ring = this;
-
- cb = &sop->chnet->ch()->cb;
- cb->response_started_ = issue_start_sqe;
- cb->response_started_data_ = tmp;
- ret = sop->chnet->Start();
- if (unlikely(ret)) {
- CallPostCQE(sqe, ret);
- delete tmp;
- }
-}
-
-static void issue_read_sqe(void *udata, net::URLRequest *url_req, int read_ret)
-{
- CNRingOpStart *sop;
- CNRingSQE *sqe;
-
- sqe = (CNRingSQE *)udata;
- sop = (CNRingOpStart *)sqe->sq_data;
- sop->ring->PostCQE(sqe, read_ret);
- delete sop;
- delete sqe;
+ cb = &ch->ch()->cb;
+ cb->read_completed_ = std::move(f);
+ return ch->Read(sqe->sq_read_.size_);
}
-static void issue_read_sqe_need_start(void *udata, net::URLRequest *url_req,
- int net_err)
+static void _issue_sqe_read_need_start(CNRing *ring, const CNRingSQE *sqe,
+ int net_err)
{
+ CHNet *ch = sqe->sq_start_.ch_;
struct net::CHNCallback *cb;
- CNRingOpRead *sop;
- CNRingSQE *sqe;
- sqe = (CNRingSQE *)udata;
- sop = (CNRingOpRead *)sqe->sq_data;
-
- if (likely(net_err == net::OK)) {
- /*
- * The start operation succeeds, now we can do
- * the read operation directly without issuing
- * extra SQE.
- */
- cb = &sop->chnet->ch()->cb;
- cb->read_completed_ = issue_read_sqe;
- cb->read_completed_data_ = sqe;
- sop->chnet->ch()->_Read(nullptr, sop->read_size);
+ if (unlikely(net_err != net::OK)) {
+ ring->PostCQE(sqe, net_err);
return;
}
+ auto f = [ring, sqe_cp = *sqe](void *u, net::URLRequest *ur, int size){
+ post_sqe_read(ring, &sqe_cp, size);
+ };
+
/*
- * The start operation fails, just post the CQE directly.
+ * The start operation succeeds, now we can do
+ * the read operation directly without issuing
+ * extra SQE.
*/
- sop->ring->PostCQE(sqe, net_err);
- delete sop;
- delete sqe;
+ cb = &ch->ch()->cb;
+ cb->read_completed_ = std::move(f);
+ ch->ch()->_Read(nullptr, sqe->sq_read_.size_);
}
-void CNRingCtx::IssueReadSQE(CNRingSQE *sqe)
+static int issue_sqe_read_need_start(CNRing *ring, const CNRingSQE *sqe)
{
+ CHNet *ch = sqe->sq_start_.ch_;
struct net::CHNCallback *cb;
- net::CHNetDelegate *ch;
- CNRingOpRead *sop;
- CNRingSQE *tmp;
- int ret;
- tmp = new CNRingSQE;
- *tmp = *sqe;
+ auto f = [ring, sqe_cp = *sqe](void *u, auto *ur, int net_err){
+ _issue_sqe_read_need_start(ring, &sqe_cp, net_err);
+ };
+
+ cb = &ch->ch()->cb;
+ cb->response_started_ = std::move(f);
+ return ch->Start();
+}
- sop = (CNRingOpRead *)tmp->sq_data;
- sop->ring = this;
+bool CNRing::IssueSQERead(const CNRingSQE *sqe)
+{
+ int ret;
- ch = sop->chnet->ch();
- cb = &ch->cb;
- if (unlikely(!ch->started())) {
+ if (!sqe->sq_start_.ch_->ch()->started()) {
/*
* The request hasn't been started, we can't perform
* a read operation at this point, do the start
* operation first!
*/
- cb->response_started_ = issue_read_sqe_need_start;
- cb->response_started_data_ = tmp;
- ret = sop->chnet->Start();
+ ret = issue_sqe_read_need_start(this, sqe);
} else {
/*
* Normal read operation here.
*/
- cb->read_completed_ = issue_read_sqe;
- cb->read_completed_data_ = tmp;
- ret = sop->chnet->Read(sop->read_size);
+ ret = issue_sqe_read(this, sqe);
}
if (likely(ret >= 0))
- return;
+ return true;
- /*
- * Aiee, the operation fails!
- */
- CallPostCQE(sqe, ret);
- delete tmp;
+ return PostCQENoSleep(sqe, ret);
+}
+
+bool CNRing::IssueSQEStart(const CNRingSQE *sqe)
+{
+ CHNet *ch = sqe->sq_start_.ch_;
+ struct net::CHNCallback *cb;
+ int ret;
+
+ auto f = [this, sqe_cp = *sqe](void *u, auto *ur, int net_err){
+ this->PostCQE(&sqe_cp, net_err);
+ };
+
+ cb = &ch->ch()->cb;
+ cb->response_started_ = std::move(f);
+ ret = ch->Start();
+
+ if (likely(ret >= 0))
+ return true;
+
+ return PostCQENoSleep(sqe, ret);
}
-void CNRingCtx::IssueSQE(CNRingSQE *sqe)
+void CNRing::IssueSQE(const CNRingSQE *sqe)
{
- switch (sqe->op) {
+ bool ok = false;
+
+ switch (sqe->op_) {
case CNRING_OP_NOP:
- IssueNopSQE(sqe);
- break;
- case CNRING_OP_START:
- IssueStartSQE(sqe);
+ ok = IssueSQENop(sqe);
break;
case CNRING_OP_READ:
- IssueReadSQE(sqe);
+ ok = IssueSQERead(sqe);
+ break;
+ case CNRING_OP_START:
+ ok = IssueSQEStart(sqe);
break;
}
+
+ if (likely(ok))
+ cq_max_to_wait_++;
}
-uint32_t CNRingCtx::SubmitSQE(uint32_t to_submit)
+uint32_t CNRing::Submit(uint32_t to_submit)
{
- uint32_t head = sq_head_.load();
- uint32_t tail = sq_tail_.load();
- uint32_t ret = 0;
+ uint32_t head;
+ uint32_t tail;
+ uint32_t mask;
+ uint32_t ret;
+
+ if (unlikely(!to_submit))
+ return 0;
+
+ state_->sqe_lock_.lock();
+ head = sq_head_.load(std::memory_order_acquire);
+ tail = sq_tail_.load(std::memory_order_acquire);
+ mask = sq_mask_;
+ ret = 0;
while (to_submit--) {
if (unlikely(head == tail))
break;
- IssueSQE(&sqes_[head++ & sq_mask_]);
+ IssueSQE(&sqes_[head++ & mask]);
ret++;
}
- if (ret)
- sq_head_.fetch_add(ret, std::memory_order_release);
+ sq_head_.store(head, std::memory_order_release);
+ state_->sqe_lock_.unlock();
+ return ret;
+}
+
+CNRingSQE *CNRing::GetSQE(void)
+{
+ CNRingSQE *sqe = nullptr;
+
+ state_->sqe_lock_.lock();
+ if (likely(SQESize() < (sq_mask_ + 1)))
+ sqe = &sqes_[sq_tail_++ & sq_mask_];
+
+ state_->sqe_lock_.unlock();
+ return sqe;
+}
+
+uint32_t CNRing::_WaitCQE(uint32_t to_wait, uint32_t timeout_ms,
+ std::unique_lock<std::mutex> &lock)
+ __must_hold(&state_->cqe_lock_)
+{
+ std::atomic<uint32_t> *cqe_to_wait = &state_->cqe_to_wait_;
+ std::condition_variable *cond = &state_->cqe_to_wait_cond_;
+ uint32_t ret;
+
+ cqe_to_wait->store(to_wait, std::memory_order_release);
+ auto callback = [this, to_wait, &ret]{
+ ret = CQESize();
+ return ret >= to_wait;
+ };
+
+ if (timeout_ms == -1U)
+ cond->wait(lock, std::move(callback));
+ else
+ cond->wait_for(lock, timeout_ms*1ms, std::move(callback));
+
+ cqe_to_wait->store(0, std::memory_order_release);
return ret;
}
-void CNRingCtx::WaitCQE(uint32_t to_wait)
+uint32_t CNRing::WaitCQE(uint32_t to_wait, uint32_t timeout_ms)
{
uint32_t max_to_wait;
- uint32_t cq_size;
+ uint32_t ret;
if (unlikely(!to_wait))
- return;
+ return 0;
+
+ max_to_wait = cq_max_to_wait_.load(std::memory_order_acquire);
+ if (unlikely(!max_to_wait))
+ return 0;
- max_to_wait = cq_mask_;
if (to_wait > max_to_wait)
to_wait = max_to_wait;
- std::unique_lock<std::mutex> cqe_lock(state_->cqe_lock_);
- cq_size = cqe_size();
-
- if (cq_size < cq_mask_ + 1)
- NotifyWaitCQEFreeSlot();
+ std::unique_lock<std::mutex> lock(state_->cqe_lock_);
- if (to_wait <= cq_size)
- return;
+ ret = CQESize();
+ if (unlikely(ret >= to_wait))
+ return ret;
- state_->cqe_to_wait_.store(to_wait);
- state_->cqe_cond_.wait(cqe_lock, [this, to_wait]{
- return to_wait <= cqe_size();
- });
- state_->cqe_to_wait_.store(0);
+ return _WaitCQE(to_wait, timeout_ms, lock);
}
-CNRingState::CNRingState(uint32_t nr_thread, uint32_t nr_jobs,
- uint32_t nr_idle_thread):
- wq_(nr_thread, nr_jobs, nr_idle_thread)
+void CNRing::CQAdvance(uint32_t n)
{
- wq_.run();
+ state_->cqe_lock_.lock();
+ cq_head_ += n;
+ cq_max_to_wait_ -= n;
+ state_->cqe_lock_.unlock();
+ NotifyFreeCQEWaiter();
}
diff --git a/chnet/chnet_ring.h b/chnet/chnet_ring.h
index f41d75b..0630050 100644
--- a/chnet/chnet_ring.h
+++ b/chnet/chnet_ring.h
@@ -8,169 +8,193 @@
#include "common.h"
#ifdef __FOR_CHROMIUM_INTERNAL
-#include <condition_variable>
-
/*
* Chromium internal includes.
*/
#include "base/component_export.h"
#include "WorkQueue.h"
+#include <condition_variable>
#define CHNET_EXPORT COMPONENT_EXPORT(CHNET)
+
#else /* #ifdef __FOR_CHROMIUM_INTERNAL */
+
#define CHNET_EXPORT
-#endif
+
+#endif /* #ifdef __FOR_CHROMIUM_INTERNAL */
+
enum CNRingOp {
CNRING_OP_NOP = 0,
- CNRING_OP_START = 1,
- CNRING_OP_READ = 2,
+ CNRING_OP_READ = 1,
+ CNRING_OP_START = 2,
+ CNRING_OP_WRITE = 3,
};
-class CNRingCtx;
-
-struct CHNET_EXPORT CNRingOpStart {
- CHNet *chnet;
- CNRingCtx *ring;
+struct CHNET_EXPORT CNRingOpRead {
+ CHNet *ch_;
+ uint32_t size_;
};
-struct CHNET_EXPORT CNRingOpRead {
- CHNet *chnet;
- CNRingCtx *ring;
- int read_size;
+struct CHNET_EXPORT CNRingOpStart {
+ CHNet *ch_;
};
class CHNET_EXPORT CNRingSQE {
public:
- uint8_t op;
- void *sq_data;
- uint64_t user_data;
+ uint8_t op_;
+ uint64_t user_data_;
+ union {
+ struct CNRingOpStart sq_start_;
+ struct CNRingOpRead sq_read_;
+ };
inline void PrepNop(void)
{
- op = CNRING_OP_NOP;
+ op_ = CNRING_OP_NOP;
}
- inline void PrepStart(CHNet *chnet)
+ inline void PrepRead(CHNet *ch, uint32_t size)
{
- CNRingOpStart *d = new CNRingOpStart;
-
- d->chnet = chnet;
- d->ring = nullptr;
- sq_data = (void *)d;
- op = CNRING_OP_START;
+ op_ = CNRING_OP_READ;
+ sq_read_.ch_ = ch;
+ sq_read_.size_ = size;
}
- inline void PrepRead(CHNet *chnet, int read_size)
+ inline void PrepStart(CHNet *ch)
{
- CNRingOpRead *d = new CNRingOpRead;
-
- d->chnet = chnet;
- d->read_size = read_size;
- sq_data = (void *)d;
- op = CNRING_OP_READ;
+ op_ = CNRING_OP_START;
+ sq_start_.ch_ = ch;
}
- inline void SetUserDataPtr(void *ptr)
+ inline void SetUserData(uint64_t data)
{
- user_data = (uint64_t) ptr;
+ user_data_ = data;
}
- inline void SetUserData(uint64_t ptr)
+ inline void SetUserDataPtr(void *data)
{
- user_data = ptr;
+ user_data_ = (uint64_t) (uintptr_t) data;
}
};
class CHNET_EXPORT CNRingCQE {
public:
- uint8_t op;
- uint64_t user_data;
- union {
- int64_t res;
- };
+ uint8_t op_;
+ int64_t res_;
+ uint64_t user_data_;
inline uint64_t GetUserData(void)
{
- return user_data;
+ return user_data_;
}
inline void *GetUserDataPtr(void)
{
- return (void *) (uintptr_t) user_data;
+ return (void *) (uintptr_t) user_data_;
}
};
#ifdef __FOR_CHROMIUM_INTERNAL
class CNRingState {
public:
- CNRingState(uint32_t nr_thread = 64, uint32_t nr_jobs = 4096,
- uint32_t nr_idle_thread = -1U);
- WorkQueue wq_;
- std::mutex cqe_lock_;
- std::condition_variable cqe_cond_;
- std::atomic<uint32_t> cqe_to_wait_;
-
- std::mutex post_cqe_lock_;
- std::condition_variable post_cqe_cond_;
- std::atomic<uint32_t> nr_post_cqe_wait_;
+ CNRingState(uint32_t nr_thread, uint32_t nr_jobs,
+ uint32_t nr_idle_thread);
+
+ std::atomic<uint32_t> cqe_to_wait_;
+ std::atomic<uint32_t> nr_free_cqe_slot_waiter_;
+
+ std::condition_variable cqe_to_wait_cond_;
+ std::condition_variable wait_for_a_free_cqe_slot_cond_;
+
+ std::mutex wait_for_a_free_cqe_slot_lock_;
+ std::mutex sqe_lock_;
+ std::mutex cqe_lock_;
+ WorkQueue wq_;
+ volatile bool stop_;
+
+ inline bool should_stop(void)
+ {
+ return stop_ || wq_.should_stop();
+ }
};
-#endif
+#endif /* #ifdef __FOR_CHROMIUM_INTERNAL */
-class CHNET_EXPORT CNRingCtx {
+
+static inline uint32_t u32_diff(uint32_t a, uint32_t b)
+{
+ if (b > a)
+ return b - a;
+ else
+ return a - b;
+}
+
+class CHNET_EXPORT CNRing {
public:
- CNRingCtx(uint32_t entry);
- ~CNRingCtx(void);
- uint32_t SubmitSQE(uint32_t to_submit = -1U);
+ CNRing(uint32_t entry);
+ ~CNRing(void);
+ uint32_t Submit(uint32_t to_submit = -1U);
CNRingSQE *GetSQE(void);
- void WaitCQE(uint32_t to_wait);
- uint32_t sqe_size(void);
- uint32_t cqe_size(void);
-
- void PostCQE(CNRingSQE *sqe, int64_t res);
- void CallPostCQE(CNRingSQE *sqe, int64_t res);
+ uint32_t WaitCQE(uint32_t to_wait = 1, uint32_t timeout_ms = -1U);
void CQAdvance(uint32_t n);
+ bool TryPostCQE(const CNRingSQE *sqe, int64_t res);
+ void PostCQE(const CNRingSQE *sqe, int64_t res);
+ bool PostCQENoSleep(const CNRingSQE *sqe, int64_t res);
- inline CNRingCQE *HeadCQE(void)
+ inline CNRingCQE *GetCQEHead(void)
{
- return &cqes_[cq_head_.load()];
+ return &cqes_[cq_head_.load(std::memory_order_acquire) &
+ cq_mask_];
}
+ CNRingSQE *sqes_;
+ CNRingCQE *cqes_;
+
#ifdef __FOR_CHROMIUM_INTERNAL
- CNRingState *state_;
+ CNRingState *state_;
#else
- void *state_;
+ void *state_;
#endif
- CNRingSQE *sqes_;
- CNRingCQE *cqes_;
-
- uint32_t sq_mask_;
- std::atomic<uint32_t> sq_head_;
std::atomic<uint32_t> sq_tail_;
+ std::atomic<uint32_t> sq_head_;
+ uint32_t sq_mask_;
- uint32_t cq_mask_;
- std::atomic<uint32_t> cq_head_;
std::atomic<uint32_t> cq_tail_;
+ std::atomic<uint32_t> cq_head_;
+ uint32_t cq_mask_;
+ std::atomic<uint32_t> cq_max_to_wait_;
private:
- void HandleSQE(CNRingSQE *sqe);
- CNRingSQE *ConsumeSQE(void);
- bool TryPostCQE(CNRingSQE *sqe, int64_t res);
- CNRingCQE *GetCQENoTailIncrement(void);
+ uint32_t _WaitCQE(uint32_t to_wait, uint32_t timeout_ms,
+ std::unique_lock<std::mutex> &lock);
+ void IssueSQE(const CNRingSQE *sqe);
+ bool IssueSQENop(const CNRingSQE *sqe);
+ bool IssueSQERead(const CNRingSQE *sqe);
+ bool IssueSQEStart(const CNRingSQE *sqe);
void NotifyCQEWaiter(void);
- void NotifyWaitCQEFreeSlot(void);
- void WaitForCQEFreeSlot(void);
+ void NotifyFreeCQEWaiter(bool force = false);
+ void SleepToWaitForAFreeCQE(void);
+
+ int _IssueSQEReadNeedStart(const CNRingSQE *sqe);
- void IssueSQE(CNRingSQE *sqe);
- void IssueNopSQE(CNRingSQE *sqe);
- void IssueStartSQE(CNRingSQE *sqe);
- void IssueReadSQE(CNRingSQE *sqe);
+ inline uint32_t CQESize(void)
+ {
+ return u32_diff(cq_head_.load(std::memory_order_acquire),
+ cq_tail_.load(std::memory_order_acquire));
+ }
+
+ inline uint32_t SQESize(void)
+ {
+ return u32_diff(sq_head_.load(std::memory_order_acquire),
+ sq_tail_.load(std::memory_order_acquire));
+ }
};
-#define chnring_for_each_cqe(ring, head, cqe) \
+#define cnring_for_each_cqe(ring, head, tail, cqe) \
for ( \
- head = (ring)->cq_head_; \
- (cqe = (head != (ring)->cq_tail_) ? \
+ head = (ring)->cq_head_.load(std::memory_order_acquire),\
+ tail = (ring)->cq_tail_.load(std::memory_order_acquire);\
+ (cqe = (head != tail) ? \
&(ring)->cqes_[head & (ring)->cq_mask_] : \
NULL); \
head++ \
diff --git a/tests/cpp/ring.cc b/tests/cpp/ring.cc
index 54426ef..22da7ea 100644
--- a/tests/cpp/ring.cc
+++ b/tests/cpp/ring.cc
@@ -6,231 +6,246 @@
static void test_nop(void)
{
- /*
- * Simple nop test.
- *
- * This shows that CQE size is double of SQE size.
- */
-
- static constexpr uint32_t entry = 4096;
- unsigned head, i, j, a, b;
- CNRingCtx ring(entry);
+ constexpr static uint32_t nr_loop = 1024 * 8;
+ constexpr static uint32_t entry = 4096;
+ CNRing ring(entry);
CNRingSQE *sqe;
CNRingCQE *cqe;
+ uint32_t total;
+ uint32_t head;
+ uint32_t tail;
+ uint32_t ret;
+ uint32_t i;
- i = 0;
- for (j = 0; j < 3; j++) {
- while (1) {
+ for (i = 0; i < nr_loop; i++) {
+ sqe = ring.GetSQE();
+ if (!sqe) {
+ assert(ring.Submit() == entry);
sqe = ring.GetSQE();
- if (!sqe)
- break;
- sqe->PrepNop();
- sqe->SetUserData(i++);
+ assert(sqe);
}
- ring.SubmitSQE();
+ sqe->PrepNop();
+ sqe->SetUserData(1);
}
- ring.WaitCQE(entry * 2);
-
- a = i;
- i = 0;
- chnring_for_each_cqe(&ring, head, cqe)
- i++;
- b = i;
- ring.CQAdvance(i);
- assert(b == entry * 2);
-
- ring.WaitCQE(entry);
- i = 0;
- chnring_for_each_cqe(&ring, head, cqe)
- i++;
- ring.CQAdvance(i);
- b += i;
- assert(a == b);
+ assert(ring.Submit() == entry);
+
+ total = 0;
+ while (total < nr_loop) {
+ ret = ring.WaitCQE(1);
+ i = 0;
+ cnring_for_each_cqe(&ring, head, tail, cqe) {
+ assert(cqe->op_ == CNRING_OP_NOP);
+ assert(cqe->res_ == 0);
+ assert(cqe->user_data_ == 1);
+ i++;
+ }
+ ring.CQAdvance(i);
+ assert(ret >= 1);
+ assert(i >= ret);
+ total += ret;
+ }
+ assert(total == nr_loop);
}
-static void test_chnet_ring_single(void)
+static void test_chnet_ring_read(bool do_sq_start = false)
{
- CNRingCtx ring(4096);
+ CNRing ring(1);
CNRingSQE *sqe;
CNRingCQE *cqe;
+ uint32_t ret;
CHNet *ch;
ch = new CHNet;
ch->SetURL("http://127.0.0.1:8000/index.php?action=hello");
+ ch->SetMethod("GET");
- sqe = ring.GetSQE();
- assert(sqe);
- sqe->PrepStart(ch);
- sqe->SetUserData(9999);
- assert(ring.SubmitSQE() == 1);
-
- ring.WaitCQE(1);
- cqe = ring.HeadCQE();
- assert(cqe->op == CNRING_OP_START);
- assert(cqe->res == 0);
- assert(cqe->user_data == 9999);
- ring.CQAdvance(1);
+ if (do_sq_start) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe->PrepStart(ch);
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+ assert(ret == 1);
+ assert(cqe->op_ == CNRING_OP_START);
+ assert(cqe->res_ == 0);
+ ring.CQAdvance(1);
+ }
sqe = ring.GetSQE();
- assert(sqe);
sqe->PrepRead(ch, 1024);
- sqe->SetUserData(9999);
- assert(ring.SubmitSQE() == 1);
-
- ring.WaitCQE(1);
- cqe = ring.HeadCQE();
- assert(cqe->op == CNRING_OP_READ);
- assert(cqe->res == 13);
- assert(cqe->user_data == 9999);
+ ring.Submit();
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+
+ assert(ret == 1);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(cqe->res_ == 13);
+ assert(ch->read_ret() == cqe->res_);
+ assert(!strncmp("Hello World!\n", ch->read_buf(), cqe->res_));
ring.CQAdvance(1);
delete ch;
}
-static void test_chnet_ring_set_header(void)
+static void test_chnet_ring_partial_read(bool do_sq_start = false)
{
- constexpr static const char ua[] = "This is just a test user agent!!!";
- CNRingCtx ring(4096);
+ CNRing ring(1);
CNRingSQE *sqe;
CNRingCQE *cqe;
+ uint32_t ret;
CHNet *ch;
ch = new CHNet;
- ch->SetURL("http://127.0.0.1:8000/index.php?action=user_agent");
- ch->SetRequestHeader("User-agent", ua);
+ ch->SetURL("http://127.0.0.1:8000/index.php?action=hello");
+ ch->SetMethod("GET");
+ if (do_sq_start) {
+ sqe = ring.GetSQE();
+ sqe->PrepStart(ch);
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+ assert(ret == 1);
+ assert(cqe->op_ == CNRING_OP_START);
+ assert(cqe->res_ == 0);
+ ring.CQAdvance(1);
+ }
+
+ /*
+ * The first read.
+ */
sqe = ring.GetSQE();
- assert(sqe);
- sqe->PrepStart(ch);
- sqe->SetUserData(9999);
- assert(ring.SubmitSQE() == 1);
-
- ring.WaitCQE(1);
- cqe = ring.HeadCQE();
- assert(cqe->op == CNRING_OP_START);
- assert(cqe->res == 0);
- assert(cqe->user_data == 9999);
+ sqe->PrepRead(ch, 4);
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+
+ assert(ret == 1);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(cqe->res_ == 4);
+ assert(ch->read_ret() == cqe->res_);
+ assert(!strncmp("Hell", ch->read_buf(), cqe->res_));
ring.CQAdvance(1);
+
+ /*
+ * The second read.
+ */
sqe = ring.GetSQE();
- assert(sqe);
- sqe->PrepRead(ch, 1024);
- sqe->SetUserData(9999);
- assert(ring.SubmitSQE() == 1);
-
- ring.WaitCQE(1);
- cqe = ring.HeadCQE();
- assert(cqe->op == CNRING_OP_READ);
- assert(cqe->res == strlen(ua));
- assert(cqe->user_data == 9999);
- assert(!strncmp(ua, ch->read_buf(), strlen(ua)));
- ring.CQAdvance(1);
+ sqe->PrepRead(ch, 4);
+ assert(ring.Submit() == 1);
- delete ch;
-}
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
-static void test_chnet_ring_single_post(void)
-{
- constexpr static const char buf[] = "AAAA Hello World!\n";
- CNRingCtx ring(4096);
- CNRingSQE *sqe;
- CNRingCQE *cqe;
- CHNet *ch;
+ assert(ret == 1);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(cqe->res_ == 4);
+ assert(ch->read_ret() == cqe->res_);
+ assert(!strncmp("o Wo", ch->read_buf(), cqe->res_));
+ ring.CQAdvance(1);
- ch = new CHNet;
- ch->SetURL("http://127.0.0.1:8000/index.php?action=body");
- ch->SetMethod("POST");
- ch->SetPayload(buf, sizeof(buf));
+ /*
+ * The third read.
+ */
sqe = ring.GetSQE();
- assert(sqe);
- sqe->PrepStart(ch);
- sqe->SetUserData(9999);
- assert(ring.SubmitSQE() == 1);
-
- ring.WaitCQE(1);
- cqe = ring.HeadCQE();
- assert(cqe->op == CNRING_OP_START);
- assert(cqe->res == 0);
- assert(cqe->user_data == 9999);
+ sqe->PrepRead(ch, 1024);
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+
+ assert(ret == 1);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(cqe->res_ == 5);
+ assert(ch->read_ret() == cqe->res_);
+ assert(!strncmp("rld!\n", ch->read_buf(), cqe->res_));
ring.CQAdvance(1);
+
+ /*
+ * The 4-th read.
+ */
sqe = ring.GetSQE();
- assert(sqe);
sqe->PrepRead(ch, 1024);
- sqe->SetUserData(9999);
- assert(ring.SubmitSQE() == 1);
-
- ring.WaitCQE(1);
- cqe = ring.HeadCQE();
- assert(cqe->op == CNRING_OP_READ);
- assert(cqe->res == sizeof(buf));
- assert(cqe->user_data == 9999);
- assert(!strncmp(ch->read_buf(), buf, sizeof(buf)));
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+
+ assert(ret == 1);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(cqe->res_ == 0);
+ assert(ch->read_ret() == cqe->res_);
ring.CQAdvance(1);
delete ch;
}
-#define NR_REQ 1024
-
-static void _test_chnet_ring_multiple(bool start_before_read, CNRingCtx *ring)
+#define NR_REQ 4096
+static void test_chnet_ring_read_parallel(bool do_sq_start = false)
{
- unsigned i, j, head;
+ CNRing ring(NR_REQ);
CNRingSQE *sqe;
CNRingCQE *cqe;
+ uint32_t head;
+ uint32_t tail;
+ uint32_t i;
CHNet **ch;
- ch = new CHNet*[NR_REQ];
+ ch = new CHNet *[NR_REQ];
for (i = 0; i < NR_REQ; i++) {
ch[i] = new CHNet;
ch[i]->SetURL("http://127.0.0.1:8000/index.php?action=hello");
}
- if (start_before_read) {
+ if (do_sq_start) {
for (i = 0; i < NR_REQ; i++) {
- sqe = ring->GetSQE();
+ sqe = ring.GetSQE();
assert(sqe);
sqe->PrepStart(ch[i]);
sqe->SetUserDataPtr(ch[i]);
}
- assert(ring->SubmitSQE() == i);
- ring->WaitCQE(i);
-
- j = 0;
- chnring_for_each_cqe(ring, head, cqe) {
- assert(cqe->op == CNRING_OP_START);
- assert(!cqe->res);
- j++;
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+ cnring_for_each_cqe(&ring, head, tail, cqe) {
+ CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
+
+ assert(ch);
+ assert(cqe->res_ == 0);
+ assert(cqe->op_ == CNRING_OP_START);
}
- assert(j == i);
- ring->CQAdvance(j);
+ ring.CQAdvance(NR_REQ);
}
+
for (i = 0; i < NR_REQ; i++) {
- sqe = ring->GetSQE();
+ sqe = ring.GetSQE();
assert(sqe);
sqe->PrepRead(ch[i], 1024);
sqe->SetUserDataPtr(ch[i]);
}
- assert(ring->SubmitSQE() == i);
- ring->WaitCQE(i);
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
- j = 0;
- chnring_for_each_cqe(ring, head, cqe) {
- CHNet *ch;
+ cnring_for_each_cqe(&ring, head, tail, cqe) {
+ CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
- ch = (CHNet *)cqe->GetUserDataPtr();
assert(ch);
- assert(cqe->op == CNRING_OP_READ);
- assert(cqe->res == 13);
- assert(ch->read_ret() == cqe->res);
- assert(!strncmp(ch->read_buf(), "Hello World!\n", 13));
- j++;
+ assert(cqe->res_ == 13);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(ch->read_ret() == cqe->res_);
+ assert(!strncmp("Hello World!\n", ch->read_buf(), cqe->res_));
}
- assert(j == i);
- ring->CQAdvance(j);
+ ring.CQAdvance(NR_REQ);
for (i = 0; i < NR_REQ; i++)
delete ch[i];
@@ -238,82 +253,157 @@ static void _test_chnet_ring_multiple(bool start_before_read, CNRingCtx *ring)
delete[] ch;
}
-static void test_chnet_ring_multiple(bool start_before_read)
-{
- CNRingCtx ring(NR_REQ);
- unsigned i;
-
- for (i = 0; i < 3; i++)
- _test_chnet_ring_multiple(start_before_read, &ring);
-}
-
-static void test_chnet_chunked_body(void)
+static void test_chnet_ring_partial_read_parallel(bool do_sq_start = false)
{
- constexpr size_t len = 1024 * 64;
- constexpr size_t nr_loop = 100;
- size_t total_read = 0;
- CNRingCtx ring(4096);
+ CNRing ring(NR_REQ);
CNRingSQE *sqe;
CNRingCQE *cqe;
- CHNet *ch;
- char *buf;
- size_t i;
+ uint32_t head;
+ uint32_t tail;
+ uint32_t i;
+ CHNet **ch;
- ch = new CHNet;
- ch->SetURL("http://127.0.0.1:8000/index.php?action=body");
- ch->SetMethod("POST");
- ch->StartChunkedBody();
+ ch = new CHNet *[NR_REQ];
- sqe = ring.GetSQE();
- assert(sqe);
- sqe->PrepRead(ch, len);
- sqe->SetUserData(9999);
- assert(ring.SubmitSQE() == 1);
-
- buf = new char[len];
- memset(buf, 'A', len);
-
- for (i = 0; i < nr_loop; i++)
- ch->Write(buf, len);
- ch->StopChunkedBody();
-
- while (total_read < (len * nr_loop)) {
- int res;
-
- ring.WaitCQE(1);
- cqe = ring.HeadCQE();
- assert(cqe->op == CNRING_OP_READ);
- assert(cqe->user_data == 9999);
- res = cqe->res;
- total_read += res;
- ring.CQAdvance(1);
+ for (i = 0; i < NR_REQ; i++) {
+ ch[i] = new CHNet;
+ ch[i]->SetURL("http://127.0.0.1:8000/index.php?action=hello");
+ ch[i]->SetMethod("GET");
+ }
- if (!res)
- break;
+ if (do_sq_start) {
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe->PrepStart(ch[i]);
+ sqe->SetUserDataPtr(ch[i]);
+ }
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+ cnring_for_each_cqe(&ring, head, tail, cqe) {
+ CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
+ assert(ch);
+ assert(cqe->res_ == 0);
+ assert(cqe->op_ == CNRING_OP_START);
+ }
+ ring.CQAdvance(NR_REQ);
+ }
+
+
+ /*
+ * The first read.
+ */
+ for (i = 0; i < NR_REQ; i++) {
sqe = ring.GetSQE();
assert(sqe);
- sqe->PrepRead(ch, len);
- sqe->SetUserData(9999);
- assert(ring.SubmitSQE() == 1);
+ sqe->PrepRead(ch[i], 4);
+ sqe->SetUserDataPtr(ch[i]);
}
- printf("total_read = %zu %zu\n", total_read, (len * nr_loop));
- assert(total_read == (len * nr_loop));
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
- delete[] buf;
- delete ch;
+ cnring_for_each_cqe(&ring, head, tail, cqe) {
+ CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
+
+ assert(ch);
+ assert(cqe->res_ == 4);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(ch->read_ret() == cqe->res_);
+ assert(!strncmp("Hell", ch->read_buf(), cqe->res_));
+ }
+ ring.CQAdvance(NR_REQ);
+
+
+ /*
+ * The second read.
+ */
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe->PrepRead(ch[i], 4);
+ sqe->SetUserDataPtr(ch[i]);
+ }
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+ cnring_for_each_cqe(&ring, head, tail, cqe) {
+ CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
+
+ assert(ch);
+ assert(cqe->res_ == 4);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(ch->read_ret() == cqe->res_);
+ assert(!strncmp("o Wo", ch->read_buf(), cqe->res_));
+ }
+ ring.CQAdvance(NR_REQ);
+
+
+ /*
+ * The third read.
+ */
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe->PrepRead(ch[i], 100);
+ sqe->SetUserDataPtr(ch[i]);
+ }
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+ cnring_for_each_cqe(&ring, head, tail, cqe) {
+ CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
+
+ assert(ch);
+ assert(cqe->res_ == 5);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(ch->read_ret() == cqe->res_);
+ assert(!strncmp("rld!\n", ch->read_buf(), cqe->res_));
+ }
+ ring.CQAdvance(NR_REQ);
+
+
+ /*
+ * The 4-th read.
+ */
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe->PrepRead(ch[i], 100);
+ sqe->SetUserDataPtr(ch[i]);
+ }
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+ cnring_for_each_cqe(&ring, head, tail, cqe) {
+ CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
+
+ assert(ch);
+ assert(cqe->res_ == 0);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(ch->read_ret() == 0);
+ }
+ ring.CQAdvance(NR_REQ);
+
+ for (i = 0; i < NR_REQ; i++)
+ delete ch[i];
+
+ delete[] ch;
}
int main(void)
{
+ int i;
+
test_nop();
chnet_global_init();
- // test_chnet_ring_single();
- // test_chnet_ring_set_header();
- // test_chnet_ring_single_post();
- // test_chnet_ring_multiple(false);
- // test_chnet_ring_multiple(true);
- test_chnet_chunked_body();
+ for (i = 0; i < 2; i++) {
+ test_chnet_ring_read(!!i);
+ test_chnet_ring_partial_read(!!i);
+ test_chnet_ring_read_parallel(!!i);
+ test_chnet_ring_partial_read_parallel(!!i);
+ }
chnet_global_stop();
return 0;
}
diff --git a/tests/js/ring.js b/tests/js/ring.js
index 372c6e6..2f3fe1c 100644
--- a/tests/js/ring.js
+++ b/tests/js/ring.js
@@ -3,263 +3,389 @@ const chnet = require("bindings")("chnet");
function assert(cond)
{
- if (!cond) {
- console.log("Assertion failed!");
- process.exit(1);
- }
+ if (!cond)
+ throw new Error("Assertion failed!");
}
function test_nop()
{
- const entry = 16;
- let ring;
- let cqe;
+ const nr_loop = 1024 * 8;
+ const entry = 4096;
+ let ring = chnet.CreateRing(entry);
let sqe;
- let a, b, i, j;
-
- i = 0;
- ring = chnet.create_ring(entry);
- for (j = 0; j < 3; j++) {
- while (1) {
- sqe = ring.get_sqe();
- if (!sqe)
- break;
- sqe.prep_nop();
- i++;
+ let cqe;
+ let total;
+ let ret;
+ let i;
+
+ for (i = 0; i < nr_loop; i++) {
+ sqe = ring.GetSQE();
+ if (!sqe) {
+ assert(ring.Submit() == entry);
+ sqe = ring.GetSQE();
+ assert(sqe);
}
- ring.submit_sqe();
+ sqe.PrepNop();
+ sqe.SetUserData(1);
+ }
+ assert(ring.Submit() == entry);
+ assert(ring.WaitCQE(1));
+
+ total = 0;
+ while (total < nr_loop) {
+ ret = ring.WaitCQE(1);
+ i = ring.ForEachCQE(function (cqe) {
+ assert(!cqe.res);
+ assert(cqe.user_data == 1);
+ });
+ ring.CQAdvance(i);
+ assert(ret >= 1);
+ assert(i >= ret);
+ total += ret;
}
- ring.wait_cqe(entry * 2);
-
- a = i;
- i = ring.for_each_cqe(function (cqe) { });
- b = i;
- ring.cq_advance(i);
- assert(b == entry * 2);
-
- ring.wait_cqe(entry);
- i = ring.for_each_cqe(function (cqe) { });
- b += i;
- ring.cq_advance(i);
- assert(a == b);
+ assert(total == nr_loop);
+ ring.Close();
}
-function test_chnet_ring_multiple()
+function test_chnet_ring_read(do_sq_start = false)
{
- const NR_REQ = 10;
- let ring;
+ let ring = chnet.CreateRing(1);
let sqe;
let cqe;
+ let ret;
let ch;
- let i;
- ring = chnet.create_ring(NR_REQ);
- ch = [];
+ ch = chnet.CreateNet();
+ ch.SetURL("http://127.0.0.1:8000/index.php?action=hello");
+ ch.SetMethod("GET");
- for (i = 0; i < NR_REQ; i++) {
- ch[i] = chnet.create_net();
- ch[i].set_url("http://127.0.0.1:8000/index.php?action=hello");
- }
+ if (do_sq_start) {
+ sqe = ring.GetSQE();
+ sqe.PrepStart(ch);
+ assert(ring.Submit() == 1);
- for (i = 0; i < NR_REQ; i++) {
- sqe = ring.get_sqe();
- assert(sqe);
- sqe.prep_start(ch[i]);
- sqe.set_user_data(ch[i]);
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+ assert(ret == 1);
+ assert(cqe.res == 0);
+ ring.CQAdvance(1);
}
- assert(ring.submit_sqe() == i);
- ring.wait_cqe(i);
- j = ring.for_each_cqe(function (cqe) {
- assert(cqe.res == 0);
- });
- assert(j == i);
- ring.cq_advance(j);
+ sqe = ring.GetSQE();
+ sqe.PrepRead(ch, 1024);
+ assert(ring.Submit() == 1);
- for (i = 0; i < NR_REQ; i++) {
- sqe = ring.get_sqe();
- assert(sqe);
- sqe.prep_read(ch[i], 1024);
- sqe.set_user_data(ch[i]);
- }
- assert(ring.submit_sqe() == i);
- ring.wait_cqe(i);
- j = ring.for_each_cqe(function (cqe) {
- let ch = cqe.user_data;
- assert(cqe.res == 13);
- assert(ch.read_buf() === "Hello World!\n");
- });
- assert(j == i);
- ring.cq_advance(j);
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+
+ assert(ret == 1);
+ assert(cqe.res == 13);
+ assert(ch.read_ret() == cqe.res);
+ assert("Hello World!\n" == ch.read_buf());
+ ring.CQAdvance(1);
+ ch.Close();
+ ring.Close();
}
-class SimpleHttp {
- constructor(url, method = "GET") {
- this.ring = chnet.create_ring(1);
- this.ch = chnet.create_net();
- this.ch.set_url(url);
- this.ch.set_method(method);
- this.res = -1;
- }
+function test_chnet_ring_partial_read(do_sq_start = false)
+{
+ let ring = chnet.CreateRing(1);
+ let sqe;
+ let cqe;
+ let ret;
+ let ch;
- prep_read(read_size) {
- let sqe = this.ring.get_sqe();
- sqe.prep_read(this.ch, read_size);
- sqe.set_user_data(this);
+ ch = chnet.CreateNet();
+ ch.SetURL("http://127.0.0.1:8000/index.php?action=hello");
+ ch.SetMethod("GET");
+
+ if (do_sq_start) {
+ sqe = ring.GetSQE();
+ sqe.PrepStart(ch);
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+ assert(ret == 1);
+ assert(cqe.res == 0);
+ ring.CQAdvance(1);
}
- set_payload(payload) {
- this.ch.set_payload(payload);
+ /*
+ * The first read.
+ */
+ sqe = ring.GetSQE();
+ sqe.PrepRead(ch, 4);
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+
+ assert(ret == 1);
+ assert(cqe.res == 4);
+ assert(ch.read_ret() == cqe.res);
+ assert("Hell" == ch.read_buf());
+ ring.CQAdvance(1);
+
+
+ /*
+ * The second read.
+ */
+ sqe = ring.GetSQE();
+ sqe.PrepRead(ch, 4);
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+
+ assert(ret == 1);
+ assert(cqe.res == 4);
+ assert(ch.read_ret() == cqe.res);
+ assert("o Wo" == ch.read_buf());
+ ring.CQAdvance(1);
+
+
+ /*
+ * The third read.
+ */
+ sqe = ring.GetSQE();
+ sqe.PrepRead(ch, 1024);
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+
+ assert(ret == 1);
+ assert(cqe.res == 5);
+ assert(ch.read_ret() == cqe.res);
+ assert("rld!\n" == ch.read_buf());
+ ring.CQAdvance(1);
+
+
+ /*
+ * The 4-th read.
+ */
+ sqe = ring.GetSQE();
+ sqe.PrepRead(ch, 4);
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+
+ assert(ret == 1);
+ assert(cqe.res == 0);
+ assert(ch.read_ret() == cqe.res);
+ assert(!ch.read_buf());
+ ring.CQAdvance(1);
+ ch.Close();
+ ring.Close();
+}
+
+const NR_REQ = 4096;
+function test_chnet_ring_read_parallel(do_sq_start = false)
+{
+ let ring = chnet.CreateRing(NR_REQ);
+ let sqe;
+ let cqe;
+ let i;
+ let ch_arr;
+
+ ch_arr = [];
+
+ for (i = 0; i < NR_REQ; i++) {
+ ch_arr[i] = chnet.CreateNet();
+ ch_arr[i].SetURL("http://127.0.0.1:8000/index.php?action=hello");
+ ch_arr[i].SetMethod("GET");
}
- run() {
- this.ring.submit_sqe();
- this.ring.wait_cqe(1);
- this.ring.for_each_cqe(function (cqe) {
- let that = cqe.user_data;
- that.res = cqe.res;
- if (cqe.res < 0)
- console.log("Error: "+that.ch.get_error());
+ if (do_sq_start) {
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe.PrepStart(ch_arr[i]);
+ sqe.SetUserData(ch_arr[i]);
+ }
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+ ring.ForEachCQE(function (cqe) {
+ let ch = cqe.user_data;
+
+ assert(ch);
+ assert(cqe.res == 0);
});
- this.ring.cq_advance(1);
+ ring.CQAdvance(NR_REQ);
}
- get_buffer() {
- return this.ch.read_buf();
+
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe.PrepRead(ch_arr[i], 1024);
+ sqe.SetUserData(ch_arr[i]);
}
-}
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
-function test_simple_http()
-{
- const payload = "Hello World! AAAAAAAAAAAAAAAAAAAAAAAAA\n";
- let h = new SimpleHttp("http://127.0.0.1:8000/index.php?action=body", "POST");
- h.set_payload(payload);
- h.prep_read(1024);
- h.run();
- assert(h.get_buffer() === payload);
- assert(h.ch.read_ret() === payload.length);
-}
+ ring.ForEachCQE(function (cqe) {
+ let ch = cqe.user_data;
-function test_simple_http_set_header()
-{
- const ua = "This is just a test user agent!";
- let h = new SimpleHttp("http://127.0.0.1:8000/index.php?action=user_agent", "GET");
- h.ch.set_request_header("User-Agent", ua);
- h.prep_read(1024);
- h.run();
- assert(h.get_buffer() === ua);
- assert(h.ch.read_ret() === ua.length);
+ assert(ch);
+ assert(cqe.res == 13);
+ assert(ch.read_ret() == cqe.res);
+ assert("Hello World!\n" == ch.read_buf());
+ });
+ ring.CQAdvance(NR_REQ);
+
+ for (i = 0; i < NR_REQ; i++)
+ ch_arr[i].Close();
+
+ ring.Close();
}
-function test_simple_http_post_set_header()
+function test_chnet_ring_partial_read_parallel(do_sq_start = false)
{
- const ss = "This is just a test string!";
- let h = new SimpleHttp("http://127.0.0.1:8000/index.php?action=print_post&key=test", "POST");
- h.ch.set_request_header("User-Agent", ss);
- h.ch.set_request_header("Content-type", "application/x-www-form-urlencoded");
- h.ch.set_payload("test="+encodeURIComponent(ss));
- h.prep_read(1024);
- h.run();
- assert(h.get_buffer() === ss);
- assert(h.ch.read_ret() === ss.length);
-}
+ let ring = chnet.CreateRing(NR_REQ);
+ let sqe;
+ let cqe;
+ let i;
+ let ch_arr;
-class CHNet {
- constructor(cr, url, method) {
- this.ch = chnet.create_net();
- this.ch.set_url(url);
- this.ch.set_method(method);
- this.cr = cr;
- }
+ ch_arr = [];
- prep_read(read_size) {
- let ring = this.cr.ring;
- let sqe = ring.get_sqe();
- if (!sqe)
- return false;
- sqe.prep_read(this.ch, read_size);
- sqe.set_user_data(this);
- return true;
+ for (i = 0; i < NR_REQ; i++) {
+ ch_arr[i] = chnet.CreateNet();
+ ch_arr[i].SetURL("http://127.0.0.1:8000/index.php?action=hello");
+ ch_arr[i].SetMethod("GET");
}
- set_user_data(data) {
- this.udata = data;
- }
+ if (do_sq_start) {
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe.PrepStart(ch_arr[i]);
+ sqe.SetUserData(ch_arr[i]);
+ }
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
- get_user_data() {
- return this.udata;
- }
+ ring.ForEachCQE(function (cqe) {
+ let ch = cqe.user_data;
- set_header(key, val, overwrite = true) {
- this.ch.set_request_header(key, val, overwrite);
+ assert(ch);
+ assert(cqe.res == 0);
+ });
+ ring.CQAdvance(NR_REQ);
}
-};
-class ChromiumNet {
- constructor() {
- this.ring = chnet.create_ring(4096);
+
+ /*
+ * The first read.
+ */
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe.PrepRead(ch_arr[i], 4);
+ sqe.SetUserData(ch_arr[i]);
}
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+ ring.ForEachCQE(function (cqe) {
+ let ch = cqe.user_data;
+
+ assert(ch);
+ assert(cqe.res == 4);
+ assert(ch.read_ret() == cqe.res);
+ assert("Hell" == ch.read_buf());
+ });
+ ring.CQAdvance(NR_REQ);
+
- create(url, method = "GET") {
- return new CHNet(this, url, method);
+ /*
+ * The second read.
+ */
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe.PrepRead(ch_arr[i], 4);
+ sqe.SetUserData(ch_arr[i]);
}
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
- for_each_cqe(cb = null) {
- let ret = this.ring.for_each_cqe(function (cqe) {
+ ring.ForEachCQE(function (cqe) {
+ let ch = cqe.user_data;
- if (cb)
- cb(cqe);
+ assert(ch);
+ assert(cqe.res == 4);
+ assert(ch.read_ret() == cqe.res);
+ assert("o Wo" == ch.read_buf());
+ });
+ ring.CQAdvance(NR_REQ);
- let ch = cqe.user_data;
- let ret = cqe.res;
- let udata = ch.udata;
- if (ret < 0 && ch.onerror)
- ch.onerror(ch, ret, udata);
- else
- ch.onreadcomplete(ch, ret, udata);
- });
- this.ring.cq_advance(ret);
- return ret;
+ /*
+ * The third read.
+ */
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe.PrepRead(ch_arr[i], 1024);
+ sqe.SetUserData(ch_arr[i]);
}
-};
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
-function test_classes()
-{
- const UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36";
- const NR_REQ = 10;
+ ring.ForEachCQE(function (cqe) {
+ let ch = cqe.user_data;
- let cr = new ChromiumNet;
- let ch_arr = [];
- let i, j, ch;
+ assert(ch);
+ assert(cqe.res == 5);
+ assert(ch.read_ret() == cqe.res);
+ assert("rld!\n" == ch.read_buf());
+ });
+ ring.CQAdvance(NR_REQ);
+
+ /*
+ * The 4-th read.
+ */
for (i = 0; i < NR_REQ; i++) {
- ch = cr.create("http://127.0.0.1:8000/index.php?action=hello");
- ch.set_header("User-Agent", UA);
- ch.prep_read(4096);
- ch.set_user_data(i);
- ch.onerror = function (cr, ret, udata) {
- console.log(`Req: ${udata} returned error: ${cr.ch.get_error()}`);
- };
- ch.onreadcomplete = function (cr, ret, udata) {
- console.log(`Req: ${udata} returned ${ret} bytes: ${cr.ch.read_buf()}`);
- };
- ch_arr[i] = ch;
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe.PrepRead(ch_arr[i], 1024);
+ sqe.SetUserData(ch_arr[i]);
}
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+ ring.ForEachCQE(function (cqe) {
+ let ch = cqe.user_data;
- cr.ring.submit_sqe();
- cr.ring.wait_cqe(1);
- cr.for_each_cqe();
+ assert(ch);
+ assert(cqe.res == 0);
+ assert(ch.read_ret() == cqe.res);
+ });
+ ring.CQAdvance(NR_REQ);
+
+ for (i = 0; i < NR_REQ; i++)
+ ch_arr[i].Close();
+
+ ring.Close();
}
function main()
{
+ let i;
+
test_nop();
- test_chnet_ring_multiple();
- test_simple_http();
- test_simple_http_set_header();
- test_simple_http_post_set_header();
- test_classes();
+ for (i = 0; i < 2; i++) {
+ test_chnet_ring_read(!!i);
+ test_chnet_ring_partial_read(!!i);
+ test_chnet_ring_read_parallel(!!i);
+ test_chnet_ring_partial_read_parallel(!!i);
+ }
}
main();
--
Ammar Faizi
next prev parent reply other threads:[~2022-08-21 11:25 UTC|newest]
Thread overview: 28+ messages / expand[flat|nested] mbox.gz Atom feed top
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 01/22] chnet: Add initial request body support Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 02/22] chnet: node: Add set_user_data support on SQE Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 03/22] tests/js/ring: Update the unit test to utilize set_user_data Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 04/22] binding.gyp: Add `-ggdb3` flag for better debugging experience Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 05/22] binding.gyp: Add `-Wno-enum-constexpr-conversion` flag Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 06/22] chnet: node: Add set_method function to set HTTP method Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 07/22] chnet: node: Add get_error function to return the error string Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 08/22] chnet: node: Add set_payload function to set HTTP req body Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 09/22] tests/js/ring: Add simple HTTP POST request example in NodeJS Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 10/22] chnet: Split construct URL req creation into a new function Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 11/22] chnet: Add set request header support Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 12/22] chnet: node: Fix unused variable warning Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 13/22] chnet: node: Add set request header function in NodeJS Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 14/22] tests/js/ring: Add more set header function test Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 15/22] chnet: node: Don't use static counter for data ID Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 16/22] tests/js/ring: Add JavaScript class wrapper example Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 17/22] chnet: Initial chunked request body support Ammar Faizi
2022-08-21 22:13 ` Alviro Iskandar Setiawan
2022-08-22 3:08 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 18/22] chnet: Rework the chunked request body interface Ammar Faizi
2022-08-21 22:20 ` Alviro Iskandar Setiawan
2022-08-21 11:24 ` Ammar Faizi [this message]
2022-08-21 11:24 ` [PATCH v1 20/22] chnet: Use busy-waiting for signal waiter Ammar Faizi
2022-08-21 22:29 ` Alviro Iskandar Setiawan
2022-08-21 11:24 ` [PATCH v1 21/22] chnet: ring: Bump max_entry to 2G Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 22/22] tests/cpp: Delete basic.cpp as it's no longer relevant Ammar Faizi
2022-08-21 22:21 ` Alviro Iskandar Setiawan
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
[email protected] \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox