From 690b3dce5d79f5812f381adec5c4646a225f8dae Mon Sep 17 00:00:00 2001 From: Lizan Zhou Date: Mon, 13 Feb 2017 22:36:06 -0800 Subject: [PATCH 1/3] Fix ProxyFlow leak --- src/grpc/proxy_flow.cc | 10 +++++----- src/nginx/t/grpc_interop_cancel.t | 5 ++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/grpc/proxy_flow.cc b/src/grpc/proxy_flow.cc index 009f37491..0dbf65e32 100644 --- a/src/grpc/proxy_flow.cc +++ b/src/grpc/proxy_flow.cc @@ -272,18 +272,18 @@ void ProxyFlow::StartUpstreamWritesDone(std::shared_ptr flow, } flow->sent_upstream_writes_done_ = true; } + if (!status.ok()) { + StartUpstreamFinish(flow, status); + return; + } flow->upstream_reader_writer_->WritesDone( - flow->async_grpc_queue_->MakeTag([flow, status](bool ok) { + flow->async_grpc_queue_->MakeTag([flow](bool ok) { if (!ok) { // Upstream is not writable, call finish to get status and // and finish the call StartUpstreamFinish(flow, Status::OK); return; } - if (!status.ok()) { - StartUpstreamFinish(flow, status); - return; - } })); } diff --git a/src/nginx/t/grpc_interop_cancel.t b/src/nginx/t/grpc_interop_cancel.t index 1e9e08adc..3f4dd5aa4 100644 --- a/src/nginx/t/grpc_interop_cancel.t +++ b/src/nginx/t/grpc_interop_cancel.t @@ -44,7 +44,7 @@ my $ServiceControlPort = ApiManager::pick_port(); my $GrpcBackendPort = ApiManager::pick_port(); my $HttpBackendPort = ApiManager::pick_port(); -my $t = Test::Nginx->new()->has(qw/http proxy/)->plan(4); +my $t = Test::Nginx->new()->has(qw/http proxy/)->plan(5); $t->write_file( 'service.pb.txt', @@ -85,8 +85,7 @@ is($t->waitforsocket("127.0.0.1:${Http2NginxPort}"), 1, 'Nginx socket ready.'); ################################################################################ my @test_cases = ( -# Temporary disabled per b/35314304 -# 'cancel_after_begin', + 'cancel_after_begin', 'cancel_after_first_response', ); From 4740b8b61d61721684306c4ae30e095e95547e18 Mon Sep 17 00:00:00 2001 From: Lizan Zhou Date: Tue, 14 Feb 2017 00:29:52 -0800 Subject: [PATCH 2/3] Fix for large_streaming --- src/grpc/proxy_flow.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/grpc/proxy_flow.cc b/src/grpc/proxy_flow.cc index 0dbf65e32..1bec512aa 100644 --- a/src/grpc/proxy_flow.cc +++ b/src/grpc/proxy_flow.cc @@ -272,18 +272,18 @@ void ProxyFlow::StartUpstreamWritesDone(std::shared_ptr flow, } flow->sent_upstream_writes_done_ = true; } - if (!status.ok()) { - StartUpstreamFinish(flow, status); - return; - } flow->upstream_reader_writer_->WritesDone( - flow->async_grpc_queue_->MakeTag([flow](bool ok) { + flow->async_grpc_queue_->MakeTag([flow, status](bool ok) { if (!ok) { // Upstream is not writable, call finish to get status and // and finish the call StartUpstreamFinish(flow, Status::OK); return; } + if (status.code() == 413) { + StartUpstreamFinish(flow, status); + return; + } })); } From d70b4d9a61336f9a2df9b802f2d12e87a5bff7d6 Mon Sep 17 00:00:00 2001 From: Lizan Zhou Date: Tue, 14 Feb 2017 10:45:41 -0800 Subject: [PATCH 3/3] Not call upstream finish during write success --- src/grpc/proxy_flow.cc | 15 +++++++-------- src/grpc/proxy_flow.h | 3 +-- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/grpc/proxy_flow.cc b/src/grpc/proxy_flow.cc index 1bec512aa..c9ac8b3b3 100644 --- a/src/grpc/proxy_flow.cc +++ b/src/grpc/proxy_flow.cc @@ -277,11 +277,11 @@ void ProxyFlow::StartUpstreamWritesDone(std::shared_ptr flow, if (!ok) { // Upstream is not writable, call finish to get status and // and finish the call - StartUpstreamFinish(flow, Status::OK); + StartUpstreamFinish(flow); return; } - if (status.code() == 413) { - StartUpstreamFinish(flow, status); + if (!status.ok()) { + StartDownstreamFinish(flow, status); return; } })); @@ -297,7 +297,7 @@ void ProxyFlow::StartUpstreamWriteMessage(std::shared_ptr flow) { if (!ok) { // Upstream is not writable, call finish to get status and // and finish the call - StartUpstreamFinish(flow, Status::OK); + StartUpstreamFinish(flow); return; } // Now that the write has completed, it's safe to start the @@ -359,7 +359,7 @@ void ProxyFlow::StartUpstreamReadMessage(std::shared_ptr flow) { &flow->upstream_to_downstream_buffer_, flow->async_grpc_queue_->MakeTag([flow](bool ok) { if (!ok) { - StartUpstreamFinish(flow, Status::OK); + StartUpstreamFinish(flow); return; } StartDownstreamWriteMessage(flow); @@ -389,8 +389,7 @@ void ProxyFlow::StartDownstreamWriteMessage(std::shared_ptr flow) { }); } -void ProxyFlow::StartUpstreamFinish(std::shared_ptr flow, - Status status) { +void ProxyFlow::StartUpstreamFinish(std::shared_ptr flow) { { std::lock_guard lock(flow->mu_); if (flow->started_upstream_finish_) { @@ -401,7 +400,7 @@ void ProxyFlow::StartUpstreamFinish(std::shared_ptr flow, flow->upstream_reader_writer_->Finish( &flow->status_from_upstream_, flow->async_grpc_queue_->MakeTag( - [flow, status](bool ok) { StartDownstreamFinish(flow, status); })); + [flow](bool ok) { StartDownstreamFinish(flow, Status::OK); })); } void ProxyFlow::StartDownstreamFinish(std::shared_ptr flow, diff --git a/src/grpc/proxy_flow.h b/src/grpc/proxy_flow.h index 3d8671cbe..33ed3b622 100644 --- a/src/grpc/proxy_flow.h +++ b/src/grpc/proxy_flow.h @@ -77,8 +77,7 @@ class ProxyFlow { std::shared_ptr flow); static void StartUpstreamReadMessage(std::shared_ptr flow); static void StartDownstreamWriteMessage(std::shared_ptr flow); - static void StartUpstreamFinish(std::shared_ptr flow, - utils::Status status); + static void StartUpstreamFinish(std::shared_ptr flow); static void StartDownstreamFinish(std::shared_ptr flow, utils::Status status);