From ac5ca42dfd1bd80a6e30da6619f5c91d06de60e9 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 9 Feb 2016 16:24:32 -0500 Subject: [PATCH 1/3] stream_base: introduce `OnClose` and `close_cb` Invoke `close_cb` on `~StreamResource`, it will be used to unconsume streams on destruction. --- src/stream_base.cc | 5 +++++ src/stream_base.h | 11 ++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/stream_base.cc b/src/stream_base.cc index 27ae0fee7b1309..5cb300d9dfc838 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -444,6 +444,11 @@ Local StreamBase::GetObject() { } +StreamResource::~StreamResource() { + OnClose(); +} + + int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) { // No TryWrite by default return 0; diff --git a/src/stream_base.h b/src/stream_base.h index fad2ddd2e086f0..788967c7ae5efa 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -135,10 +135,11 @@ class StreamResource { const uv_buf_t* buf, uv_handle_type pending, void* ctx); + typedef void (*CloseCb)(void* ctx); StreamResource() { } - virtual ~StreamResource() = default; + ~StreamResource(); virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; virtual int DoTryWrite(uv_buf_t** bufs, size_t* count); @@ -167,21 +168,29 @@ class StreamResource { read_cb_.fn(nread, buf, pending, read_cb_.ctx); } + inline void OnClose() { + if (!close_cb_.is_empty()) + close_cb_.fn(close_cb_.ctx); + } + inline void set_after_write_cb(Callback c) { after_write_cb_ = c; } inline void set_alloc_cb(Callback c) { alloc_cb_ = c; } inline void set_read_cb(Callback c) { read_cb_ = c; } + inline void set_close_cb(Callback c) { close_cb_ = c; } inline Callback after_write_cb() { return after_write_cb_; } inline Callback alloc_cb() { return alloc_cb_; } inline Callback read_cb() { return read_cb_; } + inline Callback close_cb() { return close_cb_; } private: Callback after_write_cb_; Callback alloc_cb_; Callback read_cb_; + Callback close_cb_; }; class StreamBase : public StreamResource { From 272470230b5beb2e4bccc0fa2ab3ca0ac1f42150 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 9 Feb 2016 16:25:04 -0500 Subject: [PATCH 2/3] tls_wrap: unconsume stream on destruction When parent stream is destroyed - remove references to it, and error when attempting to access its properties. --- src/tls_wrap.cc | 24 ++++++++++++++++++++++++ src/tls_wrap.h | 1 + 2 files changed, 25 insertions(+) diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index 85730b34936b55..2cc9d7230c88c2 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -63,6 +63,7 @@ TLSWrap::TLSWrap(Environment* env, stream_->set_after_write_cb({ OnAfterWriteImpl, this }); stream_->set_alloc_cb({ OnAllocImpl, this }); stream_->set_read_cb({ OnReadImpl, this }); + stream_->set_close_cb({ OnCloseImpl, this }); set_alloc_cb({ OnAllocSelf, this }); set_read_cb({ OnReadSelf, this }); @@ -191,6 +192,9 @@ void TLSWrap::Wrap(const FunctionCallbackInfo& args) { void TLSWrap::Receive(const FunctionCallbackInfo& args) { TLSWrap* wrap = Unwrap(args.Holder()); + if (wrap->stream_ == nullptr) + return; + CHECK(Buffer::HasInstance(args[0])); char* data = Buffer::Data(args[0]); size_t len = Buffer::Length(args[0]); @@ -508,31 +512,43 @@ AsyncWrap* TLSWrap::GetAsyncWrap() { bool TLSWrap::IsIPCPipe() { + if (stream_ == nullptr) + return false; return stream_->IsIPCPipe(); } int TLSWrap::GetFD() { + if (stream_ == nullptr) + return UV_EINVAL; return stream_->GetFD(); } bool TLSWrap::IsAlive() { + if (stream_ == nullptr) + return false; return ssl_ != nullptr && stream_->IsAlive(); } bool TLSWrap::IsClosing() { + if (stream_ == nullptr) + return false; return stream_->IsClosing(); } int TLSWrap::ReadStart() { + if (stream_ == nullptr) + return UV_EINVAL; return stream_->ReadStart(); } int TLSWrap::ReadStop() { + if (stream_ == nullptr) + return UV_EINVAL; return stream_->ReadStop(); } @@ -656,6 +672,12 @@ void TLSWrap::OnReadImpl(ssize_t nread, } +void TLSWrap::OnCloseImpl(void* ctx) { + TLSWrap* wrap = static_cast(ctx); + wrap->stream_ = nullptr; +} + + void TLSWrap::OnAllocSelf(size_t suggested_size, uv_buf_t* buf, void* ctx) { buf->base = static_cast(malloc(suggested_size)); CHECK_NE(buf->base, nullptr); @@ -724,6 +746,8 @@ int TLSWrap::DoShutdown(ShutdownWrap* req_wrap) { shutdown_ = true; EncOut(); + if (stream_ == nullptr) + return UV_EINVAL; return stream_->DoShutdown(req_wrap); } diff --git a/src/tls_wrap.h b/src/tls_wrap.h index 471a92056dd848..5066b9cf1c5ceb 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -113,6 +113,7 @@ class TLSWrap : public AsyncWrap, const uv_buf_t* buf, uv_handle_type pending, void* ctx); + static void OnCloseImpl(void* ctx); static void OnAfterWriteSelf(WriteWrap* w, void* ctx); static void OnAllocSelf(size_t size, uv_buf_t* buf, void* ctx); static void OnReadSelf(ssize_t nread, From 6807dc1e2167d3daa63fbef318e9954a755dc0b4 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 9 Feb 2016 16:39:31 -0500 Subject: [PATCH 3/3] stream_base: lint issue --- src/stream_base.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream_base.h b/src/stream_base.h index 788967c7ae5efa..9d84618a515aef 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -139,7 +139,7 @@ class StreamResource { StreamResource() { } - ~StreamResource(); + virtual ~StreamResource(); virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);