public inbox for [email protected]
 help / color / mirror / Atom feed
From: Ammar Faizi <[email protected]>
To: Alviro Iskandar Setiawan <[email protected]>
Cc: Ammar Faizi <[email protected]>,
	Muhammad Rizki <[email protected]>,
	Kanna Scarlet <[email protected]>,
	GNU/Weeb Mailing List <[email protected]>
Subject: [PATCH v1 19/22] chnet: ring: Refactor the ring completely
Date: Sun, 21 Aug 2022 18:24:50 +0700	[thread overview]
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>

This commit was multiple commits squashed into one, some interesting
things were:

  - chnet: ring: Support read operation again
  - chnet: ring: Support start operation again
  - tests/cpp/ring: Add more tests and delete old tests
  - chnet: node: Refactor the NodeJS interface

In short, rewriting them to make it cleaner and better...

Signed-off-by: Ammar Faizi <[email protected]>
---
 chnet/WorkQueue.cc  |  17 +-
 chnet/WorkQueue.h   |   5 +
 chnet/chnet_node.cc | 453 ++++++++++++++++++++++++--------------
 chnet/chnet_ring.cc | 460 +++++++++++++++++++--------------------
 chnet/chnet_ring.h  | 200 +++++++++--------
 tests/cpp/ring.cc   | 504 +++++++++++++++++++++++++------------------
 tests/js/ring.js    | 516 +++++++++++++++++++++++++++-----------------
 7 files changed, 1260 insertions(+), 895 deletions(-)

diff --git a/chnet/WorkQueue.cc b/chnet/WorkQueue.cc
index 30163bc..7411123 100644
--- a/chnet/WorkQueue.cc
+++ b/chnet/WorkQueue.cc
@@ -33,6 +33,12 @@ WorkQueue::~WorkQueue(void)
 
 	stop_ = true;
 
+	if (helper_thread_) {
+		jobs_cond_.notify_all();
+		helper_thread_->join();
+		delete helper_thread_;
+	}
+
 	i = nr_threads_;
 	while (i--) {
 		std::thread *t;
@@ -44,11 +50,6 @@ WorkQueue::~WorkQueue(void)
 		}
 	}
 
-	if (helper_thread_) {
-		helper_thread_->join();
-		delete helper_thread_;
-	}
-
 	delete[] threads_;
 	delete[] jobs_;
 }
@@ -99,7 +100,7 @@ void WorkQueue::_wq_thread_worker(struct wq_thread *t) noexcept
 	t->set_task_state(WQ_INTERRUPTIBLE);
 	while (!stop_) {
 		__wq_thread_worker(t, lk);
-		auto cv_ret = jobs_cond_.wait_for(lk, 1s);
+		auto cv_ret = jobs_cond_.wait_for(lk, 200ms);
 		if (unlikely(cv_ret == std::cv_status::timeout)) {
 			if (t->idx < nr_idle_thread_)
 				continue;
@@ -174,7 +175,7 @@ void WorkQueue::wq_helper(void)
 			_wq_helper(njob);
 			lk.lock();
 		}
-		jobs_cond_.wait_for(lk, 10s, [this]{ return stop_; });
+		jobs_cond_.wait_for(lk, 200ms, [this]{ return stop_; });
 		lk.unlock();
 	}
 }
@@ -284,7 +285,7 @@ int64_t WorkQueue::schedule_work(wq_job_callback_t cb, void *data) noexcept
 		}
 
 		lk.lock();
-		sched_idle_cond_.wait_for(lk, 10s);
+		sched_idle_cond_.wait_for(lk, 200ms);
 		lk.unlock();
 	}
 
diff --git a/chnet/WorkQueue.h b/chnet/WorkQueue.h
index ddb3bf5..ad4f831 100644
--- a/chnet/WorkQueue.h
+++ b/chnet/WorkQueue.h
@@ -234,6 +234,11 @@ public:
 	{
 		jobs_cond_.notify_one();
 	}
+
+	inline bool should_stop(void)
+	{
+		return stop_;
+	}
 };
 
 #endif /* #ifndef CHNET_WORKQUEUE_H */
diff --git a/chnet/chnet_node.cc b/chnet/chnet_node.cc
index 621d7aa..cea4ad6 100644
--- a/chnet/chnet_node.cc
+++ b/chnet/chnet_node.cc
@@ -4,22 +4,36 @@
 class NodeCHNetRing {
 public:
 	inline NodeCHNetRing(uint32_t entry):
-		ring_(entry)
+		ring_(new CNRing(entry))
 	{
-		ucount_.store(0, std::memory_order_relaxed);
+		id_seq_.store(0, std::memory_order_relaxed);
 	}
 
-	CNRingCtx		ring_;
+	inline ~NodeCHNetRing(void)
+	{
+		if (ring_)
+			delete ring_;
+	}
+
+	inline void Close(void)
+	{
+		if (ring_) {
+			delete ring_;
+			ring_ = nullptr;
+		}
+	}
+
+	CNRing			*ring_;
+	std::atomic<uint32_t>	id_seq_;
 	Napi::ObjectReference	ref_;
-	std::atomic<uint32_t>	ucount_;
 };
 
 class NodeCNRingSQE {
 public:
-	inline NodeCNRingSQE(NodeCHNetRing *ring, CNRingSQE *sqe)
+	inline NodeCNRingSQE(CNRingSQE *sqe, NodeCHNetRing *ring):
+		sqe_(sqe),
+		ring_(ring)
 	{
-		sqe_ = sqe;
-		ring_ = ring;
 	}
 
 	CNRingSQE		*sqe_;
@@ -33,7 +47,26 @@ struct NodeSQData {
 
 class NodeCHNet {
 public:
-	CHNet			ch_;
+	inline NodeCHNet(void):
+		ch_(new CHNet)
+	{
+	}
+
+	inline ~NodeCHNet(void)
+	{
+		if (ch_)
+			delete ch_;
+	}
+
+	inline void Close(void)
+	{
+		if (ch_) {
+			delete ch_;
+			ch_ = nullptr;
+		}
+	}
+
+	CHNet			*ch_;
 	std::string		payload_;
 };
 
@@ -51,8 +84,9 @@ static inline void obj_add_func(Napi::Env &env, Napi::Object &obj,
 
 static void CHN_SQEPrepNop(const Napi::CallbackInfo &info)
 {
-	NodeCNRingSQE *nsqe = (NodeCNRingSQE *)info.Data();
+	NodeCNRingSQE *nsqe;
 
+	nsqe = (NodeCNRingSQE *)info.Data();
 	nsqe->sqe_->PrepNop();
 }
 
@@ -77,27 +111,6 @@ static NodeCHNet *CHN_PrepGetCHObj(Napi::Env &env, const Napi::Value &arg)
 		ch_in_obj.As<Napi::Number>().Int64Value();
 }
 
-static void CHN_SQEPrepStart(const Napi::CallbackInfo &info)
-{
-	Napi::Env env = info.Env();
-	NodeCNRingSQE *nsqe;
-	NodeCHNet *ch;
-	CHNet *chn;
-
-	if (unlikely(info.Length() != 1 || !info[0].IsObject())) {
-		throw_js_exception(env, "sqe.prep_start must be given a chnet object argument");
-		return;
-	}
-
-	ch = CHN_PrepGetCHObj(env, info[0]);
-	if (unlikely(!ch))
-		return;
-
-	chn = &ch->ch_;
-	nsqe = (NodeCNRingSQE *)info.Data();
-	nsqe->sqe_->PrepStart(chn);
-}
-
 static void CHN_SQEPrepRead(const Napi::CallbackInfo &info)
 {
 	Napi::Env env = info.Env();
@@ -108,7 +121,13 @@ static void CHN_SQEPrepRead(const Napi::CallbackInfo &info)
 
 	if (unlikely(info.Length() != 2 || !info[0].IsObject() ||
 		     !info[1].IsNumber())) {
-		throw_js_exception(env, "sqe.prep_read must be given a chnet object argument and a read size");
+		throw_js_exception(env, "sqe.PrepRead must be given a chnet object argument and a read size");
+		return;
+	}
+
+	nsqe = (NodeCNRingSQE *)info.Data();
+	if (unlikely(!nsqe->ring_->ring_)) {
+		throw_js_exception(env, "ring has been closed!");
 		return;
 	}
 
@@ -117,8 +136,7 @@ static void CHN_SQEPrepRead(const Napi::CallbackInfo &info)
 		return;
 
 	read_size = info[1].As<Napi::Number>().Int32Value();
-	chn = &ch->ch_;
-	nsqe = (NodeCNRingSQE *)info.Data();
+	chn = ch->ch_;
 	nsqe->sqe_->PrepRead(chn, read_size);
 }
 
@@ -129,77 +147,135 @@ static Napi::Value CHN_SQESetUserData(const Napi::CallbackInfo &info)
 	NodeCNRingSQE *nsqe;
 
 	if (unlikely(info.Length() != 1)) {
-		throw_js_exception(env, "sqe.set_user_data must be given exactly one argument");
+		throw_js_exception(env, "sqe.SetUserData must be given exactly one argument");
 		return env.Null();
 	}
 
 	nsqe = (NodeCNRingSQE *)info.Data();
+	if (unlikely(!nsqe->ring_->ring_)) {
+		throw_js_exception(env, "ring has been closed!");
+		return env.Null();
+	}
+
 	data = new struct NodeSQData;
 	data->ring_ = nsqe->ring_;
-	data->id_   = nsqe->ring_->ucount_++ & nsqe->ring_->ring_.cq_mask_;
+	data->id_   = nsqe->ring_->id_seq_++ & nsqe->ring_->ring_->cq_mask_;
 	nsqe->ring_->ref_.Get("__udata")
 		.As<Napi::Object>()[data->id_] = info[0];
 	nsqe->sqe_->SetUserDataPtr(data);
 	return info[0];
 }
 
-static void CHN_SQEDestruct(Napi::Env env, NodeCNRingSQE *sqe)
+static void CHN_SQEPrepStart(const Napi::CallbackInfo &info)
 {
-	delete sqe;
-	(void)env;
+	Napi::Env env = info.Env();
+	NodeCNRingSQE *nsqe;
+	NodeCHNet *ch;
+	CHNet *chn;
+
+	if (unlikely(info.Length() != 1 || !info[0].IsObject())) {
+		throw_js_exception(env, "sqe.PrepStart must be given a chnet object argument");
+		return;
+	}
+
+	ch = CHN_PrepGetCHObj(env, info[0]);
+	if (unlikely(!ch))
+		return;
+
+	chn = ch->ch_;
+	nsqe = (NodeCNRingSQE *)info.Data();
+	if (unlikely(!nsqe->ring_->ring_)) {
+		throw_js_exception(env, "ring has been closed!");
+		return;
+	}
+
+	nsqe->sqe_->PrepStart(chn);
 }
 
-static Napi::Object BuildSQEObject(Napi::Env &env, NodeCHNetRing *ring,
-				   CNRingSQE *sqe)
+static void CHN_SQEDestruct(Napi::Env env, NodeCNRingSQE *nsqe)
 {
-	Napi::Object obj = Napi::Object::New(env);
-	NodeCNRingSQE *nsqe;
-
-	sqe->SetUserData(0);
-	nsqe = new NodeCNRingSQE(ring, sqe);
-	obj_add_func(env, obj, nsqe, CHN_SQEPrepNop, "prep_nop");
-	obj_add_func(env, obj, nsqe, CHN_SQEPrepStart, "prep_start");
-	obj_add_func(env, obj, nsqe, CHN_SQEPrepRead, "prep_read");
-	obj_add_func(env, obj, nsqe, CHN_SQESetUserData, "set_user_data");
-	obj.AddFinalizer(CHN_SQEDestruct, nsqe);
-	return obj;
+	delete nsqe;
 }
 
 static Napi::Value CHN_RingGetSQE(const Napi::CallbackInfo &info)
 {
 	Napi::Env env = info.Env();
 	NodeCHNetRing *ring;
+	NodeCNRingSQE *nsqe;
+	Napi::Object obj;
 	CNRingSQE *sqe;
 
 	ring = (NodeCHNetRing *)info.Data();
-	sqe = ring->ring_.GetSQE();
+	if (unlikely(!ring->ring_)) {
+		throw_js_exception(env, "ring has been closed!");
+		return env.Null();
+	}
+
+	sqe = ring->ring_->GetSQE();
 	if (unlikely(!sqe))
 		return env.Null();
 
-	return BuildSQEObject(env, ring, sqe);
+	sqe->SetUserData(0);
+	nsqe = new NodeCNRingSQE(sqe, ring);
+	obj = Napi::Object::New(env);
+	obj_add_func(env, obj, nsqe, CHN_SQEPrepNop, "PrepNop");
+	obj_add_func(env, obj, nsqe, CHN_SQEPrepRead, "PrepRead");
+	obj_add_func(env, obj, nsqe, CHN_SQEPrepStart, "PrepStart");
+	obj_add_func(env, obj, nsqe, CHN_SQESetUserData, "SetUserData");
+	obj.AddFinalizer(CHN_SQEDestruct, nsqe);
+	return obj;
 }
 
-static Napi::Value CHN_RingSubmitSQE(const Napi::CallbackInfo &info)
+static Napi::Value CHN_RingSubmit(const Napi::CallbackInfo &info)
 {
 	Napi::Env env = info.Env();
 	NodeCHNetRing *ring;
+	uint32_t to_submit;
+
+	if (info.Length() > 0) {
+		if (unlikely(!info[0].IsNumber())) {
+			throw_js_exception(env, "ring.Submit the first argument must be a positive integer");
+			return env.Null();
+		}
+
+		to_submit = info[0].ToNumber().Uint32Value();
+	} else {
+		to_submit = -1U;
+	}
 
 	ring = (NodeCHNetRing *)info.Data();
-	return Napi::Number::New(env, ring->ring_.SubmitSQE());
+	if (unlikely(!ring->ring_)) {
+		throw_js_exception(env, "ring has been closed!");
+		return env.Null();
+	}
+
+	return Napi::Number::New(env, ring->ring_->Submit(to_submit));
 }
 
-static void CHN_RingWaitCQE(const Napi::CallbackInfo &info)
+static Napi::Value CHN_RingWaitCQE(const Napi::CallbackInfo &info)
 {
-	uint32_t to_wait;
+	Napi::Env env = info.Env();
 	NodeCHNetRing *ring;
+	uint32_t to_wait;
+
+	if (info.Length() > 0) {
+		if (unlikely(!info[0].IsNumber())) {
+			throw_js_exception(env, "ring.WaitCQE the first argument must be a positive integer");
+			return env.Null();
+		}
 
-	if (unlikely(info.Length() < 1 || !info[0].IsNumber()))
-		to_wait = 1;
-	else
 		to_wait = info[0].ToNumber().Uint32Value();
+	} else {
+		to_wait = 1;
+	}
 
 	ring = (NodeCHNetRing *)info.Data();
-	ring->ring_.WaitCQE(to_wait);
+	if (unlikely(!ring->ring_)) {
+		throw_js_exception(env, "ring has been closed!");
+		return env.Null();
+	}
+
+	return Napi::Number::New(env, ring->ring_->WaitCQE(to_wait));
 }
 
 static void CHN_CQEDestruct(Napi::Env env, NodeSQData *data)
@@ -209,49 +285,56 @@ static void CHN_CQEDestruct(Napi::Env env, NodeSQData *data)
 	 *                    when it's no longer used.
 	 */
 
-	// data->ring_->ref_.Get("__udata").As<Napi::Object>()
-	// 		.Delete(data->id_);
+#if 0
+	data->ring_->ref_.Get("__udata").As<Napi::Object>()
+			.Delete(data->id_);
+#endif
+
 	delete data;
 	(void)env;
 }
 
 static Napi::Object BuildCQEObject(Napi::Env &env, CNRingCQE *cqe)
 {
-	Napi::Object obj = Napi::Object::New(env);
 	NodeSQData *data = (NodeSQData *) cqe->GetUserDataPtr();
+	Napi::Object obj = Napi::Object::New(env);
 
-	obj["res"] = Napi::Number::New(env, cqe->res);
+	obj["res"] = Napi::Number::New(env, cqe->res_);
+	if (!data)
+		return obj;
 
-	if (data) {
-		obj["user_data"] = data->ring_->ref_.Get("__udata")
-					.As<Napi::Object>().Get(data->id_);
-		obj.AddFinalizer(CHN_CQEDestruct, data);
-	}
+	obj["user_data"] = data->ring_->ref_.Get("__udata").As<Napi::Object>()
+				.Get(data->id_);
+	obj.AddFinalizer(CHN_CQEDestruct, data);
 	return obj;
 }
 
