* [PATCH v1 01/22] chnet: Add initial request body support
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 02/22] chnet: node: Add set_user_data support on SQE Ammar Faizi
` (20 subsequent siblings)
21 siblings, 0 replies; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
This adds initial request body support, we can now perform HTTP method
other than GET with non-chunked buffer for now. By default if we don't
specify the HTTP method, it will perform an HTTP GET.
TODO(ammarfaizi2): Add chunked request body support.
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/chnet.cc | 55 +++++++++++++++++++++++++++++++++++++++++++++++
chnet/chnet.h | 29 +++++++++++++++++++++++++
tests/cpp/ring.cc | 44 +++++++++++++++++++++++++++++++++++++
tests/index.php | 3 +++
4 files changed, 131 insertions(+)
diff --git a/chnet/chnet.cc b/chnet/chnet.cc
index 9d19882..93abe84 100644
--- a/chnet/chnet.cc
+++ b/chnet/chnet.cc
@@ -13,6 +13,43 @@ using namespace std::chrono_literals;
namespace net {
+CHNetDataStream::CHNetDataStream(CHNetDelegate *ch, const char *payload,
+ size_t payload_len):
+ UploadDataStream(false, 0),
+ ch_(ch),
+ payload_(payload),
+ payload_len_(payload_len)
+{
+ (void)ch_;
+}
+
+int CHNetDataStream::InitInternal(const net::NetLogWithSource& net_log)
+{
+ SetSize(payload_len_);
+ return net::OK;
+}
+
+int CHNetDataStream::ReadInternal(net::IOBuffer *buf, int buf_len)
+{
+ int remaining_size;
+ int pos;
+ int ret;
+
+ pos = (int)position();
+ remaining_size = size() - pos;
+ if (remaining_size < buf_len)
+ ret = remaining_size;
+ else
+ ret = buf_len;
+
+ memcpy(buf->data(), &payload_[pos], ret);
+ return ret;
+}
+
+void CHNetDataStream::ResetInternal()
+{
+}
+
CHNCallback::CHNCallback(void):
response_started_data_(nullptr),
response_started_(nullptr),
@@ -44,6 +81,8 @@ CHNetDelegate::CHNetDelegate(void):
base::Thread::Options options(base::MessagePumpType::IO, 0);
CHECK(thread_.StartWithOptions(std::move(options)));
read_ret_.store(0, std::memory_order_relaxed);
+ payload_ = nullptr;
+ method_ = "GET";
}
static void CHNetDelegateDestruct(std::unique_ptr<URLRequest> *url_req,
@@ -89,7 +128,13 @@ void CHNetDelegate::_Start(Waiter *sig)
url_req_ctx_ = url_req_ctx_b.Build();
url_req_ = url_req_ctx_->CreateRequest(url_, DEFAULT_PRIORITY, this,
traffic_annotation, false);
+ url_req_->set_method(method_);
sig->Signal();
+
+ if (payload_)
+ url_req_->set_upload(std::make_unique<CHNetDataStream>(
+ this, payload_, payload_len_));
+
url_req_->Start();
}
@@ -207,6 +252,16 @@ const char *CHNet::GetErrorStr(void)
return ch_->err_.c_str();
}
+void CHNet::SetMethod(const char *method)
+{
+ ch_->SetMethod(method);
+}
+
+void CHNet::SetPayload(const char *buf, size_t len)
+{
+ ch_->SetPayload(buf, len);
+}
+
int CHNet::read_ret(void)
{
return ch_->read_ret();
diff --git a/chnet/chnet.h b/chnet/chnet.h
index 15ba8cd..b09d99d 100644
--- a/chnet/chnet.h
+++ b/chnet/chnet.h
@@ -53,6 +53,23 @@
*/
namespace net {
+class CHNetDelegate;
+
+class CHNetDataStream: public UploadDataStream {
+public:
+ CHNetDataStream(CHNetDelegate *ch, const char *payload,
+ size_t payload_len);
+
+private:
+ CHNetDelegate *ch_;
+ const char *payload_;
+ size_t payload_len_;
+
+ int InitInternal(const net::NetLogWithSource& net_log) override;
+ int ReadInternal(net::IOBuffer *buf, int buf_len) override;
+ void ResetInternal() override;
+};
+
struct CHNCallback {
CHNCallback(void);
~CHNCallback(void);
@@ -81,6 +98,12 @@ public:
inline int read_ret(void) { return read_ret_.load(); }
inline const char *read_buf(void) { return read_buf_->data(); }
inline bool started(void) { return started_; }
+ inline void SetMethod(const char *method) { method_ = method; }
+ inline void SetPayload(const char *buf, size_t len)
+ {
+ payload_ = buf;
+ payload_len_ = len;
+ }
struct CHNCallback cb;
@@ -92,8 +115,12 @@ private:
std::unique_ptr<URLRequest> url_req_;
std::atomic<int> read_ret_;
base::Thread thread_;
+ const char *method_;
GURL url_;
+ const char *payload_;
+ size_t payload_len_;
+
bool url_is_set_ = false;
bool started_ = false;
@@ -119,6 +146,8 @@ public:
int Start(void);
int Read(int size);
const char *GetErrorStr(void);
+ void SetMethod(const char *method);
+ void SetPayload(const char *buf, size_t len);
int read_ret(void);
const char *read_buf(void);
diff --git a/tests/cpp/ring.cc b/tests/cpp/ring.cc
index d1d79fd..0d2d391 100644
--- a/tests/cpp/ring.cc
+++ b/tests/cpp/ring.cc
@@ -87,6 +87,49 @@ static void test_chnet_ring_single(void)
delete ch;
}
+static void test_chnet_ring_single_post(void)
+{
+ constexpr static const char buf[] = "AAAA Hello World!\n";
+ CNRingCtx ring(4096);
+ CNRingSQE *sqe;
+ CNRingCQE *cqe;
+ CHNet *ch;
+
+ ch = new CHNet;
+ ch->SetURL("http://127.0.0.1:8000/index.php?action=body");
+ ch->SetMethod("POST");
+ ch->SetPayload(buf, sizeof(buf));
+
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe->PrepStart(ch);
+ sqe->SetUserData(9999);
+ assert(ring.SubmitSQE() == 1);
+
+ ring.WaitCQE(1);
+ cqe = ring.HeadCQE();
+ assert(cqe->op == CNRING_OP_START);
+ assert(cqe->res == 0);
+ assert(cqe->user_data == 9999);
+ ring.CQAdvance(1);
+
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe->PrepRead(ch, 1024);
+ sqe->SetUserData(9999);
+ assert(ring.SubmitSQE() == 1);
+
+ ring.WaitCQE(1);
+ cqe = ring.HeadCQE();
+ assert(cqe->op == CNRING_OP_READ);
+ assert(cqe->res == sizeof(buf));
+ assert(cqe->user_data == 9999);
+ assert(!strncmp(ch->read_buf(), buf, sizeof(buf)));
+ ring.CQAdvance(1);
+
+ delete ch;
+}
+
#define NR_REQ 1024
static void _test_chnet_ring_multiple(bool start_before_read, CNRingCtx *ring)
@@ -167,6 +210,7 @@ int main(void)
test_nop();
chnet_global_init();
test_chnet_ring_single();
+ test_chnet_ring_single_post();
test_chnet_ring_multiple(false);
test_chnet_ring_multiple(true);
chnet_global_stop();
diff --git a/tests/index.php b/tests/index.php
index 12dec5c..afa3f3a 100644
--- a/tests/index.php
+++ b/tests/index.php
@@ -14,4 +14,7 @@ switch ($_GET["action"]) {
case "hello":
echo "Hello World!\n";
break;
+case "body":
+ echo file_get_contents("php://input");
+ break;
}
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 02/22] chnet: node: Add set_user_data support on SQE
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 01/22] chnet: Add initial request body support Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 03/22] tests/js/ring: Update the unit test to utilize set_user_data Ammar Faizi
` (19 subsequent siblings)
21 siblings, 0 replies; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
Make it possible to track the corresponding SQE when we are iterating
through the CQE.
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/chnet_node.cc | 193 ++++++++++++++++++++++++++++++--------------
chnet/chnet_node.h | 6 --
2 files changed, 131 insertions(+), 68 deletions(-)
diff --git a/chnet/chnet_node.cc b/chnet/chnet_node.cc
index 06f624e..7d0c6bc 100644
--- a/chnet/chnet_node.cc
+++ b/chnet/chnet_node.cc
@@ -1,16 +1,38 @@
#include "chnet_node.h"
-#ifdef DEBUG_MODE
-#define PR_DEBUG(...) \
-do { \
- printf("Debug: "); \
- printf(__VA_ARGS__); \
- putchar('\n'); \
-} while (0);
-#else
-#define PR_DEBUG(...) do { } while (0)
-#endif
+class NodeCHNetRing {
+public:
+ inline NodeCHNetRing(uint32_t entry):
+ ring_(entry)
+ {
+ }
+
+ CNRingCtx ring_;
+ Napi::ObjectReference ref_;
+};
+
+class NodeCNRingSQE {
+public:
+ inline NodeCNRingSQE(NodeCHNetRing *ring, CNRingSQE *sqe)
+ {
+ sqe_ = sqe;
+ ring_ = ring;
+ }
+
+ CNRingSQE *sqe_;
+ NodeCHNetRing *ring_;
+};
+
+struct NodeSQData {
+ uint32_t id_;
+ NodeCHNetRing *ring_;
+};
+
+class NodeCHNet {
+public:
+ CHNet ch_;
+};
static inline void throw_js_exception(Napi::Env &env, const char *err_msg)
{
@@ -24,18 +46,14 @@ static inline void obj_add_func(Napi::Env &env, Napi::Object &obj,
obj[name] = Napi::Function::New(env, func, name, data);
}
-struct sqe_udata {
- void *ptr;
-};
-
static void CHN_SQEPrepNop(const Napi::CallbackInfo &info)
{
- CNRingSQE *sqe = (CNRingSQE *)info.Data();
+ NodeCNRingSQE *nsqe = (NodeCNRingSQE *)info.Data();
- sqe->PrepNop();
+ nsqe->sqe_->PrepNop();
}
-static CHNetNode *CHN_PrepGetCHObj(Napi::Env &env, const Napi::Value &arg)
+static NodeCHNet *CHN_PrepGetCHObj(Napi::Env &env, const Napi::Value &arg)
{
Napi::Value ch_in_obj;
Napi::Object obj;
@@ -52,15 +70,15 @@ static CHNetNode *CHN_PrepGetCHObj(Napi::Env &env, const Napi::Value &arg)
return nullptr;
}
- return (CHNetNode *)(uintptr_t)
+ return (NodeCHNet *)(uintptr_t)
ch_in_obj.As<Napi::Number>().Int64Value();
}
static void CHN_SQEPrepStart(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
- CNRingSQE *sqe;
- CHNetNode *ch;
+ NodeCNRingSQE *nsqe;
+ NodeCHNet *ch;
CHNet *chn;
if (unlikely(info.Length() != 1 || !info[0].IsObject())) {
@@ -73,16 +91,16 @@ static void CHN_SQEPrepStart(const Napi::CallbackInfo &info)
return;
chn = &ch->ch_;
- sqe = (CNRingSQE *)info.Data();
- sqe->PrepStart(chn);
+ nsqe = (NodeCNRingSQE *)info.Data();
+ nsqe->sqe_->PrepStart(chn);
}
static void CHN_SQEPrepRead(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
int32_t read_size;
- CNRingSQE *sqe;
- CHNetNode *ch;
+ NodeCNRingSQE *nsqe;
+ NodeCHNet *ch;
CHNet *chn;
if (unlikely(info.Length() != 2 || !info[0].IsObject() ||
@@ -97,69 +115,116 @@ static void CHN_SQEPrepRead(const Napi::CallbackInfo &info)
read_size = info[1].As<Napi::Number>().Int32Value();
chn = &ch->ch_;
- sqe = (CNRingSQE *)info.Data();
- sqe->PrepRead(chn, read_size);
+ nsqe = (NodeCNRingSQE *)info.Data();
+ nsqe->sqe_->PrepRead(chn, read_size);
}
-static void CHN_SQESetUserData(const Napi::CallbackInfo &info)
+static Napi::Value CHN_SQESetUserData(const Napi::CallbackInfo &info)
{
- (void)info;
+ static std::atomic<uint32_t> count;
+ Napi::Env env = info.Env();
+ struct NodeSQData *data;
+ NodeCNRingSQE *nsqe;
+
+ if (unlikely(info.Length() != 1)) {
+ throw_js_exception(env, "sqe.set_user_data must be given exactly one argument");
+ return env.Null();
+ }
+
+ nsqe = (NodeCNRingSQE *)info.Data();
+ data = new struct NodeSQData;
+ data->ring_ = nsqe->ring_;
+ data->id_ = count++ & nsqe->ring_->ring_.cq_mask_;
+ nsqe->ring_->ref_.Get("__udata")
+ .As<Napi::Object>()[data->id_] = info[0];
+ nsqe->sqe_->SetUserDataPtr(data);
+ return info[0];
}
-static Napi::Object BuildSQEObject(Napi::Env &env, CNRingSQE *sqe)
+static void CHN_SQEDestruct(Napi::Env env, NodeCNRingSQE *sqe)
+{
+ delete sqe;
+ (void)env;
+}
+
+static Napi::Object BuildSQEObject(Napi::Env &env, NodeCHNetRing *ring,
+ CNRingSQE *sqe)
{
Napi::Object obj = Napi::Object::New(env);
+ NodeCNRingSQE *nsqe;
- obj_add_func(env, obj, sqe, CHN_SQEPrepNop, "prep_nop");
- obj_add_func(env, obj, sqe, CHN_SQEPrepStart, "prep_start");
- obj_add_func(env, obj, sqe, CHN_SQEPrepRead, "prep_read");
- obj_add_func(env, obj, sqe, CHN_SQESetUserData, "set_user_data");
sqe->SetUserData(0);
+ nsqe = new NodeCNRingSQE(ring, sqe);
+ obj_add_func(env, obj, nsqe, CHN_SQEPrepNop, "prep_nop");
+ obj_add_func(env, obj, nsqe, CHN_SQEPrepStart, "prep_start");
+ obj_add_func(env, obj, nsqe, CHN_SQEPrepRead, "prep_read");
+ obj_add_func(env, obj, nsqe, CHN_SQESetUserData, "set_user_data");
+ obj.AddFinalizer(CHN_SQEDestruct, nsqe);
return obj;
}
static Napi::Value CHN_RingGetSQE(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
- CNRingCtx *ring;
+ NodeCHNetRing *ring;
CNRingSQE *sqe;
- ring = (CNRingCtx *)info.Data();
- sqe = ring->GetSQE();
+ ring = (NodeCHNetRing *)info.Data();
+ sqe = ring->ring_.GetSQE();
if (unlikely(!sqe))
return env.Null();
- return BuildSQEObject(env, sqe);
+ return BuildSQEObject(env, ring, sqe);
}
static Napi::Value CHN_RingSubmitSQE(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
- CNRingCtx *ring;
+ NodeCHNetRing *ring;
- ring = (CNRingCtx *)info.Data();
- return Napi::Number::New(env, ring->SubmitSQE());
+ ring = (NodeCHNetRing *)info.Data();
+ return Napi::Number::New(env, ring->ring_.SubmitSQE());
}
static void CHN_RingWaitCQE(const Napi::CallbackInfo &info)
{
uint32_t to_wait;
- CNRingCtx *ring;
+ NodeCHNetRing *ring;
if (unlikely(info.Length() < 1 || !info[0].IsNumber()))
to_wait = 1;
else
to_wait = info[0].ToNumber().Uint32Value();
- ring = (CNRingCtx *)info.Data();
- ring->WaitCQE(to_wait);
+ ring = (NodeCHNetRing *)info.Data();
+ ring->ring_.WaitCQE(to_wait);
+}
+
+static void CHN_CQEDestruct(Napi::Env env, NodeSQData *data)
+{
+ /*
+ * TODO(ammarfaizi2): Clean up user_data from the ring
+ * when it's no longer used.
+ */
+
+ // data->ring_->ref_.Get("__udata").As<Napi::Object>()
+ // .Delete(data->id_);
+ delete data;
+ (void)env;
}
static Napi::Object BuildCQEObject(Napi::Env &env, CNRingCQE *cqe)
{
Napi::Object obj = Napi::Object::New(env);
+ NodeSQData *data = (NodeSQData *) cqe->GetUserDataPtr();
obj["res"] = Napi::Number::New(env, cqe->res);
+
+ if (data) {
+ obj["user_data"] = data->ring_->ref_.Get("__udata")
+ .As<Napi::Object>().Get(data->id_);
+ obj.AddFinalizer(CHN_CQEDestruct, data);
+ }
return obj;
}
@@ -171,7 +236,7 @@ static Napi::Value CHN_RingForEachCQE(const Napi::CallbackInfo &info)
Napi::Env env = info.Env();
Napi::Function callback;
uint32_t head, i;
- CNRingCtx *ring;
+ NodeCHNetRing *ring;
CNRingCQE *cqe;
if (unlikely(info.Length() != 1 || !info[0].IsFunction())) {
@@ -182,8 +247,8 @@ static Napi::Value CHN_RingForEachCQE(const Napi::CallbackInfo &info)
callback = info[0].As<Napi::Function>();
i = 0;
- ring = (CNRingCtx *)info.Data();
- chnring_for_each_cqe(ring, head, cqe) {
+ ring = (NodeCHNetRing *)info.Data();
+ chnring_for_each_cqe(&ring->ring_, head, cqe) {
Napi::Object cqe_obj = BuildCQEObject(env, cqe);
callback.Call({cqe_obj});
@@ -199,33 +264,35 @@ static void CHN_RingCQAdvance(const Napi::CallbackInfo &info)
"ring.cq_advance must be given exactly an integer argument";
Napi::Env env = info.Env();
- CNRingCtx *ring;
+ NodeCHNetRing *ring;
if (unlikely(info.Length() < 1 || !info[0].IsNumber())) {
throw_js_exception(env, err_msg);
return;
}
- ring = (CNRingCtx *)info.Data();
- ring->CQAdvance(info[0].ToNumber().Uint32Value());
+ ring = (NodeCHNetRing *)info.Data();
+ ring->ring_.CQAdvance(info[0].ToNumber().Uint32Value());
}
-static void CHN_RingDestruct(Napi::Env env, CNRingCtx *ring)
+static void CHN_RingDestruct(Napi::Env env, NodeCHNetRing *ring)
{
delete ring;
(void)env;
}
-static inline Napi::Value _CHN_CreateRing(Napi::Env &env, CNRingCtx *ring)
+static inline Napi::Value _CHN_CreateRing(Napi::Env &env, NodeCHNetRing *ring)
{
Napi::Object obj = Napi::Object::New(env);
+ obj["__udata"] = Napi::Object::New(env);
obj_add_func(env, obj, ring, CHN_RingGetSQE, "get_sqe");
obj_add_func(env, obj, ring, CHN_RingSubmitSQE, "submit_sqe");
obj_add_func(env, obj, ring, CHN_RingWaitCQE, "wait_cqe");
obj_add_func(env, obj, ring, CHN_RingForEachCQE, "for_each_cqe");
obj_add_func(env, obj, ring, CHN_RingCQAdvance, "cq_advance");
obj.AddFinalizer(CHN_RingDestruct, ring);
+ ring->ref_ = Napi::Persistent(obj);
return obj;
}
@@ -235,7 +302,7 @@ static Napi::Value CHN_CreateRing(const Napi::CallbackInfo &info)
"chnet.create_ring must be given exactly a positive integer argument";
Napi::Env env = info.Env();
- CNRingCtx *ring;
+ NodeCHNetRing *ring;
uint32_t entry;
if (unlikely(info.Length() < 1 || !info[0].IsNumber())) {
@@ -244,7 +311,7 @@ static Napi::Value CHN_CreateRing(const Napi::CallbackInfo &info)
}
entry = info[0].ToNumber().Uint32Value();
- ring = new CNRingCtx(entry);
+ ring = new NodeCHNetRing(entry);
return _CHN_CreateRing(env, ring);
}
@@ -254,14 +321,14 @@ static void CHN_NetSetURL(const Napi::CallbackInfo &info)
"chnet.set_url must be given exactly 1 string argument";
Napi::Env env = info.Env();
- CHNetNode *ch;
+ NodeCHNet *ch;
if (unlikely(info.Length() != 1 || !info[0].IsString())) {
throw_js_exception(env, err_msg);
return;
}
- ch = (CHNetNode *)info.Data();
+ ch = (NodeCHNet *)info.Data();
const std::string &url = info[0].ToString().Utf8Value();
ch->ch_.SetURL(url.c_str());
}
@@ -269,10 +336,10 @@ static void CHN_NetSetURL(const Napi::CallbackInfo &info)
static Napi::Value CHN_NetReadBuf(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
- CHNetNode *ch;
+ NodeCHNet *ch;
int ret;
- ch = (CHNetNode *)info.Data();
+ ch = (NodeCHNet *)info.Data();
ret = ch->ch_.read_ret();
if (ret > 0)
return Napi::String::New(env, ch->ch_.read_buf(), (size_t)ret);
@@ -283,13 +350,13 @@ static Napi::Value CHN_NetReadBuf(const Napi::CallbackInfo &info)
static Napi::Number CHN_NetReadRet(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
- CHNetNode *ch;
+ NodeCHNet *ch;
- ch = (CHNetNode *)info.Data();
+ ch = (NodeCHNet *)info.Data();
return Napi::Number::New(env, ch->ch_.read_ret());
}
-static void CHN_NetDestruct(Napi::Env env, CHNetNode *ch)
+static void CHN_NetDestruct(Napi::Env env, NodeCHNet *ch)
{
delete ch;
(void)env;
@@ -299,16 +366,18 @@ static Napi::Object CHN_CreateNet(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
Napi::Object obj = Napi::Object::New(env);
- CHNetNode *ch = new CHNetNode;
+ NodeCHNet *ch = new NodeCHNet;
int64_t ch_ptr;
obj_add_func(env, obj, ch, CHN_NetSetURL, "set_url");
obj_add_func(env, obj, ch, CHN_NetReadRet, "read_ret");
obj_add_func(env, obj, ch, CHN_NetReadBuf, "read_buf");
- ch_ptr = (int64_t)ch;
+ ch_ptr = (int64_t) (intptr_t) ch;
obj["__ch"] = Napi::Number::New(env, ch_ptr);
obj.AddFinalizer(CHN_NetDestruct, ch);
+ obj.Seal();
+ obj.Freeze();
return obj;
}
diff --git a/chnet/chnet_node.h b/chnet/chnet_node.h
index 336a005..ea0ccb3 100644
--- a/chnet/chnet_node.h
+++ b/chnet/chnet_node.h
@@ -6,10 +6,4 @@
#include "chnet_ring.h"
#include "../node-addon-api/napi.h"
-class CHNetNode {
-public:
- CHNetNode(void) = default;
- CHNet ch_;
-};
-
#endif /* #ifndef CHNET_NODE_H */
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 03/22] tests/js/ring: Update the unit test to utilize set_user_data
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 01/22] chnet: Add initial request body support Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 02/22] chnet: node: Add set_user_data support on SQE Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 04/22] binding.gyp: Add `-ggdb3` flag for better debugging experience Ammar Faizi
` (18 subsequent siblings)
21 siblings, 0 replies; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
A previous commit adds set_user_data support on the SQE. Test this new
functionality.
Signed-off-by: Ammar Faizi <[email protected]>
---
tests/js/ring.js | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/tests/js/ring.js b/tests/js/ring.js
index 94ebb2c..de4baaf 100644
--- a/tests/js/ring.js
+++ b/tests/js/ring.js
@@ -11,7 +11,7 @@ function assert(cond)
function test_nop()
{
- const entry = 4096;
+ const entry = 16;
let ring;
let cqe;
let sqe;
@@ -65,6 +65,7 @@ function test_chnet_ring_multiple()
sqe = ring.get_sqe();
assert(sqe);
sqe.prep_start(ch[i]);
+ sqe.set_user_data(ch[i]);
}
assert(ring.submit_sqe() == i);
ring.wait_cqe(i);
@@ -79,15 +80,14 @@ function test_chnet_ring_multiple()
sqe = ring.get_sqe();
assert(sqe);
sqe.prep_read(ch[i], 1024);
+ sqe.set_user_data(ch[i]);
}
assert(ring.submit_sqe() == i);
ring.wait_cqe(i);
j = ring.for_each_cqe(function (cqe) {
- /*
- * TODO(ammarfaizi2): Add buffer string assertion.
- */
-
+ let ch = cqe.user_data;
assert(cqe.res == 13);
+ assert(ch.read_buf());
});
assert(j == i);
ring.cq_advance(j);
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 04/22] binding.gyp: Add `-ggdb3` flag for better debugging experience
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (2 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 03/22] tests/js/ring: Update the unit test to utilize set_user_data Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 05/22] binding.gyp: Add `-Wno-enum-constexpr-conversion` flag Ammar Faizi
` (17 subsequent siblings)
21 siblings, 0 replies; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
I was having a hard time debugging a crash in GDB because I couldn't
see the corresponding file and line number where the crash happens.
Add `-ggdb3` flag to add the debug symbols.
Signed-off-by: Ammar Faizi <[email protected]>
---
binding.gyp | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/binding.gyp b/binding.gyp
index 3c0f133..4cdceac 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -3,10 +3,10 @@
{
"target_name": "chnet",
"cflags": [
- "-fno-exceptions"
+ "-fno-exceptions -ggdb3"
],
"cflags_cc": [
- "-fno-exceptions"
+ "-fno-exceptions -ggdb3"
],
"sources": [
"chnet/chnet_node.cc"
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 05/22] binding.gyp: Add `-Wno-enum-constexpr-conversion` flag
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (3 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 04/22] binding.gyp: Add `-ggdb3` flag for better debugging experience Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 06/22] chnet: node: Add set_method function to set HTTP method Ammar Faizi
` (16 subsequent siblings)
21 siblings, 0 replies; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
Building with clang-16, yields the following error:
napi.h:1173:7: error: integer value -1 is outside the valid range of \
values [0, 15] for this enumeration type [-Wenum-constexpr-conversion]
static_cast<napi_typedarray_type>(-1);
^
1 error generated.
Just ignore `-Wenum-constexpr-conversion` for now. I have reported this
to upstream, see the link below.
Link: https://github.com/nodejs/node-addon-api/issues/1198
Signed-off-by: Ammar Faizi <[email protected]>
---
binding.gyp | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/binding.gyp b/binding.gyp
index 4cdceac..1e4e8aa 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -3,10 +3,10 @@
{
"target_name": "chnet",
"cflags": [
- "-fno-exceptions -ggdb3"
+ "-fno-exceptions -ggdb3 -Wno-enum-constexpr-conversion"
],
"cflags_cc": [
- "-fno-exceptions -ggdb3"
+ "-fno-exceptions -ggdb3 -Wno-enum-constexpr-conversion"
],
"sources": [
"chnet/chnet_node.cc"
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 06/22] chnet: node: Add set_method function to set HTTP method
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (4 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 05/22] binding.gyp: Add `-Wno-enum-constexpr-conversion` flag Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 07/22] chnet: node: Add get_error function to return the error string Ammar Faizi
` (15 subsequent siblings)
21 siblings, 0 replies; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
By defautlt, the chromium HTTP request will perform an HTTP GET. Make
it possible to set the HTTP method from the NodeJS. Also, change the
internal string storage to std::string, because if we use a
`const char *`, it make the lifetime management harder with respoect
to the caller.
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/chnet.cc | 6 +++---
chnet/chnet.h | 4 ++--
chnet/chnet_node.cc | 19 +++++++++++++++++++
3 files changed, 24 insertions(+), 5 deletions(-)
diff --git a/chnet/chnet.cc b/chnet/chnet.cc
index 93abe84..058af89 100644
--- a/chnet/chnet.cc
+++ b/chnet/chnet.cc
@@ -76,13 +76,13 @@ net::DefineNetworkTrafficAnnotation("CHNetDelegate", R"(
})");
CHNetDelegate::CHNetDelegate(void):
- thread_("chromium_thread")
+ thread_("chromium_thread"),
+ method_("GET")
{
base::Thread::Options options(base::MessagePumpType::IO, 0);
CHECK(thread_.StartWithOptions(std::move(options)));
read_ret_.store(0, std::memory_order_relaxed);
payload_ = nullptr;
- method_ = "GET";
}
static void CHNetDelegateDestruct(std::unique_ptr<URLRequest> *url_req,
@@ -254,7 +254,7 @@ const char *CHNet::GetErrorStr(void)
void CHNet::SetMethod(const char *method)
{
- ch_->SetMethod(method);
+ ch_->SetMethod(std::string(method));
}
void CHNet::SetPayload(const char *buf, size_t len)
diff --git a/chnet/chnet.h b/chnet/chnet.h
index b09d99d..7f6359b 100644
--- a/chnet/chnet.h
+++ b/chnet/chnet.h
@@ -98,7 +98,7 @@ public:
inline int read_ret(void) { return read_ret_.load(); }
inline const char *read_buf(void) { return read_buf_->data(); }
inline bool started(void) { return started_; }
- inline void SetMethod(const char *method) { method_ = method; }
+ inline void SetMethod(std::string method) { method_ = method; }
inline void SetPayload(const char *buf, size_t len)
{
payload_ = buf;
@@ -115,7 +115,7 @@ private:
std::unique_ptr<URLRequest> url_req_;
std::atomic<int> read_ret_;
base::Thread thread_;
- const char *method_;
+ std::string method_;
GURL url_;
const char *payload_;
diff --git a/chnet/chnet_node.cc b/chnet/chnet_node.cc
index 7d0c6bc..fdac525 100644
--- a/chnet/chnet_node.cc
+++ b/chnet/chnet_node.cc
@@ -333,6 +333,24 @@ static void CHN_NetSetURL(const Napi::CallbackInfo &info)
ch->ch_.SetURL(url.c_str());
}
+static void CHN_NetSetMethod(const Napi::CallbackInfo &info)
+{
+ constexpr static const char err_msg[] =
+ "chnet.set_method must be given exactly 1 string argument";
+
+ Napi::Env env = info.Env();
+ NodeCHNet *ch;
+
+ if (unlikely(info.Length() != 1 || !info[0].IsString())) {
+ throw_js_exception(env, err_msg);
+ return;
+ }
+
+ ch = (NodeCHNet *)info.Data();
+ const std::string &url = info[0].ToString().Utf8Value();
+ ch->ch_.SetMethod(url.c_str());
+}
+
static Napi::Value CHN_NetReadBuf(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
@@ -370,6 +388,7 @@ static Napi::Object CHN_CreateNet(const Napi::CallbackInfo &info)
int64_t ch_ptr;
obj_add_func(env, obj, ch, CHN_NetSetURL, "set_url");
+ obj_add_func(env, obj, ch, CHN_NetSetMethod, "set_method");
obj_add_func(env, obj, ch, CHN_NetReadRet, "read_ret");
obj_add_func(env, obj, ch, CHN_NetReadBuf, "read_buf");
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 07/22] chnet: node: Add get_error function to return the error string
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (5 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 06/22] chnet: node: Add set_method function to set HTTP method Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 08/22] chnet: node: Add set_payload function to set HTTP req body Ammar Faizi
` (14 subsequent siblings)
21 siblings, 0 replies; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
This is very useful to get the reason of the error as a string.
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/chnet_node.cc | 15 +++++++++++++++
1 file changed, 15 insertions(+)
diff --git a/chnet/chnet_node.cc b/chnet/chnet_node.cc
index fdac525..4a60b0e 100644
--- a/chnet/chnet_node.cc
+++ b/chnet/chnet_node.cc
@@ -374,6 +374,20 @@ static Napi::Number CHN_NetReadRet(const Napi::CallbackInfo &info)
return Napi::Number::New(env, ch->ch_.read_ret());
}
+static Napi::Value CHN_NetGetError(const Napi::CallbackInfo &info)
+{
+ Napi::Env env = info.Env();
+ const char *err;
+ NodeCHNet *ch;
+
+ ch = (NodeCHNet *)info.Data();
+ err = ch->ch_.GetErrorStr();
+ if (err)
+ return Napi::String::New(env, err);
+
+ return env.Null();
+}
+
static void CHN_NetDestruct(Napi::Env env, NodeCHNet *ch)
{
delete ch;
@@ -391,6 +405,7 @@ static Napi::Object CHN_CreateNet(const Napi::CallbackInfo &info)
obj_add_func(env, obj, ch, CHN_NetSetMethod, "set_method");
obj_add_func(env, obj, ch, CHN_NetReadRet, "read_ret");
obj_add_func(env, obj, ch, CHN_NetReadBuf, "read_buf");
+ obj_add_func(env, obj, ch, CHN_NetGetError, "get_error");
ch_ptr = (int64_t) (intptr_t) ch;
obj["__ch"] = Napi::Number::New(env, ch_ptr);
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 08/22] chnet: node: Add set_payload function to set HTTP req body
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (6 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 07/22] chnet: node: Add get_error function to return the error string Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 09/22] tests/js/ring: Add simple HTTP POST request example in NodeJS Ammar Faizi
` (13 subsequent siblings)
21 siblings, 0 replies; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/chnet_node.cc | 25 +++++++++++++++++++++++--
1 file changed, 23 insertions(+), 2 deletions(-)
diff --git a/chnet/chnet_node.cc b/chnet/chnet_node.cc
index 4a60b0e..a301140 100644
--- a/chnet/chnet_node.cc
+++ b/chnet/chnet_node.cc
@@ -32,6 +32,7 @@ struct NodeSQData {
class NodeCHNet {
public:
CHNet ch_;
+ std::string payload_;
};
static inline void throw_js_exception(Napi::Env &env, const char *err_msg)
@@ -347,8 +348,8 @@ static void CHN_NetSetMethod(const Napi::CallbackInfo &info)
}
ch = (NodeCHNet *)info.Data();
- const std::string &url = info[0].ToString().Utf8Value();
- ch->ch_.SetMethod(url.c_str());
+ const std::string &method =info[0].ToString().Utf8Value();
+ ch->ch_.SetMethod(method.c_str());
}
static Napi::Value CHN_NetReadBuf(const Napi::CallbackInfo &info)
@@ -388,6 +389,25 @@ static Napi::Value CHN_NetGetError(const Napi::CallbackInfo &info)
return env.Null();
}
+static void CHN_NetSetPayload(const Napi::CallbackInfo &info)
+{
+ constexpr static const char err_msg[] =
+ "chnet.set_payload must be given exactly 1 string argument";
+
+ Napi::Env env = info.Env();
+ const char *err;
+ NodeCHNet *ch;
+
+ if (unlikely(info.Length() != 1 || !info[0].IsString())) {
+ throw_js_exception(env, err_msg);
+ return;
+ }
+
+ ch = (NodeCHNet *)info.Data();
+ ch->payload_ = info[0].ToString().Utf8Value();
+ ch->ch_.SetPayload(ch->payload_.c_str(), ch->payload_.size());
+}
+
static void CHN_NetDestruct(Napi::Env env, NodeCHNet *ch)
{
delete ch;
@@ -406,6 +426,7 @@ static Napi::Object CHN_CreateNet(const Napi::CallbackInfo &info)
obj_add_func(env, obj, ch, CHN_NetReadRet, "read_ret");
obj_add_func(env, obj, ch, CHN_NetReadBuf, "read_buf");
obj_add_func(env, obj, ch, CHN_NetGetError, "get_error");
+ obj_add_func(env, obj, ch, CHN_NetSetPayload, "set_payload");
ch_ptr = (int64_t) (intptr_t) ch;
obj["__ch"] = Napi::Number::New(env, ch_ptr);
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 09/22] tests/js/ring: Add simple HTTP POST request example in NodeJS
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (7 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 08/22] chnet: node: Add set_payload function to set HTTP req body Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 10/22] chnet: Split construct URL req creation into a new function Ammar Faizi
` (12 subsequent siblings)
21 siblings, 0 replies; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
Signed-off-by: Ammar Faizi <[email protected]>
---
tests/js/ring.js | 50 +++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 49 insertions(+), 1 deletion(-)
diff --git a/tests/js/ring.js b/tests/js/ring.js
index de4baaf..3a4d25d 100644
--- a/tests/js/ring.js
+++ b/tests/js/ring.js
@@ -87,16 +87,64 @@ function test_chnet_ring_multiple()
j = ring.for_each_cqe(function (cqe) {
let ch = cqe.user_data;
assert(cqe.res == 13);
- assert(ch.read_buf());
+ assert(ch.read_buf() === "Hello World!\n");
});
assert(j == i);
ring.cq_advance(j);
}
+class SimpleHttp {
+ constructor(url, method = "GET") {
+ this.ring = chnet.create_ring(1);
+ this.ch = chnet.create_net();
+ this.ch.set_url(url);
+ this.ch.set_method(method);
+ this.res = -1;
+ }
+
+ prep_read(read_size) {
+ let sqe = this.ring.get_sqe();
+ sqe.prep_read(this.ch, read_size);
+ sqe.set_user_data(this);
+ }
+
+ set_payload(payload) {
+ this.ch.set_payload(payload);
+ }
+
+ run() {
+ this.ring.submit_sqe();
+ this.ring.wait_cqe(1);
+ this.ring.for_each_cqe(function (cqe) {
+ let that = cqe.user_data;
+ that.res = cqe.res;
+ if (cqe.res < 0)
+ console.log("Error: "+that.ch.get_error());
+ });
+ this.ring.cq_advance(1);
+ }
+
+ get_buffer() {
+ return this.ch.read_buf();
+ }
+}
+
+function test_simple_http()
+{
+ const payload = "Hello World! AAAAAAAAAAAAAAAAAAAAAAAAA\n";
+ let h = new SimpleHttp("http://127.0.0.1:8000/index.php?action=body", "POST");
+ h.set_payload(payload);
+ h.prep_read(1024);
+ h.run();
+ assert(h.get_buffer() === payload);
+ assert(h.ch.read_ret() === payload.length);
+}
+
function main()
{
test_nop();
test_chnet_ring_multiple();
+ test_simple_http();
}
main();
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 10/22] chnet: Split construct URL req creation into a new function
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (8 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 09/22] tests/js/ring: Add simple HTTP POST request example in NodeJS Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 11/22] chnet: Add set request header support Ammar Faizi
` (11 subsequent siblings)
21 siblings, 0 replies; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
This is a preparation to add the set header function. We can efficiently
set a new HTTP header if we create the URL request first as we don't
have to have the copy of HTTP headers in a vector.
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/chnet.cc | 9 ++++++++-
chnet/chnet.h | 1 +
2 files changed, 9 insertions(+), 1 deletion(-)
diff --git a/chnet/chnet.cc b/chnet/chnet.cc
index 058af89..67d420b 100644
--- a/chnet/chnet.cc
+++ b/chnet/chnet.cc
@@ -111,7 +111,7 @@ CHNetDelegate::~CHNetDelegate(void)
sig.Wait();
}
-void CHNetDelegate::_Start(Waiter *sig)
+void CHNetDelegate::ConstructURLRequest(void)
{
net::URLRequestContextBuilder url_req_ctx_b;
@@ -128,6 +128,13 @@ void CHNetDelegate::_Start(Waiter *sig)
url_req_ctx_ = url_req_ctx_b.Build();
url_req_ = url_req_ctx_->CreateRequest(url_, DEFAULT_PRIORITY, this,
traffic_annotation, false);
+}
+
+void CHNetDelegate::_Start(Waiter *sig)
+{
+ if (!url_req_)
+ ConstructURLRequest();
+
url_req_->set_method(method_);
sig->Signal();
diff --git a/chnet/chnet.h b/chnet/chnet.h
index 7f6359b..5e2392c 100644
--- a/chnet/chnet.h
+++ b/chnet/chnet.h
@@ -109,6 +109,7 @@ public:
private:
void _Start(Waiter *sig);
+ void ConstructURLRequest(void);
std::unique_ptr<net::URLRequestContext> url_req_ctx_;
scoped_refptr<net::IOBufferWithSize> read_buf_;
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 11/22] chnet: Add set request header support
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (9 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 10/22] chnet: Split construct URL req creation into a new function Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 12/22] chnet: node: Fix unused variable warning Ammar Faizi
` (10 subsequent siblings)
21 siblings, 0 replies; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
Make it possible to set extra request header.
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/chnet.cc | 28 ++++++++++++++++++++++++--
chnet/chnet.h | 6 +++++-
tests/cpp/ring.cc | 51 +++++++++++++++++++++++++++++++++++++++++++----
tests/index.php | 3 +++
4 files changed, 81 insertions(+), 7 deletions(-)
diff --git a/chnet/chnet.cc b/chnet/chnet.cc
index 67d420b..915dfea 100644
--- a/chnet/chnet.cc
+++ b/chnet/chnet.cc
@@ -111,7 +111,7 @@ CHNetDelegate::~CHNetDelegate(void)
sig.Wait();
}
-void CHNetDelegate::ConstructURLRequest(void)
+void CHNetDelegate::ConstructURLRequest(Waiter *sig)
{
net::URLRequestContextBuilder url_req_ctx_b;
@@ -128,12 +128,14 @@ void CHNetDelegate::ConstructURLRequest(void)
url_req_ctx_ = url_req_ctx_b.Build();
url_req_ = url_req_ctx_->CreateRequest(url_, DEFAULT_PRIORITY, this,
traffic_annotation, false);
+ if (sig)
+ sig->Signal();
}
void CHNetDelegate::_Start(Waiter *sig)
{
if (!url_req_)
- ConstructURLRequest();
+ ConstructURLRequest(nullptr);
url_req_->set_method(method_);
sig->Signal();
@@ -151,6 +153,23 @@ void CHNetDelegate::SetURL(const char *url)
url_is_set_ = true;
}
+void CHNetDelegate::SetRequestHeader(const std::string &key,
+ const std::string &val,
+ bool overwrite)
+{
+ if (unlikely(!url_req_)) {
+ Waiter sig;
+ auto *r = thread_.task_runner().get();
+ r->PostTask(FROM_HERE,
+ base::BindOnce(&CHNetDelegate::ConstructURLRequest,
+ base::Unretained(this), &sig));
+ sig.Wait();
+ CHECK(url_req_);
+ }
+
+ url_req_->SetExtraRequestHeaderByName(key, val, overwrite);
+}
+
int CHNetDelegate::Start(void)
{
if (unlikely(!url_is_set_)) {
@@ -269,6 +288,11 @@ void CHNet::SetPayload(const char *buf, size_t len)
ch_->SetPayload(buf, len);
}
+void CHNet::SetRequestHeader(const char *key, const char *val, bool overwrite)
+{
+ ch_->SetRequestHeader(std::string(key), std::string(val), overwrite);
+}
+
int CHNet::read_ret(void)
{
return ch_->read_ret();
diff --git a/chnet/chnet.h b/chnet/chnet.h
index 5e2392c..8e995a1 100644
--- a/chnet/chnet.h
+++ b/chnet/chnet.h
@@ -92,6 +92,8 @@ public:
int Read(int size);
void _Read(Waiter *sig, int size);
void SetError(int err_code);
+ void SetRequestHeader(const std::string &key, const std::string &val,
+ bool overwrite = true);
void OnResponseStarted(URLRequest *url_req, int net_error) override;
void OnReadCompleted(URLRequest *url_req, int bytes_read) override;
@@ -109,7 +111,7 @@ public:
private:
void _Start(Waiter *sig);
- void ConstructURLRequest(void);
+ void ConstructURLRequest(Waiter *sig);
std::unique_ptr<net::URLRequestContext> url_req_ctx_;
scoped_refptr<net::IOBufferWithSize> read_buf_;
@@ -149,6 +151,8 @@ public:
const char *GetErrorStr(void);
void SetMethod(const char *method);
void SetPayload(const char *buf, size_t len);
+ void SetRequestHeader(const char *key, const char *val,
+ bool overwrite = true);
int read_ret(void);
const char *read_buf(void);
diff --git a/tests/cpp/ring.cc b/tests/cpp/ring.cc
index 0d2d391..8bedef5 100644
--- a/tests/cpp/ring.cc
+++ b/tests/cpp/ring.cc
@@ -87,6 +87,48 @@ static void test_chnet_ring_single(void)
delete ch;
}
+static void test_chnet_ring_set_header(void)
+{
+ constexpr static const char ua[] = "This is just a test user agent!!!";
+ CNRingCtx ring(4096);
+ CNRingSQE *sqe;
+ CNRingCQE *cqe;
+ CHNet *ch;
+
+ ch = new CHNet;
+ ch->SetURL("http://127.0.0.1:8000/index.php?action=user_agent");
+ ch->SetRequestHeader("User-agent", ua);
+
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe->PrepStart(ch);
+ sqe->SetUserData(9999);
+ assert(ring.SubmitSQE() == 1);
+
+ ring.WaitCQE(1);
+ cqe = ring.HeadCQE();
+ assert(cqe->op == CNRING_OP_START);
+ assert(cqe->res == 0);
+ assert(cqe->user_data == 9999);
+ ring.CQAdvance(1);
+
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe->PrepRead(ch, 1024);
+ sqe->SetUserData(9999);
+ assert(ring.SubmitSQE() == 1);
+
+ ring.WaitCQE(1);
+ cqe = ring.HeadCQE();
+ assert(cqe->op == CNRING_OP_READ);
+ assert(cqe->res == strlen(ua));
+ assert(cqe->user_data == 9999);
+ assert(!strncmp(ua, ch->read_buf(), strlen(ua)));
+ ring.CQAdvance(1);
+
+ delete ch;
+}
+
static void test_chnet_ring_single_post(void)
{
constexpr static const char buf[] = "AAAA Hello World!\n";
@@ -209,10 +251,11 @@ int main(void)
{
test_nop();
chnet_global_init();
- test_chnet_ring_single();
- test_chnet_ring_single_post();
- test_chnet_ring_multiple(false);
- test_chnet_ring_multiple(true);
+ // test_chnet_ring_single();
+ test_chnet_ring_set_header();
+ // test_chnet_ring_single_post();
+ // test_chnet_ring_multiple(false);
+ // test_chnet_ring_multiple(true);
chnet_global_stop();
return 0;
}
diff --git a/tests/index.php b/tests/index.php
index afa3f3a..55639c6 100644
--- a/tests/index.php
+++ b/tests/index.php
@@ -17,4 +17,7 @@ case "hello":
case "body":
echo file_get_contents("php://input");
break;
+case "user_agent":
+ echo $_SERVER["HTTP_USER_AGENT"] ?? "";
+ break;
}
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 12/22] chnet: node: Fix unused variable warning
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (10 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 11/22] chnet: Add set request header support Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 13/22] chnet: node: Add set request header function in NodeJS Ammar Faizi
` (9 subsequent siblings)
21 siblings, 0 replies; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
This variable is not used, remove it.
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/chnet_node.cc | 1 -
1 file changed, 1 deletion(-)
diff --git a/chnet/chnet_node.cc b/chnet/chnet_node.cc
index a301140..cfa919b 100644
--- a/chnet/chnet_node.cc
+++ b/chnet/chnet_node.cc
@@ -395,7 +395,6 @@ static void CHN_NetSetPayload(const Napi::CallbackInfo &info)
"chnet.set_payload must be given exactly 1 string argument";
Napi::Env env = info.Env();
- const char *err;
NodeCHNet *ch;
if (unlikely(info.Length() != 1 || !info[0].IsString())) {
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 13/22] chnet: node: Add set request header function in NodeJS
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (11 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 12/22] chnet: node: Fix unused variable warning Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 14/22] tests/js/ring: Add more set header function test Ammar Faizi
` (8 subsequent siblings)
21 siblings, 0 replies; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
This adds set request header functionality. Also, update the ring.js
example to perform an HTTP request that executes this new function.
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/chnet_node.cc | 41 +++++++++++++++++++++++++++++++++++++++++
tests/js/ring.js | 12 ++++++++++++
2 files changed, 53 insertions(+)
diff --git a/chnet/chnet_node.cc b/chnet/chnet_node.cc
index cfa919b..1269577 100644
--- a/chnet/chnet_node.cc
+++ b/chnet/chnet_node.cc
@@ -352,6 +352,46 @@ static void CHN_NetSetMethod(const Napi::CallbackInfo &info)
ch->ch_.SetMethod(method.c_str());
}
+static void CHN_NetSetRequestHeader(const Napi::CallbackInfo &info)
+{
+ constexpr static const char err_msg[] =
+ "chnet.set_request_header must be at least given 2 string arguments";
+ constexpr static const char err_msg2[] =
+ "chnet.set_request_header can only receive 3 arguments with the third argument being a boolean";
+
+ Napi::Env env = info.Env();
+ bool overwrite = true;
+ NodeCHNet *ch;
+ int nr_arg;
+
+
+ nr_arg = info.Length();
+ if (unlikely(nr_arg < 2 || !info[0].IsString() ||
+ !info[1].IsString())) {
+ throw_js_exception(env, err_msg);
+ return;
+ }
+
+ if (unlikely(nr_arg > 3)) {
+ throw_js_exception(env, err_msg2);
+ return;
+ }
+
+ if (nr_arg == 3) {
+ if (unlikely(!info[2].IsBoolean())) {
+ throw_js_exception(env, err_msg2);
+ return;
+ }
+ overwrite = info[2].As<Napi::Boolean>();
+ }
+
+ ch = (NodeCHNet *)info.Data();
+ const std::string &key = info[0].ToString().Utf8Value();
+ const std::string &val = info[1].ToString().Utf8Value();
+ ch->ch_.SetRequestHeader(key.c_str(), val.c_str(), overwrite);
+}
+
+
static Napi::Value CHN_NetReadBuf(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
@@ -422,6 +462,7 @@ static Napi::Object CHN_CreateNet(const Napi::CallbackInfo &info)
obj_add_func(env, obj, ch, CHN_NetSetURL, "set_url");
obj_add_func(env, obj, ch, CHN_NetSetMethod, "set_method");
+ obj_add_func(env, obj, ch, CHN_NetSetRequestHeader, "set_request_header");
obj_add_func(env, obj, ch, CHN_NetReadRet, "read_ret");
obj_add_func(env, obj, ch, CHN_NetReadBuf, "read_buf");
obj_add_func(env, obj, ch, CHN_NetGetError, "get_error");
diff --git a/tests/js/ring.js b/tests/js/ring.js
index 3a4d25d..4b081ae 100644
--- a/tests/js/ring.js
+++ b/tests/js/ring.js
@@ -140,11 +140,23 @@ function test_simple_http()
assert(h.ch.read_ret() === payload.length);
}
+function test_simple_http_set_header()
+{
+ const ua = "This is just a test user agent!";
+ let h = new SimpleHttp("http://127.0.0.1:8000/index.php?action=user_agent", "GET");
+ h.ch.set_request_header("User-Agent", ua);
+ h.prep_read(1024);
+ h.run();
+ assert(h.get_buffer() === ua);
+ assert(h.ch.read_ret() === ua.length);
+}
+
function main()
{
test_nop();
test_chnet_ring_multiple();
test_simple_http();
+ test_simple_http_set_header();
}
main();
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 14/22] tests/js/ring: Add more set header function test
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (12 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 13/22] chnet: node: Add set request header function in NodeJS Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 15/22] chnet: node: Don't use static counter for data ID Ammar Faizi
` (7 subsequent siblings)
21 siblings, 0 replies; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
Now, let's test that content-type header can be set properly.
Signed-off-by: Ammar Faizi <[email protected]>
---
tests/index.php | 8 ++++++++
tests/js/ring.js | 14 ++++++++++++++
2 files changed, 22 insertions(+)
diff --git a/tests/index.php b/tests/index.php
index 55639c6..4a56233 100644
--- a/tests/index.php
+++ b/tests/index.php
@@ -20,4 +20,12 @@ case "body":
case "user_agent":
echo $_SERVER["HTTP_USER_AGENT"] ?? "";
break;
+case "print_post":
+ if (!isset($_GET["key"]) || !is_string($_GET["key"])) {
+ echo "Missing key!";
+ break;
+ }
+
+ echo $_POST[$_GET["key"]] ?? "";
+ break;
}
diff --git a/tests/js/ring.js b/tests/js/ring.js
index 4b081ae..6752fe1 100644
--- a/tests/js/ring.js
+++ b/tests/js/ring.js
@@ -151,12 +151,26 @@ function test_simple_http_set_header()
assert(h.ch.read_ret() === ua.length);
}
+function test_simple_http_post_set_header()
+{
+ const ss = "This is just a test string!";
+ let h = new SimpleHttp("http://127.0.0.1:8000/index.php?action=print_post&key=test", "POST");
+ h.ch.set_request_header("User-Agent", ss);
+ h.ch.set_request_header("Content-type", "application/x-www-form-urlencoded");
+ h.ch.set_payload("test="+encodeURIComponent(ss));
+ h.prep_read(1024);
+ h.run();
+ assert(h.get_buffer() === ss);
+ assert(h.ch.read_ret() === ss.length);
+}
+
function main()
{
test_nop();
test_chnet_ring_multiple();
test_simple_http();
test_simple_http_set_header();
+ test_simple_http_post_set_header();
}
main();
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 15/22] chnet: node: Don't use static counter for data ID
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (13 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 14/22] tests/js/ring: Add more set header function test Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 16/22] tests/js/ring: Add JavaScript class wrapper example Ammar Faizi
` (6 subsequent siblings)
21 siblings, 0 replies; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
The calculation will be wrong if we have multiple rings. Just provide
a counter for each ring, don't share it across multiple rings.
Fixes: 34c16780ba00890faa7139092965537ef713d853 ("chnet: node: Add set_user_data support on SQE")
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/chnet_node.cc | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/chnet/chnet_node.cc b/chnet/chnet_node.cc
index 1269577..621d7aa 100644
--- a/chnet/chnet_node.cc
+++ b/chnet/chnet_node.cc
@@ -6,10 +6,12 @@ public:
inline NodeCHNetRing(uint32_t entry):
ring_(entry)
{
+ ucount_.store(0, std::memory_order_relaxed);
}
CNRingCtx ring_;
Napi::ObjectReference ref_;
+ std::atomic<uint32_t> ucount_;
};
class NodeCNRingSQE {
@@ -122,7 +124,6 @@ static void CHN_SQEPrepRead(const Napi::CallbackInfo &info)
static Napi::Value CHN_SQESetUserData(const Napi::CallbackInfo &info)
{
- static std::atomic<uint32_t> count;
Napi::Env env = info.Env();
struct NodeSQData *data;
NodeCNRingSQE *nsqe;
@@ -135,7 +136,7 @@ static Napi::Value CHN_SQESetUserData(const Napi::CallbackInfo &info)
nsqe = (NodeCNRingSQE *)info.Data();
data = new struct NodeSQData;
data->ring_ = nsqe->ring_;
- data->id_ = count++ & nsqe->ring_->ring_.cq_mask_;
+ data->id_ = nsqe->ring_->ucount_++ & nsqe->ring_->ring_.cq_mask_;
nsqe->ring_->ref_.Get("__udata")
.As<Napi::Object>()[data->id_] = info[0];
nsqe->sqe_->SetUserDataPtr(data);
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 16/22] tests/js/ring: Add JavaScript class wrapper example
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (14 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 15/22] chnet: node: Don't use static counter for data ID Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 17/22] chnet: Initial chunked request body support Ammar Faizi
` (5 subsequent siblings)
21 siblings, 0 replies; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
Just an example writing a wrapper to simplify the SQ/CQ mechanism.
Signed-off-by: Ammar Faizi <[email protected]>
---
tests/js/ring.js | 89 ++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 89 insertions(+)
diff --git a/tests/js/ring.js b/tests/js/ring.js
index 6752fe1..372c6e6 100644
--- a/tests/js/ring.js
+++ b/tests/js/ring.js
@@ -164,6 +164,94 @@ function test_simple_http_post_set_header()
assert(h.ch.read_ret() === ss.length);
}
+class CHNet {
+ constructor(cr, url, method) {
+ this.ch = chnet.create_net();
+ this.ch.set_url(url);
+ this.ch.set_method(method);
+ this.cr = cr;
+ }
+
+ prep_read(read_size) {
+ let ring = this.cr.ring;
+ let sqe = ring.get_sqe();
+ if (!sqe)
+ return false;
+ sqe.prep_read(this.ch, read_size);
+ sqe.set_user_data(this);
+ return true;
+ }
+
+ set_user_data(data) {
+ this.udata = data;
+ }
+
+ get_user_data() {
+ return this.udata;
+ }
+
+ set_header(key, val, overwrite = true) {
+ this.ch.set_request_header(key, val, overwrite);
+ }
+};
+
+class ChromiumNet {
+ constructor() {
+ this.ring = chnet.create_ring(4096);
+ }
+
+ create(url, method = "GET") {
+ return new CHNet(this, url, method);
+ }
+
+ for_each_cqe(cb = null) {
+ let ret = this.ring.for_each_cqe(function (cqe) {
+
+ if (cb)
+ cb(cqe);
+
+ let ch = cqe.user_data;
+ let ret = cqe.res;
+ let udata = ch.udata;
+
+ if (ret < 0 && ch.onerror)
+ ch.onerror(ch, ret, udata);
+ else
+ ch.onreadcomplete(ch, ret, udata);
+ });
+ this.ring.cq_advance(ret);
+ return ret;
+ }
+};
+
+function test_classes()
+{
+ const UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36";
+ const NR_REQ = 10;
+
+ let cr = new ChromiumNet;
+ let ch_arr = [];
+ let i, j, ch;
+
+ for (i = 0; i < NR_REQ; i++) {
+ ch = cr.create("http://127.0.0.1:8000/index.php?action=hello");
+ ch.set_header("User-Agent", UA);
+ ch.prep_read(4096);
+ ch.set_user_data(i);
+ ch.onerror = function (cr, ret, udata) {
+ console.log(`Req: ${udata} returned error: ${cr.ch.get_error()}`);
+ };
+ ch.onreadcomplete = function (cr, ret, udata) {
+ console.log(`Req: ${udata} returned ${ret} bytes: ${cr.ch.read_buf()}`);
+ };
+ ch_arr[i] = ch;
+ }
+
+ cr.ring.submit_sqe();
+ cr.ring.wait_cqe(1);
+ cr.for_each_cqe();
+}
+
function main()
{
test_nop();
@@ -171,6 +259,7 @@ function main()
test_simple_http();
test_simple_http_set_header();
test_simple_http_post_set_header();
+ test_classes();
}
main();
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 17/22] chnet: Initial chunked request body support
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (15 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 16/22] tests/js/ring: Add JavaScript class wrapper example Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 22:13 ` Alviro Iskandar Setiawan
2022-08-21 11:24 ` [PATCH v1 18/22] chnet: Rework the chunked request body interface Ammar Faizi
` (4 subsequent siblings)
21 siblings, 1 reply; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/chnet.cc | 155 +++++++++++++++++++++++++++++++++++++++++++++-
chnet/chnet.h | 106 ++++++++++++++++++++++++++++++-
tests/cpp/ring.cc | 47 ++++++++++++--
3 files changed, 302 insertions(+), 6 deletions(-)
diff --git a/chnet/chnet.cc b/chnet/chnet.cc
index 915dfea..1ba4d32 100644
--- a/chnet/chnet.cc
+++ b/chnet/chnet.cc
@@ -50,6 +50,116 @@ void CHNetDataStream::ResetInternal()
{
}
+CHNetDataStreamChunked::CHNetDataStreamChunked(CHNetDelegate *ch):
+ UploadDataStream(true, 0),
+ ch_(ch),
+ buf_queue_(1024)
+{
+ cur_pos_ = 0;
+ is_eof_ = false;
+ being_waited_.store(false, std::memory_order_relaxed);
+ ch_->chuncked_body_.store(this, std::memory_order_release);
+}
+
+CHNetDataStreamChunked::~CHNetDataStreamChunked(void)
+{
+ struct net::CHNetDataStreamChunked::buf *arr;
+ uint32_t i, len;
+
+ ch_->chuncked_body_.store(nullptr, std::memory_order_release);
+ arr = buf_queue_.arr_;
+ len = buf_queue_.mask_ + 1;
+
+ for (i = 0; i < len; i++) {
+ char *b = arr[i].buf_;
+
+ if (b)
+ delete[] b;
+ }
+}
+
+void CHNetDataStreamChunked::WriteBuf(const char *buf, size_t len)
+{
+ struct net::CHNetDataStreamChunked::buf b;
+
+ b.buf_ = new char[len];
+ b.len_ = len;
+ memcpy(b.buf_, buf, len);
+
+ std::unique_lock<std::mutex> lk(buf_lock_);
+ buf_queue_.push(b);
+ if (being_waited_.load(std::memory_order_acquire))
+ buf_cond_.notify_one();
+}
+
+int CHNetDataStreamChunked::InitInternal(const net::NetLogWithSource& net_log)
+{
+ return net::OK;
+}
+
+int CHNetDataStreamChunked::ReadInternal(net::IOBuffer *buf, int read_size_arg)
+{
+ size_t read_size;
+ char *ptr;
+ int ret;
+
+ if (unlikely(read_size_arg < 0))
+ return net::ERR_IO_PENDING;
+
+ if (unlikely(is_eof_ || !read_size_arg))
+ return 0;
+
+ ret = 0;
+ ptr = buf->data();
+ read_size = (size_t)read_size_arg;
+
+ std::unique_lock<std::mutex> lk(buf_lock_);
+ while (read_size) {
+ struct net::CHNetDataStreamChunked::buf *b;
+ size_t copy_size;
+
+ if (unlikely(!buf_queue_.size())) {
+ /*
+ * At this point, the buffer is empty.
+ *
+ * Wait for the buffer queue to be
+ * filled in.
+ */
+ being_waited_.store(true, std::memory_order_release);
+ buf_cond_.wait(lk, [this]{ return buf_queue_.size(); });
+ being_waited_.store(false, std::memory_order_release);
+ }
+
+ b = buf_queue_.front();
+ if (b->len_ == -(size_t)1UL) {
+ is_eof_ = true;
+ break;
+ }
+
+ copy_size = b->len_ - cur_pos_;
+ if (copy_size > read_size)
+ copy_size = read_size;
+
+ memcpy(ptr, &b->buf_[cur_pos_], copy_size);
+ ptr += copy_size;
+ ret += copy_size;
+ read_size -= copy_size;
+ buf_queue_.head_++;
+ }
+
+ if (unlikely(is_eof_))
+ SetIsFinalChunk();
+
+ if (unlikely(!ret && !is_eof_))
+ return net::ERR_IO_PENDING;
+
+ return ret;
+}
+
+void CHNetDataStreamChunked::ResetInternal()
+{
+}
+
CHNCallback::CHNCallback(void):
response_started_data_(nullptr),
response_started_(nullptr),
@@ -82,6 +192,7 @@ CHNetDelegate::CHNetDelegate(void):
base::Thread::Options options(base::MessagePumpType::IO, 0);
CHECK(thread_.StartWithOptions(std::move(options)));
read_ret_.store(0, std::memory_order_relaxed);
+ chuncked_body_.store(nullptr, std::memory_order_relaxed);
payload_ = nullptr;
}
@@ -140,7 +251,9 @@ void CHNetDelegate::_Start(Waiter *sig)
url_req_->set_method(method_);
sig->Signal();
- if (payload_)
+ if (chuncked_body_up_)
+ url_req_->set_upload(std::move(chuncked_body_up_));
+ else if (payload_)
url_req_->set_upload(std::make_unique<CHNetDataStream>(
this, payload_, payload_len_));
@@ -218,6 +331,17 @@ int CHNetDelegate::Read(int size)
return read_ret_.load();
}
+void CHNetDelegate::Write(const char *buf, size_t len)
+{
+ CHNetDataStreamChunked *p;
+
+ p = chuncked_body_.load(std::memory_order_acquire);
+ if (unlikely(!p))
+ return;
+
+ p->WriteBuf(buf, len);
+}
+
void CHNetDelegate::OnResponseStarted(URLRequest *url_req, int net_err)
{
if (unlikely(net_err < 0))
@@ -243,6 +367,20 @@ void CHNetDelegate::SetError(int err_code)
err_ = "chromium_net_err:" + ErrorToString(err_code);
}
+void CHNetDelegate::StartChunkedBody(void)
+{
+ chuncked_body_up_ = std::make_unique<CHNetDataStreamChunked>(this);
+}
+
+void CHNetDelegate::StopChunkedBody(void)
+{
+ CHNetDataStreamChunked *p;
+
+ p = chuncked_body_.load(std::memory_order_acquire);
+ if (likely(p))
+ p->StopWriting();
+}
+
} /* namespace net */
CHNet::CHNet(void)
@@ -293,6 +431,21 @@ void CHNet::SetRequestHeader(const char *key, const char *val, bool overwrite)
ch_->SetRequestHeader(std::string(key), std::string(val), overwrite);
}
+void CHNet::Write(const char *buf, size_t len)
+{
+ ch_->Write(buf, len);
+}
+
+void CHNet::StartChunkedBody(void)
+{
+ ch_->StartChunkedBody();
+}
+
+void CHNet::StopChunkedBody(void)
+{
+ ch_->StopChunkedBody();
+}
+
int CHNet::read_ret(void)
{
return ch_->read_ret();
diff --git a/chnet/chnet.h b/chnet/chnet.h
index 8e995a1..d92ba6d 100644
--- a/chnet/chnet.h
+++ b/chnet/chnet.h
@@ -70,6 +70,102 @@ private:
void ResetInternal() override;
};
+class CHNetDataStreamChunked: public UploadDataStream {
+
+private:
+ struct buf {
+ char *buf_;
+ size_t len_;
+ };
+
+ template<typename T>
+ struct queue {
+ struct buf *arr_;
+ std::atomic<uint32_t> head_;
+ std::atomic<uint32_t> tail_;
+ uint32_t mask_;
+
+ inline queue(size_t want_max)
+ {
+ uint32_t max = 1;
+ uint32_t i;
+
+ head_.store(0, std::memory_order_relaxed);
+ tail_.store(0, std::memory_order_relaxed);
+
+ if (want_max < 4)
+ want_max = 4;
+
+ while (max < want_max)
+ max *= 2;
+
+ mask_ = max - 1;
+ arr_ = new T[max];
+ for (i = 0; i < max; i++) {
+ arr_[i].buf_ = nullptr;
+ arr_[i].len_ = 0;
+ }
+ }
+
+ inline ~queue(void)
+ {
+ delete[] arr_;
+ }
+
+ inline size_t size(void)
+ {
+ if (likely(tail_ > head_))
+ return tail_ - head_;
+ else
+ return head_ - tail_;
+ }
+
+ inline void push(T val)
+ {
+ arr_[tail_++ & mask_] = val;
+ }
+
+ inline struct buf *front(void)
+ {
+ return &arr_[head_.load() & mask_];
+ }
+
+ inline struct buf *pop(void)
+ {
+ return &arr_[head_++ & mask_];
+ }
+ };
+
+public:
+ CHNetDataStreamChunked(CHNetDelegate *ch);
+ ~CHNetDataStreamChunked(void) override;
+ void WriteBuf(const char *buf, size_t len);
+ inline void StopWriting(void)
+ {
+ struct buf b;
+
+ b.buf_ = nullptr;
+ b.len_ = -(size_t)1UL;
+ buf_lock_.lock();
+ buf_queue_.push(b);
+ buf_lock_.unlock();
+ }
+
+private:
+ CHNetDelegate *ch_;
+ size_t cur_pos_;
+ std::mutex buf_lock_;
+ std::condition_variable buf_cond_;
+ struct queue<struct buf> buf_queue_;
+ std::atomic<bool> being_waited_;
+ bool is_eof_;
+
+ int InitInternal(const net::NetLogWithSource& net_log) override;
+ int ReadInternal(net::IOBuffer *buf, int buf_len) override;
+ void ResetInternal() override;
+
+};
+
struct CHNCallback {
CHNCallback(void);
~CHNCallback(void);
@@ -97,6 +193,10 @@ public:
void OnResponseStarted(URLRequest *url_req, int net_error) override;
void OnReadCompleted(URLRequest *url_req, int bytes_read) override;
+ void StartChunkedBody(void);
+ void Write(const char *buf, size_t len);
+ void StopChunkedBody(void);
+
inline int read_ret(void) { return read_ret_.load(); }
inline const char *read_buf(void) { return read_buf_->data(); }
inline bool started(void) { return started_; }
@@ -108,7 +208,8 @@ public:
}
struct CHNCallback cb;
-
+ std::atomic<CHNetDataStreamChunked *> chuncked_body_;
+ std::unique_ptr<CHNetDataStreamChunked> chuncked_body_up_;
private:
void _Start(Waiter *sig);
void ConstructURLRequest(Waiter *sig);
@@ -151,11 +252,14 @@ public:
const char *GetErrorStr(void);
void SetMethod(const char *method);
void SetPayload(const char *buf, size_t len);
+ void StartChunkedBody(void);
+ void StopChunkedBody(void);
void SetRequestHeader(const char *key, const char *val,
bool overwrite = true);
int read_ret(void);
const char *read_buf(void);
+ void Write(const char *buf, size_t len);
#ifdef __FOR_CHROMIUM_INTERNAL
inline net::CHNetDelegate *ch(void) { return ch_; }
diff --git a/tests/cpp/ring.cc b/tests/cpp/ring.cc
index 8bedef5..a65457d 100644
--- a/tests/cpp/ring.cc
+++ b/tests/cpp/ring.cc
@@ -247,15 +247,54 @@ static void test_chnet_ring_multiple(bool start_before_read)
_test_chnet_ring_multiple(start_before_read, &ring);
}
+static void test_chnet_chunked_body(void)
+{
+ constexpr static const char buf[] = "AAAA Hello World!\n";
+ CNRingCtx ring(4096);
+ CNRingSQE *sqe;
+ CNRingCQE *cqe;
+ CHNet *ch;
+ int i;
+
+ ch = new CHNet;
+ ch->SetURL("http://127.0.0.1:8000/index.php?action=body");
+ ch->SetMethod("POST");
+ ch->StartChunkedBody();
+ for (i = 0; i < 3; i++)
+ ch->Write(buf, strlen(buf));
+
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe->PrepRead(ch, 1024);
+ sqe->SetUserData(9999);
+ assert(ring.SubmitSQE() == 1);
+
+ for (i = 0; i < 3; i++)
+ ch->Write(buf, strlen(buf));
+ ch->StopChunkedBody();
+
+ ring.WaitCQE(1);
+ cqe = ring.HeadCQE();
+ assert(cqe->op == CNRING_OP_READ);
+ // assert(cqe->res == sizeof(buf));
+ assert(cqe->user_data == 9999);
+ // assert(!strncmp(ch->read_buf(), buf, sizeof(buf)));
+ write(1, ch->read_buf(), cqe->res);
+ ring.CQAdvance(1);
+
+ delete ch;
+}
+
int main(void)
{
test_nop();
chnet_global_init();
- // test_chnet_ring_single();
+ test_chnet_ring_single();
test_chnet_ring_set_header();
- // test_chnet_ring_single_post();
- // test_chnet_ring_multiple(false);
- // test_chnet_ring_multiple(true);
+ test_chnet_ring_single_post();
+ test_chnet_ring_multiple(false);
+ test_chnet_ring_multiple(true);
+ test_chnet_chunked_body();
chnet_global_stop();
return 0;
}
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* Re: [PATCH v1 17/22] chnet: Initial chunked request body support
2022-08-21 11:24 ` [PATCH v1 17/22] chnet: Initial chunked request body support Ammar Faizi
@ 2022-08-21 22:13 ` Alviro Iskandar Setiawan
2022-08-22 3:08 ` Ammar Faizi
0 siblings, 1 reply; 28+ messages in thread
From: Alviro Iskandar Setiawan @ 2022-08-21 22:13 UTC (permalink / raw)
To: Ammar Faizi; +Cc: Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
On Sun, Aug 21, 2022 at 6:24 PM Ammar Faizi wrote:
> +int CHNetDataStreamChunked::ReadInternal(net::IOBuffer *buf, int read_size_arg)
> +{
> + size_t read_size;
> + char *ptr;
> + int ret;
> +
> + if (unlikely(read_size_arg < 0))
> + return net::ERR_IO_PENDING;
> +
> + if (unlikely(is_eof_ || !read_size_arg))
> + return 0;
> +
> + ret = 0;
> + ptr = buf->data();
> + read_size = (size_t)read_size_arg;
> +
> + std::unique_lock<std::mutex> lk(buf_lock_);
> + while (read_size) {
> + struct net::CHNetDataStreamChunked::buf *b;
> + size_t copy_size;
> +
> + if (unlikely(!buf_queue_.size())) {
> + /*
> + * At this point, the buffer is empty.
> + *
> + * Wait for the buffer queue to be
> + * filled in.
> + */
> + being_waited_.store(true, std::memory_order_release);
> + buf_cond_.wait(lk, [this]{ return buf_queue_.size(); });
> + being_waited_.store(false, std::memory_order_release);
> + }
> +
> + b = buf_queue_.front();
> + if (b->len_ == -(size_t)1UL) {
> + is_eof_ = true;
> + break;
> + }
> +
> + copy_size = b->len_ - cur_pos_;
> + if (copy_size > read_size)
> + copy_size = read_size;
> +
> + memcpy(ptr, &b->buf_[cur_pos_], copy_size);
> + ptr += copy_size;
> + ret += copy_size;
> + read_size -= copy_size;
> + buf_queue_.head_++;
> + }
> +
> + if (unlikely(is_eof_))
> + SetIsFinalChunk();
> +
> + if (unlikely(!ret && !is_eof_))
> + return net::ERR_IO_PENDING;
> +
> + return ret;
> +}
I think you shouldn't return net::ERR_IO_PENDING, just sleep on the
condition variable if the buffer is not ready yet, it is legal to
sleep on ReadInternal(), why spend time spinning on -EAGAIN?
You can end in OnReadCompleted() if you sleep on the condition
variable and wait for it to get signaled:
https://source.chromium.org/chromium/chromium/src/+/main:net/base/upload_data_stream.cc;l=82-87;drc=5eda14f193ef3e0131aad2f31ae172c0001b3f6d?q=upload_data_stream.cc&ss=chromium%2Fchromium%2Fsrc
tq
-- Viro
^ permalink raw reply [flat|nested] 28+ messages in thread
* [PATCH v1 18/22] chnet: Rework the chunked request body interface
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (16 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 17/22] chnet: Initial chunked request body support Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 22:20 ` Alviro Iskandar Setiawan
2022-08-21 11:24 ` [PATCH v1 19/22] chnet: ring: Refactor the ring completely Ammar Faizi
` (3 subsequent siblings)
21 siblings, 1 reply; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
This seems to be working fine at a glance, but there are still an issue
with double-free bug. It rarely happens, but it's a real bug that the
ASAN has reported. I will take a look into this further on Monday.
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/chnet.cc | 74 +++++++++++++++++++++++++++++++++--------------
chnet/chnet.h | 68 +++++--------------------------------------
tests/cpp/ring.cc | 59 ++++++++++++++++++++++++-------------
3 files changed, 99 insertions(+), 102 deletions(-)
diff --git a/chnet/chnet.cc b/chnet/chnet.cc
index 1ba4d32..a34bb18 100644
--- a/chnet/chnet.cc
+++ b/chnet/chnet.cc
@@ -52,30 +52,42 @@ void CHNetDataStream::ResetInternal()
CHNetDataStreamChunked::CHNetDataStreamChunked(CHNetDelegate *ch):
UploadDataStream(true, 0),
- ch_(ch),
- buf_queue_(1024)
+ ch_(ch)
{
cur_pos_ = 0;
+ mem_hold_size_ = 0;
is_eof_ = false;
being_waited_.store(false, std::memory_order_relaxed);
ch_->chuncked_body_.store(this, std::memory_order_release);
}
+void CHNetDataStreamChunked::ClearDelQueue(void)
+{
+ char *b;
+
+ while (!del_queue_.empty()) {
+ b = del_queue_.front();
+ if (b)
+ delete[] b;
+
+ del_queue_.pop();
+ }
+}
+
CHNetDataStreamChunked::~CHNetDataStreamChunked(void)
{
- struct net::CHNetDataStreamChunked::buf *arr;
- uint32_t i, len;
+ char *b;
ch_->chuncked_body_.store(nullptr, std::memory_order_release);
- arr = buf_queue_.arr_;
- len = buf_queue_.mask_ + 1;
-
- for (i = 0; i < len; i++) {
- char *b = arr[i].buf_;
+ while (!buf_queue_.empty()) {
+ b = buf_queue_.front().buf_;
if (b)
delete[] b;
+
+ buf_queue_.pop();
}
+ ClearDelQueue();
}
void CHNetDataStreamChunked::WriteBuf(const char *buf, size_t len)
@@ -85,6 +97,7 @@ void CHNetDataStreamChunked::WriteBuf(const char *buf, size_t len)
b.buf_ = new char[len];
b.len_ = len;
memcpy(b.buf_, buf, len);
+ mem_hold_size_.fetch_add(len);
std::unique_lock<std::mutex> lk(buf_lock_);
buf_queue_.push(b);
@@ -115,44 +128,61 @@ int CHNetDataStreamChunked::ReadInternal(net::IOBuffer *buf, int read_size_arg)
std::unique_lock<std::mutex> lk(buf_lock_);
while (read_size) {
- struct net::CHNetDataStreamChunked::buf *b;
+ struct net::CHNetDataStreamChunked::buf b;
size_t copy_size;
- if (unlikely(!buf_queue_.size())) {
+ if (unlikely(buf_queue_.empty())) {
/*
- * At this point, the buffer is empty.
+ * At this point, the buffer queue is empty.
*
- * Wait for the buffer queue to be
- * filled in.
+ * Wait for the buffer queue to be filled in...
*/
being_waited_.store(true, std::memory_order_release);
- buf_cond_.wait(lk, [this]{ return buf_queue_.size(); });
+ ClearDelQueue();
+ buf_cond_.wait(lk, [this]{ return !buf_queue_.empty(); });
being_waited_.store(false, std::memory_order_release);
}
b = buf_queue_.front();
- if (b->len_ == -(size_t)1UL) {
+ if (unlikely(b.len_ == -(size_t)1UL)) {
is_eof_ = true;
+ buf_queue_.pop();
break;
}
- copy_size = b->len_ - cur_pos_;
+ copy_size = b.len_ - cur_pos_;
if (copy_size > read_size)
copy_size = read_size;
- memcpy(ptr, &b->buf_[cur_pos_], copy_size);
+ memcpy(ptr, &b.buf_[cur_pos_], copy_size);
ptr += copy_size;
ret += copy_size;
read_size -= copy_size;
- buf_queue_.head_++;
+ cur_pos_ += copy_size;
+
+ if (cur_pos_ == b.len_) {
+ size_t mem;
+
+ cur_pos_ = 0;
+ buf_queue_.pop();
+ del_queue_.push(b.buf_);
+ mem = mem_hold_size_.load(std::memory_order_acquire);
+
+ /*
+ * Make sure we don't hold too many delete
+ * queues to avoid memory pressure.
+ */
+ if (mem >= MEM_HOLD_THRESHOLD) {
+ ClearDelQueue();
+ mem_hold_size_.store(0,
+ std::memory_order_release);
+ }
+ }
}
if (unlikely(is_eof_))
SetIsFinalChunk();
- if (unlikely(!ret && !is_eof_))
- return net::ERR_IO_PENDING;
-
return ret;
}
diff --git a/chnet/chnet.h b/chnet/chnet.h
index d92ba6d..84db8d4 100644
--- a/chnet/chnet.h
+++ b/chnet/chnet.h
@@ -29,6 +29,8 @@
#include "net/base/net_errors.h"
#define CHNET_EXPORT COMPONENT_EXPORT(CHNET)
+#include <queue>
+
#else /* #ifdef __FOR_CHROMIUM_INTERNAL */
/*
@@ -78,64 +80,6 @@ private:
size_t len_;
};
- template<typename T>
- struct queue {
- struct buf *arr_;
- std::atomic<uint32_t> head_;
- std::atomic<uint32_t> tail_;
- uint32_t mask_;
-
- inline queue(size_t want_max)
- {
- uint32_t max = 1;
- uint32_t i;
-
- head_.store(0, std::memory_order_relaxed);
- tail_.store(0, std::memory_order_relaxed);
-
- if (want_max < 4)
- want_max = 4;
-
- while (max < want_max)
- max *= 2;
-
- mask_ = max - 1;
- arr_ = new T[max];
- for (i = 0; i < max; i++) {
- arr_[i].buf_ = nullptr;
- arr_[i].len_ = 0;
- }
- }
-
- inline ~queue(void)
- {
- delete[] arr_;
- }
-
- inline size_t size(void)
- {
- if (likely(tail_ > head_))
- return tail_ - head_;
- else
- return head_ - tail_;
- }
-
- inline void push(T val)
- {
- arr_[tail_++ & mask_] = val;
- }
-
- inline struct buf *front(void)
- {
- return &arr_[head_.load() & mask_];
- }
-
- inline struct buf *pop(void)
- {
- return &arr_[head_++ & mask_];
- }
- };
-
public:
CHNetDataStreamChunked(CHNetDelegate *ch);
~CHNetDataStreamChunked(void) override;
@@ -146,6 +90,7 @@ public:
b.buf_ = nullptr;
b.len_ = -(size_t)1UL;
+
buf_lock_.lock();
buf_queue_.push(b);
buf_lock_.unlock();
@@ -156,14 +101,17 @@ private:
size_t cur_pos_;
std::mutex buf_lock_;
std::condition_variable buf_cond_;
- struct queue<struct buf> buf_queue_;
+ std::queue<struct buf> buf_queue_;
+ std::queue<char *> del_queue_;
std::atomic<bool> being_waited_;
+ std::atomic<size_t> mem_hold_size_;
bool is_eof_;
+ constexpr static const uint32_t MEM_HOLD_THRESHOLD = 1024 * 1024 * 4;
int InitInternal(const net::NetLogWithSource& net_log) override;
int ReadInternal(net::IOBuffer *buf, int buf_len) override;
void ResetInternal() override;
-
+ void ClearDelQueue(void);
};
struct CHNCallback {
diff --git a/tests/cpp/ring.cc b/tests/cpp/ring.cc
index a65457d..54426ef 100644
--- a/tests/cpp/ring.cc
+++ b/tests/cpp/ring.cc
@@ -249,39 +249,58 @@ static void test_chnet_ring_multiple(bool start_before_read)
static void test_chnet_chunked_body(void)
{
- constexpr static const char buf[] = "AAAA Hello World!\n";
+ constexpr size_t len = 1024 * 64;
+ constexpr size_t nr_loop = 100;
+ size_t total_read = 0;
CNRingCtx ring(4096);
CNRingSQE *sqe;
CNRingCQE *cqe;
CHNet *ch;
- int i;
+ char *buf;
+ size_t i;
ch = new CHNet;
ch->SetURL("http://127.0.0.1:8000/index.php?action=body");
ch->SetMethod("POST");
ch->StartChunkedBody();
- for (i = 0; i < 3; i++)
- ch->Write(buf, strlen(buf));
sqe = ring.GetSQE();
assert(sqe);
- sqe->PrepRead(ch, 1024);
+ sqe->PrepRead(ch, len);
sqe->SetUserData(9999);
assert(ring.SubmitSQE() == 1);
- for (i = 0; i < 3; i++)
- ch->Write(buf, strlen(buf));
+ buf = new char[len];
+ memset(buf, 'A', len);
+
+ for (i = 0; i < nr_loop; i++)
+ ch->Write(buf, len);
ch->StopChunkedBody();
- ring.WaitCQE(1);
- cqe = ring.HeadCQE();
- assert(cqe->op == CNRING_OP_READ);
- // assert(cqe->res == sizeof(buf));
- assert(cqe->user_data == 9999);
- // assert(!strncmp(ch->read_buf(), buf, sizeof(buf)));
- write(1, ch->read_buf(), cqe->res);
- ring.CQAdvance(1);
+ while (total_read < (len * nr_loop)) {
+ int res;
+
+ ring.WaitCQE(1);
+ cqe = ring.HeadCQE();
+ assert(cqe->op == CNRING_OP_READ);
+ assert(cqe->user_data == 9999);
+ res = cqe->res;
+ total_read += res;
+ ring.CQAdvance(1);
+
+ if (!res)
+ break;
+
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe->PrepRead(ch, len);
+ sqe->SetUserData(9999);
+ assert(ring.SubmitSQE() == 1);
+ }
+ printf("total_read = %zu %zu\n", total_read, (len * nr_loop));
+ assert(total_read == (len * nr_loop));
+ delete[] buf;
delete ch;
}
@@ -289,11 +308,11 @@ int main(void)
{
test_nop();
chnet_global_init();
- test_chnet_ring_single();
- test_chnet_ring_set_header();
- test_chnet_ring_single_post();
- test_chnet_ring_multiple(false);
- test_chnet_ring_multiple(true);
+ // test_chnet_ring_single();
+ // test_chnet_ring_set_header();
+ // test_chnet_ring_single_post();
+ // test_chnet_ring_multiple(false);
+ // test_chnet_ring_multiple(true);
test_chnet_chunked_body();
chnet_global_stop();
return 0;
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 19/22] chnet: ring: Refactor the ring completely
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (17 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 18/22] chnet: Rework the chunked request body interface Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 20/22] chnet: Use busy-waiting for signal waiter Ammar Faizi
` (2 subsequent siblings)
21 siblings, 0 replies; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
This commit was multiple commits squashed into one, some interesting
things were:
- chnet: ring: Support read operation again
- chnet: ring: Support start operation again
- tests/cpp/ring: Add more tests and delete old tests
- chnet: node: Refactor the NodeJS interface
In short, rewriting them to make it cleaner and better...
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/WorkQueue.cc | 17 +-
chnet/WorkQueue.h | 5 +
chnet/chnet_node.cc | 453 ++++++++++++++++++++++++--------------
chnet/chnet_ring.cc | 460 +++++++++++++++++++--------------------
chnet/chnet_ring.h | 200 +++++++++--------
tests/cpp/ring.cc | 504 +++++++++++++++++++++++++------------------
tests/js/ring.js | 516 +++++++++++++++++++++++++++-----------------
7 files changed, 1260 insertions(+), 895 deletions(-)
diff --git a/chnet/WorkQueue.cc b/chnet/WorkQueue.cc
index 30163bc..7411123 100644
--- a/chnet/WorkQueue.cc
+++ b/chnet/WorkQueue.cc
@@ -33,6 +33,12 @@ WorkQueue::~WorkQueue(void)
stop_ = true;
+ if (helper_thread_) {
+ jobs_cond_.notify_all();
+ helper_thread_->join();
+ delete helper_thread_;
+ }
+
i = nr_threads_;
while (i--) {
std::thread *t;
@@ -44,11 +50,6 @@ WorkQueue::~WorkQueue(void)
}
}
- if (helper_thread_) {
- helper_thread_->join();
- delete helper_thread_;
- }
-
delete[] threads_;
delete[] jobs_;
}
@@ -99,7 +100,7 @@ void WorkQueue::_wq_thread_worker(struct wq_thread *t) noexcept
t->set_task_state(WQ_INTERRUPTIBLE);
while (!stop_) {
__wq_thread_worker(t, lk);
- auto cv_ret = jobs_cond_.wait_for(lk, 1s);
+ auto cv_ret = jobs_cond_.wait_for(lk, 200ms);
if (unlikely(cv_ret == std::cv_status::timeout)) {
if (t->idx < nr_idle_thread_)
continue;
@@ -174,7 +175,7 @@ void WorkQueue::wq_helper(void)
_wq_helper(njob);
lk.lock();
}
- jobs_cond_.wait_for(lk, 10s, [this]{ return stop_; });
+ jobs_cond_.wait_for(lk, 200ms, [this]{ return stop_; });
lk.unlock();
}
}
@@ -284,7 +285,7 @@ int64_t WorkQueue::schedule_work(wq_job_callback_t cb, void *data) noexcept
}
lk.lock();
- sched_idle_cond_.wait_for(lk, 10s);
+ sched_idle_cond_.wait_for(lk, 200ms);
lk.unlock();
}
diff --git a/chnet/WorkQueue.h b/chnet/WorkQueue.h
index ddb3bf5..ad4f831 100644
--- a/chnet/WorkQueue.h
+++ b/chnet/WorkQueue.h
@@ -234,6 +234,11 @@ public:
{
jobs_cond_.notify_one();
}
+
+ inline bool should_stop(void)
+ {
+ return stop_;
+ }
};
#endif /* #ifndef CHNET_WORKQUEUE_H */
diff --git a/chnet/chnet_node.cc b/chnet/chnet_node.cc
index 621d7aa..cea4ad6 100644
--- a/chnet/chnet_node.cc
+++ b/chnet/chnet_node.cc
@@ -4,22 +4,36 @@
class NodeCHNetRing {
public:
inline NodeCHNetRing(uint32_t entry):
- ring_(entry)
+ ring_(new CNRing(entry))
{
- ucount_.store(0, std::memory_order_relaxed);
+ id_seq_.store(0, std::memory_order_relaxed);
}
- CNRingCtx ring_;
+ inline ~NodeCHNetRing(void)
+ {
+ if (ring_)
+ delete ring_;
+ }
+
+ inline void Close(void)
+ {
+ if (ring_) {
+ delete ring_;
+ ring_ = nullptr;
+ }
+ }
+
+ CNRing *ring_;
+ std::atomic<uint32_t> id_seq_;
Napi::ObjectReference ref_;
- std::atomic<uint32_t> ucount_;
};
class NodeCNRingSQE {
public:
- inline NodeCNRingSQE(NodeCHNetRing *ring, CNRingSQE *sqe)
+ inline NodeCNRingSQE(CNRingSQE *sqe, NodeCHNetRing *ring):
+ sqe_(sqe),
+ ring_(ring)
{
- sqe_ = sqe;
- ring_ = ring;
}
CNRingSQE *sqe_;
@@ -33,7 +47,26 @@ struct NodeSQData {
class NodeCHNet {
public:
- CHNet ch_;
+ inline NodeCHNet(void):
+ ch_(new CHNet)
+ {
+ }
+
+ inline ~NodeCHNet(void)
+ {
+ if (ch_)
+ delete ch_;
+ }
+
+ inline void Close(void)
+ {
+ if (ch_) {
+ delete ch_;
+ ch_ = nullptr;
+ }
+ }
+
+ CHNet *ch_;
std::string payload_;
};
@@ -51,8 +84,9 @@ static inline void obj_add_func(Napi::Env &env, Napi::Object &obj,
static void CHN_SQEPrepNop(const Napi::CallbackInfo &info)
{
- NodeCNRingSQE *nsqe = (NodeCNRingSQE *)info.Data();
+ NodeCNRingSQE *nsqe;
+ nsqe = (NodeCNRingSQE *)info.Data();
nsqe->sqe_->PrepNop();
}
@@ -77,27 +111,6 @@ static NodeCHNet *CHN_PrepGetCHObj(Napi::Env &env, const Napi::Value &arg)
ch_in_obj.As<Napi::Number>().Int64Value();
}
-static void CHN_SQEPrepStart(const Napi::CallbackInfo &info)
-{
- Napi::Env env = info.Env();
- NodeCNRingSQE *nsqe;
- NodeCHNet *ch;
- CHNet *chn;
-
- if (unlikely(info.Length() != 1 || !info[0].IsObject())) {
- throw_js_exception(env, "sqe.prep_start must be given a chnet object argument");
- return;
- }
-
- ch = CHN_PrepGetCHObj(env, info[0]);
- if (unlikely(!ch))
- return;
-
- chn = &ch->ch_;
- nsqe = (NodeCNRingSQE *)info.Data();
- nsqe->sqe_->PrepStart(chn);
-}
-
static void CHN_SQEPrepRead(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
@@ -108,7 +121,13 @@ static void CHN_SQEPrepRead(const Napi::CallbackInfo &info)
if (unlikely(info.Length() != 2 || !info[0].IsObject() ||
!info[1].IsNumber())) {
- throw_js_exception(env, "sqe.prep_read must be given a chnet object argument and a read size");
+ throw_js_exception(env, "sqe.PrepRead must be given a chnet object argument and a read size");
+ return;
+ }
+
+ nsqe = (NodeCNRingSQE *)info.Data();
+ if (unlikely(!nsqe->ring_->ring_)) {
+ throw_js_exception(env, "ring has been closed!");
return;
}
@@ -117,8 +136,7 @@ static void CHN_SQEPrepRead(const Napi::CallbackInfo &info)
return;
read_size = info[1].As<Napi::Number>().Int32Value();
- chn = &ch->ch_;
- nsqe = (NodeCNRingSQE *)info.Data();
+ chn = ch->ch_;
nsqe->sqe_->PrepRead(chn, read_size);
}
@@ -129,77 +147,135 @@ static Napi::Value CHN_SQESetUserData(const Napi::CallbackInfo &info)
NodeCNRingSQE *nsqe;
if (unlikely(info.Length() != 1)) {
- throw_js_exception(env, "sqe.set_user_data must be given exactly one argument");
+ throw_js_exception(env, "sqe.SetUserData must be given exactly one argument");
return env.Null();
}
nsqe = (NodeCNRingSQE *)info.Data();
+ if (unlikely(!nsqe->ring_->ring_)) {
+ throw_js_exception(env, "ring has been closed!");
+ return env.Null();
+ }
+
data = new struct NodeSQData;
data->ring_ = nsqe->ring_;
- data->id_ = nsqe->ring_->ucount_++ & nsqe->ring_->ring_.cq_mask_;
+ data->id_ = nsqe->ring_->id_seq_++ & nsqe->ring_->ring_->cq_mask_;
nsqe->ring_->ref_.Get("__udata")
.As<Napi::Object>()[data->id_] = info[0];
nsqe->sqe_->SetUserDataPtr(data);
return info[0];
}
-static void CHN_SQEDestruct(Napi::Env env, NodeCNRingSQE *sqe)
+static void CHN_SQEPrepStart(const Napi::CallbackInfo &info)
{
- delete sqe;
- (void)env;
+ Napi::Env env = info.Env();
+ NodeCNRingSQE *nsqe;
+ NodeCHNet *ch;
+ CHNet *chn;
+
+ if (unlikely(info.Length() != 1 || !info[0].IsObject())) {
+ throw_js_exception(env, "sqe.PrepStart must be given a chnet object argument");
+ return;
+ }
+
+ ch = CHN_PrepGetCHObj(env, info[0]);
+ if (unlikely(!ch))
+ return;
+
+ chn = ch->ch_;
+ nsqe = (NodeCNRingSQE *)info.Data();
+ if (unlikely(!nsqe->ring_->ring_)) {
+ throw_js_exception(env, "ring has been closed!");
+ return;
+ }
+
+ nsqe->sqe_->PrepStart(chn);
}
-static Napi::Object BuildSQEObject(Napi::Env &env, NodeCHNetRing *ring,
- CNRingSQE *sqe)
+static void CHN_SQEDestruct(Napi::Env env, NodeCNRingSQE *nsqe)
{
- Napi::Object obj = Napi::Object::New(env);
- NodeCNRingSQE *nsqe;
-
- sqe->SetUserData(0);
- nsqe = new NodeCNRingSQE(ring, sqe);
- obj_add_func(env, obj, nsqe, CHN_SQEPrepNop, "prep_nop");
- obj_add_func(env, obj, nsqe, CHN_SQEPrepStart, "prep_start");
- obj_add_func(env, obj, nsqe, CHN_SQEPrepRead, "prep_read");
- obj_add_func(env, obj, nsqe, CHN_SQESetUserData, "set_user_data");
- obj.AddFinalizer(CHN_SQEDestruct, nsqe);
- return obj;
+ delete nsqe;
}
static Napi::Value CHN_RingGetSQE(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
NodeCHNetRing *ring;
+ NodeCNRingSQE *nsqe;
+ Napi::Object obj;
CNRingSQE *sqe;
ring = (NodeCHNetRing *)info.Data();
- sqe = ring->ring_.GetSQE();
+ if (unlikely(!ring->ring_)) {
+ throw_js_exception(env, "ring has been closed!");
+ return env.Null();
+ }
+
+ sqe = ring->ring_->GetSQE();
if (unlikely(!sqe))
return env.Null();
- return BuildSQEObject(env, ring, sqe);
+ sqe->SetUserData(0);
+ nsqe = new NodeCNRingSQE(sqe, ring);
+ obj = Napi::Object::New(env);
+ obj_add_func(env, obj, nsqe, CHN_SQEPrepNop, "PrepNop");
+ obj_add_func(env, obj, nsqe, CHN_SQEPrepRead, "PrepRead");
+ obj_add_func(env, obj, nsqe, CHN_SQEPrepStart, "PrepStart");
+ obj_add_func(env, obj, nsqe, CHN_SQESetUserData, "SetUserData");
+ obj.AddFinalizer(CHN_SQEDestruct, nsqe);
+ return obj;
}
-static Napi::Value CHN_RingSubmitSQE(const Napi::CallbackInfo &info)
+static Napi::Value CHN_RingSubmit(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
NodeCHNetRing *ring;
+ uint32_t to_submit;
+
+ if (info.Length() > 0) {
+ if (unlikely(!info[0].IsNumber())) {
+ throw_js_exception(env, "ring.Submit the first argument must be a positive integer");
+ return env.Null();
+ }
+
+ to_submit = info[0].ToNumber().Uint32Value();
+ } else {
+ to_submit = -1U;
+ }
ring = (NodeCHNetRing *)info.Data();
- return Napi::Number::New(env, ring->ring_.SubmitSQE());
+ if (unlikely(!ring->ring_)) {
+ throw_js_exception(env, "ring has been closed!");
+ return env.Null();
+ }
+
+ return Napi::Number::New(env, ring->ring_->Submit(to_submit));
}
-static void CHN_RingWaitCQE(const Napi::CallbackInfo &info)
+static Napi::Value CHN_RingWaitCQE(const Napi::CallbackInfo &info)
{
- uint32_t to_wait;
+ Napi::Env env = info.Env();
NodeCHNetRing *ring;
+ uint32_t to_wait;
+
+ if (info.Length() > 0) {
+ if (unlikely(!info[0].IsNumber())) {
+ throw_js_exception(env, "ring.WaitCQE the first argument must be a positive integer");
+ return env.Null();
+ }
- if (unlikely(info.Length() < 1 || !info[0].IsNumber()))
- to_wait = 1;
- else
to_wait = info[0].ToNumber().Uint32Value();
+ } else {
+ to_wait = 1;
+ }
ring = (NodeCHNetRing *)info.Data();
- ring->ring_.WaitCQE(to_wait);
+ if (unlikely(!ring->ring_)) {
+ throw_js_exception(env, "ring has been closed!");
+ return env.Null();
+ }
+
+ return Napi::Number::New(env, ring->ring_->WaitCQE(to_wait));
}
static void CHN_CQEDestruct(Napi::Env env, NodeSQData *data)
@@ -209,49 +285,56 @@ static void CHN_CQEDestruct(Napi::Env env, NodeSQData *data)
* when it's no longer used.
*/
- // data->ring_->ref_.Get("__udata").As<Napi::Object>()
- // .Delete(data->id_);
+#if 0
+ data->ring_->ref_.Get("__udata").As<Napi::Object>()
+ .Delete(data->id_);
+#endif
+
delete data;
(void)env;
}
static Napi::Object BuildCQEObject(Napi::Env &env, CNRingCQE *cqe)
{
- Napi::Object obj = Napi::Object::New(env);
NodeSQData *data = (NodeSQData *) cqe->GetUserDataPtr();
+ Napi::Object obj = Napi::Object::New(env);
- obj["res"] = Napi::Number::New(env, cqe->res);
+ obj["res"] = Napi::Number::New(env, cqe->res_);
+ if (!data)
+ return obj;
- if (data) {
- obj["user_data"] = data->ring_->ref_.Get("__udata")
- .As<Napi::Object>().Get(data->id_);
- obj.AddFinalizer(CHN_CQEDestruct, data);
- }
+ obj["user_data"] = data->ring_->ref_.Get("__udata").As<Napi::Object>()
+ .Get(data->id_);
+ obj.AddFinalizer(CHN_CQEDestruct, data);
return obj;
}
+
static Napi::Value CHN_RingForEachCQE(const Napi::CallbackInfo &info)
{
- constexpr static const char err_msg[] =
- "ring.for_each_cqe must be given exactly a callback function argument";
-
Napi::Env env = info.Env();
Napi::Function callback;
- uint32_t head, i;
NodeCHNetRing *ring;
CNRingCQE *cqe;
+ uint32_t head;
+ uint32_t tail;
+ uint32_t i;
if (unlikely(info.Length() != 1 || !info[0].IsFunction())) {
- throw_js_exception(env, err_msg);
+ throw_js_exception(env, "ring.ForEachCQE must be given exactly one callback function argument");
return env.Null();
}
- callback = info[0].As<Napi::Function>();
+ ring = (NodeCHNetRing *)info.Data();
+ if (unlikely(!ring->ring_)) {
+ throw_js_exception(env, "ring has been closed!");
+ return env.Null();
+ }
i = 0;
- ring = (NodeCHNetRing *)info.Data();
- chnring_for_each_cqe(&ring->ring_, head, cqe) {
- Napi::Object cqe_obj = BuildCQEObject(env, cqe);
+ callback = info[0].As<Napi::Function>();
+ cnring_for_each_cqe(ring->ring_, head, tail, cqe) {
+ Napi::Value cqe_obj = BuildCQEObject(env, cqe);
callback.Call({cqe_obj});
i++;
@@ -262,104 +345,93 @@ static Napi::Value CHN_RingForEachCQE(const Napi::CallbackInfo &info)
static void CHN_RingCQAdvance(const Napi::CallbackInfo &info)
{
- constexpr static const char err_msg[] =
- "ring.cq_advance must be given exactly an integer argument";
-
Napi::Env env = info.Env();
NodeCHNetRing *ring;
if (unlikely(info.Length() < 1 || !info[0].IsNumber())) {
- throw_js_exception(env, err_msg);
+ throw_js_exception(env, "ring.CQAdvance must be given exactly one positive integer argument");
return;
}
ring = (NodeCHNetRing *)info.Data();
- ring->ring_.CQAdvance(info[0].ToNumber().Uint32Value());
-}
+ if (unlikely(!ring->ring_)) {
+ throw_js_exception(env, "ring has been closed!");
+ return;
+ }
-static void CHN_RingDestruct(Napi::Env env, NodeCHNetRing *ring)
-{
- delete ring;
- (void)env;
+ ring->ring_->CQAdvance(info[0].ToNumber().Uint32Value());
}
-static inline Napi::Value _CHN_CreateRing(Napi::Env &env, NodeCHNetRing *ring)
+static Napi::Value CHN_RingGetCQEHead(const Napi::CallbackInfo &info)
{
- Napi::Object obj = Napi::Object::New(env);
+ Napi::Env env = info.Env();
+ NodeCHNetRing *ring;
- obj["__udata"] = Napi::Object::New(env);
- obj_add_func(env, obj, ring, CHN_RingGetSQE, "get_sqe");
- obj_add_func(env, obj, ring, CHN_RingSubmitSQE, "submit_sqe");
- obj_add_func(env, obj, ring, CHN_RingWaitCQE, "wait_cqe");
- obj_add_func(env, obj, ring, CHN_RingForEachCQE, "for_each_cqe");
- obj_add_func(env, obj, ring, CHN_RingCQAdvance, "cq_advance");
- obj.AddFinalizer(CHN_RingDestruct, ring);
- ring->ref_ = Napi::Persistent(obj);
- return obj;
+ ring = (NodeCHNetRing *)info.Data();
+ if (unlikely(!ring->ring_)) {
+ throw_js_exception(env, "ring has been closed!");
+ return env.Null();
+ }
+
+ return BuildCQEObject(env, ring->ring_->GetCQEHead());
}
-static Napi::Value CHN_CreateRing(const Napi::CallbackInfo &info)
+static void CHN_RingClose(const Napi::CallbackInfo &info)
{
- constexpr static const char err_msg[] =
- "chnet.create_ring must be given exactly a positive integer argument";
-
- Napi::Env env = info.Env();
NodeCHNetRing *ring;
- uint32_t entry;
- if (unlikely(info.Length() < 1 || !info[0].IsNumber())) {
- throw_js_exception(env, err_msg);
- return env.Null();
- }
+ ring = (NodeCHNetRing *)info.Data();
+ ring->Close();
+}
- entry = info[0].ToNumber().Uint32Value();
- ring = new NodeCHNetRing(entry);
- return _CHN_CreateRing(env, ring);
+static void CHN_RingDestruct(Napi::Env env, NodeCHNetRing *ring)
+{
+ delete ring;
}
static void CHN_NetSetURL(const Napi::CallbackInfo &info)
{
- constexpr static const char err_msg[] =
- "chnet.set_url must be given exactly 1 string argument";
-
Napi::Env env = info.Env();
NodeCHNet *ch;
if (unlikely(info.Length() != 1 || !info[0].IsString())) {
- throw_js_exception(env, err_msg);
+ throw_js_exception(env, "net.SetURL must be given exactly 1 string argument");
return;
}
ch = (NodeCHNet *)info.Data();
+ if (unlikely(!ch->ch_)) {
+ throw_js_exception(env, "chnet has been closed!");
+ return;
+ }
+
const std::string &url = info[0].ToString().Utf8Value();
- ch->ch_.SetURL(url.c_str());
+ ch->ch_->SetURL(url.c_str());
}
static void CHN_NetSetMethod(const Napi::CallbackInfo &info)
{
- constexpr static const char err_msg[] =
- "chnet.set_method must be given exactly 1 string argument";
-
Napi::Env env = info.Env();
NodeCHNet *ch;
if (unlikely(info.Length() != 1 || !info[0].IsString())) {
- throw_js_exception(env, err_msg);
+ throw_js_exception(env, "net.SetMethod must be given exactly 1 string argument");
return;
}
ch = (NodeCHNet *)info.Data();
- const std::string &method =info[0].ToString().Utf8Value();
- ch->ch_.SetMethod(method.c_str());
+ if (unlikely(!ch->ch_)) {
+ throw_js_exception(env, "chnet has been closed!");
+ return;
+ }
+
+ const std::string &method = info[0].ToString().Utf8Value();
+ ch->ch_->SetMethod(method.c_str());
}
static void CHN_NetSetRequestHeader(const Napi::CallbackInfo &info)
{
- constexpr static const char err_msg[] =
- "chnet.set_request_header must be at least given 2 string arguments";
- constexpr static const char err_msg2[] =
- "chnet.set_request_header can only receive 3 arguments with the third argument being a boolean";
-
+ constexpr static const char err_arg3[] = "chnet.SetRequestHeader can only receive 3 arguments with the third argument being a boolean";
Napi::Env env = info.Env();
bool overwrite = true;
NodeCHNet *ch;
@@ -369,61 +441,80 @@ static void CHN_NetSetRequestHeader(const Napi::CallbackInfo &info)
nr_arg = info.Length();
if (unlikely(nr_arg < 2 || !info[0].IsString() ||
!info[1].IsString())) {
- throw_js_exception(env, err_msg);
+ throw_js_exception(env, "chnet.SetRequestHeader must be at least given 2 string arguments");
return;
}
if (unlikely(nr_arg > 3)) {
- throw_js_exception(env, err_msg2);
+ throw_js_exception(env, err_arg3);
return;
}
if (nr_arg == 3) {
if (unlikely(!info[2].IsBoolean())) {
- throw_js_exception(env, err_msg2);
+ throw_js_exception(env, err_arg3);
return;
}
overwrite = info[2].As<Napi::Boolean>();
}
ch = (NodeCHNet *)info.Data();
+ if (unlikely(!ch->ch_)) {
+ throw_js_exception(env, "chnet has been closed!");
+ return;
+ }
+
const std::string &key = info[0].ToString().Utf8Value();
const std::string &val = info[1].ToString().Utf8Value();
- ch->ch_.SetRequestHeader(key.c_str(), val.c_str(), overwrite);
+ ch->ch_->SetRequestHeader(key.c_str(), val.c_str(), overwrite);
}
-
-static Napi::Value CHN_NetReadBuf(const Napi::CallbackInfo &info)
+static Napi::Value CHN_NetReadRet(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
NodeCHNet *ch;
- int ret;
ch = (NodeCHNet *)info.Data();
- ret = ch->ch_.read_ret();
- if (ret > 0)
- return Napi::String::New(env, ch->ch_.read_buf(), (size_t)ret);
+ if (unlikely(!ch->ch_)) {
+ throw_js_exception(env, "chnet has been closed!");
+ return env.Null();
+ }
- return env.Null();
+ return Napi::Number::New(env, ch->ch_->read_ret());
}
-static Napi::Number CHN_NetReadRet(const Napi::CallbackInfo &info)
+static Napi::Value CHN_NetReadBuf(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
NodeCHNet *ch;
+ int ret;
ch = (NodeCHNet *)info.Data();
- return Napi::Number::New(env, ch->ch_.read_ret());
+ if (unlikely(!ch->ch_)) {
+ throw_js_exception(env, "chnet has been closed!");
+ return env.Null();
+ }
+
+ ret = ch->ch_->read_ret();
+ if (ret > 0)
+ return Napi::String::New(env, ch->ch_->read_buf(), (size_t)ret);
+
+ return env.Null();
}
-static Napi::Value CHN_NetGetError(const Napi::CallbackInfo &info)
+static Napi::Value CHN_NetGetErrorStr(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
const char *err;
NodeCHNet *ch;
ch = (NodeCHNet *)info.Data();
- err = ch->ch_.GetErrorStr();
+ if (unlikely(!ch->ch_)) {
+ throw_js_exception(env, "chnet has been closed!");
+ return env.Null();
+ }
+
+ err = ch->ch_->GetErrorStr();
if (err)
return Napi::String::New(env, err);
@@ -432,20 +523,30 @@ static Napi::Value CHN_NetGetError(const Napi::CallbackInfo &info)
static void CHN_NetSetPayload(const Napi::CallbackInfo &info)
{
- constexpr static const char err_msg[] =
- "chnet.set_payload must be given exactly 1 string argument";
-
Napi::Env env = info.Env();
NodeCHNet *ch;
if (unlikely(info.Length() != 1 || !info[0].IsString())) {
- throw_js_exception(env, err_msg);
+ throw_js_exception(env, "chnet.SetPayload must be given exactly 1 string argument");
return;
}
ch = (NodeCHNet *)info.Data();
+ if (unlikely(!ch->ch_)) {
+ throw_js_exception(env, "chnet has been closed!");
+ return;
+ }
+
ch->payload_ = info[0].ToString().Utf8Value();
- ch->ch_.SetPayload(ch->payload_.c_str(), ch->payload_.size());
+ ch->ch_->SetPayload(ch->payload_.c_str(), ch->payload_.size());
+}
+
+static void CHN_NetClose(const Napi::CallbackInfo &info)
+{
+ NodeCHNet *ch;
+
+ ch = (NodeCHNet *)info.Data();
+ ch->Close();
}
static void CHN_NetDestruct(Napi::Env env, NodeCHNet *ch)
@@ -454,6 +555,42 @@ static void CHN_NetDestruct(Napi::Env env, NodeCHNet *ch)
(void)env;
}
+static Napi::Value _CHN_CreateRing(Napi::Env env, uint32_t entry)
+{
+ NodeCHNetRing *ring;
+ Napi::Object obj;
+
+ obj = Napi::Object::New(env);
+ ring = new NodeCHNetRing(entry);
+ obj["__udata"] = Napi::Object::New(env);
+ obj_add_func(env, obj, ring, CHN_RingGetSQE, "GetSQE");
+ obj_add_func(env, obj, ring, CHN_RingSubmit, "Submit");
+ obj_add_func(env, obj, ring, CHN_RingWaitCQE, "WaitCQE");
+ obj_add_func(env, obj, ring, CHN_RingForEachCQE, "ForEachCQE");
+ obj_add_func(env, obj, ring, CHN_RingCQAdvance, "CQAdvance");
+ obj_add_func(env, obj, ring, CHN_RingGetCQEHead, "GetCQEHead");
+ obj_add_func(env, obj, ring, CHN_RingClose, "Close");
+ obj.AddFinalizer(CHN_RingDestruct, ring);
+ obj.Seal();
+ obj.Freeze();
+ ring->ref_ = Napi::Persistent(obj);
+ return obj;
+}
+
+static Napi::Value CHN_CreateRing(const Napi::CallbackInfo &info)
+{
+ Napi::Env env = info.Env();
+ uint32_t entry;
+
+ if (unlikely(info.Length() < 1 || !info[0].IsNumber())) {
+ throw_js_exception(env, "chnet.CreateRing must be given exactly one positive integer argument");
+ return env.Null();
+ }
+
+ entry = info[0].ToNumber().Uint32Value();
+ return _CHN_CreateRing(env, entry);
+}
+
static Napi::Object CHN_CreateNet(const Napi::CallbackInfo &info)
{
Napi::Env env = info.Env();
@@ -461,14 +598,14 @@ static Napi::Object CHN_CreateNet(const Napi::CallbackInfo &info)
NodeCHNet *ch = new NodeCHNet;
int64_t ch_ptr;
- obj_add_func(env, obj, ch, CHN_NetSetURL, "set_url");
- obj_add_func(env, obj, ch, CHN_NetSetMethod, "set_method");
- obj_add_func(env, obj, ch, CHN_NetSetRequestHeader, "set_request_header");
+ obj_add_func(env, obj, ch, CHN_NetSetURL, "SetURL");
+ obj_add_func(env, obj, ch, CHN_NetSetMethod, "SetMethod");
+ obj_add_func(env, obj, ch, CHN_NetSetRequestHeader, "SetRequestHeader");
obj_add_func(env, obj, ch, CHN_NetReadRet, "read_ret");
obj_add_func(env, obj, ch, CHN_NetReadBuf, "read_buf");
- obj_add_func(env, obj, ch, CHN_NetGetError, "get_error");
- obj_add_func(env, obj, ch, CHN_NetSetPayload, "set_payload");
-
+ obj_add_func(env, obj, ch, CHN_NetGetErrorStr, "GetErrorStr");
+ obj_add_func(env, obj, ch, CHN_NetSetPayload, "SetPayload");
+ obj_add_func(env, obj, ch, CHN_NetClose, "Close");
ch_ptr = (int64_t) (intptr_t) ch;
obj["__ch"] = Napi::Number::New(env, ch_ptr);
obj.AddFinalizer(CHN_NetDestruct, ch);
@@ -480,8 +617,10 @@ static Napi::Object CHN_CreateNet(const Napi::CallbackInfo &info)
static Napi::Object CHN_GlobalInit(Napi::Env env, Napi::Object exports)
{
chnet_global_init();
- obj_add_func(env, exports, NULL, CHN_CreateRing, "create_ring");
- obj_add_func(env, exports, NULL, CHN_CreateNet, "create_net");
+ obj_add_func(env, exports, NULL, CHN_CreateRing, "CreateRing");
+ obj_add_func(env, exports, NULL, CHN_CreateNet, "CreateNet");
+ exports.Seal();
+ exports.Freeze();
return exports;
}
NODE_API_MODULE(chnet, CHN_GlobalInit);
diff --git a/chnet/chnet_ring.cc b/chnet/chnet_ring.cc
index a5f3094..b858421 100644
--- a/chnet/chnet_ring.cc
+++ b/chnet/chnet_ring.cc
@@ -8,15 +8,14 @@
#include <chrono>
using namespace std::chrono_literals;
-#include <unistd.h>
-CNRingCtx::CNRingCtx(uint32_t entry)
+CNRing::CNRing(uint32_t entry)
{
uint32_t sq_max;
uint32_t cq_max;
- if (entry < 16)
- entry = 16;
+ if (entry < 2)
+ entry = 2;
if (entry > 1048576)
entry = 1048576;
@@ -28,372 +27,353 @@ CNRingCtx::CNRingCtx(uint32_t entry)
sq_mask_ = sq_max - 1;
cq_mask_ = cq_max - 1;
-
- sqes_ = new CNRingSQE[sq_max + 1];
- cqes_ = new CNRingCQE[cq_max + 1];
- state_ = new CNRingState(sq_max, cq_max, 2);
-
- state_->cqe_to_wait_.store(0, std::memory_order_relaxed);
- state_->nr_post_cqe_wait_.store(0, std::memory_order_relaxed);
sq_head_.store(0, std::memory_order_relaxed);
sq_tail_.store(0, std::memory_order_relaxed);
cq_head_.store(0, std::memory_order_relaxed);
cq_tail_.store(0, std::memory_order_relaxed);
+ cq_max_to_wait_.store(0, std::memory_order_relaxed);
+ sqes_ = new CNRingSQE[sq_max];
+ cqes_ = new CNRingCQE[cq_max];
+ state_ = new CNRingState(4096, 8192, 1);
}
-CNRingCtx::~CNRingCtx(void)
+CNRing::~CNRing(void)
{
+ state_->stop_ = true;
+ NotifyFreeCQEWaiter(true);
delete state_;
delete[] sqes_;
delete[] cqes_;
}
-static inline uint32_t big_min_small(uint32_t a, uint32_t b)
-{
- if (a < b)
- return b - a;
- else
- return a - b;
-}
-
-uint32_t CNRingCtx::sqe_size(void)
-{
- return big_min_small(sq_head_.load(), sq_tail_.load());
-}
-
-uint32_t CNRingCtx::cqe_size(void)
-{
- return big_min_small(cq_head_.load(), cq_tail_.load());
-}
-
-CNRingSQE *CNRingCtx::GetSQE(void)
-{
- uint32_t head = sq_head_.load();
- uint32_t tail = sq_tail_.load();
- uint32_t mask = sq_mask_;
- CNRingSQE *sqe;
-
- if (unlikely(big_min_small(head, tail) > mask))
- return nullptr;
-
- sqe = &sqes_[tail & mask];
- sq_tail_++;
- return sqe;
-}
-
-CNRingCQE *CNRingCtx::GetCQENoTailIncrement(void)
- __must_hold(&state_->cqe_lock_)
+CNRingState::CNRingState(uint32_t nr_thread, uint32_t nr_jobs,
+ uint32_t nr_idle_thread):
+ wq_(nr_thread, nr_jobs, nr_idle_thread)
{
- uint32_t head = cq_head_.load();
- uint32_t tail = cq_tail_.load();
- uint32_t mask = cq_mask_;
-
- if (unlikely(big_min_small(head, tail) > mask))
- return nullptr;
-
- return &cqes_[tail & mask];
+ cqe_to_wait_.store(0, std::memory_order_relaxed);
+ nr_free_cqe_slot_waiter_.store(0, std::memory_order_relaxed);
+ stop_ = false;
+ wq_.run();
}
-void CNRingCtx::NotifyCQEWaiter(void)
+void CNRing::NotifyCQEWaiter(void)
__must_hold(&state_->cqe_lock_)
{
- uint32_t to_wait = state_->cqe_to_wait_.load();
+ uint32_t to_wait = state_->cqe_to_wait_.load(std::memory_order_acquire);
if (!to_wait)
return;
- if (to_wait > cqe_size())
+ if (to_wait > CQESize())
return;
- state_->cqe_cond_.notify_one();
+ state_->cqe_to_wait_cond_.notify_one();
}
-bool CNRingCtx::TryPostCQE(CNRingSQE *sqe, int64_t res)
+void CNRing::NotifyFreeCQEWaiter(bool force)
{
- CNRingCQE *cqe;
-
- state_->cqe_lock_.lock();
- cqe = GetCQENoTailIncrement();
- if (unlikely(!cqe)) {
- NotifyCQEWaiter();
- state_->cqe_lock_.unlock();
- return false;
+ uint32_t n;
+
+ n = state_->nr_free_cqe_slot_waiter_.load(std::memory_order_acquire);
+ if (unlikely(n > 0 || force)) {
+ std::condition_variable *cond;
+ std::mutex *mutex;
+
+ mutex = &state_->wait_for_a_free_cqe_slot_lock_;
+ cond = &state_->wait_for_a_free_cqe_slot_cond_;
+ mutex->lock();
+ if (n == 1)
+ cond->notify_one();
+ else
+ cond->notify_all();
+ mutex->unlock();
}
-
- cqe->op = sqe->op;
- cqe->user_data = sqe->user_data;
- cqe->res = res;
- cq_tail_++;
- NotifyCQEWaiter();
- state_->cqe_lock_.unlock();
- return true;
}
-void CNRingCtx::WaitForCQEFreeSlot(void)
+void CNRing::SleepToWaitForAFreeCQE(void)
{
- std::unique_lock<std::mutex> lock(state_->post_cqe_lock_);
+ std::unique_lock<std::mutex> lk(state_->wait_for_a_free_cqe_slot_lock_);
+ std::atomic<uint32_t> *nr_waiter = &state_->nr_free_cqe_slot_waiter_;
- state_->nr_post_cqe_wait_++;
- state_->post_cqe_cond_.wait(lock, [this]{
- bool ret;
+ (*nr_waiter)++;
+ state_->wait_for_a_free_cqe_slot_cond_.wait(lk, [this]{
+ uint32_t ret;
state_->cqe_lock_.lock();
- ret = !!GetCQENoTailIncrement();
+ ret = CQESize();
state_->cqe_lock_.unlock();
- return ret;
+ return ret < (cq_mask_ + 1) || state_->should_stop();
});
- state_->nr_post_cqe_wait_--;
+ (*nr_waiter)--;
}
-void CNRingCtx::PostCQE(CNRingSQE *sqe, int64_t res)
+void CNRing::PostCQE(const CNRingSQE *sqe, int64_t res)
{
while (1) {
+ if (unlikely(state_->should_stop()))
+ return;
+
if (TryPostCQE(sqe, res))
- break;
+ return;
- WaitForCQEFreeSlot();
+ SleepToWaitForAFreeCQE();
}
}
-void CNRingCtx::NotifyWaitCQEFreeSlot(void)
+bool CNRing::TryPostCQE(const CNRingSQE *sqe, int64_t res)
{
- uint32_t nr_wait = state_->nr_post_cqe_wait_.load();
+ bool ret = false;
+ CNRingCQE *cqe;
+ uint32_t mask;
- if (unlikely(nr_wait)) {
- state_->post_cqe_lock_.lock();
- if (nr_wait == 1)
- state_->post_cqe_cond_.notify_one();
- else
- state_->post_cqe_cond_.notify_all();
- state_->post_cqe_lock_.unlock();
+ state_->cqe_lock_.lock();
+ mask = cq_mask_;
+
+ if (likely(CQESize() < (mask + 1))) {
+ cqe = &cqes_[cq_tail_++ & mask];
+ ret = true;
+ cqe->op_ = sqe->op_;
+ cqe->res_ = res;
+ cqe->user_data_ = sqe->user_data_;
+ NotifyCQEWaiter();
}
-}
-void CNRingCtx::CQAdvance(uint32_t n)
-{
- state_->cqe_lock_.lock();
- cq_head_ += n;
state_->cqe_lock_.unlock();
- NotifyWaitCQEFreeSlot();
+ return ret;
}
-/*
- * Use this when the caller is not allowed to block.
- */
-void CNRingCtx::CallPostCQE(CNRingSQE *sqe, int64_t res)
+bool CNRing::PostCQENoSleep(const CNRingSQE *sqe, int64_t res)
{
- CNRingSQE *tmp;
-
if (likely(TryPostCQE(sqe, res)))
- return;
+ return true;
- /*
- * The CQE slot is full, but the caller is not allowed to
- * block, let's schedule the post in the workqueue thread.
- */
- tmp = new CNRingSQE;
- *tmp = *sqe;
- state_->wq_.schedule_work([this, res](struct wq_job_data *data){
- CNRingSQE *sqe = (CNRingSQE *)data->data;
-
- this->PostCQE(sqe, res);
- delete sqe;
- }, tmp);
+ auto func = [this, res, copy_sqe = *sqe](auto *x){
+ this->PostCQE(©_sqe, res);
+ };
+ return state_->wq_.try_schedule_work(std::move(func), nullptr) >= 0;
}
-void CNRingCtx::IssueNopSQE(CNRingSQE *sqe)
+bool CNRing::IssueSQENop(const CNRingSQE *sqe)
{
- CallPostCQE(sqe, 0);
+ return PostCQENoSleep(sqe, 0);
}
-static void issue_start_sqe(void *udata, net::URLRequest *url_req, int net_err)
+static inline void post_sqe_read(CNRing *ring, const CNRingSQE *sqe, int size)
{
- CNRingOpStart *sop;
- CNRingSQE *sqe;
-
- sqe = (CNRingSQE *)udata;
- sop = (CNRingOpStart *)sqe->sq_data;
- sop->ring->PostCQE(sqe, net_err);
- delete sop;
- delete sqe;
+ ring->PostCQE(sqe, size);
}
-void CNRingCtx::IssueStartSQE(CNRingSQE *sqe)
+static int issue_sqe_read(CNRing *ring, const CNRingSQE *sqe)
{
+ CHNet *ch = sqe->sq_start_.ch_;
struct net::CHNCallback *cb;
- CNRingOpStart *sop;
- CNRingSQE *tmp;
- int ret;
- tmp = new CNRingSQE;
- *tmp = *sqe;
+ auto f = [ring, sqe_cp = *sqe](void *u, net::URLRequest *ur, int size){
+ post_sqe_read(ring, &sqe_cp, size);
+ };
- sop = (CNRingOpStart *)tmp->sq_data;
- sop->ring = this;
-
- cb = &sop->chnet->ch()->cb;
- cb->response_started_ = issue_start_sqe;
- cb->response_started_data_ = tmp;
- ret = sop->chnet->Start();
- if (unlikely(ret)) {
- CallPostCQE(sqe, ret);
- delete tmp;
- }
-}
-
-static void issue_read_sqe(void *udata, net::URLRequest *url_req, int read_ret)
-{
- CNRingOpStart *sop;
- CNRingSQE *sqe;
-
- sqe = (CNRingSQE *)udata;
- sop = (CNRingOpStart *)sqe->sq_data;
- sop->ring->PostCQE(sqe, read_ret);
- delete sop;
- delete sqe;
+ cb = &ch->ch()->cb;
+ cb->read_completed_ = std::move(f);
+ return ch->Read(sqe->sq_read_.size_);
}
-static void issue_read_sqe_need_start(void *udata, net::URLRequest *url_req,
- int net_err)
+static void _issue_sqe_read_need_start(CNRing *ring, const CNRingSQE *sqe,
+ int net_err)
{
+ CHNet *ch = sqe->sq_start_.ch_;
struct net::CHNCallback *cb;
- CNRingOpRead *sop;
- CNRingSQE *sqe;
- sqe = (CNRingSQE *)udata;
- sop = (CNRingOpRead *)sqe->sq_data;
-
- if (likely(net_err == net::OK)) {
- /*
- * The start operation succeeds, now we can do
- * the read operation directly without issuing
- * extra SQE.
- */
- cb = &sop->chnet->ch()->cb;
- cb->read_completed_ = issue_read_sqe;
- cb->read_completed_data_ = sqe;
- sop->chnet->ch()->_Read(nullptr, sop->read_size);
+ if (unlikely(net_err != net::OK)) {
+ ring->PostCQE(sqe, net_err);
return;
}
+ auto f = [ring, sqe_cp = *sqe](void *u, net::URLRequest *ur, int size){
+ post_sqe_read(ring, &sqe_cp, size);
+ };
+
/*
- * The start operation fails, just post the CQE directly.
+ * The start operation succeeds, now we can do
+ * the read operation directly without issuing
+ * extra SQE.
*/
- sop->ring->PostCQE(sqe, net_err);
- delete sop;
- delete sqe;
+ cb = &ch->ch()->cb;
+ cb->read_completed_ = std::move(f);
+ ch->ch()->_Read(nullptr, sqe->sq_read_.size_);
}
-void CNRingCtx::IssueReadSQE(CNRingSQE *sqe)
+static int issue_sqe_read_need_start(CNRing *ring, const CNRingSQE *sqe)
{
+ CHNet *ch = sqe->sq_start_.ch_;
struct net::CHNCallback *cb;
- net::CHNetDelegate *ch;
- CNRingOpRead *sop;
- CNRingSQE *tmp;
- int ret;
- tmp = new CNRingSQE;
- *tmp = *sqe;
+ auto f = [ring, sqe_cp = *sqe](void *u, auto *ur, int net_err){
+ _issue_sqe_read_need_start(ring, &sqe_cp, net_err);
+ };
+
+ cb = &ch->ch()->cb;
+ cb->response_started_ = std::move(f);
+ return ch->Start();
+}
- sop = (CNRingOpRead *)tmp->sq_data;
- sop->ring = this;
+bool CNRing::IssueSQERead(const CNRingSQE *sqe)
+{
+ int ret;
- ch = sop->chnet->ch();
- cb = &ch->cb;
- if (unlikely(!ch->started())) {
+ if (!sqe->sq_start_.ch_->ch()->started()) {
/*
* The request hasn't been started, we can't perform
* a read operation at this point, do the start
* operation first!
*/
- cb->response_started_ = issue_read_sqe_need_start;
- cb->response_started_data_ = tmp;
- ret = sop->chnet->Start();
+ ret = issue_sqe_read_need_start(this, sqe);
} else {
/*
* Normal read operation here.
*/
- cb->read_completed_ = issue_read_sqe;
- cb->read_completed_data_ = tmp;
- ret = sop->chnet->Read(sop->read_size);
+ ret = issue_sqe_read(this, sqe);
}
if (likely(ret >= 0))
- return;
+ return true;
- /*
- * Aiee, the operation fails!
- */
- CallPostCQE(sqe, ret);
- delete tmp;
+ return PostCQENoSleep(sqe, ret);
+}
+
+bool CNRing::IssueSQEStart(const CNRingSQE *sqe)
+{
+ CHNet *ch = sqe->sq_start_.ch_;
+ struct net::CHNCallback *cb;
+ int ret;
+
+ auto f = [this, sqe_cp = *sqe](void *u, auto *ur, int net_err){
+ this->PostCQE(&sqe_cp, net_err);
+ };
+
+ cb = &ch->ch()->cb;
+ cb->response_started_ = std::move(f);
+ ret = ch->Start();
+
+ if (likely(ret >= 0))
+ return true;
+
+ return PostCQENoSleep(sqe, ret);
}
-void CNRingCtx::IssueSQE(CNRingSQE *sqe)
+void CNRing::IssueSQE(const CNRingSQE *sqe)
{
- switch (sqe->op) {
+ bool ok = false;
+
+ switch (sqe->op_) {
case CNRING_OP_NOP:
- IssueNopSQE(sqe);
- break;
- case CNRING_OP_START:
- IssueStartSQE(sqe);
+ ok = IssueSQENop(sqe);
break;
case CNRING_OP_READ:
- IssueReadSQE(sqe);
+ ok = IssueSQERead(sqe);
+ break;
+ case CNRING_OP_START:
+ ok = IssueSQEStart(sqe);
break;
}
+
+ if (likely(ok))
+ cq_max_to_wait_++;
}
-uint32_t CNRingCtx::SubmitSQE(uint32_t to_submit)
+uint32_t CNRing::Submit(uint32_t to_submit)
{
- uint32_t head = sq_head_.load();
- uint32_t tail = sq_tail_.load();
- uint32_t ret = 0;
+ uint32_t head;
+ uint32_t tail;
+ uint32_t mask;
+ uint32_t ret;
+
+ if (unlikely(!to_submit))
+ return 0;
+
+ state_->sqe_lock_.lock();
+ head = sq_head_.load(std::memory_order_acquire);
+ tail = sq_tail_.load(std::memory_order_acquire);
+ mask = sq_mask_;
+ ret = 0;
while (to_submit--) {
if (unlikely(head == tail))
break;
- IssueSQE(&sqes_[head++ & sq_mask_]);
+ IssueSQE(&sqes_[head++ & mask]);
ret++;
}
- if (ret)
- sq_head_.fetch_add(ret, std::memory_order_release);
+ sq_head_.store(head, std::memory_order_release);
+ state_->sqe_lock_.unlock();
+ return ret;
+}
+
+CNRingSQE *CNRing::GetSQE(void)
+{
+ CNRingSQE *sqe = nullptr;
+
+ state_->sqe_lock_.lock();
+ if (likely(SQESize() < (sq_mask_ + 1)))
+ sqe = &sqes_[sq_tail_++ & sq_mask_];
+
+ state_->sqe_lock_.unlock();
+ return sqe;
+}
+
+uint32_t CNRing::_WaitCQE(uint32_t to_wait, uint32_t timeout_ms,
+ std::unique_lock<std::mutex> &lock)
+ __must_hold(&state_->cqe_lock_)
+{
+ std::atomic<uint32_t> *cqe_to_wait = &state_->cqe_to_wait_;
+ std::condition_variable *cond = &state_->cqe_to_wait_cond_;
+ uint32_t ret;
+
+ cqe_to_wait->store(to_wait, std::memory_order_release);
+ auto callback = [this, to_wait, &ret]{
+ ret = CQESize();
+ return ret >= to_wait;
+ };
+
+ if (timeout_ms == -1U)
+ cond->wait(lock, std::move(callback));
+ else
+ cond->wait_for(lock, timeout_ms*1ms, std::move(callback));
+
+ cqe_to_wait->store(0, std::memory_order_release);
return ret;
}
-void CNRingCtx::WaitCQE(uint32_t to_wait)
+uint32_t CNRing::WaitCQE(uint32_t to_wait, uint32_t timeout_ms)
{
uint32_t max_to_wait;
- uint32_t cq_size;
+ uint32_t ret;
if (unlikely(!to_wait))
- return;
+ return 0;
+
+ max_to_wait = cq_max_to_wait_.load(std::memory_order_acquire);
+ if (unlikely(!max_to_wait))
+ return 0;
- max_to_wait = cq_mask_;
if (to_wait > max_to_wait)
to_wait = max_to_wait;
- std::unique_lock<std::mutex> cqe_lock(state_->cqe_lock_);
- cq_size = cqe_size();
-
- if (cq_size < cq_mask_ + 1)
- NotifyWaitCQEFreeSlot();
+ std::unique_lock<std::mutex> lock(state_->cqe_lock_);
- if (to_wait <= cq_size)
- return;
+ ret = CQESize();
+ if (unlikely(ret >= to_wait))
+ return ret;
- state_->cqe_to_wait_.store(to_wait);
- state_->cqe_cond_.wait(cqe_lock, [this, to_wait]{
- return to_wait <= cqe_size();
- });
- state_->cqe_to_wait_.store(0);
+ return _WaitCQE(to_wait, timeout_ms, lock);
}
-CNRingState::CNRingState(uint32_t nr_thread, uint32_t nr_jobs,
- uint32_t nr_idle_thread):
- wq_(nr_thread, nr_jobs, nr_idle_thread)
+void CNRing::CQAdvance(uint32_t n)
{
- wq_.run();
+ state_->cqe_lock_.lock();
+ cq_head_ += n;
+ cq_max_to_wait_ -= n;
+ state_->cqe_lock_.unlock();
+ NotifyFreeCQEWaiter();
}
diff --git a/chnet/chnet_ring.h b/chnet/chnet_ring.h
index f41d75b..0630050 100644
--- a/chnet/chnet_ring.h
+++ b/chnet/chnet_ring.h
@@ -8,169 +8,193 @@
#include "common.h"
#ifdef __FOR_CHROMIUM_INTERNAL
-#include <condition_variable>
-
/*
* Chromium internal includes.
*/
#include "base/component_export.h"
#include "WorkQueue.h"
+#include <condition_variable>
#define CHNET_EXPORT COMPONENT_EXPORT(CHNET)
+
#else /* #ifdef __FOR_CHROMIUM_INTERNAL */
+
#define CHNET_EXPORT
-#endif
+
+#endif /* #ifdef __FOR_CHROMIUM_INTERNAL */
+
enum CNRingOp {
CNRING_OP_NOP = 0,
- CNRING_OP_START = 1,
- CNRING_OP_READ = 2,
+ CNRING_OP_READ = 1,
+ CNRING_OP_START = 2,
+ CNRING_OP_WRITE = 3,
};
-class CNRingCtx;
-
-struct CHNET_EXPORT CNRingOpStart {
- CHNet *chnet;
- CNRingCtx *ring;
+struct CHNET_EXPORT CNRingOpRead {
+ CHNet *ch_;
+ uint32_t size_;
};
-struct CHNET_EXPORT CNRingOpRead {
- CHNet *chnet;
- CNRingCtx *ring;
- int read_size;
+struct CHNET_EXPORT CNRingOpStart {
+ CHNet *ch_;
};
class CHNET_EXPORT CNRingSQE {
public:
- uint8_t op;
- void *sq_data;
- uint64_t user_data;
+ uint8_t op_;
+ uint64_t user_data_;
+ union {
+ struct CNRingOpStart sq_start_;
+ struct CNRingOpRead sq_read_;
+ };
inline void PrepNop(void)
{
- op = CNRING_OP_NOP;
+ op_ = CNRING_OP_NOP;
}
- inline void PrepStart(CHNet *chnet)
+ inline void PrepRead(CHNet *ch, uint32_t size)
{
- CNRingOpStart *d = new CNRingOpStart;
-
- d->chnet = chnet;
- d->ring = nullptr;
- sq_data = (void *)d;
- op = CNRING_OP_START;
+ op_ = CNRING_OP_READ;
+ sq_read_.ch_ = ch;
+ sq_read_.size_ = size;
}
- inline void PrepRead(CHNet *chnet, int read_size)
+ inline void PrepStart(CHNet *ch)
{
- CNRingOpRead *d = new CNRingOpRead;
-
- d->chnet = chnet;
- d->read_size = read_size;
- sq_data = (void *)d;
- op = CNRING_OP_READ;
+ op_ = CNRING_OP_START;
+ sq_start_.ch_ = ch;
}
- inline void SetUserDataPtr(void *ptr)
+ inline void SetUserData(uint64_t data)
{
- user_data = (uint64_t) ptr;
+ user_data_ = data;
}
- inline void SetUserData(uint64_t ptr)
+ inline void SetUserDataPtr(void *data)
{
- user_data = ptr;
+ user_data_ = (uint64_t) (uintptr_t) data;
}
};
class CHNET_EXPORT CNRingCQE {
public:
- uint8_t op;
- uint64_t user_data;
- union {
- int64_t res;
- };
+ uint8_t op_;
+ int64_t res_;
+ uint64_t user_data_;
inline uint64_t GetUserData(void)
{
- return user_data;
+ return user_data_;
}
inline void *GetUserDataPtr(void)
{
- return (void *) (uintptr_t) user_data;
+ return (void *) (uintptr_t) user_data_;
}
};
#ifdef __FOR_CHROMIUM_INTERNAL
class CNRingState {
public:
- CNRingState(uint32_t nr_thread = 64, uint32_t nr_jobs = 4096,
- uint32_t nr_idle_thread = -1U);
- WorkQueue wq_;
- std::mutex cqe_lock_;
- std::condition_variable cqe_cond_;
- std::atomic<uint32_t> cqe_to_wait_;
-
- std::mutex post_cqe_lock_;
- std::condition_variable post_cqe_cond_;
- std::atomic<uint32_t> nr_post_cqe_wait_;
+ CNRingState(uint32_t nr_thread, uint32_t nr_jobs,
+ uint32_t nr_idle_thread);
+
+ std::atomic<uint32_t> cqe_to_wait_;
+ std::atomic<uint32_t> nr_free_cqe_slot_waiter_;
+
+ std::condition_variable cqe_to_wait_cond_;
+ std::condition_variable wait_for_a_free_cqe_slot_cond_;
+
+ std::mutex wait_for_a_free_cqe_slot_lock_;
+ std::mutex sqe_lock_;
+ std::mutex cqe_lock_;
+ WorkQueue wq_;
+ volatile bool stop_;
+
+ inline bool should_stop(void)
+ {
+ return stop_ || wq_.should_stop();
+ }
};
-#endif
+#endif /* #ifdef __FOR_CHROMIUM_INTERNAL */
-class CHNET_EXPORT CNRingCtx {
+
+static inline uint32_t u32_diff(uint32_t a, uint32_t b)
+{
+ if (b > a)
+ return b - a;
+ else
+ return a - b;
+}
+
+class CHNET_EXPORT CNRing {
public:
- CNRingCtx(uint32_t entry);
- ~CNRingCtx(void);
- uint32_t SubmitSQE(uint32_t to_submit = -1U);
+ CNRing(uint32_t entry);
+ ~CNRing(void);
+ uint32_t Submit(uint32_t to_submit = -1U);
CNRingSQE *GetSQE(void);
- void WaitCQE(uint32_t to_wait);
- uint32_t sqe_size(void);
- uint32_t cqe_size(void);
-
- void PostCQE(CNRingSQE *sqe, int64_t res);
- void CallPostCQE(CNRingSQE *sqe, int64_t res);
+ uint32_t WaitCQE(uint32_t to_wait = 1, uint32_t timeout_ms = -1U);
void CQAdvance(uint32_t n);
+ bool TryPostCQE(const CNRingSQE *sqe, int64_t res);
+ void PostCQE(const CNRingSQE *sqe, int64_t res);
+ bool PostCQENoSleep(const CNRingSQE *sqe, int64_t res);
- inline CNRingCQE *HeadCQE(void)
+ inline CNRingCQE *GetCQEHead(void)
{
- return &cqes_[cq_head_.load()];
+ return &cqes_[cq_head_.load(std::memory_order_acquire) &
+ cq_mask_];
}
+ CNRingSQE *sqes_;
+ CNRingCQE *cqes_;
+
#ifdef __FOR_CHROMIUM_INTERNAL
- CNRingState *state_;
+ CNRingState *state_;
#else
- void *state_;
+ void *state_;
#endif
- CNRingSQE *sqes_;
- CNRingCQE *cqes_;
-
- uint32_t sq_mask_;
- std::atomic<uint32_t> sq_head_;
std::atomic<uint32_t> sq_tail_;
+ std::atomic<uint32_t> sq_head_;
+ uint32_t sq_mask_;
- uint32_t cq_mask_;
- std::atomic<uint32_t> cq_head_;
std::atomic<uint32_t> cq_tail_;
+ std::atomic<uint32_t> cq_head_;
+ uint32_t cq_mask_;
+ std::atomic<uint32_t> cq_max_to_wait_;
private:
- void HandleSQE(CNRingSQE *sqe);
- CNRingSQE *ConsumeSQE(void);
- bool TryPostCQE(CNRingSQE *sqe, int64_t res);
- CNRingCQE *GetCQENoTailIncrement(void);
+ uint32_t _WaitCQE(uint32_t to_wait, uint32_t timeout_ms,
+ std::unique_lock<std::mutex> &lock);
+ void IssueSQE(const CNRingSQE *sqe);
+ bool IssueSQENop(const CNRingSQE *sqe);
+ bool IssueSQERead(const CNRingSQE *sqe);
+ bool IssueSQEStart(const CNRingSQE *sqe);
void NotifyCQEWaiter(void);
- void NotifyWaitCQEFreeSlot(void);
- void WaitForCQEFreeSlot(void);
+ void NotifyFreeCQEWaiter(bool force = false);
+ void SleepToWaitForAFreeCQE(void);
+
+ int _IssueSQEReadNeedStart(const CNRingSQE *sqe);
- void IssueSQE(CNRingSQE *sqe);
- void IssueNopSQE(CNRingSQE *sqe);
- void IssueStartSQE(CNRingSQE *sqe);
- void IssueReadSQE(CNRingSQE *sqe);
+ inline uint32_t CQESize(void)
+ {
+ return u32_diff(cq_head_.load(std::memory_order_acquire),
+ cq_tail_.load(std::memory_order_acquire));
+ }
+
+ inline uint32_t SQESize(void)
+ {
+ return u32_diff(sq_head_.load(std::memory_order_acquire),
+ sq_tail_.load(std::memory_order_acquire));
+ }
};
-#define chnring_for_each_cqe(ring, head, cqe) \
+#define cnring_for_each_cqe(ring, head, tail, cqe) \
for ( \
- head = (ring)->cq_head_; \
- (cqe = (head != (ring)->cq_tail_) ? \
+ head = (ring)->cq_head_.load(std::memory_order_acquire),\
+ tail = (ring)->cq_tail_.load(std::memory_order_acquire);\
+ (cqe = (head != tail) ? \
&(ring)->cqes_[head & (ring)->cq_mask_] : \
NULL); \
head++ \
diff --git a/tests/cpp/ring.cc b/tests/cpp/ring.cc
index 54426ef..22da7ea 100644
--- a/tests/cpp/ring.cc
+++ b/tests/cpp/ring.cc
@@ -6,231 +6,246 @@
static void test_nop(void)
{
- /*
- * Simple nop test.
- *
- * This shows that CQE size is double of SQE size.
- */
-
- static constexpr uint32_t entry = 4096;
- unsigned head, i, j, a, b;
- CNRingCtx ring(entry);
+ constexpr static uint32_t nr_loop = 1024 * 8;
+ constexpr static uint32_t entry = 4096;
+ CNRing ring(entry);
CNRingSQE *sqe;
CNRingCQE *cqe;
+ uint32_t total;
+ uint32_t head;
+ uint32_t tail;
+ uint32_t ret;
+ uint32_t i;
- i = 0;
- for (j = 0; j < 3; j++) {
- while (1) {
+ for (i = 0; i < nr_loop; i++) {
+ sqe = ring.GetSQE();
+ if (!sqe) {
+ assert(ring.Submit() == entry);
sqe = ring.GetSQE();
- if (!sqe)
- break;
- sqe->PrepNop();
- sqe->SetUserData(i++);
+ assert(sqe);
}
- ring.SubmitSQE();
+ sqe->PrepNop();
+ sqe->SetUserData(1);
}
- ring.WaitCQE(entry * 2);
-
- a = i;
- i = 0;
- chnring_for_each_cqe(&ring, head, cqe)
- i++;
- b = i;
- ring.CQAdvance(i);
- assert(b == entry * 2);
-
- ring.WaitCQE(entry);
- i = 0;
- chnring_for_each_cqe(&ring, head, cqe)
- i++;
- ring.CQAdvance(i);
- b += i;
- assert(a == b);
+ assert(ring.Submit() == entry);
+
+ total = 0;
+ while (total < nr_loop) {
+ ret = ring.WaitCQE(1);
+ i = 0;
+ cnring_for_each_cqe(&ring, head, tail, cqe) {
+ assert(cqe->op_ == CNRING_OP_NOP);
+ assert(cqe->res_ == 0);
+ assert(cqe->user_data_ == 1);
+ i++;
+ }
+ ring.CQAdvance(i);
+ assert(ret >= 1);
+ assert(i >= ret);
+ total += ret;
+ }
+ assert(total == nr_loop);
}
-static void test_chnet_ring_single(void)
+static void test_chnet_ring_read(bool do_sq_start = false)
{
- CNRingCtx ring(4096);
+ CNRing ring(1);
CNRingSQE *sqe;
CNRingCQE *cqe;
+ uint32_t ret;
CHNet *ch;
ch = new CHNet;
ch->SetURL("http://127.0.0.1:8000/index.php?action=hello");
+ ch->SetMethod("GET");
- sqe = ring.GetSQE();
- assert(sqe);
- sqe->PrepStart(ch);
- sqe->SetUserData(9999);
- assert(ring.SubmitSQE() == 1);
-
- ring.WaitCQE(1);
- cqe = ring.HeadCQE();
- assert(cqe->op == CNRING_OP_START);
- assert(cqe->res == 0);
- assert(cqe->user_data == 9999);
- ring.CQAdvance(1);
+ if (do_sq_start) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe->PrepStart(ch);
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+ assert(ret == 1);
+ assert(cqe->op_ == CNRING_OP_START);
+ assert(cqe->res_ == 0);
+ ring.CQAdvance(1);
+ }
sqe = ring.GetSQE();
- assert(sqe);
sqe->PrepRead(ch, 1024);
- sqe->SetUserData(9999);
- assert(ring.SubmitSQE() == 1);
-
- ring.WaitCQE(1);
- cqe = ring.HeadCQE();
- assert(cqe->op == CNRING_OP_READ);
- assert(cqe->res == 13);
- assert(cqe->user_data == 9999);
+ ring.Submit();
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+
+ assert(ret == 1);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(cqe->res_ == 13);
+ assert(ch->read_ret() == cqe->res_);
+ assert(!strncmp("Hello World!\n", ch->read_buf(), cqe->res_));
ring.CQAdvance(1);
delete ch;
}
-static void test_chnet_ring_set_header(void)
+static void test_chnet_ring_partial_read(bool do_sq_start = false)
{
- constexpr static const char ua[] = "This is just a test user agent!!!";
- CNRingCtx ring(4096);
+ CNRing ring(1);
CNRingSQE *sqe;
CNRingCQE *cqe;
+ uint32_t ret;
CHNet *ch;
ch = new CHNet;
- ch->SetURL("http://127.0.0.1:8000/index.php?action=user_agent");
- ch->SetRequestHeader("User-agent", ua);
+ ch->SetURL("http://127.0.0.1:8000/index.php?action=hello");
+ ch->SetMethod("GET");
+ if (do_sq_start) {
+ sqe = ring.GetSQE();
+ sqe->PrepStart(ch);
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+ assert(ret == 1);
+ assert(cqe->op_ == CNRING_OP_START);
+ assert(cqe->res_ == 0);
+ ring.CQAdvance(1);
+ }
+
+ /*
+ * The first read.
+ */
sqe = ring.GetSQE();
- assert(sqe);
- sqe->PrepStart(ch);
- sqe->SetUserData(9999);
- assert(ring.SubmitSQE() == 1);
-
- ring.WaitCQE(1);
- cqe = ring.HeadCQE();
- assert(cqe->op == CNRING_OP_START);
- assert(cqe->res == 0);
- assert(cqe->user_data == 9999);
+ sqe->PrepRead(ch, 4);
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+
+ assert(ret == 1);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(cqe->res_ == 4);
+ assert(ch->read_ret() == cqe->res_);
+ assert(!strncmp("Hell", ch->read_buf(), cqe->res_));
ring.CQAdvance(1);
+
+ /*
+ * The second read.
+ */
sqe = ring.GetSQE();
- assert(sqe);
- sqe->PrepRead(ch, 1024);
- sqe->SetUserData(9999);
- assert(ring.SubmitSQE() == 1);
-
- ring.WaitCQE(1);
- cqe = ring.HeadCQE();
- assert(cqe->op == CNRING_OP_READ);
- assert(cqe->res == strlen(ua));
- assert(cqe->user_data == 9999);
- assert(!strncmp(ua, ch->read_buf(), strlen(ua)));
- ring.CQAdvance(1);
+ sqe->PrepRead(ch, 4);
+ assert(ring.Submit() == 1);
- delete ch;
-}
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
-static void test_chnet_ring_single_post(void)
-{
- constexpr static const char buf[] = "AAAA Hello World!\n";
- CNRingCtx ring(4096);
- CNRingSQE *sqe;
- CNRingCQE *cqe;
- CHNet *ch;
+ assert(ret == 1);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(cqe->res_ == 4);
+ assert(ch->read_ret() == cqe->res_);
+ assert(!strncmp("o Wo", ch->read_buf(), cqe->res_));
+ ring.CQAdvance(1);
- ch = new CHNet;
- ch->SetURL("http://127.0.0.1:8000/index.php?action=body");
- ch->SetMethod("POST");
- ch->SetPayload(buf, sizeof(buf));
+ /*
+ * The third read.
+ */
sqe = ring.GetSQE();
- assert(sqe);
- sqe->PrepStart(ch);
- sqe->SetUserData(9999);
- assert(ring.SubmitSQE() == 1);
-
- ring.WaitCQE(1);
- cqe = ring.HeadCQE();
- assert(cqe->op == CNRING_OP_START);
- assert(cqe->res == 0);
- assert(cqe->user_data == 9999);
+ sqe->PrepRead(ch, 1024);
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+
+ assert(ret == 1);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(cqe->res_ == 5);
+ assert(ch->read_ret() == cqe->res_);
+ assert(!strncmp("rld!\n", ch->read_buf(), cqe->res_));
ring.CQAdvance(1);
+
+ /*
+ * The 4-th read.
+ */
sqe = ring.GetSQE();
- assert(sqe);
sqe->PrepRead(ch, 1024);
- sqe->SetUserData(9999);
- assert(ring.SubmitSQE() == 1);
-
- ring.WaitCQE(1);
- cqe = ring.HeadCQE();
- assert(cqe->op == CNRING_OP_READ);
- assert(cqe->res == sizeof(buf));
- assert(cqe->user_data == 9999);
- assert(!strncmp(ch->read_buf(), buf, sizeof(buf)));
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+
+ assert(ret == 1);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(cqe->res_ == 0);
+ assert(ch->read_ret() == cqe->res_);
ring.CQAdvance(1);
delete ch;
}
-#define NR_REQ 1024
-
-static void _test_chnet_ring_multiple(bool start_before_read, CNRingCtx *ring)
+#define NR_REQ 4096
+static void test_chnet_ring_read_parallel(bool do_sq_start = false)
{
- unsigned i, j, head;
+ CNRing ring(NR_REQ);
CNRingSQE *sqe;
CNRingCQE *cqe;
+ uint32_t head;
+ uint32_t tail;
+ uint32_t i;
CHNet **ch;
- ch = new CHNet*[NR_REQ];
+ ch = new CHNet *[NR_REQ];
for (i = 0; i < NR_REQ; i++) {
ch[i] = new CHNet;
ch[i]->SetURL("http://127.0.0.1:8000/index.php?action=hello");
}
- if (start_before_read) {
+ if (do_sq_start) {
for (i = 0; i < NR_REQ; i++) {
- sqe = ring->GetSQE();
+ sqe = ring.GetSQE();
assert(sqe);
sqe->PrepStart(ch[i]);
sqe->SetUserDataPtr(ch[i]);
}
- assert(ring->SubmitSQE() == i);
- ring->WaitCQE(i);
-
- j = 0;
- chnring_for_each_cqe(ring, head, cqe) {
- assert(cqe->op == CNRING_OP_START);
- assert(!cqe->res);
- j++;
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+ cnring_for_each_cqe(&ring, head, tail, cqe) {
+ CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
+
+ assert(ch);
+ assert(cqe->res_ == 0);
+ assert(cqe->op_ == CNRING_OP_START);
}
- assert(j == i);
- ring->CQAdvance(j);
+ ring.CQAdvance(NR_REQ);
}
+
for (i = 0; i < NR_REQ; i++) {
- sqe = ring->GetSQE();
+ sqe = ring.GetSQE();
assert(sqe);
sqe->PrepRead(ch[i], 1024);
sqe->SetUserDataPtr(ch[i]);
}
- assert(ring->SubmitSQE() == i);
- ring->WaitCQE(i);
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
- j = 0;
- chnring_for_each_cqe(ring, head, cqe) {
- CHNet *ch;
+ cnring_for_each_cqe(&ring, head, tail, cqe) {
+ CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
- ch = (CHNet *)cqe->GetUserDataPtr();
assert(ch);
- assert(cqe->op == CNRING_OP_READ);
- assert(cqe->res == 13);
- assert(ch->read_ret() == cqe->res);
- assert(!strncmp(ch->read_buf(), "Hello World!\n", 13));
- j++;
+ assert(cqe->res_ == 13);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(ch->read_ret() == cqe->res_);
+ assert(!strncmp("Hello World!\n", ch->read_buf(), cqe->res_));
}
- assert(j == i);
- ring->CQAdvance(j);
+ ring.CQAdvance(NR_REQ);
for (i = 0; i < NR_REQ; i++)
delete ch[i];
@@ -238,82 +253,157 @@ static void _test_chnet_ring_multiple(bool start_before_read, CNRingCtx *ring)
delete[] ch;
}
-static void test_chnet_ring_multiple(bool start_before_read)
-{
- CNRingCtx ring(NR_REQ);
- unsigned i;
-
- for (i = 0; i < 3; i++)
- _test_chnet_ring_multiple(start_before_read, &ring);
-}
-
-static void test_chnet_chunked_body(void)
+static void test_chnet_ring_partial_read_parallel(bool do_sq_start = false)
{
- constexpr size_t len = 1024 * 64;
- constexpr size_t nr_loop = 100;
- size_t total_read = 0;
- CNRingCtx ring(4096);
+ CNRing ring(NR_REQ);
CNRingSQE *sqe;
CNRingCQE *cqe;
- CHNet *ch;
- char *buf;
- size_t i;
+ uint32_t head;
+ uint32_t tail;
+ uint32_t i;
+ CHNet **ch;
- ch = new CHNet;
- ch->SetURL("http://127.0.0.1:8000/index.php?action=body");
- ch->SetMethod("POST");
- ch->StartChunkedBody();
+ ch = new CHNet *[NR_REQ];
- sqe = ring.GetSQE();
- assert(sqe);
- sqe->PrepRead(ch, len);
- sqe->SetUserData(9999);
- assert(ring.SubmitSQE() == 1);
-
- buf = new char[len];
- memset(buf, 'A', len);
-
- for (i = 0; i < nr_loop; i++)
- ch->Write(buf, len);
- ch->StopChunkedBody();
-
- while (total_read < (len * nr_loop)) {
- int res;
-
- ring.WaitCQE(1);
- cqe = ring.HeadCQE();
- assert(cqe->op == CNRING_OP_READ);
- assert(cqe->user_data == 9999);
- res = cqe->res;
- total_read += res;
- ring.CQAdvance(1);
+ for (i = 0; i < NR_REQ; i++) {
+ ch[i] = new CHNet;
+ ch[i]->SetURL("http://127.0.0.1:8000/index.php?action=hello");
+ ch[i]->SetMethod("GET");
+ }
- if (!res)
- break;
+ if (do_sq_start) {
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe->PrepStart(ch[i]);
+ sqe->SetUserDataPtr(ch[i]);
+ }
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+ cnring_for_each_cqe(&ring, head, tail, cqe) {
+ CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
+ assert(ch);
+ assert(cqe->res_ == 0);
+ assert(cqe->op_ == CNRING_OP_START);
+ }
+ ring.CQAdvance(NR_REQ);
+ }
+
+
+ /*
+ * The first read.
+ */
+ for (i = 0; i < NR_REQ; i++) {
sqe = ring.GetSQE();
assert(sqe);
- sqe->PrepRead(ch, len);
- sqe->SetUserData(9999);
- assert(ring.SubmitSQE() == 1);
+ sqe->PrepRead(ch[i], 4);
+ sqe->SetUserDataPtr(ch[i]);
}
- printf("total_read = %zu %zu\n", total_read, (len * nr_loop));
- assert(total_read == (len * nr_loop));
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
- delete[] buf;
- delete ch;
+ cnring_for_each_cqe(&ring, head, tail, cqe) {
+ CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
+
+ assert(ch);
+ assert(cqe->res_ == 4);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(ch->read_ret() == cqe->res_);
+ assert(!strncmp("Hell", ch->read_buf(), cqe->res_));
+ }
+ ring.CQAdvance(NR_REQ);
+
+
+ /*
+ * The second read.
+ */
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe->PrepRead(ch[i], 4);
+ sqe->SetUserDataPtr(ch[i]);
+ }
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+ cnring_for_each_cqe(&ring, head, tail, cqe) {
+ CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
+
+ assert(ch);
+ assert(cqe->res_ == 4);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(ch->read_ret() == cqe->res_);
+ assert(!strncmp("o Wo", ch->read_buf(), cqe->res_));
+ }
+ ring.CQAdvance(NR_REQ);
+
+
+ /*
+ * The third read.
+ */
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe->PrepRead(ch[i], 100);
+ sqe->SetUserDataPtr(ch[i]);
+ }
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+ cnring_for_each_cqe(&ring, head, tail, cqe) {
+ CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
+
+ assert(ch);
+ assert(cqe->res_ == 5);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(ch->read_ret() == cqe->res_);
+ assert(!strncmp("rld!\n", ch->read_buf(), cqe->res_));
+ }
+ ring.CQAdvance(NR_REQ);
+
+
+ /*
+ * The 4-th read.
+ */
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe->PrepRead(ch[i], 100);
+ sqe->SetUserDataPtr(ch[i]);
+ }
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+ cnring_for_each_cqe(&ring, head, tail, cqe) {
+ CHNet *ch = (CHNet *) cqe->GetUserDataPtr();
+
+ assert(ch);
+ assert(cqe->res_ == 0);
+ assert(cqe->op_ == CNRING_OP_READ);
+ assert(ch->read_ret() == 0);
+ }
+ ring.CQAdvance(NR_REQ);
+
+ for (i = 0; i < NR_REQ; i++)
+ delete ch[i];
+
+ delete[] ch;
}
int main(void)
{
+ int i;
+
test_nop();
chnet_global_init();
- // test_chnet_ring_single();
- // test_chnet_ring_set_header();
- // test_chnet_ring_single_post();
- // test_chnet_ring_multiple(false);
- // test_chnet_ring_multiple(true);
- test_chnet_chunked_body();
+ for (i = 0; i < 2; i++) {
+ test_chnet_ring_read(!!i);
+ test_chnet_ring_partial_read(!!i);
+ test_chnet_ring_read_parallel(!!i);
+ test_chnet_ring_partial_read_parallel(!!i);
+ }
chnet_global_stop();
return 0;
}
diff --git a/tests/js/ring.js b/tests/js/ring.js
index 372c6e6..2f3fe1c 100644
--- a/tests/js/ring.js
+++ b/tests/js/ring.js
@@ -3,263 +3,389 @@ const chnet = require("bindings")("chnet");
function assert(cond)
{
- if (!cond) {
- console.log("Assertion failed!");
- process.exit(1);
- }
+ if (!cond)
+ throw new Error("Assertion failed!");
}
function test_nop()
{
- const entry = 16;
- let ring;
- let cqe;
+ const nr_loop = 1024 * 8;
+ const entry = 4096;
+ let ring = chnet.CreateRing(entry);
let sqe;
- let a, b, i, j;
-
- i = 0;
- ring = chnet.create_ring(entry);
- for (j = 0; j < 3; j++) {
- while (1) {
- sqe = ring.get_sqe();
- if (!sqe)
- break;
- sqe.prep_nop();
- i++;
+ let cqe;
+ let total;
+ let ret;
+ let i;
+
+ for (i = 0; i < nr_loop; i++) {
+ sqe = ring.GetSQE();
+ if (!sqe) {
+ assert(ring.Submit() == entry);
+ sqe = ring.GetSQE();
+ assert(sqe);
}
- ring.submit_sqe();
+ sqe.PrepNop();
+ sqe.SetUserData(1);
+ }
+ assert(ring.Submit() == entry);
+ assert(ring.WaitCQE(1));
+
+ total = 0;
+ while (total < nr_loop) {
+ ret = ring.WaitCQE(1);
+ i = ring.ForEachCQE(function (cqe) {
+ assert(!cqe.res);
+ assert(cqe.user_data == 1);
+ });
+ ring.CQAdvance(i);
+ assert(ret >= 1);
+ assert(i >= ret);
+ total += ret;
}
- ring.wait_cqe(entry * 2);
-
- a = i;
- i = ring.for_each_cqe(function (cqe) { });
- b = i;
- ring.cq_advance(i);
- assert(b == entry * 2);
-
- ring.wait_cqe(entry);
- i = ring.for_each_cqe(function (cqe) { });
- b += i;
- ring.cq_advance(i);
- assert(a == b);
+ assert(total == nr_loop);
+ ring.Close();
}
-function test_chnet_ring_multiple()
+function test_chnet_ring_read(do_sq_start = false)
{
- const NR_REQ = 10;
- let ring;
+ let ring = chnet.CreateRing(1);
let sqe;
let cqe;
+ let ret;
let ch;
- let i;
- ring = chnet.create_ring(NR_REQ);
- ch = [];
+ ch = chnet.CreateNet();
+ ch.SetURL("http://127.0.0.1:8000/index.php?action=hello");
+ ch.SetMethod("GET");
- for (i = 0; i < NR_REQ; i++) {
- ch[i] = chnet.create_net();
- ch[i].set_url("http://127.0.0.1:8000/index.php?action=hello");
- }
+ if (do_sq_start) {
+ sqe = ring.GetSQE();
+ sqe.PrepStart(ch);
+ assert(ring.Submit() == 1);
- for (i = 0; i < NR_REQ; i++) {
- sqe = ring.get_sqe();
- assert(sqe);
- sqe.prep_start(ch[i]);
- sqe.set_user_data(ch[i]);
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+ assert(ret == 1);
+ assert(cqe.res == 0);
+ ring.CQAdvance(1);
}
- assert(ring.submit_sqe() == i);
- ring.wait_cqe(i);
- j = ring.for_each_cqe(function (cqe) {
- assert(cqe.res == 0);
- });
- assert(j == i);
- ring.cq_advance(j);
+ sqe = ring.GetSQE();
+ sqe.PrepRead(ch, 1024);
+ assert(ring.Submit() == 1);
- for (i = 0; i < NR_REQ; i++) {
- sqe = ring.get_sqe();
- assert(sqe);
- sqe.prep_read(ch[i], 1024);
- sqe.set_user_data(ch[i]);
- }
- assert(ring.submit_sqe() == i);
- ring.wait_cqe(i);
- j = ring.for_each_cqe(function (cqe) {
- let ch = cqe.user_data;
- assert(cqe.res == 13);
- assert(ch.read_buf() === "Hello World!\n");
- });
- assert(j == i);
- ring.cq_advance(j);
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+
+ assert(ret == 1);
+ assert(cqe.res == 13);
+ assert(ch.read_ret() == cqe.res);
+ assert("Hello World!\n" == ch.read_buf());
+ ring.CQAdvance(1);
+ ch.Close();
+ ring.Close();
}
-class SimpleHttp {
- constructor(url, method = "GET") {
- this.ring = chnet.create_ring(1);
- this.ch = chnet.create_net();
- this.ch.set_url(url);
- this.ch.set_method(method);
- this.res = -1;
- }
+function test_chnet_ring_partial_read(do_sq_start = false)
+{
+ let ring = chnet.CreateRing(1);
+ let sqe;
+ let cqe;
+ let ret;
+ let ch;
- prep_read(read_size) {
- let sqe = this.ring.get_sqe();
- sqe.prep_read(this.ch, read_size);
- sqe.set_user_data(this);
+ ch = chnet.CreateNet();
+ ch.SetURL("http://127.0.0.1:8000/index.php?action=hello");
+ ch.SetMethod("GET");
+
+ if (do_sq_start) {
+ sqe = ring.GetSQE();
+ sqe.PrepStart(ch);
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+ assert(ret == 1);
+ assert(cqe.res == 0);
+ ring.CQAdvance(1);
}
- set_payload(payload) {
- this.ch.set_payload(payload);
+ /*
+ * The first read.
+ */
+ sqe = ring.GetSQE();
+ sqe.PrepRead(ch, 4);
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+
+ assert(ret == 1);
+ assert(cqe.res == 4);
+ assert(ch.read_ret() == cqe.res);
+ assert("Hell" == ch.read_buf());
+ ring.CQAdvance(1);
+
+
+ /*
+ * The second read.
+ */
+ sqe = ring.GetSQE();
+ sqe.PrepRead(ch, 4);
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+
+ assert(ret == 1);
+ assert(cqe.res == 4);
+ assert(ch.read_ret() == cqe.res);
+ assert("o Wo" == ch.read_buf());
+ ring.CQAdvance(1);
+
+
+ /*
+ * The third read.
+ */
+ sqe = ring.GetSQE();
+ sqe.PrepRead(ch, 1024);
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+
+ assert(ret == 1);
+ assert(cqe.res == 5);
+ assert(ch.read_ret() == cqe.res);
+ assert("rld!\n" == ch.read_buf());
+ ring.CQAdvance(1);
+
+
+ /*
+ * The 4-th read.
+ */
+ sqe = ring.GetSQE();
+ sqe.PrepRead(ch, 4);
+ assert(ring.Submit() == 1);
+
+ ret = ring.WaitCQE(1);
+ cqe = ring.GetCQEHead();
+
+ assert(ret == 1);
+ assert(cqe.res == 0);
+ assert(ch.read_ret() == cqe.res);
+ assert(!ch.read_buf());
+ ring.CQAdvance(1);
+ ch.Close();
+ ring.Close();
+}
+
+const NR_REQ = 4096;
+function test_chnet_ring_read_parallel(do_sq_start = false)
+{
+ let ring = chnet.CreateRing(NR_REQ);
+ let sqe;
+ let cqe;
+ let i;
+ let ch_arr;
+
+ ch_arr = [];
+
+ for (i = 0; i < NR_REQ; i++) {
+ ch_arr[i] = chnet.CreateNet();
+ ch_arr[i].SetURL("http://127.0.0.1:8000/index.php?action=hello");
+ ch_arr[i].SetMethod("GET");
}
- run() {
- this.ring.submit_sqe();
- this.ring.wait_cqe(1);
- this.ring.for_each_cqe(function (cqe) {
- let that = cqe.user_data;
- that.res = cqe.res;
- if (cqe.res < 0)
- console.log("Error: "+that.ch.get_error());
+ if (do_sq_start) {
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe.PrepStart(ch_arr[i]);
+ sqe.SetUserData(ch_arr[i]);
+ }
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+ ring.ForEachCQE(function (cqe) {
+ let ch = cqe.user_data;
+
+ assert(ch);
+ assert(cqe.res == 0);
});
- this.ring.cq_advance(1);
+ ring.CQAdvance(NR_REQ);
}
- get_buffer() {
- return this.ch.read_buf();
+
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe.PrepRead(ch_arr[i], 1024);
+ sqe.SetUserData(ch_arr[i]);
}
-}
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
-function test_simple_http()
-{
- const payload = "Hello World! AAAAAAAAAAAAAAAAAAAAAAAAA\n";
- let h = new SimpleHttp("http://127.0.0.1:8000/index.php?action=body", "POST");
- h.set_payload(payload);
- h.prep_read(1024);
- h.run();
- assert(h.get_buffer() === payload);
- assert(h.ch.read_ret() === payload.length);
-}
+ ring.ForEachCQE(function (cqe) {
+ let ch = cqe.user_data;
-function test_simple_http_set_header()
-{
- const ua = "This is just a test user agent!";
- let h = new SimpleHttp("http://127.0.0.1:8000/index.php?action=user_agent", "GET");
- h.ch.set_request_header("User-Agent", ua);
- h.prep_read(1024);
- h.run();
- assert(h.get_buffer() === ua);
- assert(h.ch.read_ret() === ua.length);
+ assert(ch);
+ assert(cqe.res == 13);
+ assert(ch.read_ret() == cqe.res);
+ assert("Hello World!\n" == ch.read_buf());
+ });
+ ring.CQAdvance(NR_REQ);
+
+ for (i = 0; i < NR_REQ; i++)
+ ch_arr[i].Close();
+
+ ring.Close();
}
-function test_simple_http_post_set_header()
+function test_chnet_ring_partial_read_parallel(do_sq_start = false)
{
- const ss = "This is just a test string!";
- let h = new SimpleHttp("http://127.0.0.1:8000/index.php?action=print_post&key=test", "POST");
- h.ch.set_request_header("User-Agent", ss);
- h.ch.set_request_header("Content-type", "application/x-www-form-urlencoded");
- h.ch.set_payload("test="+encodeURIComponent(ss));
- h.prep_read(1024);
- h.run();
- assert(h.get_buffer() === ss);
- assert(h.ch.read_ret() === ss.length);
-}
+ let ring = chnet.CreateRing(NR_REQ);
+ let sqe;
+ let cqe;
+ let i;
+ let ch_arr;
-class CHNet {
- constructor(cr, url, method) {
- this.ch = chnet.create_net();
- this.ch.set_url(url);
- this.ch.set_method(method);
- this.cr = cr;
- }
+ ch_arr = [];
- prep_read(read_size) {
- let ring = this.cr.ring;
- let sqe = ring.get_sqe();
- if (!sqe)
- return false;
- sqe.prep_read(this.ch, read_size);
- sqe.set_user_data(this);
- return true;
+ for (i = 0; i < NR_REQ; i++) {
+ ch_arr[i] = chnet.CreateNet();
+ ch_arr[i].SetURL("http://127.0.0.1:8000/index.php?action=hello");
+ ch_arr[i].SetMethod("GET");
}
- set_user_data(data) {
- this.udata = data;
- }
+ if (do_sq_start) {
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe.PrepStart(ch_arr[i]);
+ sqe.SetUserData(ch_arr[i]);
+ }
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
- get_user_data() {
- return this.udata;
- }
+ ring.ForEachCQE(function (cqe) {
+ let ch = cqe.user_data;
- set_header(key, val, overwrite = true) {
- this.ch.set_request_header(key, val, overwrite);
+ assert(ch);
+ assert(cqe.res == 0);
+ });
+ ring.CQAdvance(NR_REQ);
}
-};
-class ChromiumNet {
- constructor() {
- this.ring = chnet.create_ring(4096);
+
+ /*
+ * The first read.
+ */
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe.PrepRead(ch_arr[i], 4);
+ sqe.SetUserData(ch_arr[i]);
}
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+ ring.ForEachCQE(function (cqe) {
+ let ch = cqe.user_data;
+
+ assert(ch);
+ assert(cqe.res == 4);
+ assert(ch.read_ret() == cqe.res);
+ assert("Hell" == ch.read_buf());
+ });
+ ring.CQAdvance(NR_REQ);
+
- create(url, method = "GET") {
- return new CHNet(this, url, method);
+ /*
+ * The second read.
+ */
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe.PrepRead(ch_arr[i], 4);
+ sqe.SetUserData(ch_arr[i]);
}
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
- for_each_cqe(cb = null) {
- let ret = this.ring.for_each_cqe(function (cqe) {
+ ring.ForEachCQE(function (cqe) {
+ let ch = cqe.user_data;
- if (cb)
- cb(cqe);
+ assert(ch);
+ assert(cqe.res == 4);
+ assert(ch.read_ret() == cqe.res);
+ assert("o Wo" == ch.read_buf());
+ });
+ ring.CQAdvance(NR_REQ);
- let ch = cqe.user_data;
- let ret = cqe.res;
- let udata = ch.udata;
- if (ret < 0 && ch.onerror)
- ch.onerror(ch, ret, udata);
- else
- ch.onreadcomplete(ch, ret, udata);
- });
- this.ring.cq_advance(ret);
- return ret;
+ /*
+ * The third read.
+ */
+ for (i = 0; i < NR_REQ; i++) {
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe.PrepRead(ch_arr[i], 1024);
+ sqe.SetUserData(ch_arr[i]);
}
-};
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
-function test_classes()
-{
- const UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36";
- const NR_REQ = 10;
+ ring.ForEachCQE(function (cqe) {
+ let ch = cqe.user_data;
- let cr = new ChromiumNet;
- let ch_arr = [];
- let i, j, ch;
+ assert(ch);
+ assert(cqe.res == 5);
+ assert(ch.read_ret() == cqe.res);
+ assert("rld!\n" == ch.read_buf());
+ });
+ ring.CQAdvance(NR_REQ);
+
+ /*
+ * The 4-th read.
+ */
for (i = 0; i < NR_REQ; i++) {
- ch = cr.create("http://127.0.0.1:8000/index.php?action=hello");
- ch.set_header("User-Agent", UA);
- ch.prep_read(4096);
- ch.set_user_data(i);
- ch.onerror = function (cr, ret, udata) {
- console.log(`Req: ${udata} returned error: ${cr.ch.get_error()}`);
- };
- ch.onreadcomplete = function (cr, ret, udata) {
- console.log(`Req: ${udata} returned ${ret} bytes: ${cr.ch.read_buf()}`);
- };
- ch_arr[i] = ch;
+ sqe = ring.GetSQE();
+ assert(sqe);
+ sqe.PrepRead(ch_arr[i], 1024);
+ sqe.SetUserData(ch_arr[i]);
}
+ assert(ring.Submit() == NR_REQ);
+ assert(ring.WaitCQE(NR_REQ) == NR_REQ);
+
+ ring.ForEachCQE(function (cqe) {
+ let ch = cqe.user_data;
- cr.ring.submit_sqe();
- cr.ring.wait_cqe(1);
- cr.for_each_cqe();
+ assert(ch);
+ assert(cqe.res == 0);
+ assert(ch.read_ret() == cqe.res);
+ });
+ ring.CQAdvance(NR_REQ);
+
+ for (i = 0; i < NR_REQ; i++)
+ ch_arr[i].Close();
+
+ ring.Close();
}
function main()
{
+ let i;
+
test_nop();
- test_chnet_ring_multiple();
- test_simple_http();
- test_simple_http_set_header();
- test_simple_http_post_set_header();
- test_classes();
+ for (i = 0; i < 2; i++) {
+ test_chnet_ring_read(!!i);
+ test_chnet_ring_partial_read(!!i);
+ test_chnet_ring_read_parallel(!!i);
+ test_chnet_ring_partial_read_parallel(!!i);
+ }
}
main();
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 20/22] chnet: Use busy-waiting for signal waiter
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (18 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 19/22] chnet: ring: Refactor the ring completely Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 22:29 ` Alviro Iskandar Setiawan
2022-08-21 11:24 ` [PATCH v1 21/22] chnet: ring: Bump max_entry to 2G Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 22/22] tests/cpp: Delete basic.cpp as it's no longer relevant Ammar Faizi
21 siblings, 1 reply; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
Sometimes the worker stuck on sig.Wait(), it seems the signaling
mechanism was implemeted wrongly. Just use a busy-waiting which
is easier to implement. There is nothing to worry much about
wasting CPU cycle as the waiting period must be very short, if
it's not short, then something has gone very wrong!
Also, PostTask() may fail, handle this failure.
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/chnet.cc | 48 ++++++++++++++++++++++++++++++++++++------------
chnet/common.h | 20 +++-----------------
2 files changed, 39 insertions(+), 29 deletions(-)
diff --git a/chnet/chnet.cc b/chnet/chnet.cc
index a34bb18..bafa0fd 100644
--- a/chnet/chnet.cc
+++ b/chnet/chnet.cc
@@ -315,6 +315,9 @@ void CHNetDelegate::SetRequestHeader(const std::string &key,
int CHNetDelegate::Start(void)
{
+ uint32_t try_count = 0;
+ bool p;
+
if (unlikely(!url_is_set_)) {
err_ = "URL is not set";
return -EINVAL;
@@ -322,11 +325,21 @@ int CHNetDelegate::Start(void)
Waiter sig;
auto *r = thread_.task_runner().get();
- r->PostTask(FROM_HERE, base::BindOnce(&CHNetDelegate::_Start,
- base::Unretained(this), &sig));
- sig.Wait();
- started_ = true;
- return 0;
+ do {
+ p = r->PostTask(FROM_HERE,
+ base::BindOnce(&CHNetDelegate::_Start,
+ base::Unretained(this), &sig));
+ try_count++;
+ } while (unlikely(!p && try_count < 10));
+
+ if (likely(p)) {
+ sig.Wait();
+ started_ = true;
+ return 0;
+ }
+
+ err_ = "Resource temporarily unavailable";
+ return -EAGAIN;
}
void CHNetDelegate::_Read(Waiter *sig, int size)
@@ -346,19 +359,31 @@ void CHNetDelegate::_Read(Waiter *sig, int size)
int CHNetDelegate::Read(int size)
{
+ uint32_t try_count = 0;
+ bool p;
+
if (unlikely(!url_req_)) {
err_ = "The request has not been started";
return -EINVAL;
}
read_ret_.store(-EAGAIN);
+
Waiter sig;
auto *r = thread_.task_runner().get();
- r->PostTask(FROM_HERE, base::BindOnce(&CHNetDelegate::_Read,
- base::Unretained(this), &sig,
- size));
- sig.Wait();
- return read_ret_.load();
+ do {
+ p = r->PostTask(FROM_HERE, base::BindOnce(&CHNetDelegate::_Read,
+ base::Unretained(this), &sig, size));
+ try_count++;
+ } while (unlikely(!p && try_count < 10));
+
+ if (likely(p)) {
+ sig.Wait();
+ return read_ret_.load();
+ }
+
+ err_ = "Resource temporarily unavailable";
+ return -EAGAIN;
}
void CHNetDelegate::Write(const char *buf, size_t len)
@@ -486,8 +511,7 @@ const char *CHNet::read_buf(void)
return ch_->read_buf();
}
-Waiter::Waiter(void):
- lock_(mutex_)
+Waiter::Waiter(void)
{
is_signaled_.store(false);
}
diff --git a/chnet/common.h b/chnet/common.h
index 5e225ed..5b2a3f2 100644
--- a/chnet/common.h
+++ b/chnet/common.h
@@ -51,30 +51,16 @@ public:
inline void Signal(void)
{
- is_signaled_.store(true);
- cond_.notify_one();
+ is_signaled_.store(true, std::memory_order_release);
}
inline void Wait(void)
{
- cond_.wait(lock_, [this]{ return is_signaled_.load(); });
- is_signaled_.store(false);
- }
-
- template<typename Rep, typename Period>
- inline bool WaitFor(const std::chrono::duration<Rep, Period> &rel_time)
- {
- bool ret = cond_.wait_for(lock_, rel_time, [this]{
- return is_signaled_.load();
- });
- is_signaled_.store(false);
- return ret;
+ while (!is_signaled_.load(std::memory_order_acquire))
+ __asm__ volatile ("pause" ::: "memory");
}
private:
- std::mutex mutex_;
- std::unique_lock<std::mutex> lock_;
- std::condition_variable cond_;
std::atomic<bool> is_signaled_;
};
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* Re: [PATCH v1 20/22] chnet: Use busy-waiting for signal waiter
2022-08-21 11:24 ` [PATCH v1 20/22] chnet: Use busy-waiting for signal waiter Ammar Faizi
@ 2022-08-21 22:29 ` Alviro Iskandar Setiawan
0 siblings, 0 replies; 28+ messages in thread
From: Alviro Iskandar Setiawan @ 2022-08-21 22:29 UTC (permalink / raw)
To: Ammar Faizi; +Cc: Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
On Sun, Aug 21, 2022 at 6:24 PM Ammar Faizi wrote:
>
> Sometimes the worker stuck on sig.Wait(), it seems the signaling
> mechanism was implemeted wrongly. Just use a busy-waiting which
> is easier to implement. There is nothing to worry much about
> wasting CPU cycle as the waiting period must be very short, if
> it's not short, then something has gone very wrong!
>
> Also, PostTask() may fail, handle this failure.
>
> Signed-off-by: Ammar Faizi <[email protected]>
LGTM, busy spinning on this may reveal a real issue if PostTask() is
doing something wrong.
Several typos:
s/implemeted/implemented/
s/CPU cycle/CPU cycles/
with that fixed:
Acked-by: Alviro Iskandar Setiawan <[email protected]>
-- Viro
^ permalink raw reply [flat|nested] 28+ messages in thread
* [PATCH v1 21/22] chnet: ring: Bump max_entry to 2G
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (19 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 20/22] chnet: Use busy-waiting for signal waiter Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 11:24 ` [PATCH v1 22/22] tests/cpp: Delete basic.cpp as it's no longer relevant Ammar Faizi
21 siblings, 0 replies; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
Make it able to handle bigger ring size. I have tested this with 2G
entry and it works fine. Didn't manage to test chnet though, only
nop SQEs as I don't have that much memory to run chnet with 2G ring
size.
Signed-off-by: Ammar Faizi <[email protected]>
---
chnet/chnet_ring.cc | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git a/chnet/chnet_ring.cc b/chnet/chnet_ring.cc
index b858421..031c100 100644
--- a/chnet/chnet_ring.cc
+++ b/chnet/chnet_ring.cc
@@ -11,13 +11,15 @@ using namespace std::chrono_literals;
CNRing::CNRing(uint32_t entry)
{
+ constexpr static uint32_t min_entry = 2U;
+ constexpr static uint32_t max_entry = 1024U * 1024U * 1024U * 2U;
uint32_t sq_max;
uint32_t cq_max;
- if (entry < 2)
- entry = 2;
- if (entry > 1048576)
- entry = 1048576;
+ if (entry < min_entry)
+ entry = min_entry;
+ if (entry > max_entry)
+ entry = max_entry;
sq_max = 1;
while (sq_max < entry)
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH v1 22/22] tests/cpp: Delete basic.cpp as it's no longer relevant
2022-08-21 11:24 [PATCH v1 00/22] ncns updates Ammar Faizi
` (20 preceding siblings ...)
2022-08-21 11:24 ` [PATCH v1 21/22] chnet: ring: Bump max_entry to 2G Ammar Faizi
@ 2022-08-21 11:24 ` Ammar Faizi
2022-08-21 22:21 ` Alviro Iskandar Setiawan
21 siblings, 1 reply; 28+ messages in thread
From: Ammar Faizi @ 2022-08-21 11:24 UTC (permalink / raw)
To: Alviro Iskandar Setiawan
Cc: Ammar Faizi, Muhammad Rizki, Kanna Scarlet, GNU/Weeb Mailing List
This test is no longer relevant as now we intend to always use CHNet
class with ring support.
Signed-off-by: Ammar Faizi <[email protected]>
---
tests/cpp/basic.cc | 97 ----------------------------------------------
1 file changed, 97 deletions(-)
delete mode 100644 tests/cpp/basic.cc
diff --git a/tests/cpp/basic.cc b/tests/cpp/basic.cc
deleted file mode 100644
index 8ead825..0000000
--- a/tests/cpp/basic.cc
+++ /dev/null
@@ -1,97 +0,0 @@
-
-#include <assert.h>
-#include <string.h>
-#include <chnet.h>
-#include <unistd.h>
-
-// #define NR_ASYNC_REQ 100
-// static void test_basic_async(void)
-// {
-// CHNet *ch[NR_ASYNC_REQ];
-// int i;
-
-// for (i = 0; i < NR_ASYNC_REQ; i++) {
-// ch[i] = new CHNet;
-// ch[i]->SetURL("http://127.0.0.1:8000/index.php?action=hello");
-// }
-
-// for (i = 0; i < NR_ASYNC_REQ; i++)
-// ch[i]->Start();
-
-// for (i = 0; i < NR_ASYNC_REQ; i++)
-// ch[i]->WaitForResponseStarted();
-
-// for (i = 0; i < NR_ASYNC_REQ; i++)
-// ch[i]->Read(4096);
-
-// for (i = 0; i < NR_ASYNC_REQ; i++)
-// ch[i]->WaitForReadCompleted();
-
-// for (i = 0; i < NR_ASYNC_REQ; i++) {
-// const char *buf;
-// int ret;
-
-// ret = ch[i]->read_ret();
-// buf = ch[i]->buffer();
-// assert(ret == 13);
-// assert(!strncmp(buf, "Hello World!\n", 13));
-// }
-
-// for (i = 0; i < NR_ASYNC_REQ; i++)
-// delete ch[i];
-// }
-
-// static void test_basic_sync(void)
-// {
-// bool async = false;
-// const char *buf;
-// CHNet *ch;
-// int ret;
-
-// ch = new CHNet;
-// ch->SetURL("http://127.0.0.1:8000/index.php?action=hello");
-// ch->Start(async);
-// ch->Read(4096, async);
-// ret = ch->read_ret();
-// buf = ch->buffer();
-
-// assert(ret == 13);
-// assert(!strncmp(buf, "Hello World!\n", 13));
-// delete ch;
-// }
-
-static int test_url_not_set(void)
-{
- // CHNet *ch = new CHNet;
-
- // assert(ch->Start() == -EINVAL);
- // assert(!strcmp(ch->GetErrorStr(), "URL is not set"));
- // delete ch;
- return 0;
-}
-
-static int test_basic(void)
-{
- CHNet *ch = new CHNet;
- int ret;
-
- ch->SetURL("http://127.0.0.1:8000/index.php?action=hello");
- ret = ch->Start();
- assert(!ret);
- sleep(100);
- delete ch;
- return 0;
-}
-
-int main(void)
-{
- int ret = 0;
-
- chnet_global_init();
- ret |= test_basic();
- ret |= test_url_not_set();
- // test_basic_sync();
- // test_basic_async();
- chnet_global_stop();
- return ret;
-}
--
Ammar Faizi
^ permalink raw reply related [flat|nested] 28+ messages in thread