From 7533f59bd288e65247508940194ca97f3e93bd67 Mon Sep 17 00:00:00 2001 From: Anatoli Papirovski Date: Thu, 5 Oct 2017 16:16:20 -0400 Subject: [PATCH 1/3] tls: properly track writeQueueSize during writes Make writeQueueSize represent the actual size of the write queue within the TLS socket. Add tls test to confirm that bufferSize works as expected. PR-URL: https://github.com/nodejs/node/pull/15791 Fixes: https://github.com/nodejs/node/issues/15005 Refs: https://github.com/nodejs/node/pull/15006 Reviewed-By: Anna Henningsen Reviewed-By: Matteo Collina Reviewed-By: Refael Ackermann Reviewed-By: Fedor Indutny --- lib/_tls_wrap.js | 5 ++-- src/tls_wrap.cc | 24 ++++++++++++++-- src/tls_wrap.h | 1 + test/parallel/test-tls-buffersize.js | 43 ++++++++++++++++++++++++++++ 4 files changed, 68 insertions(+), 5 deletions(-) create mode 100644 test/parallel/test-tls-buffersize.js diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js index c435d792e0a2cc..c1ad58c0e87b4f 100644 --- a/lib/_tls_wrap.js +++ b/lib/_tls_wrap.js @@ -401,9 +401,8 @@ TLSSocket.prototype._init = function(socket, wrap) { var ssl = this._handle; // lib/net.js expect this value to be non-zero if write hasn't been flushed - // immediately - // TODO(indutny): revise this solution, it might be 1 before handshake and - // represent real writeQueueSize during regular writes. + // immediately. After the handshake is done this will represent the actual + // write queue size ssl.writeQueueSize = 1; this.server = options.server; diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index a0a2232884a805..bc0b9711438a8d 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -21,6 +21,7 @@ using v8::Exception; using v8::Function; using v8::FunctionCallbackInfo; using v8::FunctionTemplate; +using v8::Integer; using v8::Local; using v8::Object; using v8::String; @@ -276,6 +277,7 @@ void TLSWrap::EncOut() { // No data to write if (BIO_pending(enc_out_) == 0) { + UpdateWriteQueueSize(); if (clear_in_->Length() == 0) InvokeQueued(0); return; @@ -530,6 +532,18 @@ bool TLSWrap::IsClosing() { } +uint32_t TLSWrap::UpdateWriteQueueSize(uint32_t write_queue_size) { + HandleScope scope(env()->isolate()); + if (write_queue_size == 0) + write_queue_size = BIO_pending(enc_out_); + object()->Set(env()->context(), + env()->write_queue_size_string(), + Integer::NewFromUnsigned(env()->isolate(), + write_queue_size)).FromJust(); + return write_queue_size; +} + + int TLSWrap::ReadStart() { return stream_->ReadStart(); } @@ -570,8 +584,12 @@ int TLSWrap::DoWrite(WriteWrap* w, ClearOut(); // However, if there is any data that should be written to the socket, // the callback should not be invoked immediately - if (BIO_pending(enc_out_) == 0) + if (BIO_pending(enc_out_) == 0) { + // net.js expects writeQueueSize to be > 0 if the write isn't + // immediately flushed + UpdateWriteQueueSize(1); return stream_->DoWrite(w, bufs, count, send_handle); + } } // Queue callback to execute it on next tick @@ -621,13 +639,15 @@ int TLSWrap::DoWrite(WriteWrap* w, // Try writing data immediately EncOut(); + UpdateWriteQueueSize(); return 0; } void TLSWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) { - // Intentionally empty + TLSWrap* wrap = static_cast(ctx); + wrap->UpdateWriteQueueSize(); } diff --git a/src/tls_wrap.h b/src/tls_wrap.h index dbb1a0199e4e66..0a59c80688ef25 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -111,6 +111,7 @@ class TLSWrap : public AsyncWrap, AsyncWrap* GetAsyncWrap() override; bool IsIPCPipe() override; + uint32_t UpdateWriteQueueSize(uint32_t write_queue_size = 0); // Resource implementation static void OnAfterWriteImpl(WriteWrap* w, void* ctx); diff --git a/test/parallel/test-tls-buffersize.js b/test/parallel/test-tls-buffersize.js new file mode 100644 index 00000000000000..49848cd865aca5 --- /dev/null +++ b/test/parallel/test-tls-buffersize.js @@ -0,0 +1,43 @@ +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const fixtures = require('../common/fixtures'); +const tls = require('tls'); + +const iter = 10; +const overhead = 30; + +const server = tls.createServer({ + key: fixtures.readKey('agent2-key.pem'), + cert: fixtures.readKey('agent2-cert.pem') +}, common.mustCall((socket) => { + socket.on('readable', common.mustCallAtLeast(() => { + socket.read(); + }, 1)); + + socket.on('end', common.mustCall(() => { + server.close(); + })); +})); + +server.listen(0, common.mustCall(() => { + const client = tls.connect({ + port: server.address().port, + rejectUnauthorized: false + }, common.mustCall(() => { + assert.strictEqual(client.bufferSize, 0); + + for (let i = 1; i < iter; i++) { + client.write('a'); + assert.strictEqual(client.bufferSize, i + overhead); + } + + client.on('finish', common.mustCall(() => { + assert.strictEqual(client.bufferSize, 0); + })); + + client.end(); + })); +})); From 3b5ac774ba9cd480fc0e9e358782d60bcd0517d5 Mon Sep 17 00:00:00 2001 From: Anatoli Papirovski Date: Thu, 12 Oct 2017 06:57:42 -0700 Subject: [PATCH 2/3] net: fix timeouts during long writes Add updateWriteQueueSize which updates and returns queue size (net & tls). Make _onTimeout check whether an active write is ongoing and if so, call _unrefTimer rather than emitting a timeout event. Add http & https test that checks whether long-lasting (but active) writes timeout or can finish writing as expected. PR-URL: https://github.com/nodejs/node/pull/15791 Fixes: https://github.com/nodejs/node/issues/15082 Reviewed-By: Anna Henningsen Reviewed-By: Matteo Collina Reviewed-By: Refael Ackermann Reviewed-By: Fedor Indutny --- lib/net.js | 8 ++ src/stream_wrap.cc | 22 ++++- src/stream_wrap.h | 4 +- src/tls_wrap.cc | 10 +++ src/tls_wrap.h | 4 + .../test-http-keep-alive-large-write.js | 80 +++++++++++++++++ .../test-https-keep-alive-large-write.js | 87 +++++++++++++++++++ 7 files changed, 210 insertions(+), 5 deletions(-) create mode 100644 test/sequential/test-http-keep-alive-large-write.js create mode 100644 test/sequential/test-https-keep-alive-large-write.js diff --git a/lib/net.js b/lib/net.js index 50c4a5cf3fd148..21e25854a52fc9 100644 --- a/lib/net.js +++ b/lib/net.js @@ -334,6 +334,14 @@ Socket.prototype.setTimeout = function(msecs, callback) { Socket.prototype._onTimeout = function() { + // `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is + // an active write in progress, so we suppress the timeout. + const prevWriteQueueSize = this._handle.writeQueueSize; + if (prevWriteQueueSize > 0 && + prevWriteQueueSize !== this._handle.updateWriteQueueSize()) { + this._unrefTimer(); + return; + } debug('_onTimeout'); this.emit('timeout'); }; diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 09c954b9c783e6..885e4b006e157c 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -73,6 +73,7 @@ StreamWrap::StreamWrap(Environment* env, void StreamWrap::AddMethods(Environment* env, v8::Local target, int flags) { + env->SetProtoMethod(target, "updateWriteQueueSize", UpdateWriteQueueSize); env->SetProtoMethod(target, "setBlocking", SetBlocking); StreamBase::AddMethods(env, target, flags); } @@ -113,11 +114,14 @@ bool StreamWrap::IsIPCPipe() { } -void StreamWrap::UpdateWriteQueueSize() { +uint32_t StreamWrap::UpdateWriteQueueSize() { HandleScope scope(env()->isolate()); - Local write_queue_size = - Integer::NewFromUnsigned(env()->isolate(), stream()->write_queue_size); - object()->Set(env()->write_queue_size_string(), write_queue_size); + uint32_t write_queue_size = stream()->write_queue_size; + object()->Set(env()->context(), + env()->write_queue_size_string(), + Integer::NewFromUnsigned(env()->isolate(), + write_queue_size)).FromJust(); + return write_queue_size; } @@ -242,6 +246,16 @@ void StreamWrap::OnRead(uv_stream_t* handle, } +void StreamWrap::UpdateWriteQueueSize( + const FunctionCallbackInfo& args) { + StreamWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + + uint32_t write_queue_size = wrap->UpdateWriteQueueSize(); + args.GetReturnValue().Set(write_queue_size); +} + + void StreamWrap::SetBlocking(const FunctionCallbackInfo& args) { StreamWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); diff --git a/src/stream_wrap.h b/src/stream_wrap.h index 3b2ce8ee3beb00..3b076373e5abf0 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -67,13 +67,15 @@ class StreamWrap : public HandleWrap, public StreamBase { } AsyncWrap* GetAsyncWrap() override; - void UpdateWriteQueueSize(); + uint32_t UpdateWriteQueueSize(); static void AddMethods(Environment* env, v8::Local target, int flags = StreamBase::kFlagNone); private: + static void UpdateWriteQueueSize( + const v8::FunctionCallbackInfo& args); static void SetBlocking(const v8::FunctionCallbackInfo& args); // Callbacks for libuv diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index bc0b9711438a8d..6b8c5207cb76bc 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -911,6 +911,15 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) { #endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB +void TLSWrap::UpdateWriteQueueSize(const FunctionCallbackInfo& args) { + TLSWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + + uint32_t write_queue_size = wrap->UpdateWriteQueueSize(); + args.GetReturnValue().Set(write_queue_size); +} + + void TLSWrap::Initialize(Local target, Local unused, Local context) { @@ -932,6 +941,7 @@ void TLSWrap::Initialize(Local target, env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks); env->SetProtoMethod(t, "destroySSL", DestroySSL); env->SetProtoMethod(t, "enableCertCb", EnableCertCb); + env->SetProtoMethod(t, "updateWriteQueueSize", UpdateWriteQueueSize); StreamBase::AddMethods(env, t, StreamBase::kFlagHasWritev); SSLWrap::AddMethods(env, t); diff --git a/src/tls_wrap.h b/src/tls_wrap.h index 0a59c80688ef25..92c7e2cd16292c 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -167,6 +167,10 @@ class TLSWrap : public AsyncWrap, // If true - delivered EOF to the js-land, either after `close_notify`, or // after the `UV_EOF` on socket. bool eof_; + + private: + static void UpdateWriteQueueSize( + const v8::FunctionCallbackInfo& args); }; } // namespace node diff --git a/test/sequential/test-http-keep-alive-large-write.js b/test/sequential/test-http-keep-alive-large-write.js new file mode 100644 index 00000000000000..2cdf539e76b2fc --- /dev/null +++ b/test/sequential/test-http-keep-alive-large-write.js @@ -0,0 +1,80 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const http = require('http'); + +// This test assesses whether long-running writes can complete +// or timeout because the socket is not aware that the backing +// stream is still writing. +// To simulate a slow client, we write a really large chunk and +// then proceed through the following cycle: +// 1) Receive first 'data' event and record currently written size +// 2) Once we've read up to currently written size recorded above, +// we pause the stream and wait longer than the server timeout +// 3) Socket.prototype._onTimeout triggers and should confirm +// that the backing stream is still active and writing +// 4) Our timer fires, we resume the socket and start at 1) + +const minReadSize = 250000; +const serverTimeout = common.platformTimeout(500); +let offsetTimeout = common.platformTimeout(100); +let serverConnectionHandle; +let writeSize = 3000000; +let didReceiveData = false; +// this represents each cycles write size, where the cycle consists +// of `write > read > _onTimeout` +let currentWriteSize = 0; + +const server = http.createServer(common.mustCall((req, res) => { + const content = Buffer.alloc(writeSize, 0x44); + + res.writeHead(200, { + 'Content-Type': 'application/octet-stream', + 'Content-Length': content.length.toString(), + 'Vary': 'Accept-Encoding' + }); + + serverConnectionHandle = res.socket._handle; + res.write(content); + res.end(); +})); +server.setTimeout(serverTimeout); +server.on('timeout', () => { + assert.strictEqual(didReceiveData, false, 'Should not timeout'); +}); + +server.listen(0, common.mustCall(() => { + http.get({ + path: '/', + port: server.address().port + }, common.mustCall((res) => { + const resume = () => res.resume(); + let receivedBufferLength = 0; + let firstReceivedAt; + res.on('data', common.mustCallAtLeast((buf) => { + if (receivedBufferLength === 0) { + currentWriteSize = Math.max( + minReadSize, + writeSize - serverConnectionHandle.writeQueueSize + ); + didReceiveData = false; + firstReceivedAt = Date.now(); + } + receivedBufferLength += buf.length; + if (receivedBufferLength >= currentWriteSize) { + didReceiveData = true; + writeSize = serverConnectionHandle.writeQueueSize; + receivedBufferLength = 0; + res.pause(); + setTimeout( + resume, + serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt) + ); + offsetTimeout = 0; + } + }, 1)); + res.on('end', common.mustCall(() => { + server.close(); + })); + })); +})); diff --git a/test/sequential/test-https-keep-alive-large-write.js b/test/sequential/test-https-keep-alive-large-write.js new file mode 100644 index 00000000000000..88468dc03fc99b --- /dev/null +++ b/test/sequential/test-https-keep-alive-large-write.js @@ -0,0 +1,87 @@ +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const fixtures = require('../common/fixtures'); +const https = require('https'); + +// This test assesses whether long-running writes can complete +// or timeout because the socket is not aware that the backing +// stream is still writing. +// To simulate a slow client, we write a really large chunk and +// then proceed through the following cycle: +// 1) Receive first 'data' event and record currently written size +// 2) Once we've read up to currently written size recorded above, +// we pause the stream and wait longer than the server timeout +// 3) Socket.prototype._onTimeout triggers and should confirm +// that the backing stream is still active and writing +// 4) Our timer fires, we resume the socket and start at 1) + +const minReadSize = 250000; +const serverTimeout = common.platformTimeout(500); +let offsetTimeout = common.platformTimeout(100); +let serverConnectionHandle; +let writeSize = 2000000; +let didReceiveData = false; +// this represents each cycles write size, where the cycle consists +// of `write > read > _onTimeout` +let currentWriteSize = 0; + +const server = https.createServer({ + key: fixtures.readKey('agent1-key.pem'), + cert: fixtures.readKey('agent1-cert.pem') +}, common.mustCall((req, res) => { + const content = Buffer.alloc(writeSize, 0x44); + + res.writeHead(200, { + 'Content-Type': 'application/octet-stream', + 'Content-Length': content.length.toString(), + 'Vary': 'Accept-Encoding' + }); + + serverConnectionHandle = res.socket._handle; + res.write(content); + res.end(); +})); +server.setTimeout(serverTimeout); +server.on('timeout', () => { + assert.strictEqual(didReceiveData, false, 'Should not timeout'); +}); + +server.listen(0, common.mustCall(() => { + https.get({ + path: '/', + port: server.address().port, + rejectUnauthorized: false + }, common.mustCall((res) => { + const resume = () => res.resume(); + let receivedBufferLength = 0; + let firstReceivedAt; + res.on('data', common.mustCallAtLeast((buf) => { + if (receivedBufferLength === 0) { + currentWriteSize = Math.max( + minReadSize, + writeSize - serverConnectionHandle.writeQueueSize + ); + didReceiveData = false; + firstReceivedAt = Date.now(); + } + receivedBufferLength += buf.length; + if (receivedBufferLength >= currentWriteSize) { + didReceiveData = true; + writeSize = serverConnectionHandle.writeQueueSize; + receivedBufferLength = 0; + res.pause(); + setTimeout( + resume, + serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt) + ); + offsetTimeout = 0; + } + }, 1)); + res.on('end', common.mustCall(() => { + server.close(); + })); + })); +})); From 920f2ff656ef65973ddf678779d4ab56cbc53a10 Mon Sep 17 00:00:00 2001 From: Anatoli Papirovski Date: Wed, 25 Oct 2017 09:26:20 -0400 Subject: [PATCH 3/3] net: fix timeout with null handle This commit handles the case where _onTimeout is called with a null handle. Refs: https://github.com/nodejs/node/pull/15791 Fixes: https://github.com/nodejs/node/issues/16484 PR-URL: https://github.com/nodejs/node/pull/16489 Reviewed-By: Anna Henningsen Reviewed-By: James M Snell Reviewed-By: Colin Ihrig Reviewed-By: Refael Ackermann --- lib/net.js | 16 +++++++++------- test/parallel/test-net-timeout-no-handle.js | 17 +++++++++++++++++ 2 files changed, 26 insertions(+), 7 deletions(-) create mode 100644 test/parallel/test-net-timeout-no-handle.js diff --git a/lib/net.js b/lib/net.js index 21e25854a52fc9..78337cda6dd808 100644 --- a/lib/net.js +++ b/lib/net.js @@ -334,13 +334,15 @@ Socket.prototype.setTimeout = function(msecs, callback) { Socket.prototype._onTimeout = function() { - // `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is - // an active write in progress, so we suppress the timeout. - const prevWriteQueueSize = this._handle.writeQueueSize; - if (prevWriteQueueSize > 0 && - prevWriteQueueSize !== this._handle.updateWriteQueueSize()) { - this._unrefTimer(); - return; + if (this._handle) { + // `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is + // an active write in progress, so we suppress the timeout. + const prevWriteQueueSize = this._handle.writeQueueSize; + if (prevWriteQueueSize > 0 && + prevWriteQueueSize !== this._handle.updateWriteQueueSize()) { + this._unrefTimer(); + return; + } } debug('_onTimeout'); this.emit('timeout'); diff --git a/test/parallel/test-net-timeout-no-handle.js b/test/parallel/test-net-timeout-no-handle.js new file mode 100644 index 00000000000000..539f661cae8414 --- /dev/null +++ b/test/parallel/test-net-timeout-no-handle.js @@ -0,0 +1,17 @@ +'use strict'; + +const common = require('../common'); +const net = require('net'); +const assert = require('assert'); + +const socket = new net.Socket(); +socket.setTimeout(common.platformTimeout(50)); + +socket.on('timeout', common.mustCall(() => { + assert.strictEqual(socket._handle, null); +})); + +socket.on('connect', common.mustNotCall()); + +// since the timeout is unrefed, the code will exit without this +setTimeout(() => {}, common.platformTimeout(200));