Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1649,6 +1649,8 @@ Http2Stream::Http2Stream(
MakeWeak<Http2Stream>(this);
statistics_.start_time = uv_hrtime();

stream_resource_flags_ |= StreamResource::kFlagWantsWrite;

// Limit the number of header pairs
max_header_pairs_ = session->GetMaxHeaderPairs();
if (max_header_pairs_ == 0)
Expand Down
2 changes: 0 additions & 2 deletions src/node_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -573,8 +573,6 @@ class Http2Stream : public AsyncWrap,
// Required for StreamBase
int DoShutdown(ShutdownWrap* req_wrap) override;

bool HasWantsWrite() const override { return true; }

// Initiate a response on this stream.
int SubmitResponse(nghttp2_nv* nva, size_t len, int options);

Expand Down
6 changes: 5 additions & 1 deletion src/stream_base-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ inline StreamResource::~StreamResource() {
}
}

inline void StreamResource::DisableDoTryWrite() {
stream_resource_flags_ &= ~kFlagDoTryWrite;
}

inline void StreamResource::PushStreamListener(StreamListener* listener) {
CHECK_NE(listener, nullptr);
CHECK_EQ(listener->stream_, nullptr);
Expand Down Expand Up @@ -199,7 +203,7 @@ inline StreamWriteResult StreamBase::Write(
total_bytes += bufs[i].len;
bytes_written_ += total_bytes;

if (send_handle == nullptr) {
if (HasDoTryWrite() && send_handle == nullptr) {
err = DoTryWrite(&bufs, &count);
if (err != 0 || count == 0) {
return StreamWriteResult { false, err, nullptr, total_bytes };
Expand Down
3 changes: 2 additions & 1 deletion src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
size_t synchronously_written = 0;
uv_buf_t buf;

bool try_write = storage_size <= sizeof(stack_storage) &&
bool try_write = HasDoTryWrite() &&
storage_size <= sizeof(stack_storage) &&
(!IsIPCPipe() || send_handle_obj.IsEmpty());
if (try_write) {
data_size = StringBytes::Write(env->isolate(),
Expand Down
20 changes: 19 additions & 1 deletion src/stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
// A stream is always controlled through one `StreamListener` instance.
class StreamResource {
public:
enum Flags {
kFlagNone = 0x0,
kFlagDoTryWrite = 0x1,
kFlagWantsWrite = 0x2
};

virtual ~StreamResource();

// These need to be implemented on the readable side of this stream:
Expand Down Expand Up @@ -216,8 +222,18 @@ class StreamResource {
size_t count,
uv_stream_t* send_handle) = 0;

// Return true if the stream supports `DoTryWrite`.
inline bool HasDoTryWrite() const {
return stream_resource_flags_ & kFlagDoTryWrite;
}
// Returns true if the stream supports the `OnStreamWantsWrite()` interface.
virtual bool HasWantsWrite() const { return false; }
inline bool HasWantsWrite() const {
return stream_resource_flags_ & kFlagWantsWrite;
}

// Disable DoTryWrite for this StreamResource, useful for consumers such
// as TLSWrap that don't support sync writes.
inline void DisableDoTryWrite();

// Optionally, this may provide an error message to be used for
// failing writes.
Expand Down Expand Up @@ -249,6 +265,8 @@ class StreamResource {
uint64_t bytes_read_ = 0;
uint64_t bytes_written_ = 0;

unsigned int stream_resource_flags_ = kFlagNone;

friend class StreamListener;
};

Expand Down
1 change: 1 addition & 0 deletions src/stream_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
provider),
StreamBase(env),
stream_(stream) {
stream_resource_flags_ |= StreamResource::kFlagDoTryWrite;
}


Expand Down
59 changes: 16 additions & 43 deletions src/tls_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ TLSWrap::TLSWrap(Environment* env,
SSL_CTX_sess_set_get_cb(sc_->ctx_, SSLWrap<TLSWrap>::GetSessionCallback);
SSL_CTX_sess_set_new_cb(sc_->ctx_, SSLWrap<TLSWrap>::NewSessionCallback);

stream->DisableDoTryWrite();
stream->PushStreamListener(this);

InitSSL();
Expand All @@ -91,17 +92,12 @@ TLSWrap::~TLSWrap() {
}


bool TLSWrap::InvokeQueued(int status, const char* error_str) {
if (!write_callback_scheduled_)
return false;

void TLSWrap::InvokeQueued(int status, const char* error_str) {
if (current_write_ != nullptr) {
WriteWrap* w = current_write_;
current_write_ = nullptr;
w->Done(status, error_str);
}

return true;
}


Expand Down Expand Up @@ -256,16 +252,12 @@ void TLSWrap::EncOut() {
if (is_waiting_new_session())
return;

// Split-off queue
if (established_ && current_write_ != nullptr)
write_callback_scheduled_ = true;

if (ssl_ == nullptr)
return;

// No data to write
if (BIO_pending(enc_out_) == 0) {
if (pending_cleartext_input_.empty())
if (established_ && pending_cleartext_input_.empty())
InvokeQueued(0);
return;
}
Expand All @@ -291,22 +283,13 @@ void TLSWrap::EncOut() {

NODE_COUNT_NET_BYTES_SENT(write_size_);

if (!res.async) {
HandleScope handle_scope(env()->isolate());

// Simulate asynchronous finishing, TLS cannot handle this at the moment.
env()->SetImmediate([](Environment* env, void* data) {
static_cast<TLSWrap*>(data)->OnStreamAfterWrite(nullptr, 0);
}, this, object());
}
CHECK(res.async);
}


void TLSWrap::OnStreamAfterWrite(WriteWrap* req_wrap, int status) {
if (current_empty_write_ != nullptr) {
WriteWrap* finishing = current_empty_write_;
current_empty_write_ = nullptr;
finishing->Done(status);
if (write_size_ == 0) {
InvokeQueued(status);
return;
}

Expand Down Expand Up @@ -460,6 +443,9 @@ bool TLSWrap::ClearIn() {
if (ssl_ == nullptr)
return false;

if (pending_cleartext_input_.empty())
return true;

std::vector<uv_buf_t> buffers;
buffers.swap(pending_cleartext_input_);

Expand Down Expand Up @@ -490,7 +476,6 @@ bool TLSWrap::ClearIn() {
std::string error_str;
Local<Value> arg = GetSSLError(written, &err, &error_str);
if (!arg.IsEmpty()) {
write_callback_scheduled_ = true;
InvokeQueued(UV_EPROTO, error_str.c_str());
} else {
// Push back the not-yet-written pending buffers into their queue.
Expand Down Expand Up @@ -569,6 +554,10 @@ int TLSWrap::DoWrite(WriteWrap* w,
return UV_EPROTO;
}

// Store the current write wrap
CHECK_EQ(current_write_, nullptr);
current_write_ = w;

bool empty = true;

// Empty writes should not go through encryption process
Expand All @@ -583,26 +572,13 @@ int TLSWrap::DoWrite(WriteWrap* w,
// However, if there is any data that should be written to the socket,
// the callback should not be invoked immediately
if (BIO_pending(enc_out_) == 0) {
CHECK_EQ(current_empty_write_, nullptr);
current_empty_write_ = w;
StreamWriteResult res =
underlying_stream()->Write(bufs, count, send_handle);
if (!res.async) {
env()->SetImmediate([](Environment* env, void* data) {
TLSWrap* self = static_cast<TLSWrap*>(data);
self->OnStreamAfterWrite(self->current_empty_write_, 0);
}, this, object());
}
CHECK(res.async);
return 0;
}
}

// Store the current write wrap
CHECK_EQ(current_write_, nullptr);
current_write_ = w;

// Write queued data
if (empty) {
// Write queued data
EncOut();
return 0;
}
Expand Down Expand Up @@ -748,10 +724,7 @@ void TLSWrap::DestroySSL(const FunctionCallbackInfo<Value>& args) {
TLSWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

// If there is a write happening, mark it as finished.
wrap->write_callback_scheduled_ = true;

// And destroy
// Finish current write, if one exists.
wrap->InvokeQueued(UV_ECANCELED, "Canceled because of SSL destruction");

// Destroy the SSL structure and friends
Expand Down
4 changes: 1 addition & 3 deletions src/tls_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class TLSWrap : public AsyncWrap,
void EncOut();
bool ClearIn();
void ClearOut();
bool InvokeQueued(int status, const char* error_str = nullptr);
void InvokeQueued(int status, const char* error_str = nullptr);

inline void Cycle() {
// Prevent recursion
Expand Down Expand Up @@ -151,8 +151,6 @@ class TLSWrap : public AsyncWrap,
std::vector<uv_buf_t> pending_cleartext_input_;
size_t write_size_;
WriteWrap* current_write_ = nullptr;
WriteWrap* current_empty_write_ = nullptr;
bool write_callback_scheduled_ = false;
bool started_;
bool established_;
bool shutdown_;
Expand Down
2 changes: 1 addition & 1 deletion test/async-hooks/test-graph.tls-write.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ function onexit() {
id: 'getaddrinforeq:1', triggerAsyncId: 'tls:1' },
{ type: 'TCPCONNECTWRAP',
id: 'tcpconnect:1', triggerAsyncId: 'tcp:1' },
{ type: 'WRITEWRAP', id: 'write:1', triggerAsyncId: 'tcpconnect:1' },
{ type: 'WRITEWRAP', id: 'write:1', triggerAsyncId: 'tcp:1' },
{ type: 'TCPWRAP', id: 'tcp:2', triggerAsyncId: 'tcpserver:1' },
{ type: 'TLSWRAP', id: 'tls:2', triggerAsyncId: 'tcpserver:1' },
{ type: 'TIMERWRAP', id: 'timer:1', triggerAsyncId: 'tcpserver:1' },
Expand Down
Loading