+
 static Napi::Value CHN_RingForEachCQE(const Napi::CallbackInfo &info)
 {
-	constexpr static const char err_msg[] =
-		"ring.for_each_cqe must be given exactly a callback function argument";
-
 	Napi::Env env = info.Env();
 	Napi::Function callback;
-	uint32_t head, i;
 	NodeCHNetRing *ring;
 	CNRingCQE *cqe;
+	uint32_t head;
+	uint32_t tail;
+	uint32_t i;
 
 	if (unlikely(info.Length() != 1 || !info[0].IsFunction())) {
-		throw_js_exception(env, err_msg);
+		throw_js_exception(env, "ring.ForEachCQE must be given exactly one callback function argument");
 		return env.Null();
 	}
 
-	callback = info[0].As<Napi::Function>();
+	ring = (NodeCHNetRing *)info.Data();
+	if (unlikely(!ring->ring_)) {
+		throw_js_exception(env, "ring has been closed!");
+		return env.Null();
+	}
 
 	i = 0;
-	ring = (NodeCHNetRing *)info.Data();
-	chnring_for_each_cqe(&ring->ring_, head, cqe) {
-		Napi::Object cqe_obj = BuildCQEObject(env, cqe);
+	callback = info[0].As<Napi::Function>();
+	cnring_for_each_cqe(ring->ring_, head, tail, cqe) {
+		Napi::Value cqe_obj = BuildCQEObject(env, cqe);
 
 		callback.Call({cqe_obj});
 		i++;
@@ -262,104 +345,93 @@ static Napi::Value CHN_RingForEachCQE(const Napi::CallbackInfo &info)
 
 static void CHN_RingCQAdvance(const Napi::CallbackInfo &info)
 {
-	constexpr static const char err_msg[] =
-		"ring.cq_advance must be given exactly an integer argument";
-
 	Napi::Env env = info.Env();
 	NodeCHNetRing *ring;
 
 	if (unlikely(info.Length() < 1 || !info[0].IsNumber())) {
-		throw_js_exception(env, err_msg);
+		throw_js_exception(env, "ring.CQAdvance must be given exactly one positive integer argument");
 		return;
 	}
 
 	ring = (NodeCHNetRing *)info.Data();
-	ring->ring_.CQAdvance(info[0].ToNumber().Uint32Value());
-}
+	if (unlikely(!ring->ring_)) {
+		throw_js_exception(env, "ring has been closed!");
+		return;
+	}
 
-static void CHN_RingDestruct(Napi::Env env, NodeCHNetRing *ring)
-{
-	delete ring;
-	(void)env;
+	ring->ring_->CQAdvance(info[0].ToNumber().Uint32Value());
 }
 
-static inline Napi::Value _CHN_CreateRing(Napi::Env &env, NodeCHNetRing *ring)
+static Napi::Value CHN_RingGetCQEHead(const Napi::CallbackInfo &info)
 {
-	Napi::Object obj = Napi::Object::New(env);
+	Napi::Env env = info.Env();
+	NodeCHNetRing *ring;
 
-	obj["__udata"] = Napi::Object::New(env);
-	obj_add_func(env, obj, ring, CHN_RingGetSQE, "get_sqe");
-	obj_add_func(env, obj, ring, CHN_RingSubmitSQE, "submit_sqe");
-	obj_add_func(env, obj, ring, CHN_RingWaitCQE, "wait_cqe");
-	obj_add_func(env, obj, ring, CHN_RingForEachCQE, "for_each_cqe");
-	obj_add_func(env, obj, ring, CHN_RingCQAdvance, "cq_advance");
-	obj.AddFinalizer(CHN_RingDestruct, ring);
-	ring->ref_ = Napi::Persistent(obj);
-	return obj;
+	ring = (NodeCHNetRing *)info.Data();
+	if (unlikely(!ring->ring_)) {
+		throw_js_exception(env, "ring has been closed!");
+		return env.Null();
+	}
+
+	return BuildCQEObject(env, ring->ring_->GetCQEHead());
 }
 
-static Napi::Value CHN_CreateRing(const Napi::CallbackInfo &info)
+static void CHN_RingClose(const Napi::CallbackInfo &info)
 {
-	constexpr static const char err_msg[] =
-		"chnet.create_ring must be given exactly a positive integer argument";
-
-	Napi::Env env = info.Env();
 	NodeCHNetRing *ring;
-	uint32_t entry;
 
-	if (unlikely(info.Length() < 1 || !info[0].IsNumber())) {
-		throw_js_exception(env, err_msg);
-		return env.Null();
-	}
+	ring = (NodeCHNetRing *)info.Data();
+	ring->Close();
+}
 
-	entry = info[0].ToNumber().Uint32Value();
-	ring = new NodeCHNetRing(entry);
-	return _CHN_CreateRing(env, ring);
+static void CHN_RingDestruct(Napi::Env env, NodeCHNetRing *ring)
+{
+	delete ring;
 }
 
 static void CHN_NetSetURL(const Napi::CallbackInfo &info)
 {
-	constexpr static const char err_msg[] =
-		"chnet.set_url must be given exactly 1 string argument";
-
 	Napi::Env env = info.Env();
 	NodeCHNet *ch;
 
 	if (unlikely(info.Length() != 1 || !info[0].IsString())) {
-		throw_js_exception(env, err_msg);
+		throw_js_exception(env, "net.SetURL must be given exactly 1 string argument");
 		return;
 	}
 
 	ch = (NodeCHNet *)info.Data();
+	if (unlikely(!ch->ch_)) {
+		throw_js_exception(env, "chnet has been closed!");
+		return;
+	}
+
 	const std::string &url = info[0].ToString().Utf8Value();
-	ch->ch_.SetURL(url.c_str());
+	ch->ch_->SetURL(url.c_str());
 }
 
 static void CHN_NetSetMethod(const Napi::CallbackInfo &info)
 {
-	constexpr static const char err_msg[] =
-		"chnet.set_method must be given exactly 1 string argument";
-
 	Napi::Env env = info.Env();
 	NodeCHNet *ch;
 
 	if (unlikely(info.Length() != 1 || !info[0].IsString())) {
-		throw_js_exception(env, err_msg);
+		throw_js_exception(env, "net.SetMethod must be given exactly 1 string argument");
 		return;
 	}
 
 	ch = (NodeCHNet *)info.Data();
-	const std::string &method =info[0].ToString().Utf8Value();
-	ch->ch_.SetMethod(method.c_str());
+	if (unlikely(!ch->ch_)) {
+		throw_js_exception(env, "chnet has been closed!");
+		return;
+	}
+
+	const std::string &method = info[0].ToString().Utf8Value();
+	ch->ch_->SetMethod(method.c_str());
 }
 
 static void CHN_NetSetRequestHeader(const Napi::CallbackInfo &info)
 {
-	constexpr static const char err_msg[] =
-		"chnet.set_request_header must be at least given 2 string arguments";
-	constexpr static const char err_msg2[] =
-		"chnet.set_request_header can only receive 3 arguments with the third argument being a boolean";
-
+	constexpr static const char err_arg3[] = "chnet.SetRequestHeader can only receive 3 arguments with the third argument being a boolean";
 	Napi::Env env = info.Env();
 	bool overwrite = true;
 	NodeCHNet *ch;
@@ -369,61 +441,80 @@ static void CHN_NetSetRequestHeader(const Napi::CallbackInfo &info)
 	nr_arg = info.Length();
 	if (unlikely(nr_arg < 2 || !info[0].IsString() ||
 		     !info[1].IsString())) {
-		throw_js_exception(env, err_msg);
+		throw_js_exception(env, "chnet.SetRequestHeader must be at least given 2 string arguments");
 		return;
 	}
 
 	if (unlikely(nr_arg > 3)) {
-		throw_js_exception(env, err_msg2);
+		throw_js_exception(env, err_arg3);
 		return;
 	}
 
 	if (nr_arg == 3) {
 		if (unlikely(!info[2].IsBoolean())) {
-			throw_js_exception(env, err_msg2);
+			throw_js_exception(env, err_arg3);
 			return;
 		}
 		overwrite = info[2].As<Napi::Boolean>();
 	}
 
 	ch = (NodeCHNet *)info.Data();
+	if (unlikely(!ch->ch_)) {
+		throw_js_exception(env, "chnet has been closed!");
+		return;
+	}
+
 	const std::string &key = info[0].ToString().Utf8Value();
 	const std::string &val = info[1].ToString().Utf8Value();
-	ch->ch_.SetRequestHeader(key.c_str(), val.c_str(), overwrite);
+	ch->ch_->SetRequestHeader(key.c_str(), val.c_str(), overwrite);
 }
 
-
-static Napi::Value CHN_NetReadBuf(const Napi::CallbackInfo &info)
+static Napi::Value CHN_NetReadRet(const Napi::CallbackInfo &info)
 {
 	Napi::Env env = info.Env();
 	NodeCHNet *ch;
-	int ret;
 
 	ch = (NodeCHNet *)info.Data();
-	ret = ch->ch_.read_ret();
-	if (ret > 0)
-		return Napi::String::New(env, ch->ch_.read_buf(), (size_t)ret);
+	if (unlikely(!ch->ch_)) {
+		throw_js_exception(env, "chnet has been closed!");
+		return env.Null();
+	}
 
-	return env.Null();
+	return Napi::Number::New(env, ch->ch_->read_ret());
 }
 
-static Napi::Number CHN_NetReadRet(const Napi::CallbackInfo &info)
+static Napi::Value CHN_NetReadBuf(const Napi::CallbackInfo &info)
 {
 	Napi::Env env = info.Env();
 	NodeCHNet *ch;
+	int ret;
 
 	ch = (NodeCHNet *)info.Data();
-	return Napi::Number::New(env, ch->ch_.read_ret());
+	if (unlikely(!ch->ch_)) {
+		throw_js_exception(env, "chnet has been closed!");
+		return env.Null();
+	}
+
+	ret = ch->ch_->read_ret();
+	if (ret > 0)
+		return Napi::String::New(env, ch->ch_->read_buf(), (size_t)ret);
+
+	return env.Null();
 }
 
-static Napi::Value CHN_NetGetError(const Napi::CallbackInfo &info)
+static Napi::Value CHN_NetGetErrorStr(const Napi::CallbackInfo &info)
 {
 	Napi::Env env = info.Env();
 	const char *err;
 	NodeCHNet *ch;
 
 	ch = (NodeCHNet *)info.Data();
-	err = ch->ch_.GetErrorStr();
+	if (unlikely(!ch->ch_)) {
+		throw_js_exception(env, "chnet has been closed!");
+		return env.Null();
+	}
+
+	err = ch->ch_->GetErrorStr();
 	if (err)
 		return Napi::String::New(env, err);
 
@@ -432,20 +523,30 @@ static Napi::Value CHN_NetGetError(const Napi::CallbackInfo &info)
 
 static void CHN_NetSetPayload(const Napi::CallbackInfo &info)
 {
-	constexpr static const char err_msg[] =
-		"chnet.set_payload must be given exactly 1 string argument";
-
 	Napi::Env env = info.Env();
 	NodeCHNet *ch;
 
 	if (unlikely(info.Length() != 1 || !info[0].IsString())) {
-		throw_js_exception(env, err_msg);
+		throw_js_exception(env, "chnet.SetPayload must be given exactly 1 string argument");
 		return;
 	}
 
 	ch = (NodeCHNet *)info.Data();
+	if (unlikely(!ch->ch_)) {
+		throw_js_exception(env, "chnet has been closed!");
+		return;
+	}
+
 	ch->payload_ = info[0].ToString().Utf8Value();
-	ch->ch_.SetPayload(ch->payload_.c_str(), ch->payload_.size());
+	ch->ch_->SetPayload(ch->payload_.c_str(), ch->payload_.size());
+}
+
+static void CHN_NetClose(const Napi::CallbackInfo &info)
+{
+	NodeCHNet *ch;
+
+	ch = (NodeCHNet *)info.Data();
+	ch->Close();
 }
 
 static void CHN_NetDestruct(Napi::Env env, NodeCHNet *ch)
@@ -454,6 +555,42 @@ static void CHN_NetDestruct(Napi::Env env, NodeCHNet *ch)
 	(void)env;
 }
 
+static Napi::Value _CHN_CreateRing(Napi::Env env, uint32_t entry)
+{
+	NodeCHNetRing *ring;
+	Napi::Object obj;
+
+	obj = Napi::Object::New(env);
+	ring = new NodeCHNetRing(entry);
+	obj["__udata"] = Napi::Object::New(env);
+	obj_add_func(env, obj, ring, CHN_RingGetSQE, "GetSQE");
+	obj_add_func(env, obj, ring, CHN_RingSubmit, "Submit");
+	obj_add_func(env, obj, ring, CHN_RingWaitCQE, "WaitCQE");
+	obj_add_func(env, obj, ring, CHN_RingForEachCQE, "ForEachCQE");
+	obj_add_func(env, obj, ring, CHN_RingCQAdvance, "CQAdvance");
+	obj_add_func(env, obj, ring, CHN_RingGetCQEHead, "GetCQEHead");
+	obj_add_func(env, obj, ring, CHN_RingClose, "Close");
+	obj.AddFinalizer(CHN_RingDestruct, ring);
+	obj.Seal();
+	obj.Freeze();
+	ring->ref_ = Napi::Persistent(obj);
+	return obj;
+}
+
+static Napi::Value CHN_CreateRing(const Napi::CallbackInfo &info)
+{
+	Napi::Env env = info.Env();
+	uint32_t entry;
+
+	if (unlikely(info.Length() < 1 || !info[0].IsNumber())) {
+		throw_js_exception(env, "chnet.CreateRing must be given exactly one positive integer argument");
+		return env.Null();
+	}
+
+	entry = info[0].ToNumber().Uint32Value();
+	return _CHN_CreateRing(env, entry);
+}
+
 static Napi::Object CHN_CreateNet(const Napi::CallbackInfo &info)
 {
 	Napi::Env env = info.Env();
@@ -461,14 +598,14 @@ static Napi::Object CHN_CreateNet(const Napi::CallbackInfo &info)
 	NodeCHNet *ch = new NodeCHNet;
 	int64_t ch_ptr;
 
-	obj_add_func(env, obj, ch, CHN_NetSetURL, "set_url");
-	obj_add_func(env, obj, ch, CHN_NetSetMethod, "set_method");
-	obj_add_func(env, obj, ch, CHN_NetSetRequestHeader, "set_request_header");
+	obj_add_func(env, obj, ch, CHN_NetSetURL, "SetURL");
+	obj_add_func(env, obj, ch, CHN_NetSetMethod, "SetMethod");
+	obj_add_func(env, obj, ch, CHN_NetSetRequestHeader, "SetRequestHeader");
 	obj_add_func(env, obj, ch, CHN_NetReadRet, "read_ret");
 	obj_add_func(env, obj, ch, CHN_NetReadBuf, "read_buf");
-	obj_add_func(env, obj, ch, CHN_NetGetError, "get_error");
-	obj_add_func(env, obj, ch, CHN_NetSetPayload, "set_payload");
-
+	obj_add_func(env, obj, ch, CHN_NetGetErrorStr, "GetErrorStr");
+	obj_add_func(env, obj, ch, CHN_NetSetPayload, "SetPayload");
+	obj_add_func(env, obj, ch, CHN_NetClose, "Close");
 	ch_ptr = (int64_t) (intptr_t) ch;
 	obj["__ch"] = Napi::Number::New(env, ch_ptr);
 	obj.AddFinalizer(CHN_NetDestruct, ch);
@@ -480,8 +617,10 @@ static Napi::Object CHN_CreateNet(const Napi::CallbackInfo &info)
 static Napi::Object CHN_GlobalInit(Napi::Env env, Napi::Object exports)
 {
 	chnet_global_init();
-	obj_add_func(env, exports, NULL, CHN_CreateRing, "create_ring");
-	obj_add_func(env, exports, NULL, CHN_CreateNet, "create_net");
+	obj_add_func(env, exports, NULL, CHN_CreateRing, "CreateRing");
+	obj_add_func(env, exports, NULL, CHN_CreateNet, "CreateNet");
+	exports.Seal();
+	exports.Freeze();
 	return exports;
 }
 NODE_API_MODULE(chnet, CHN_GlobalInit);
diff --git a/chnet/chnet_ring.cc b/chnet/chnet_ring.cc
index a5f3094..b858421 100644
--- a/chnet/chnet_ring.cc
+++ b/chnet/chnet_ring.cc
@@ -8,15 +8,14 @@
 #include <chrono>
 using namespace std::chrono_literals;
 
-#include <unistd.h>
 
-CNRingCtx::CNRingCtx(uint32_t entry)
+CNRing::CNRing(uint32_t entry)
 {
 	uint32_t sq_max;
 	uint32_t cq_max;
 
-	if (entry < 16)
-		entry = 16;
+	if (entry < 2)
+		entry = 2;
 	if (entry > 1048576)
 		entry = 1048576;
 
@@ -28,372 +27,353 @@ CNRingCtx::CNRingCtx(uint32_t entry)
 
 	sq_mask_ = sq_max - 1;
 	cq_mask_ = cq_max - 1;
-
-	sqes_ = new CNRingSQE[sq_max + 1];
-	cqes_ = new CNRingCQE[cq_max + 1];
-	state_ = new CNRingState(sq_max, cq_max, 2);
-
-	state_->cqe_to_wait_.store(0, std::memory_order_relaxed);
-	state_->nr_post_cqe_wait_.store(0, std::memory_order_relaxed);
 	sq_head_.store(0, std::memory_order_relaxed);
 	sq_tail_.store(0, std::memory_order_relaxed);
 	cq_head_.store(0, std::memory_order_relaxed);
 	cq_tail_.store(0, std::memory_order_relaxed);
+	cq_max_to_wait_.store(0, std::memory_order_relaxed);
+	sqes_ = new CNRingSQE[sq_max];
+	cqes_ = new CNRingCQE[cq_max];
+	state_ = new CNRingState(4096, 8192, 1);
 }
 
-CNRingCtx::~CNRingCtx(void)
+CNRing::~CNRing(void)
 {
+	state_->stop_ = true;
+	NotifyFreeCQEWaiter(true);
 	delete state_;
 	delete[] sqes_;
 	delete[] cqes_;
 }
 
-static inline uint32_t big_min_small(uint32_t a, uint32_t b)
-{
-	if (a < b)
-		return b - a;
-	else
-		return a - b;
-}
-
-uint32_t CNRingCtx::sqe_size(void)
-{
-	return big_min_small(sq_head_.load(), sq_tail_.load());
-}
-
-uint32_t CNRingCtx::cqe_size(void)
-{
-	return big_min_small(cq_head_.load(), cq_tail_.load());
-}
-
-CNRingSQE *CNRingCtx::GetSQE(void)
-{
-	uint32_t head = sq_head_.load();
-	uint32_t tail = sq_tail_.load();
-	uint32_t mask = sq_mask_;
-	CNRingSQE *sqe;
-
-	if (unlikely(big_min_small(head, tail) > mask))
-		return nullptr;
-
-	sqe = &sqes_[tail & mask];
-	sq_tail_++;
-	return sqe;
-}
-
-CNRingCQE *CNRingCtx::GetCQENoTailIncrement(void)
-	__must_hold(&state_->cqe_lock_)
+CNRingState::CNRingState(uint32_t nr_thread, uint32_t nr_jobs,
+			 uint32_t nr_idle_thread):
+	wq_(nr_thread, nr_jobs, nr_idle_thread)
 {
-	uint32_t head = cq_head_.load();
-	uint32_t tail = cq_tail_.load();
-	uint32_t mask = cq_mask_;
-
-	if (unlikely(big_min_small(head, tail) > mask))
-		return nullptr;
-
-	return &cqes_[tail & mask];
+	cqe_to_wait_.store(0, std::memory_order_relaxed);
+	nr_free_cqe_slot_waiter_.store(0, std::memory_order_relaxed);
+	stop_ = false;
+	wq_.run();
 }
 
-void CNRingCtx::NotifyCQEWaiter(void)
+void CNRing::NotifyCQEWaiter(void)
 	__must_hold(&state_->cqe_lock_)
 {
-	uint32_t to_wait = state_->cqe_to_wait_.load();
+	uint32_t to_wait = state_->cqe_to_wait_.load(std::memory_order_acquire);
 
 	if (!to_wait)
 		return;
 
-	if (to_wait > cqe_size())
+	if (to_wait > CQESize())
 		return;
 
-	state_->cqe_cond_.notify_one();
+	state_->cqe_to_wait_cond_.notify_one();
 }
 
-bool CNRingCtx::TryPostCQE(CNRingSQE *sqe, int64_t res)
+void CNRing::NotifyFreeCQEWaiter(bool force)
 {
-	CNRingCQE *cqe;
-
-	state_->cqe_lock_.lock();
-	cqe = GetCQENoTailIncrement();
-	if (unlikely(!cqe)) {
-		NotifyCQEWaiter();
-		state_->cqe_lock_.unlock();
-		return false;
+	uint32_t n;
+
+	n = state_->nr_free_cqe_slot_waiter_.load(std::memory_order_acquire);
+	if (unlikely(n > 0 || force)) {
+		std::condition_variable *cond;
+		std::mutex *mutex;
+
+		mutex = &state_->wait_for_a_free_cqe_slot_lock_;
+		cond = &state_->wait_for_a_free_cqe_slot_cond_;
+		mutex->lock();
+		if (n == 1)
+			cond->notify_one();
+		else
+			cond->notify_all();
+		mutex->unlock();
 	}
-
-	cqe->op = sqe->op;
-	cqe->user_data = sqe->user_data;
-	cqe->res = res;
-	cq_tail_++;
-	NotifyCQEWaiter();
-	state_->cqe_lock_.unlock();
-	return true;
 }
 
-void CNRingCtx::WaitForCQEFreeSlot(void)
+void CNRing::SleepToWaitForAFreeCQE(void)
 {
-	std::unique_lock<std::mutex> lock(state_->post_cqe_lock_);
+	std::unique_lock<std::mutex> lk(state_->wait_for_a_free_cqe_slot_lock_);
+	std::atomic<uint32_t> *nr_waiter = &state_->nr_free_cqe_slot_waiter_;
 
-	state_->nr_post_cqe_wait_++;
-	state_->post_cqe_cond_.wait(lock, [this]{
-		bool ret;
+	(*nr_waiter)++;
+	state_->wait_for_a_free_cqe_slot_cond_.wait(lk, [this]{
+		uint32_t ret;
 
 		state_->cqe_lock_.lock();
-		ret = !!GetCQENoTailIncrement();
+		ret = CQESize();
 		state_->cqe_lock_.unlock();
-		return ret;
+		return ret < (cq_mask_ + 1) || state_->should_stop();
 	});
-	state_->nr_post_cqe_wait_--;
+	(*nr_waiter)--;
 }
 
-void CNRingCtx::PostCQE(CNRingSQE *sqe, int64_t res)
+void CNRing::PostCQE(const CNRingSQE *sqe, int64_t res)
 {
 	while (1) {
+		if (unlikely(state_->should_stop()))
+			return;
+
 		if (TryPostCQE(sqe, res))
-			break;
+			return;
 
-		WaitForCQEFreeSlot();
+		SleepToWaitForAFreeCQE();
 	}
 }
 
-void CNRingCtx::NotifyWaitCQEFreeSlot(void)
+bool CNRing::TryPostCQE(const CNRingSQE *sqe, int64_t res)
 {
-	uint32_t nr_wait = state_->nr_post_cqe_wait_.load();
+	bool ret = false;
+	CNRingCQE *cqe;
+	uint32_t mask;
 
-	if (unlikely(nr_wait)) {
-		state_->post_cqe_lock_.lock();
-		if (nr_wait == 1)
-			state_->post_cqe_cond_.notify_one();
-		else
-			state_->post_cqe_cond_.notify_all();
-		state_->post_cqe_lock_.unlock();
+	state_->cqe_lock_.lock();
+	mask = cq_mask_;
+
+	if (likely(CQESize() < (mask + 1))) {
+		cqe = &cqes_[cq_tail_++ & mask];
+		ret = true;
+		cqe->op_ = sqe->op_;
+		cqe->res_ = res;
+		cqe->user_data_ = sqe->user_data_;
+		NotifyCQEWaiter();
 	}
-}
 
-void CNRingCtx::CQAdvance(uint32_t n)
-{
-	state_->cqe_lock_.lock();
-	cq_head_ += n;
 	state_->cqe_lock_.unlock();
-	NotifyWaitCQEFreeSlot();
+	return ret;
 }
 
-/*
- * Use this when the caller is not allowed to block.
- */
-void CNRingCtx::CallPostCQE(CNRingSQE *sqe, int64_t res)
+bool CNRing::PostCQENoSleep(const CNRingSQE *sqe, int64_t res)
 {
-	CNRingSQE *tmp;
-
 	if (likely(TryPostCQE(sqe, res)))
-		return;
+		return true;
 
-	/*
-	 * The CQE slot is full, but the caller is not allowed to
-	 * block, let's schedule the post in the workqueue thread.
-	 */
-	tmp = new CNRingSQE;
-	*tmp = *sqe;
-	state_->wq_.schedule_work([this, res](struct wq_job_data *data){
-		CNRingSQE *sqe = (CNRingSQE *)data->data;
-
-		this->PostCQE(sqe, res);
-		delete sqe;
-	}, tmp);
+	auto func = [this, res, copy_sqe = *sqe](auto *x){
+		this->PostCQE(&copy_sqe, res);
+	};
+	return state_->wq_.try_schedule_work(std::move(func), nullptr) >= 0;
 }
 
-void CNRingCtx::IssueNopSQE(CNRingSQE *sqe)
+bool CNRing::IssueSQENop(const CNRingSQE *sqe)
 {
-	CallPostCQE(sqe, 0);
+	return PostCQENoSleep(sqe, 0);
 }
 
-static void issue_start_sqe(void *udata, net::URLRequest *url_req, int net_err)
+static inline void post_sqe_read(CNRing *ring, const CNRingSQE *sqe, int size)
 {
-	CNRingOpStart *sop;
-	CNRingSQE *sqe;
-
-	sqe = (CNRingSQE *)udata;
-	sop = (CNRingOpStart *)sqe->sq_data;
-	sop->ring->PostCQE(sqe, net_err);
-	delete sop;
-	delete sqe;
+	ring->PostCQE(sqe, size);
 }
 
-void CNRingCtx::IssueStartSQE(CNRingSQE *sqe)
+static int issue_sqe_read(CNRing *ring, const CNRingSQE *sqe)
 {
+	CHNet *ch = sqe->sq_start_.ch_;
 	struct net::CHNCallback *cb;
-	CNRingOpStart *sop;
-	CNRingSQE *tmp;
-	int ret;
 
-	tmp = new CNRingSQE;
-	*tmp = *sqe;
+	auto f = [ring, sqe_cp = *sqe](void *u, net::URLRequest *ur, int size){
+		post_sqe_read(ring, &sqe_cp, size);
+	};
 
-	sop = (CNRingOpStart *)tmp->sq_data;
-	sop->ring = this;
-
-	cb = &sop->chnet->ch()->cb;
-	cb->response_started_ = issue_start_sqe;
-	cb->response_started_data_ = tmp;
-	ret = sop->chnet->Start();
-	if (unlikely(ret)) {
-		CallPostCQE(sqe, ret);
-		delete tmp;
-	}
-}
-
-static void issue_read_sqe(void *udata, net::URLRequest *url_req, int read_ret)
-{
-	CNRingOpStart *sop;
-	CNRingSQE *sqe;
-
-	sqe = (CNRingSQE *)udata;
-	sop = (CNRingOpStart *)sqe->sq_data;
-	sop->ring->PostCQE(sqe, read_ret);
-	delete sop;
-	delete sqe;
+	cb = &ch->ch()->cb;
+	cb->read_completed_ = std::move(f);
+	return ch->Read(sqe->sq_read_.size_);
 }
 
-static void issue_read_sqe_need_start(void *udata, net::URLRequest *url_req,
-				      int net_err)
+static void _issue_sqe_read_need_start(CNRing *ring, const CNRingSQE *sqe,
+				       int net_err)
 {
+	CHNet *ch = sqe->sq_start_.ch_;
 	struct net::CHNCallback *cb;
-	CNRingOpRead *sop;
-	CNRingSQE *sqe;
 
-	sqe = (CNRingSQE *)udata;
-	sop = (CNRingOpRead *)sqe->sq_data;
-
-	if (likely(net_err == net::OK)) {
-		/*
-		 * The start operation succeeds, now we can do
-		 * the read operation directly without issuing
-		 * extra SQE.
-		 */
-		cb = &sop->chnet->ch()->cb;
-		cb->read_completed_ = issue_read_sqe;
-		cb->read_completed_data_ = sqe;
-		sop->chnet->ch()->_Read(nullptr, sop->read_size);
+	if (unlikely(net_err != net::OK)) {
+		ring->PostCQE(sqe, net_err);
 		return;
 	}
 
+	auto f = [ring, sqe_cp = *sqe](void *u, net::URLRequest *ur, int size){
+		post_sqe_read(ring, &sqe_cp, size);
+	};
+
 	/*
-	 * The start operation fails, just post the CQE directly.
+	 * The start operation succeeds, now we can do
+	 * the read operation directly without issuing
+	 * extra SQE.
 	 */
-	sop->ring->PostCQE(sqe, net_err);
-	delete sop;
-	delete sqe;
+	cb = &ch->ch()->cb;
+	cb->read_completed_ = std::move(f);
+	ch->ch()->_Read(nullptr, sqe->sq_read_.size_);
 }
 
-void CNRingCtx::IssueReadSQE(CNRingSQE *sqe)
+static int issue_sqe_read_need_start(CNRing *ring, const CNRingSQE *sqe)
 {
+	CHNet *ch = sqe->sq_start_.ch_;
 	struct net::CHNCallback *cb;
-	net::CHNetDelegate *ch;
-	CNRingOpRead *sop;
-	CNRingSQE *tmp;
-	int ret;
 
-	tmp = new CNRingSQE;
-	*tmp = *sqe;
+	auto f = [ring, sqe_cp = *sqe](void *u, auto *ur, int net_err){
+		_issue_sqe_read_need_start(ring, &sqe_cp, net_err);
+	};
+
+	cb = &ch->ch()->cb;
+	cb->response_started_ = std::move(f);
+	return ch->Start();
+}
 
-	sop = (CNRingOpRead *)tmp->sq_data;
-	sop->ring = this;
+bool CNRing::IssueSQERead(const CNRingSQE *sqe)
+{
+	int ret;
 
-	ch = sop->chnet->ch();
-	cb = &ch->cb;
-	if (unlikely(!ch->started())) {
+	if (!sqe->sq_start_.ch_->ch()->started()) {
 		/*
 		 * The request hasn't been started, we can't perform
 		 * a read operation at this point, do the start
 		 * operation first!
 		 */
-		cb->response_started_ = issue_read_sqe_need_start;
-		cb->response_started_data_ = tmp;
-		ret = sop->chnet->Start();
+		ret = issue_sqe_read_need_start(this, sqe);
 	} else {
 		/*
 		 * Normal read operation here.
 		 */
-		cb->read_completed_ = issue_read_sqe;
-		cb->read_completed_data_ = tmp;
-		ret = sop->chnet->Read(sop->read_size);
+		ret = issue_sqe_read(this, sqe);
 	}
 
 	if (likely(ret >= 0))
-		return;
+		return true;
 
-	/*
-	 * Aiee, the operation fails!
-	 */
-	CallPostCQE(sqe, ret);
-	delete tmp;
+	return PostCQENoSleep(sqe, ret);
+}
+
+bool CNRing::IssueSQEStart(const CNRingSQE *sqe)
+{
+	CHNet *ch = sqe->sq_start_.ch_;
+	struct net::CHNCallback *cb;
+	int ret;
+
+	auto f = [this, sqe_cp = *sqe](void *u, auto *ur, int net_err){
+		this->PostCQE(&sqe_cp, net_err);
+	};
+
+	cb = &ch->ch()->cb;
+	cb->response_started_ = std::move(f);
+	ret = ch->Start();
+
+	if (likely(ret >= 0))
+		return true;
+
+	return PostCQENoSleep(sqe, ret);
 }
 
-void CNRingCtx::IssueSQE(CNRingSQE *sqe)
+void CNRing::IssueSQE(const CNRingSQE *sqe)
 {
-	switch (sqe->op) {
+	bool ok = false;
+
+	switch (sqe->op_) {
 	case CNRING_OP_NOP:
-		IssueNopSQE(sqe);
-		break;
-	case CNRING_OP_START:
-		IssueStartSQE(sqe);
+		ok = IssueSQENop(sqe);
 		break;
 	case CNRING_OP_READ:
-		IssueReadSQE(sqe);
+		ok = IssueSQERead(sqe);
+		break;
+	case CNRING_OP_START:
+		ok = IssueSQEStart(sqe);
 		break;
 	}
+
+	if (likely(ok))
+		cq_max_to_wait_++;
 }
 
-uint32_t CNRingCtx::SubmitSQE(uint32_t to_submit)
+uint32_t CNRing::Submit(uint32_t to_submit)
 {
-	uint32_t head = sq_head_.load();
-	uint32_t tail = sq_tail_.load();
-	uint32_t ret = 0;
+	uint32_t head;
+	uint32_t tail;
+	uint32_t mask;
+	uint32_t ret;
+
+	if (unlikely(!to_submit))
+		return 0;
+
+	state_->sqe_lock_.lock();
+	head = sq_head_.load(std::memory_order_acquire);
+	tail = sq_tail_.load(std::memory_order_acquire);
+	mask = sq_mask_;
+	ret = 0;
 
 	while (to_submit--) {
 		if (unlikely(head == tail))
 			break;
 
-		IssueSQE(&sqes_[head++ & sq_mask_]);
+		IssueSQE(&sqes_[head++ & mask]);
 		ret++;
 	}
 
-	if (ret)
-		sq_head_.fetch_add(ret, std::memory_order_release);
+	sq_head_.store(head, std::memory_order_release);
+	state_->sqe_lock_.unlock();
+	return ret;
+}
+
+CNRingSQE *CNRing::GetSQE(void)
+{
+	CNRingSQE *sqe = nullptr;
+
+	state_->sqe_lock_.lock();
 
+	if (likely(SQESize() < (sq_mask_ + 1)))
+		sqe = &sqes_[sq_tail_++ & sq_mask_];
+
+	state_->sqe_lock_.unlock();
+	return sqe;
+}
+
+uint32_t CNRing::_WaitCQE(uint32_t to_wait, uint32_t timeout_ms,
+			  std::unique_lock<std::mutex> &lock)
+	__must_hold(&state_->cqe_lock_)
+{
+	std::atomic<uint32_t> *cqe_to_wait = &state_->cqe_to_wait_;
+	std::condition_variable *cond = &state_->cqe_to_wait_cond_;
+	uint32_t ret;
+
+	cqe_to_wait->store(to_wait, std::memory_order_release);
+	auto callback = [this, to_wait, &ret]{
+		ret = CQESize();
+		return ret >= to_wait;
+	};
+
+	if (timeout_ms == -1U)
+		cond->wait(lock, std::move(callback));
+	else
+		cond->wait_for(lock, timeout_ms*1ms, std::move(callback));
+
+	cqe_to_wait->store(0, std::memory_order_release);
 	return ret;
 }
 
-void CNRingCtx::WaitCQE(uint32_t to_wait)
+uint32_t CNRing::WaitCQE(uint32_t to_wait, uint32_t timeout_ms)
 {
 	uint32_t max_to_wait;
-	uint32_t cq_size;
+	uint32_t ret;
 
 	if (unlikely(!to_wait))
-		return;
+		return 0;
+
+	max_to_wait = cq_max_to_wait_.load(std::memory_order_acquire);
+	if (unlikely(!max_to_wait))
+		return 0;
 
-	max_to_wait = cq_mask_;
 	if (to_wait > max_to_wait)
 		to_wait = max_to_wait;
 
-	std::unique_lock<std::mutex> cqe_lock(state_->cqe_lock_);
-	cq_size = cqe_size();
-
-	if (cq_size < cq_mask_ + 1)
-		NotifyWaitCQEFreeSlot();
+	std::unique_lock<std::mutex> lock(state_->cqe_lock_);
 
-	if (to_wait <= cq_size)
-		return;
+	ret = CQESize();
+	if (unlikely(ret >= to_wait))
+		return ret;
 
-	state_->cqe_to_wait_.store(to_wait);
-	state_->cqe_cond_.wait(cqe_lock, [this, to_wait]{
-		return to_wait <= cqe_size();
-	});
-	state_->cqe_to_wait_.store(0);
+	return _WaitCQE(to_wait, timeout_ms, lock);
 }
 
-CNRingState::CNRingState(uint32_t nr_thread, uint32_t nr_jobs,
-			       uint32_t nr_idle_thread):
-	wq_(nr_thread, nr_jobs, nr_idle_thread)
+void CNRing::CQAdvance(uint32_t n)
 {
-	wq_.run();
+	state_->cqe_lock_.lock();
+	cq_head_ += n;
+	cq_max_to_wait_ -= n;
+	state_->cqe_lock_.unlock();
+	NotifyFreeCQEWaiter();
 }
diff --git a/chnet/chnet_ring.h b/chnet/chnet_ring.h
index f41d75b..0630050 100644
--- a/chnet/chnet_ring.h
+++ b/chnet/chnet_ring.h
@@ -8,169 +8,193 @@
 #include "common.h"
 
 #ifdef __FOR_CHROMIUM_INTERNAL
-#include <condition_variable>
-
 /*
  * Chromium internal includes.
  */
 #include "base/component_export.h"
 #include "WorkQueue.h"
+#include <condition_variable>
 #define CHNET_EXPORT COMPONENT_EXPORT(CHNET)
+
 #else /* #ifdef __FOR_CHROMIUM_INTERNAL */
+
 #define CHNET_EXPORT
-#endif
+
+#endif /* #ifdef __FOR_CHROMIUM_INTERNAL */
+
 
 enum CNRingOp {
 	CNRING_OP_NOP	= 0,
-	CNRING_OP_START	= 1,
-	CNRING_OP_READ	= 2,
+	CNRING_OP_READ	= 1,
+	CNRING_OP_START	= 2,
+	CNRING_OP_WRITE	= 3,
 };
 
-class CNRingCtx;
-
-struct CHNET_EXPORT CNRingOpStart {
-	CHNet		*chnet;
-	CNRingCtx	*ring;
+struct CHNET_EXPORT CNRingOpRead {
+	CHNet		*ch_;
+	uint32_t	size_;
 };
 
-struct CHNET_EXPORT CNRingOpRead {
-	CHNet		*chnet;
-	CNRingCtx	*ring;
-	int		read_size;
+struct CHNET_EXPORT CNRingOpStart {
+	CHNet		*ch_;
 };
 
 class CHNET_EXPORT CNRingSQE {
 public:
-	uint8_t		op;
-	void		*sq_data;
-	uint64_t	user_data;
+	uint8_t		op_;
+	uint64_t	user_data_;
+	union {
+		struct CNRingOpStart	sq_start_;
+		struct CNRingOpRead	sq_read_;
+	};
 
 	inline void PrepNop(void)
 	{
-		op = CNRING_OP_NOP;
+		op_ = CNRING_OP_NOP;
 	}
 
-	inline void PrepStart(CHNet *chnet)
+	inline void PrepRead(CHNet *ch, uint32_t size)
 	{
-		CNRingOpStart *d = new CNRingOpStart;
-
-		d->chnet = chnet;
-		d->ring = nullptr;
-		sq_data = (void *)d;
-		op = CNRING_OP_START;
+		op_ = CNRING_OP_READ;
+		sq_read_.ch_ = ch;
+		sq_read_.size_ = size;
 	}
 
-	inline void PrepRead(CHNet *chnet, int read_size)
+	inline void PrepStart(CHNet *ch)
 	{
-		CNRingOpRead *d = new CNRingOpRead;
-
-		d->chnet = chnet;
-		d->read_size = read_size;
-		sq_data = (void *)d;
-		op = CNRING_OP_READ;
+		op_ = CNRING_OP_START;
+		sq_start_.ch_ = ch;
 	}
 
-	inline void SetUserDataPtr(void *ptr)
+	inline void SetUserData(uint64_t data)
 	{
-		user_data = (uint64_t) ptr;
+		user_data_ = data;
 	}
 
-	inline void SetUserData(uint64_t ptr)
+	inline void SetUserDataPtr(void *data)
 	{
-		user_data = ptr;
+		user_data_ = (uint64_t) (uintptr_t) data;
 	}
 };
 
 class CHNET_EXPORT CNRingCQE {
 public:
-	uint8_t		op;
-	uint64_t	user_data;
-	union {
-		int64_t		res;
-	};
+	uint8_t		op_;
+	int64_t		res_;
+	uint64_t	user_data_;
 
 	inline uint64_t GetUserData(void)
 	{
-		return user_data;
+		return user_data_;
 	}
 
 	inline void *GetUserDataPtr(void)
 	{
-		return (void *) (uintptr_t) user_data;
+		return (void *) (uintptr_t) user_data_;
 	}
 };
 
 #ifdef __FOR_CHROMIUM_INTERNAL
 class CNRingState {
 public:
-	CNRingState(uint32_t nr_thread = 64, uint32_t nr_jobs = 4096,
-		    uint32_t nr_idle_thread = -1U);
-	WorkQueue	wq_;
-	std::mutex	cqe_lock_;
-	std::condition_variable cqe_cond_;
-	std::atomic<uint32_t> cqe_to_wait_;
-
-	std::mutex	post_cqe_lock_;
-	std::condition_variable post_cqe_cond_;
-	std::atomic<uint32_t> nr_post_cqe_wait_;
+	CNRingState(uint32_t nr_thread, uint32_t nr_jobs,
+		    uint32_t nr_idle_thread);
+
+	std::atomic<uint32_t>	cqe_to_wait_;
+	std::atomic<uint32_t>	nr_free_cqe_slot_waiter_;
+
+	std::condition_variable	cqe_to_wait_cond_;
+	std::condition_variable	wait_for_a_free_cqe_slot_cond_;
+
+	std::mutex		wait_for_a_free_cqe_slot_lock_;
+	std::mutex		sqe_lock_;
+	std::mutex		cqe_lock_;
+	WorkQueue		wq_;
+	volatile bool		stop_;
+
+	inline bool should_stop(void)
+	{
+		return stop_ || wq_.should_stop();
+	}
 };
-#endif
+#endif /* #ifdef __FOR_CHROMIUM_INTERNAL */
 
-class CHNET_EXPORT CNRingCtx {
+
+static inline uint32_t u32_diff(uint32_t a, uint32_t b)
+{
+	if (b > a)
+		return b - a;
+	else
+		return a - b;
+}
+
+class CHNET_EXPORT CNRing {
 public:
-	CNRingCtx(uint32_t entry);
-	~CNRingCtx(void);
-	uint32_t SubmitSQE(uint32_t to_submit = -1U);
+	CNRing(uint32_t entry);
+	~CNRing(void);
+	uint32_t Submit(uint32_t to_submit = -1U);
 	CNRingSQE *GetSQE(void);
-	void WaitCQE(uint32_t to_wait);
-	uint32_t sqe_size(void);
-	uint32_t cqe_size(void);
-
-	void PostCQE(CNRingSQE *sqe, int64_t res);
-	void CallPostCQE(CNRingSQE *sqe, int64_t res);
+	uint32_t WaitCQE(uint32_t to_wait = 1, uint32_t timeout_ms = -1U);
 	void CQAdvance(uint32_t n);
+	bool TryPostCQE(const CNRingSQE *sqe, int64_t res);
+	void PostCQE(const CNRingSQE *sqe, int64_t res);
+	bool PostCQENoSleep(const CNRingSQE *sqe, int64_t res);
 
-	inline CNRingCQE *HeadCQE(void)
+	inline CNRingCQE *GetCQEHead(void)
 	{
-		return &cqes_[cq_head_.load()];
+		return &cqes_[cq_head_.load(std::memory_order_acquire) &
+			      cq_mask_];
 	}
 
+	CNRingSQE		*sqes_;
+	CNRingCQE		*cqes_;
+
 #ifdef __FOR_CHROMIUM_INTERNAL
-	CNRingState	*state_;
+	CNRingState		*state_;
 #else
-	void		*state_;
+	void			*state_;
 #endif
 
-	CNRingSQE	*sqes_;
-	CNRingCQE	*cqes_;
-
-	uint32_t		sq_mask_;
-	std::atomic<uint32_t>	sq_head_;
 	std::atomic<uint32_t>	sq_tail_;
+	std::atomic<uint32_t>	sq_head_;
+	uint32_t		sq_mask_;
 
-	uint32_t		cq_mask_;
-	std::atomic<uint32_t>	cq_head_;
 	std::atomic<uint32_t>	cq_tail_;
+	std::atomic<uint32_t>	cq_head_;
+	uint32_t		cq_mask_;
+	std::atomic<uint32_t>	cq_max_to_wait_;
 
 private:
-	void HandleSQE(CNRingSQE *sqe);
-	CNRingSQE *ConsumeSQE(void);
-	bool TryPostCQE(CNRingSQE *sqe, int64_t res);
-	CNRingCQE *GetCQENoTailIncrement(void);
+	uint32_t _WaitCQE(uint32_t to_wait, uint32_t timeout_ms,
+			  std::unique_lock<std::mutex> &lock);
+	void IssueSQE(const CNRingSQE *sqe);
+	bool IssueSQENop(const CNRingSQE *sqe);
+	bool IssueSQERead(const CNRingSQE *sqe);
+	bool IssueSQEStart(const CNRingSQE *sqe);
 	void NotifyCQEWaiter(void);
-	void NotifyWaitCQEFreeSlot(void);
-	void WaitForCQEFreeSlot(void);
+	void NotifyFreeCQEWaiter(bool force = false);
+	void SleepToWaitForAFreeCQE(void);
+
+	int _IssueSQEReadNeedStart(const CNRingSQE *sqe);
 
-	void IssueSQE(CNRingSQE *sqe);
-	void IssueNopSQE(CNRingSQE *sqe);
-	void IssueStartSQE(CNRingSQE *sqe);
-	void IssueReadSQE(CNRingSQE *sqe);
+	inline uint32_t CQESize(void)
+	{
+		return u32_diff(cq_head_.load(std::memory_order_acquire),
+				cq_tail_.load(std::memory_order_acquire));
+	}
+
+	inline uint32_t SQESize(void)
+	{
+		return u32_diff(sq_head_.load(std::memory_order_acquire),
+				sq_tail_.load(std::memory_order_acquire));
+	}
 };
 
-#define chnring_for_each_cqe(ring, head, cqe)				\
+#define cnring_for_each_cqe(ring, head, tail, cqe)			\
 	for (								\
-		head = (ring)->cq_head_;				\
-		(cqe  = (head != (ring)->cq_tail_) ?			\
+		head = (ring)->cq_head_.load(std::memory_order_acquire),\
+		tail = (ring)->cq_tail_.load(std::memory_order_acquire);\
+		(cqe = (head != tail) ?					\
 			&(ring)->cqes_[head & (ring)->cq_mask_] :	\
 			NULL);						\
 		head++							\
diff --git a/tests/cpp/ring.cc b/tests/cpp/ring.cc
index 54426ef..22da7ea 100644
--- a/tests/cpp/ring.cc
+++ b/tests/cpp/ring.cc
@@ -6,231 +6,246 @@
 
 static void test_nop(void)
 {
-	/*
-	 * Simple nop test.
-	 *
-	 * This shows that CQE size is double of SQE size.
-	 */
-
-	static constexpr uint32_t entry = 4096;
-	unsigned head, i, j, a, b;
-	CNRingCtx ring(entry);
+	constexpr static uint32_t nr_loop = 1024 * 8;
+	constexpr static uint32_t entry = 4096;
+	CNRing ring(entry);
 	CNRingSQE *sqe;
 	CNRingCQE *cqe;
+	uint32_t total;
+	uint32_t head;
+	uint32_t tail;
+	uint32_t ret;
+	uint32_t i;
 
-	i = 0;
-	for (j = 0; j < 3; j++) {
-		while (1) {
+	for (i = 0; i < nr_loop; i++) {
+		sqe = ring.GetSQE();
+		if (!sqe) {
+			assert(ring.Submit() == entry);
 			sqe = ring.GetSQE();
-			if (!sqe)
-				break;
-			sqe->PrepNop();
-			sqe->SetUserData(i++);
+			assert(sqe);
 		}
-		ring.SubmitSQE();
+		sqe->PrepNop();
+		sqe->SetUserData(1);
 	}
-	ring.WaitCQE(entry * 2);
-
-	a = i;
-	i = 0;
-	chnring_for_each_cqe(&ring, head, cqe)
-		i++;
-	b = i;
-	ring.CQAdvance(i);
-	assert(b == entry * 2);
-
-	ring.WaitCQE(entry);
-	i = 0;
-	chnring_for_each_cqe(&ring, head, cqe)
-		i++;
-	ring.CQAdvance(i);
-	b += i;
-	assert(a == b);
+	assert(ring.Submit() == entry);
+
+	total = 0;
+	while (total < nr_loop) {
+		ret = ring.WaitCQE(1);
+		i = 0;
+		cnring_for_each_cqe(&ring, head, tail, cqe) {
+			assert(cqe->op_ == CNRING_OP_NOP);
+			assert(cqe->res_ == 0);
+			assert(cqe->user_data_ == 1);
+			i++;
+		}
+		ring.CQAdvance(i);
+		assert(ret >= 1);
+		assert(i >= ret);
+		total += ret;
+	}
+	assert(total == nr_loop);
 }
 
-static void test_chnet_ring_single(void)
+static void test_chnet_ring_read(bool do_sq_start = false)
 {
-	CNRingCtx ring(4096);
+	CNRing ring(1);
 	CNRingSQE *sqe;
 	CNRingCQE *cqe;
+	uint32_t ret;
 	CHNet *ch;
 
 	ch = new CHNet;
 	ch->SetURL("http://127.0.0.1:8000/index.php?action=hello");
+	ch->SetMethod("GET");
 
-	sqe = ring.GetSQE();
-	assert(sqe);
-	sqe->PrepStart(ch);
-	sqe->SetUserData(9999);
-	assert(ring.SubmitSQE() == 1);
-
-	ring.WaitCQE(1);
-	cqe = ring.HeadCQE();
-	assert(cqe->op == CNRING_OP_START);
-	assert(cqe->res == 0);
-	assert(cqe->user_data == 9999);
-	ring.CQAdvance(1);
+	if (do_sq_start) {
+		sqe = ring.GetSQE();
+		assert(sqe);
+		sqe->PrepStart(ch);
+		assert(ring.Submit() == 1);
+
+		ret = ring.WaitCQE(1);
+		cqe = ring.GetCQEHead();
+		assert(ret == 1);
+		assert(cqe->op_ == CNRING_OP_START);
+		assert(cqe->res_ == 0);
+		ring.CQAdvance(1);
+	}
 
 	sqe = ring.GetSQE();
-	assert(sqe);
 	sqe->PrepRead(ch, 1024);
-	sqe->SetUserData(9999);
-	assert(ring.SubmitSQE() == 1);
-
-	ring.WaitCQE(1);
-	cqe = ring.HeadCQE();
-	assert(cqe->op == CNRING_OP_READ);
-	assert(cqe->res == 13);
-	assert(cqe->user_data == 9999);
+	ring.Submit();
+
+	ret = ring.WaitCQE(1);
+	cqe = ring.GetCQEHead();
+
+	assert(ret == 1);
+	assert(cqe->op_ == CNRING_OP_READ);
+	assert(cqe->res_ == 13);
+	assert(ch->read_ret() == cqe->res_);
+	assert(!strncmp("Hello World!\n", ch->read_buf(), cqe->res_));
 	ring.CQAdvance(1);
 
 	delete ch;
 }
 
-static void test_chnet_ring_set_header(void)
+static void test_chnet_ring_partial_read(bool do_sq_start = false)
 {
-	constexpr static const char ua[] = "This is just a test user agent!!!";
-	CNRingCtx ring(4096);
+	CNRing ring(1);
 	CNRingSQE *sqe;
 	CNRingCQE *cqe;
+	uint32_t ret;
 	CHNet *ch;
 
 	ch = new CHNet;
-	ch->SetURL("http://127.0.0.1:8000/index.php?action=user_agent");
-	ch->SetRequestHeader("User-agent", ua);
+	ch->SetURL("http://127.0.0.1:8000/index.php?action=hello");
+	ch->SetMethod("GET");
 
+	if (do_sq_start) {
+		sqe = ring.GetSQE();
+		sqe->PrepStart(ch);
+		assert(ring.Submit() == 1);
+
+		ret = ring.WaitCQE(1);
+		cqe = ring.GetCQEHead();
+		assert(ret == 1);
+		assert(cqe->op_ == CNRING_OP_START);
+		assert(cqe->res_ == 0);
+		ring.CQAdvance(1);
+	}
+
+	/*
+	 * The first read.
+	 */
 	sqe = ring.GetSQE();
-	assert(sqe);
-	sqe->PrepStart(ch);
-	sqe->SetUserData(9999);
-	assert(ring.SubmitSQE() == 1);
-
-	ring.WaitCQE(1);
-	cqe = ring.HeadCQE();
-	assert(cqe->op == CNRING_OP_START);
-	assert(cqe->res == 0);
-	assert(cqe->user_data == 9999);
+	sqe->PrepRead(ch, 4);
+	assert(ring.Submit() == 1);
+
+	ret = ring.WaitCQE(1);
+	cqe = ring.GetCQEHead();
+
+	assert(ret == 1);
+	assert(cqe->op_ == CNRING_OP_READ);
+	assert(cqe->res_ == 4);
+	assert(ch->read_ret() == cqe->res_);
+	assert(!strncmp("Hell", ch->read_buf(), cqe->res_));
 	ring.CQAdvance(1);
 
+
+	/*
+	 * The second read.
+	 */
 	sqe = ring.GetSQE();
-	assert(sqe);
-	sqe->PrepRead(ch, 1024);
-	sqe->SetUserData(9999);
-	assert(ring.SubmitSQE() == 1);
-
-	ring.WaitCQE(1);
-	cqe = ring.HeadCQE();
-	assert(cqe->op == CNRING_OP_READ);
-	assert(cqe->res == strlen(ua));
-	assert(cqe->user_data == 9999);
-	assert(!strncmp(ua, ch->read_buf(), strlen(ua)));
-	ring.CQAdvance(1);
+	sqe->PrepRead(ch, 4);
+	assert(ring.Submit() == 1);
 
-	delete ch;
-}
+	ret = ring.WaitCQE(1);
+	cqe = ring.GetCQEHead();
 
-static void test_chnet_ring_single_post(void)
-{
-	constexpr static const char buf[] = "AAAA Hello World!\n";
-	CNRingCtx ring(4096);
-	CNRingSQE *sqe;
-	CNRingCQE *cqe;
-	CHNet *ch;
+	assert(ret == 1);
+	assert(cqe->op_ == CNRING_OP_READ);
+	assert(cqe->res_ == 4);
+	assert(ch->read_ret() == cqe->res_);
+	assert(!strncmp("o Wo", ch->read_buf(), cqe->res_));
+	ring.CQAdvance(1);
 
-	ch = new CHNet;
-	ch->SetURL("http://127.0.0.1:8000/index.php?action=body");
-	ch->SetMethod("POST");
-	ch->SetPayload(buf, sizeof(buf));
 
+	/*
+	 * The third read.
+	 */
 	sqe = ring.GetSQE();
-	assert(sqe);
-	sqe->PrepStart(ch);
-	sqe->SetUserData(9999);
-	assert(ring.SubmitSQE() == 1);
-
-	ring.WaitCQE(1);
-	cqe = ring.HeadCQE();
-	assert(cqe->op == CNRING_OP_START);
-	assert(cqe->res == 0);
-	assert(cqe->user_data == 9999);
+	sqe->PrepRead(ch, 1024);
+	assert(ring.Submit() == 1);
+
+	ret = ring.WaitCQE(1);
+	cqe = ring.GetCQEHead();
+
+	assert(ret == 1);
+	assert(cqe->op_ == CNRING_OP_READ);
+	assert(cqe->res_ == 5);
+	assert(ch->read_ret() == cqe->res_);
+	assert(!strncmp("rld!\n", ch->read_buf(), cqe->res_));
 	ring.CQAdvance(1);
 
+
+	/*
+	 * The 4-th read.
+	 */
 	sqe = ring.GetSQE();
-	assert(sqe);
 	sqe->PrepRead(ch, 1024);
-	sqe->SetUserData(9999);
-	assert(ring.SubmitSQE() == 1);
-
-	ring.WaitCQE(1);
-	cqe = ring.HeadCQE();
-	assert(cqe->op == CNRING_OP_READ);
-	assert(cqe->res == sizeof(buf));
-	assert(cqe->user_data == 9999);
-	assert(!strncmp(ch->read_buf(), buf, sizeof(buf)));
+	assert(ring.Submit() == 1);
+
+	ret = ring.WaitCQE(1);
+	cqe = ring.GetCQEHead();
+
+	assert(ret == 1);
+	assert(cqe->op_ == CNRING_OP_READ);
+	assert(cqe->res_ == 0);
+	assert(ch->read_ret() == cqe->res_);
 	ring.CQAdvance(1);
 
 	delete ch;
 }
 
-#define NR_REQ 1024
-
-static void _test_chnet_ring_multiple(bool start_before_read, CNRingCtx *ring)
+#define NR_REQ 4096
+static void test_chnet_ring_read_parallel(bool do_sq_start = false)
 {
-	unsigned i, j, head;
+	CNRing ring(NR_REQ);
 	CNRingSQE *sqe;
 	CNRingCQE *cqe;
+	uint32_t head;
+	uint32_t tail;
+	uint32_t i;
 	CHNet **ch;
 
-	ch = new CHNet*[NR_REQ];
+	ch = new CHNet *[NR_REQ];
 
 	for (i = 0; i < NR_REQ; i++) {
 		ch[i] = new CHNet;
 		ch[i]->SetURL("http://127.0.0.1:8000/index.php?action=hello");
 	}
 
-	if (start_before_read) {
+	if (do_sq_start) {
 		for (i = 0; i < NR_REQ; i++) {
-			sqe = ring->GetSQE();
+			sqe = ring.GetSQE();
 			assert(sqe);
 			sqe->PrepStart(ch[i]);
 			sqe->SetUserDataPtr(ch[i]);
 		}
-		assert(ring->SubmitSQE() == i);
-		ring->WaitCQE(i);
-
-		j = 0;
-		chnring_for_each_cqe(ring, head, cqe) {
-			assert(cqe->op == CNRING_OP_START);
-			assert(!cqe->res);
-			j++;
+		assert(ring.Submit() == NR_REQ);
+		assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+		cnring_for_each_cqe(&ring, head, tail, cqe) {
+			CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
+
+			assert(ch);
+			assert(cqe->res_ == 0);
+			assert(cqe->op_ == CNRING_OP_START);
 		}
-		assert(j == i);
-		ring->CQAdvance(j);
+		ring.CQAdvance(NR_REQ);
 	}
 
+
 	for (i = 0; i < NR_REQ; i++) {
-		sqe = ring->GetSQE();
+		sqe = ring.GetSQE();
 		assert(sqe);
 		sqe->PrepRead(ch[i], 1024);
 		sqe->SetUserDataPtr(ch[i]);
 	}
-	assert(ring->SubmitSQE() == i);
-	ring->WaitCQE(i);
+	assert(ring.Submit() == NR_REQ);
+	assert(ring.WaitCQE(NR_REQ) == NR_REQ);
 
-	j = 0;
-	chnring_for_each_cqe(ring, head, cqe) {
-		CHNet *ch;
+	cnring_for_each_cqe(&ring, head, tail, cqe) {
+		CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
 
-		ch = (CHNet *)cqe->GetUserDataPtr();
 		assert(ch);
-		assert(cqe->op == CNRING_OP_READ);
-		assert(cqe->res == 13);
-		assert(ch->read_ret() == cqe->res);
-		assert(!strncmp(ch->read_buf(), "Hello World!\n", 13));
-		j++;
+		assert(cqe->res_ == 13);
+		assert(cqe->op_ == CNRING_OP_READ);
+		assert(ch->read_ret() == cqe->res_);
+		assert(!strncmp("Hello World!\n", ch->read_buf(), cqe->res_));
 	}
-	assert(j == i);
-	ring->CQAdvance(j);
+	ring.CQAdvance(NR_REQ);
 
 	for (i = 0; i < NR_REQ; i++)
 		delete ch[i];
@@ -238,82 +253,157 @@ static void _test_chnet_ring_multiple(bool start_before_read, CNRingCtx *ring)
 	delete[] ch;
 }
 
-static void test_chnet_ring_multiple(bool start_before_read)
-{
-	CNRingCtx ring(NR_REQ);
-	unsigned i;
-
-	for (i = 0; i < 3; i++)
-		_test_chnet_ring_multiple(start_before_read, &ring);
-}
-
-static void test_chnet_chunked_body(void)
+static void test_chnet_ring_partial_read_parallel(bool do_sq_start = false)
 {
-	constexpr size_t len = 1024 * 64;
-	constexpr size_t nr_loop = 100;
-	size_t total_read = 0;
-	CNRingCtx ring(4096);
+	CNRing ring(NR_REQ);
 	CNRingSQE *sqe;
 	CNRingCQE *cqe;
-	CHNet *ch;
-	char *buf;
-	size_t i;
+	uint32_t head;
+	uint32_t tail;
+	uint32_t i;
+	CHNet **ch;
 
-	ch = new CHNet;
-	ch->SetURL("http://127.0.0.1:8000/index.php?action=body");
-	ch->SetMethod("POST");
-	ch->StartChunkedBody();
+	ch = new CHNet *[NR_REQ];
 
-	sqe = ring.GetSQE();
-	assert(sqe);
-	sqe->PrepRead(ch, len);
-	sqe->SetUserData(9999);
-	assert(ring.SubmitSQE() == 1);
-
-	buf = new char[len];
-	memset(buf, 'A', len);
-
-	for (i = 0; i < nr_loop; i++)
-		ch->Write(buf, len);
-	ch->StopChunkedBody();
-
-	while (total_read < (len * nr_loop)) {
-		int res;
-
-		ring.WaitCQE(1);
-		cqe = ring.HeadCQE();
-		assert(cqe->op == CNRING_OP_READ);
-		assert(cqe->user_data == 9999);
-		res = cqe->res;
-		total_read += res;
-		ring.CQAdvance(1);
+	for (i = 0; i < NR_REQ; i++) {
+		ch[i] = new CHNet;
+		ch[i]->SetURL("http://127.0.0.1:8000/index.php?action=hello");
+		ch[i]->SetMethod("GET");
+	}
 
-		if (!res)
-			break;
+	if (do_sq_start) {
+		for (i = 0; i < NR_REQ; i++) {
+			sqe = ring.GetSQE();
+			assert(sqe);
+			sqe->PrepStart(ch[i]);
+			sqe->SetUserDataPtr(ch[i]);
+		}
+		assert(ring.Submit() == NR_REQ);
+		assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+		cnring_for_each_cqe(&ring, head, tail, cqe) {
+			CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
 
+			assert(ch);
+			assert(cqe->res_ == 0);
+			assert(cqe->op_ == CNRING_OP_START);
+		}
+		ring.CQAdvance(NR_REQ);
+	}
+
+
+	/*
+	 * The first read.
+	 */
+	for (i = 0; i < NR_REQ; i++) {
 		sqe = ring.GetSQE();
 		assert(sqe);
-		sqe->PrepRead(ch, len);
-		sqe->SetUserData(9999);
-		assert(ring.SubmitSQE() == 1);
+		sqe->PrepRead(ch[i], 4);
+		sqe->SetUserDataPtr(ch[i]);
 	}
-	printf("total_read = %zu %zu\n", total_read, (len * nr_loop));
-	assert(total_read == (len * nr_loop));
+	assert(ring.Submit() == NR_REQ);
+	assert(ring.WaitCQE(NR_REQ) == NR_REQ);
 
-	delete[] buf;
-	delete ch;
+	cnring_for_each_cqe(&ring, head, tail, cqe) {
+		CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
+
+		assert(ch);
+		assert(cqe->res_ == 4);
+		assert(cqe->op_ == CNRING_OP_READ);
+		assert(ch->read_ret() == cqe->res_);
+		assert(!strncmp("Hell", ch->read_buf(), cqe->res_));
+	}
+	ring.CQAdvance(NR_REQ);
+
+
+	/*
+	 * The second read.
+	 */
+	for (i = 0; i < NR_REQ; i++) {
+		sqe = ring.GetSQE();
+		assert(sqe);
+		sqe->PrepRead(ch[i], 4);
+		sqe->SetUserDataPtr(ch[i]);
+	}
+	assert(ring.Submit() == NR_REQ);
+	assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+	cnring_for_each_cqe(&ring, head, tail, cqe) {
+		CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
+
+		assert(ch);
+		assert(cqe->res_ == 4);
+		assert(cqe->op_ == CNRING_OP_READ);
+		assert(ch->read_ret() == cqe->res_);
+		assert(!strncmp("o Wo", ch->read_buf(), cqe->res_));
+	}
+	ring.CQAdvance(NR_REQ);
+
+
+	/*
+	 * The third read.
+	 */
+	for (i = 0; i < NR_REQ; i++) {
+		sqe = ring.GetSQE();
+		assert(sqe);
+		sqe->PrepRead(ch[i], 100);
+		sqe->SetUserDataPtr(ch[i]);
+	}
+	assert(ring.Submit() == NR_REQ);
+	assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+	cnring_for_each_cqe(&ring, head, tail, cqe) {
+		CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
+
+		assert(ch);
+		assert(cqe->res_ == 5);
+		assert(cqe->op_ == CNRING_OP_READ);
+		assert(ch->read_ret() == cqe->res_);
+		assert(!strncmp("rld!\n", ch->read_buf(), cqe->res_));
+	}
+	ring.CQAdvance(NR_REQ);
+
+
+	/*
+	 * The 4-th read.
+	 */
+	for (i = 0; i < NR_REQ; i++) {
+		sqe = ring.GetSQE();
+		assert(sqe);
+		sqe->PrepRead(ch[i], 100);
+		sqe->SetUserDataPtr(ch[i]);
+	}
+	assert(ring.Submit() == NR_REQ);
+	assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+	cnring_for_each_cqe(&ring, head, tail, cqe) {
+		CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
+
+		assert(ch);
+		assert(cqe->res_ == 0);
+		assert(cqe->op_ == CNRING_OP_READ);
+		assert(ch->read_ret() == 0);
+	}
+	ring.CQAdvance(NR_REQ);
+
+	for (i = 0; i < NR_REQ; i++)
+		delete ch[i];
+
+	delete[] ch;
 }
 
 int main(void)
 {
+	int i;
+
 	test_nop();
 	chnet_global_init();
-	// test_chnet_ring_single();
-	// test_chnet_ring_set_header();
-	// test_chnet_ring_single_post();
-	// test_chnet_ring_multiple(false);
-	// test_chnet_ring_multiple(true);
-	test_chnet_chunked_body();
+	for (i = 0; i < 2; i++) {
+		test_chnet_ring_read(!!i);
+		test_chnet_ring_partial_read(!!i);
+		test_chnet_ring_read_parallel(!!i);
+		test_chnet_ring_partial_read_parallel(!!i);
+	}
 	chnet_global_stop();
 	return 0;
 }
diff --git a/tests/js/ring.js b/tests/js/ring.js
index 372c6e6..2f3fe1c 100644
--- a/tests/js/ring.js
+++ b/tests/js/ring.js
@@ -3,263 +3,389 @@ const chnet = require("bindings")("chnet");
 
 function assert(cond)
 {
-	if (!cond) {
-		console.log("Assertion failed!");
-		process.exit(1);
-	}
+	if (!cond)
+		throw new Error("Assertion failed!");
 }
 
 function test_nop()
 {
-	const entry = 16;
-	let ring;
-	let cqe;
+	const nr_loop = 1024 * 8;
+	const entry = 4096;
+	let ring = chnet.CreateRing(entry);
 	let sqe;
-	let a, b, i, j;
-
-	i = 0;
-	ring = chnet.create_ring(entry);
-	for (j = 0; j < 3; j++) {
-		while (1) {
-			sqe = ring.get_sqe();
-			if (!sqe)
-				break;
-			sqe.prep_nop();
-			i++;
+	let cqe;
+	let total;
+	let ret;
+	let i;
+
+	for (i = 0; i < nr_loop; i++) {
+		sqe = ring.GetSQE();
+		if (!sqe) {
+			assert(ring.Submit() == entry);
+			sqe = ring.GetSQE();
+			assert(sqe);
 		}
-		ring.submit_sqe();
+		sqe.PrepNop();
+		sqe.SetUserData(1);
+	}
+	assert(ring.Submit() == entry);
+	assert(ring.WaitCQE(1));
+
+	total = 0;
+	while (total < nr_loop) {
+		ret = ring.WaitCQE(1);
+		i = ring.ForEachCQE(function (cqe) {
+			assert(!cqe.res);
+			assert(cqe.user_data == 1);
+		});
+		ring.CQAdvance(i);
+		assert(ret >= 1);
+		assert(i >= ret);
+		total += ret;
 	}
-	ring.wait_cqe(entry * 2);
-
-	a = i;
-	i = ring.for_each_cqe(function (cqe) { });
-	b = i;
-	ring.cq_advance(i);
-	assert(b == entry * 2);
-
-	ring.wait_cqe(entry);
-	i = ring.for_each_cqe(function (cqe) { });
-	b += i;
-	ring.cq_advance(i);
-	assert(a == b);
+	assert(total == nr_loop);
+	ring.Close();
 }
 
-function test_chnet_ring_multiple()
+function test_chnet_ring_read(do_sq_start = false)
 {
-	const NR_REQ = 10;
-	let ring;
+	let ring = chnet.CreateRing(1);
 	let sqe;
 	let cqe;
+	let ret;
 	let ch;
-	let i;
 
-	ring = chnet.create_ring(NR_REQ);
-	ch = [];
+	ch = chnet.CreateNet();
+	ch.SetURL("http://127.0.0.1:8000/index.php?action=hello");
+	ch.SetMethod("GET");
 
-	for (i = 0; i < NR_REQ; i++) {
-		ch[i] = chnet.create_net();
-		ch[i].set_url("http://127.0.0.1:8000/index.php?action=hello");
-	}
+	if (do_sq_start) {
+		sqe = ring.GetSQE();
+		sqe.PrepStart(ch);
+		assert(ring.Submit() == 1);
 
-	for (i = 0; i < NR_REQ; i++) {
-		sqe = ring.get_sqe();
-		assert(sqe);
-		sqe.prep_start(ch[i]);
-		sqe.set_user_data(ch[i]);
+		ret = ring.WaitCQE(1);
+		cqe = ring.GetCQEHead();
+		assert(ret == 1);
+		assert(cqe.res == 0);
+		ring.CQAdvance(1);
 	}
-	assert(ring.submit_sqe() == i);
-	ring.wait_cqe(i);
 
-	j = ring.for_each_cqe(function (cqe) {
-		assert(cqe.res == 0);
-	});
-	assert(j == i);
-	ring.cq_advance(j);
+	sqe = ring.GetSQE();
+	sqe.PrepRead(ch, 1024);
+	assert(ring.Submit() == 1);
 
-	for (i = 0; i < NR_REQ; i++) {
-		sqe = ring.get_sqe();
-		assert(sqe);
-		sqe.prep_read(ch[i], 1024);
-		sqe.set_user_data(ch[i]);
-	}
-	assert(ring.submit_sqe() == i);
-	ring.wait_cqe(i);
-	j = ring.for_each_cqe(function (cqe) {
-		let ch = cqe.user_data;
-		assert(cqe.res == 13);
-		assert(ch.read_buf() === "Hello World!\n");
-	});
-	assert(j == i);
-	ring.cq_advance(j);
+	ret = ring.WaitCQE(1);
+	cqe = ring.GetCQEHead();
+
+	assert(ret == 1);
+	assert(cqe.res == 13);
+	assert(ch.read_ret() == cqe.res);
+	assert("Hello World!\n" == ch.read_buf());
+	ring.CQAdvance(1);
+	ch.Close();
+	ring.Close();
 }
 
-class SimpleHttp {
-	constructor(url, method = "GET") {
-		this.ring = chnet.create_ring(1);
-		this.ch = chnet.create_net();
-		this.ch.set_url(url);
-		this.ch.set_method(method);
-		this.res = -1;
-	}
+function test_chnet_ring_partial_read(do_sq_start = false)
+{
+	let ring = chnet.CreateRing(1);
+	let sqe;
+	let cqe;
+	let ret;
+	let ch;
 
-	prep_read(read_size) {
-		let sqe = this.ring.get_sqe();
-		sqe.prep_read(this.ch, read_size);
-		sqe.set_user_data(this);
+	ch = chnet.CreateNet();
+	ch.SetURL("http://127.0.0.1:8000/index.php?action=hello");
+	ch.SetMethod("GET");
+
+	if (do_sq_start) {
+		sqe = ring.GetSQE();
+		sqe.PrepStart(ch);
+		assert(ring.Submit() == 1);
+
+		ret = ring.WaitCQE(1);
+		cqe = ring.GetCQEHead();
+		assert(ret == 1);
+		assert(cqe.res == 0);
+		ring.CQAdvance(1);
 	}
 
-	set_payload(payload) {
-		this.ch.set_payload(payload);
+	/*
+	 * The first read.
+	 */
+	sqe = ring.GetSQE();
+	sqe.PrepRead(ch, 4);
+	assert(ring.Submit() == 1);
+
+	ret = ring.WaitCQE(1);
+	cqe = ring.GetCQEHead();
+
+	assert(ret == 1);
+	assert(cqe.res == 4);
+	assert(ch.read_ret() == cqe.res);
+	assert("Hell" == ch.read_buf());
+	ring.CQAdvance(1);
+
+
+	/*
+	 * The second read.
+	 */
+	sqe = ring.GetSQE();
+	sqe.PrepRead(ch, 4);
+	assert(ring.Submit() == 1);
+
+	ret = ring.WaitCQE(1);
+	cqe = ring.GetCQEHead();
+
+	assert(ret == 1);
+	assert(cqe.res == 4);
+	assert(ch.read_ret() == cqe.res);
+	assert("o Wo" == ch.read_buf());
+	ring.CQAdvance(1);
+
+
+	/*
+	 * The third read.
+	 */
+	sqe = ring.GetSQE();
+	sqe.PrepRead(ch, 1024);
+	assert(ring.Submit() == 1);
+
+	ret = ring.WaitCQE(1);
+	cqe = ring.GetCQEHead();
+
+	assert(ret == 1);
+	assert(cqe.res == 5);
+	assert(ch.read_ret() == cqe.res);
+	assert("rld!\n" == ch.read_buf());
+	ring.CQAdvance(1);
+
+
+	/*
+	 * The 4-th read.
+	 */
+	sqe = ring.GetSQE();
+	sqe.PrepRead(ch, 4);
+	assert(ring.Submit() == 1);
+
+	ret = ring.WaitCQE(1);
+	cqe = ring.GetCQEHead();
+
+	assert(ret == 1);
+	assert(cqe.res == 0);
+	assert(ch.read_ret() == cqe.res);
+	assert(!ch.read_buf());
+	ring.CQAdvance(1);
+	ch.Close();
+	ring.Close();
+}
+
+const NR_REQ = 4096;
+function test_chnet_ring_read_parallel(do_sq_start = false)
+{
+	let ring = chnet.CreateRing(NR_REQ);
+	let sqe;
+	let cqe;
+	let i;
+	let ch_arr;
+
+	ch_arr = [];
+
+	for (i = 0; i < NR_REQ; i++) {
+		ch_arr[i] = chnet.CreateNet();
+		ch_arr[i].SetURL("http://127.0.0.1:8000/index.php?action=hello");
+		ch_arr[i].SetMethod("GET");
 	}
 
-	run() {
-		this.ring.submit_sqe();
-		this.ring.wait_cqe(1);
-		this.ring.for_each_cqe(function (cqe) {
-			let that = cqe.user_data;
-			that.res = cqe.res;
-			if (cqe.res < 0)
-				console.log("Error: "+that.ch.get_error());
+	if (do_sq_start) {
+		for (i = 0; i < NR_REQ; i++) {
+			sqe = ring.GetSQE();
+			assert(sqe);
+			sqe.PrepStart(ch_arr[i]);
+			sqe.SetUserData(ch_arr[i]);
+		}
+		assert(ring.Submit() == NR_REQ);
+		assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+		ring.ForEachCQE(function (cqe) {
+			let ch = cqe.user_data;
+
+			assert(ch);
+			assert(cqe.res == 0);
 		});
-		this.ring.cq_advance(1);
+		ring.CQAdvance(NR_REQ);
 	}
 
-	get_buffer() {
-		return this.ch.read_buf();
+
+	for (i = 0; i < NR_REQ; i++) {
+		sqe = ring.GetSQE();
+		assert(sqe);
+		sqe.PrepRead(ch_arr[i], 1024);
+		sqe.SetUserData(ch_arr[i]);
 	}
-}
+	assert(ring.Submit() == NR_REQ);
+	assert(ring.WaitCQE(NR_REQ) == NR_REQ);
 
-function test_simple_http()
-{
-	const payload = "Hello World! AAAAAAAAAAAAAAAAAAAAAAAAA\n";
-	let h = new SimpleHttp("http://127.0.0.1:8000/index.php?action=body", "POST");
-	h.set_payload(payload);
-	h.prep_read(1024);
-	h.run();
-	assert(h.get_buffer() === payload);
-	assert(h.ch.read_ret() === payload.length);
-}
+	ring.ForEachCQE(function (cqe) {
+		let ch = cqe.user_data;
 
-function test_simple_http_set_header()
-{
-	const ua = "This is just a test user agent!";
-	let h = new SimpleHttp("http://127.0.0.1:8000/index.php?action=user_agent", "GET");
-	h.ch.set_request_header("User-Agent", ua);
-	h.prep_read(1024);
-	h.run();
-	assert(h.get_buffer() === ua);
-	assert(h.ch.read_ret() === ua.length);
+		assert(ch);
+		assert(cqe.res == 13);
+		assert(ch.read_ret() == cqe.res);
+		assert("Hello World!\n" == ch.read_buf());
+	});
+	ring.CQAdvance(NR_REQ);
+
+	for (i = 0; i < NR_REQ; i++)
+		ch_arr[i].Close();
+
+	ring.Close();
 }
 
-function test_simple_http_post_set_header()
+function test_chnet_ring_partial_read_parallel(do_sq_start = false)
 {
-	const ss = "This is just a test string!";
-	let h = new SimpleHttp("http://127.0.0.1:8000/index.php?action=print_post&key=test", "POST");
-	h.ch.set_request_header("User-Agent", ss);
-	h.ch.set_request_header("Content-type", "application/x-www-form-urlencoded");
-	h.ch.set_payload("test="+encodeURIComponent(ss));
-	h.prep_read(1024);
-	h.run();
-	assert(h.get_buffer() === ss);
-	assert(h.ch.read_ret() === ss.length);
-}
+	let ring = chnet.CreateRing(NR_REQ);
+	let sqe;
+	let cqe;
+	let i;
+	let ch_arr;
 
-class CHNet {
-	constructor(cr, url, method) {
-		this.ch = chnet.create_net();
-		this.ch.set_url(url);
-		this.ch.set_method(method);
-		this.cr = cr;
-	}
+	ch_arr = [];
 
-	prep_read(read_size) {
-		let ring = this.cr.ring;
-		let sqe  = ring.get_sqe();
-		if (!sqe)
-			return false;
-		sqe.prep_read(this.ch, read_size);
-		sqe.set_user_data(this);
-		return true;
+	for (i = 0; i < NR_REQ; i++) {
+		ch_arr[i] = chnet.CreateNet();
+		ch_arr[i].SetURL("http://127.0.0.1:8000/index.php?action=hello");
+		ch_arr[i].SetMethod("GET");
 	}
 
-	set_user_data(data) {
-		this.udata = data;
-	}
+	if (do_sq_start) {
+		for (i = 0; i < NR_REQ; i++) {
+			sqe = ring.GetSQE();
+			assert(sqe);
+			sqe.PrepStart(ch_arr[i]);
+			sqe.SetUserData(ch_arr[i]);
+		}
+		assert(ring.Submit() == NR_REQ);
+		assert(ring.WaitCQE(NR_REQ) == NR_REQ);
 
-	get_user_data() {
-		return this.udata;
-	}
+		ring.ForEachCQE(function (cqe) {
+			let ch = cqe.user_data;
 
-	set_header(key, val, overwrite = true) {
-		this.ch.set_request_header(key, val, overwrite);
+			assert(ch);
+			assert(cqe.res == 0);
+		});
+		ring.CQAdvance(NR_REQ);
 	}
-};
 
-class ChromiumNet {
-	constructor() {
-		this.ring = chnet.create_ring(4096);
+
+	/*
+	 * The first read.
+	 */
+	for (i = 0; i < NR_REQ; i++) {
+		sqe = ring.GetSQE();
+		assert(sqe);
+		sqe.PrepRead(ch_arr[i], 4);
+		sqe.SetUserData(ch_arr[i]);
 	}
+	assert(ring.Submit() == NR_REQ);
+	assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+	ring.ForEachCQE(function (cqe) {
+		let ch = cqe.user_data;
+
+		assert(ch);
+		assert(cqe.res == 4);
+		assert(ch.read_ret() == cqe.res);
+		assert("Hell" == ch.read_buf());
+	});
+	ring.CQAdvance(NR_REQ);
+
 
-	create(url, method = "GET") {
-		return new CHNet(this, url, method);
+	/*
+	 * The second read.
+	 */
+	for (i = 0; i < NR_REQ; i++) {
+		sqe = ring.GetSQE();
+		assert(sqe);
+		sqe.PrepRead(ch_arr[i], 4);
+		sqe.SetUserData(ch_arr[i]);
 	}
+	assert(ring.Submit() == NR_REQ);
+	assert(ring.WaitCQE(NR_REQ) == NR_REQ);
 
-	for_each_cqe(cb = null) {
-		let ret = this.ring.for_each_cqe(function (cqe) {
+	ring.ForEachCQE(function (cqe) {
+		let ch = cqe.user_data;
 
-			if (cb)
-				cb(cqe);
+		assert(ch);
+		assert(cqe.res == 4);
+		assert(ch.read_ret() == cqe.res);
+		assert("o Wo" == ch.read_buf());
+	});
+	ring.CQAdvance(NR_REQ);
 
-			let ch    = cqe.user_data;
-			let ret   = cqe.res;
-			let udata = ch.udata;
 
-			if (ret < 0 && ch.onerror)
-				ch.onerror(ch, ret, udata);
-			else
-				ch.onreadcomplete(ch, ret, udata);
-		});
-		this.ring.cq_advance(ret);
-		return ret;
+	/*
+	 * The third read.
+	 */
+	for (i = 0; i < NR_REQ; i++) {
+		sqe = ring.GetSQE();
+		assert(sqe);
+		sqe.PrepRead(ch_arr[i], 1024);
+		sqe.SetUserData(ch_arr[i]);
 	}
-};
+	assert(ring.Submit() == NR_REQ);
+	assert(ring.WaitCQE(NR_REQ) == NR_REQ);
 
-function test_classes()
-{
-	const UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36";
-	const NR_REQ = 10;
+	ring.ForEachCQE(function (cqe) {
+		let ch = cqe.user_data;
 
-	let cr = new ChromiumNet;
-	let ch_arr = [];
-	let i, j, ch;
+		assert(ch);
+		assert(cqe.res == 5);
+		assert(ch.read_ret() == cqe.res);
+		assert("rld!\n" == ch.read_buf());
+	});
+	ring.CQAdvance(NR_REQ);
 
+
+	/*
+	 * The 4-th read.
+	 */
 	for (i = 0; i < NR_REQ; i++) {
-		ch = cr.create("http://127.0.0.1:8000/index.php?action=hello");
-		ch.set_header("User-Agent", UA);
-		ch.prep_read(4096);
-		ch.set_user_data(i);
-		ch.onerror = function (cr, ret, udata) {
-			console.log(`Req: ${udata} returned error: ${cr.ch.get_error()}`);
-		};
-		ch.onreadcomplete = function (cr, ret, udata) {
-			console.log(`Req: ${udata} returned ${ret} bytes: ${cr.ch.read_buf()}`);
-		};
-		ch_arr[i] = ch;
+		sqe = ring.GetSQE();
+		assert(sqe);
+		sqe.PrepRead(ch_arr[i], 1024);
+		sqe.SetUserData(ch_arr[i]);
 	}
+	assert(ring.Submit() == NR_REQ);
+	assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+	ring.ForEachCQE(function (cqe) {
+		let ch = cqe.user_data;
 
-	cr.ring.submit_sqe();
-	cr.ring.wait_cqe(1);
-	cr.for_each_cqe();
+		assert(ch);
+		assert(cqe.res == 0);
+		assert(ch.read_ret() == cqe.res);
+	});
+	ring.CQAdvance(NR_REQ);
+
+	for (i = 0; i < NR_REQ; i++)
+		ch_arr[i].Close();
+
+	ring.Close();
 }
 
 function main()
 {
+	let i;
+
 	test_nop();
-	test_chnet_ring_multiple();
-	test_simple_http();
-	test_simple_http_set_header();
-	test_simple_http_post_set_header();
-	test_classes();
+	for (i = 0; i < 2; i++) {
+		test_chnet_ring_read(!!i);
+		test_chnet_ring_partial_read(!!i);
+		test_chnet_ring_read_parallel(!!i);
+		test_chnet_ring_partial_read_parallel(!!i);
+	}
 }
 
 main();
-- 
Ammar Faizi


  parent reply	other threads:[~2022-08-21 11:25 UTC|newest]

Thread overview: 28+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 01/22] chnet: Add initial request body support Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 02/22] chnet: node: Add set_user_data support on SQE Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 03/22] tests/js/ring: Update the unit test to utilize set_user_data Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 04/22] binding.gyp: Add `-ggdb3` flag for better debugging experience Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 05/22] binding.gyp: Add `-Wno-enum-constexpr-conversion` flag Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 06/22] chnet: node: Add set_method function to set HTTP method Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 07/22] chnet: node: Add get_error function to return the error string Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 08/22] chnet: node: Add set_payload function to set HTTP req body Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 09/22] tests/js/ring: Add simple HTTP POST request example in NodeJS Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 10/22] chnet: Split construct URL req creation into a new function Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 11/22] chnet: Add set request header support Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 12/22] chnet: node: Fix unused variable warning Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 13/22] chnet: node: Add set request header function in NodeJS Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 14/22] tests/js/ring: Add more set header function test Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 15/22] chnet: node: Don't use static counter for data ID Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 16/22] tests/js/ring: Add JavaScript class wrapper example Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 17/22] chnet: Initial chunked request body support Ammar Faizi
2022-08-21 22:13   ` Alviro Iskandar Setiawan
2022-08-22  3:08     ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 18/22] chnet: Rework the chunked request body interface Ammar Faizi
2022-08-21 22:20   ` Alviro Iskandar Setiawan
2022-08-21 11:24 ` Ammar Faizi [this message]
2022-08-21 11:24 ` [PATCH v1 20/22] chnet: Use busy-waiting for signal waiter Ammar Faizi
2022-08-21 22:29   ` Alviro Iskandar Setiawan
2022-08-21 11:24 ` [PATCH v1 21/22] chnet: ring: Bump max_entry to 2G Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 22/22] tests/cpp: Delete basic.cpp as it's no longer relevant Ammar Faizi
2022-08-21 22:21   ` Alviro Iskandar Setiawan

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    [email protected] \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox