diff --git a/chnet/chnet.cc b/chnet/chnet.cc index 093660c..d064d52 100644 --- a/chnet/chnet.cc +++ b/chnet/chnet.cc @@ -110,23 +110,24 @@ int CHNetChunkedPayload::FlushBufferQueue(void) int ret = 0; while (!buf_queue_.empty()) { - struct chunk_vec c; + struct chunk_vec &c = buf_queue_.front(); size_t copy_size; - - c = buf_queue_.front(); - buf_queue_.pop(); + bool did_pop; if (unlikely(c.len_ == (size_t)~0)) { + buf_queue_.pop(); SetIsFinalChunk(); dst_ptr_ = nullptr; return ret; } if (c.len_ <= dst_len_) { + buf_queue_.pop(); copy_size = c.len_; + did_pop = true; } else { copy_size = dst_len_; - EnqueueBuffer(&c.buf_[copy_size], c.len_ - copy_size); + did_pop = false; } memcpy(dst_ptr_, c.buf_, copy_size); @@ -134,23 +135,43 @@ int CHNetChunkedPayload::FlushBufferQueue(void) dst_len_ -= copy_size; copied_size_ += copy_size; ret += copy_size; - free(c.buf_); + + if (did_pop) { + free(c.buf_); + } else { + c.len_ -= copy_size; + memmove(c.buf_, &c.buf_[copy_size], c.len_); + } if (!dst_len_) { + + if (did_pop) { + struct chunk_vec &c = buf_queue_.front(); + + if (unlikely(c.len_ == (size_t)~0)) { + buf_queue_.pop(); + printf("test\n"); + SetIsFinalChunk(); + } + } + dst_ptr_ = nullptr; break; } } - if (!ret) + if (likely(!IsEOF())) return net::ERR_IO_PENDING; + printf("read ok = %d\n", ret); return ret; } int CHNetChunkedPayload::WriteBuffer(const char *buf, size_t len) { size_t copy_size; + static std::atomic p; + uint32_t x = p++; if (unlikely(!len)) return 0; @@ -164,7 +185,7 @@ int CHNetChunkedPayload::WriteBuffer(const char *buf, size_t len) * consume the queued buffers in FIFO order. */ EnqueueBuffer(buf, len); - printf("1 waaa = %zu\n", len); + printf("%u 1 waaa = %zu\n", x, len); goto out; } @@ -178,17 +199,17 @@ int CHNetChunkedPayload::WriteBuffer(const char *buf, size_t len) if (!dst_ptr_) { InvokeOnReadCompleted(copied_size_); EnqueueBuffer(buf, len); - printf("2 waaa = %zu\n", len); + printf("%u 2 waaa = %zu\n", x, len); goto out; } if (len < dst_len_) { copy_size = len; - printf("3 waaa = %zu\n", len); + printf("%u 3 waaa = %zu\n", x, len); } else { copy_size = dst_len_; EnqueueBuffer(&buf[copy_size], len - copy_size); - printf("4 waaa = %zu\n", len); + printf("%u 4 waaa = %zu\n", x, len); } memcpy(dst_ptr_, buf, copy_size); @@ -196,8 +217,10 @@ int CHNetChunkedPayload::WriteBuffer(const char *buf, size_t len) dst_len_ -= copy_size; copied_size_ += copy_size; + printf("dst_len = %zu\n", dst_len_); if (!dst_len_) { dst_ptr_ = nullptr; + printf("x\n"); InvokeOnReadCompleted(copied_size_); } out: @@ -225,6 +248,9 @@ int CHNetChunkedPayload::ReadInternal(net::IOBuffer *buf, int buf_len) { int ret; + if (buf_len > 1024) + buf_len = 1024; + buf_lock_.lock(); dst_ptr_ = buf->data(); dst_len_ = buf_len; diff --git a/chromium/src b/chromium/src --- a/chromium/src +++ b/chromium/src @@ -1 +1 @@ -Subproject commit f7ad9cd81801ffb398777ca64485d5d1af429558 +Subproject commit f7ad9cd81801ffb398777ca64485d5d1af429558-dirty diff --git a/tests/cpp/ring.cc b/tests/cpp/ring.cc index bf135c9..0232ae0 100644 --- a/tests/cpp/ring.cc +++ b/tests/cpp/ring.cc @@ -503,7 +503,7 @@ static void test_chnet_ring_simple_post_parallel(bool do_sq_start = false) static void test_chnet_ring_chunked_post(bool do_sq_start = false) { - constexpr static const uint32_t nr_body_A = 1000 * 15; + constexpr static const uint32_t nr_body_A = 1000 * 30; const char *buf; CNRing ring(1); CNRingSQE *sqe; @@ -513,6 +513,8 @@ static void test_chnet_ring_chunked_post(bool do_sq_start = false) uint32_t i; CHNet *ch; + setvbuf(stdout, NULL, _IOLBF, 4096); + ch = new CHNet; ch->SetURL("http://127.0.0.1:8000/index.php?action=print_post&key=data"); ch->SetMethod("POST"); @@ -542,11 +544,13 @@ static void test_chnet_ring_chunked_post(bool do_sq_start = false) assert(ring.Submit() == 1); ch->WriteBody("67890", 5); - char q[nr_body_A]; - memset(q, 'A', sizeof(q)); - ch->WriteBody(q, sizeof(q)); - ch->WriteBody(q, sizeof(q)); - ch->WriteBody(q, sizeof(q)); + // char q[nr_body_A]; + // memset(q, 'A', sizeof(q)); + // ch->WriteBody(q, sizeof(q)); + // ch->WriteBody(q, sizeof(q)); + // ch->WriteBody(q, sizeof(q)); + for (i = 0; i < nr_body_A*3; i++) + ch->WriteBody("AAAAAAAAAAAAAAA", 15); ch->StopChunkedBody(); assert(ring.WaitCQE(1) == 1); @@ -558,6 +562,7 @@ static void test_chnet_ring_chunked_post(bool do_sq_start = false) assert(!strncmp("1234567890", buf, cqe->res_)); ring.CQAdvance(1); + printf("xxxxxxxxx\n"); total = 0; while (total < nr_body_A*3) { sqe = ring.GetSQE(); @@ -576,7 +581,7 @@ static void test_chnet_ring_chunked_post(bool do_sq_start = false) assert(buf[i] == 'A'); ring.CQAdvance(1); total += (uint32_t)cqe->res_; - printf("total = %u\n", total); + // printf("total = %u\n", total); } delete ch;