diff --git a/google/cloud/spanner/internal/connection_impl.cc b/google/cloud/spanner/internal/connection_impl.cc index 01a0b1c5243d9..b9be0021f85e5 100644 --- a/google/cloud/spanner/internal/connection_impl.cc +++ b/google/cloud/spanner/internal/connection_impl.cc @@ -137,15 +137,16 @@ class DefaultPartialResultSetReader : public PartialResultSetReader { void TryCancel() override { context_->TryCancel(); } - absl::optional Read( - absl::optional const&) override { - google::spanner::v1::PartialResultSet result; - auto status = reader_->Read(&result); - if (status.has_value()) { - final_status_ = *std::move(status); - return absl::nullopt; + bool Read(absl::optional const&, + UnownedPartialResultSet& response) override { + auto opt_status = reader_->Read(&response.result); + response.resumption = false; + + if (opt_status.has_value()) { + final_status_ = *std::move(opt_status); + return false; } - return PartialResultSet{std::move(result), false}; + return true; } Status Finish() override { return final_status_; } diff --git a/google/cloud/spanner/internal/logging_result_set_reader.cc b/google/cloud/spanner/internal/logging_result_set_reader.cc index c9434a9a7db1c..60d6d2592d5fd 100644 --- a/google/cloud/spanner/internal/logging_result_set_reader.cc +++ b/google/cloud/spanner/internal/logging_result_set_reader.cc @@ -25,22 +25,23 @@ using ::google::cloud::internal::DebugString; void LoggingResultSetReader::TryCancel() { impl_->TryCancel(); } -absl::optional LoggingResultSetReader::Read( - absl::optional const& resume_token) { +bool LoggingResultSetReader::Read( + absl::optional const& resume_token, + UnownedPartialResultSet& result) { if (resume_token) { GCP_LOG(DEBUG) << __func__ << "() << resume_token=\"" << DebugString(*resume_token, tracing_options_) << "\""; } else { GCP_LOG(DEBUG) << __func__ << "() << (unresumable)"; } - auto result = impl_->Read(resume_token); - if (!result) { - GCP_LOG(DEBUG) << __func__ << "() >> (optional-with-no-value)"; + bool success = impl_->Read(resume_token, result); + if (!success) { + GCP_LOG(DEBUG) << __func__ << "() >> (failed)"; } else { GCP_LOG(DEBUG) << __func__ << "() >> resumption=" - << (result->resumption ? "true" : "false"); + << (result.resumption ? "true" : "false"); } - return result; + return success; } Status LoggingResultSetReader::Finish() { return impl_->Finish(); } diff --git a/google/cloud/spanner/internal/logging_result_set_reader.h b/google/cloud/spanner/internal/logging_result_set_reader.h index 5735cf4ea1131..795c801a3bcdf 100644 --- a/google/cloud/spanner/internal/logging_result_set_reader.h +++ b/google/cloud/spanner/internal/logging_result_set_reader.h @@ -39,8 +39,8 @@ class LoggingResultSetReader : public PartialResultSetReader { ~LoggingResultSetReader() override = default; void TryCancel() override; - absl::optional Read( - absl::optional const& resume_token) override; + bool Read(absl::optional const& resume_token, + UnownedPartialResultSet& result) override; Status Finish() override; private: diff --git a/google/cloud/spanner/internal/logging_result_set_reader_test.cc b/google/cloud/spanner/internal/logging_result_set_reader_test.cc index 715fc3337cdd2..8ca414c28b7e2 100644 --- a/google/cloud/spanner/internal/logging_result_set_reader_test.cc +++ b/google/cloud/spanner/internal/logging_result_set_reader_test.cc @@ -49,30 +49,32 @@ TEST_F(LoggingResultSetReaderTest, TryCancel) { TEST_F(LoggingResultSetReaderTest, Read) { auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read(_)) - .WillOnce([] { - google::spanner::v1::PartialResultSet result; - result.set_resume_token("test-token"); - return PartialResultSet{std::move(result), false}; + EXPECT_CALL(*mock, Read(_, _)) + .WillOnce([](absl::optional const&, + UnownedPartialResultSet& result) { + result.resumption = false; + result.result.set_resume_token("test-token"); + return true; }) - .WillOnce([] { return absl::optional{}; }); + .WillOnce([] { return false; }); LoggingResultSetReader reader(std::move(mock), TracingOptions{}); - auto result = reader.Read(""); - ASSERT_TRUE(result.has_value()); - EXPECT_EQ("test-token", result->result.resume_token()); + google::spanner::v1::PartialResultSet partial_result_set; + auto result = + UnownedPartialResultSet::FromPartialResultSet(partial_result_set); + ASSERT_TRUE(reader.Read("", result)); + EXPECT_EQ("test-token", result.result.resume_token()); auto log_lines = log_.ExtractLines(); EXPECT_THAT(log_lines, AllOf(Contains(StartsWith("Read()")))); EXPECT_THAT(log_lines, Contains(HasSubstr("resume_token=\"\""))); EXPECT_THAT(log_lines, Contains(HasSubstr("resumption=false"))); - result = reader.Read("test-token"); - ASSERT_FALSE(result.has_value()); + ASSERT_FALSE(reader.Read("test-token", result)); log_lines = log_.ExtractLines(); EXPECT_THAT(log_lines, AllOf(Contains(StartsWith("Read()")))); EXPECT_THAT(log_lines, Contains(HasSubstr("resume_token=\"test-token\""))); - EXPECT_THAT(log_lines, Contains(HasSubstr("(optional-with-no-value)"))); + EXPECT_THAT(log_lines, Contains(HasSubstr("(failed)"))); } TEST_F(LoggingResultSetReaderTest, Finish) { diff --git a/google/cloud/spanner/internal/partial_result_set_reader.h b/google/cloud/spanner/internal/partial_result_set_reader.h index 4ce16cad130fc..8d71f8588aff7 100644 --- a/google/cloud/spanner/internal/partial_result_set_reader.h +++ b/google/cloud/spanner/internal/partial_result_set_reader.h @@ -36,8 +36,13 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN * caller should discard any pending state not covered by the token, as that * data will be replayed. */ -struct PartialResultSet { - google::spanner::v1::PartialResultSet result; +struct UnownedPartialResultSet { + static UnownedPartialResultSet FromPartialResultSet( + google::spanner::v1::PartialResultSet& result) { + return UnownedPartialResultSet{result, false}; + } + + google::spanner::v1::PartialResultSet& result; bool resumption; }; @@ -54,8 +59,8 @@ class PartialResultSetReader { public: virtual ~PartialResultSetReader() = default; virtual void TryCancel() = 0; - virtual absl::optional Read( - absl::optional const& resume_token) = 0; + virtual bool Read(absl::optional const& resume_token, + UnownedPartialResultSet& result) = 0; virtual Status Finish() = 0; }; diff --git a/google/cloud/spanner/internal/partial_result_set_resume.cc b/google/cloud/spanner/internal/partial_result_set_resume.cc index 39fc2deeb0206..f798927c07366 100644 --- a/google/cloud/spanner/internal/partial_result_set_resume.cc +++ b/google/cloud/spanner/internal/partial_result_set_resume.cc @@ -22,36 +22,36 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN void PartialResultSetResume::TryCancel() { child_->TryCancel(); } -absl::optional PartialResultSetResume::Read( - absl::optional const& resume_token) { +bool PartialResultSetResume::Read( + absl::optional const& resume_token, + UnownedPartialResultSet& result) { bool resumption = false; do { - absl::optional result = child_->Read(resume_token); - if (result) { + if (child_->Read(resume_token, result)) { // Let the caller know if we recreated the PartialResultSetReader using // the resume_token so that they might discard any previous results that // will be contained in the new stream. - if (resumption) result->resumption = true; - return result; + if (resumption) result.resumption = true; + return true; } auto status = Finish(); - if (status.ok()) return {}; + if (status.ok()) return false; if (!resume_token) { // Our caller has requested that we not try to resume the stream, // probably because they have already delivered previous results that // would otherwise be replayed. - return {}; + return false; } if (idempotency_ == google::cloud::Idempotency::kNonIdempotent || !retry_policy_prototype_->OnFailure(status)) { - return {}; + return false; } std::this_thread::sleep_for(backoff_policy_prototype_->OnCompletion()); resumption = true; last_status_.reset(); child_ = factory_(*resume_token); } while (!retry_policy_prototype_->IsExhausted()); - return {}; + return false; } Status PartialResultSetResume::Finish() { diff --git a/google/cloud/spanner/internal/partial_result_set_resume.h b/google/cloud/spanner/internal/partial_result_set_resume.h index c5bc9c6c4b1f3..240983751fbc2 100644 --- a/google/cloud/spanner/internal/partial_result_set_resume.h +++ b/google/cloud/spanner/internal/partial_result_set_resume.h @@ -52,8 +52,8 @@ class PartialResultSetResume : public PartialResultSetReader { ~PartialResultSetResume() override = default; void TryCancel() override; - absl::optional Read( - absl::optional const& resume_token) override; + bool Read(absl::optional const& resume_token, + UnownedPartialResultSet& result) override; Status Finish() override; private: diff --git a/google/cloud/spanner/internal/partial_result_set_resume_test.cc b/google/cloud/spanner/internal/partial_result_set_resume_test.cc index 8c71c29600936..844e9ceb14442 100644 --- a/google/cloud/spanner/internal/partial_result_set_resume_test.cc +++ b/google/cloud/spanner/internal/partial_result_set_resume_test.cc @@ -43,14 +43,6 @@ using ::testing::AtLeast; using ::testing::HasSubstr; using ::testing::Return; -absl::optional ReadReturn( - google::spanner::v1::PartialResultSet response) { - bool const resumption = false; // only a PartialResultSetResume returns true - return PartialResultSet{std::move(response), resumption}; -} - -absl::optional ReadReturn() { return {}; } - struct MockFactory { MOCK_METHOD(std::unique_ptr, MakeReader, (std::string const& token)); @@ -81,6 +73,18 @@ MATCHER_P(IsValidAndEquals, expected, return arg && *arg == expected; } +// Helper function for MockPartialResultSetReader::Read to return true and +// populate result +auto ReadAction(google::spanner::v1::PartialResultSet& response_proto, + bool resumption_val) { + return [&response_proto, resumption_val](absl::optional const&, + UnownedPartialResultSet& result) { + result.result = response_proto; + result.resumption = resumption_val; + return true; + }; +}; + TEST(PartialResultSetResume, Success) { google::spanner::v1::PartialResultSet response; auto constexpr kText = R"pb( @@ -103,9 +107,9 @@ TEST(PartialResultSetResume, Success) { .WillOnce([&response](std::string const& token) { EXPECT_TRUE(token.empty()); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read(_)) - .WillOnce([&response] { return ReadReturn(response); }) - .WillOnce(Return(ReadReturn())); + EXPECT_CALL(*mock, Read(_, _)) + .WillOnce(ReadAction(response, false)) + .WillOnce(Return(false)); EXPECT_CALL(*mock, Finish()).WillOnce(Return(Status())); return mock; }); @@ -114,11 +118,11 @@ TEST(PartialResultSetResume, Success) { return mock_factory.MakeReader(token); }; auto reader = MakeTestResume(factory, Idempotency::kIdempotent); - auto v = reader->Read(""); - ASSERT_TRUE(v.has_value()); - EXPECT_THAT(v->result, IsProtoEqual(response)); - v = reader->Read("resume-after-2"); - ASSERT_FALSE(v.has_value()); + google::spanner::v1::PartialResultSet raw_result; + auto result = UnownedPartialResultSet::FromPartialResultSet(raw_result); + ASSERT_TRUE(reader->Read("", result)); + EXPECT_THAT(result.result, IsProtoEqual(response)); + ASSERT_FALSE(reader->Read("resume-after-2", result)); auto status = reader->Finish(); EXPECT_STATUS_OK(status); } @@ -153,9 +157,9 @@ TEST(PartialResultSetResume, SuccessWithRestart) { .WillOnce([&r12](std::string const& token) { EXPECT_TRUE(token.empty()); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read(_)) - .WillOnce([&r12] { return ReadReturn(r12); }) - .WillOnce(Return(ReadReturn())); + EXPECT_CALL(*mock, Read(_, _)) + .WillOnce(ReadAction(r12, false)) + .WillOnce(Return(false)); EXPECT_CALL(*mock, Finish()) .WillOnce(Return(Status(StatusCode::kUnavailable, "Try again 1"))); return mock; @@ -163,9 +167,9 @@ TEST(PartialResultSetResume, SuccessWithRestart) { .WillOnce([&r34](std::string const& token) { EXPECT_EQ("resume-after-2", token); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read(_)) - .WillOnce([&r34] { return ReadReturn(r34); }) - .WillOnce(Return(ReadReturn())); + EXPECT_CALL(*mock, Read(_, _)) + .WillOnce(ReadAction(r34, false)) + .WillOnce(Return(false)); EXPECT_CALL(*mock, Finish()) .WillOnce(Return(Status(StatusCode::kUnavailable, "Try again 2"))); return mock; @@ -173,7 +177,7 @@ TEST(PartialResultSetResume, SuccessWithRestart) { .WillOnce([](std::string const& token) { EXPECT_EQ("resume-after-4", token); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read(_)).WillOnce(Return(ReadReturn())); + EXPECT_CALL(*mock, Read(_, _)).WillOnce(Return(false)); EXPECT_CALL(*mock, Finish()).WillOnce(Return(Status())); return mock; }); @@ -182,14 +186,13 @@ TEST(PartialResultSetResume, SuccessWithRestart) { return mock_factory.MakeReader(token); }; auto reader = MakeTestResume(factory, Idempotency::kIdempotent); - auto v = reader->Read(""); - ASSERT_TRUE(v.has_value()); - EXPECT_THAT(v->result, IsProtoEqual(r12)); - v = reader->Read("resume-after-2"); - ASSERT_TRUE(v.has_value()); - EXPECT_THAT(v->result, IsProtoEqual(r34)); - v = reader->Read("resume-after-4"); - ASSERT_FALSE(v.has_value()); + google::spanner::v1::PartialResultSet raw_result; + auto result = UnownedPartialResultSet::FromPartialResultSet(raw_result); + ASSERT_TRUE(reader->Read("", result)); + EXPECT_THAT(raw_result, IsProtoEqual(r12)); + ASSERT_TRUE(reader->Read("resume-after-2", result)); + EXPECT_THAT(raw_result, IsProtoEqual(r34)); + ASSERT_FALSE(reader->Read("resume-after-4", result)); auto status = reader->Finish(); EXPECT_STATUS_OK(status); } @@ -216,9 +219,9 @@ TEST(PartialResultSetResume, PermanentError) { .WillOnce([&r12](std::string const& token) { EXPECT_TRUE(token.empty()); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read(_)) - .WillOnce([&r12] { return ReadReturn(r12); }) - .WillOnce(Return(ReadReturn())); + EXPECT_CALL(*mock, Read(_, _)) + .WillOnce(ReadAction(r12, false)) + .WillOnce(Return(false)); EXPECT_CALL(*mock, Finish()) .WillOnce(Return(Status(StatusCode::kUnavailable, "Try again"))); return mock; @@ -226,7 +229,7 @@ TEST(PartialResultSetResume, PermanentError) { .WillOnce([](std::string const& token) { EXPECT_EQ("resume-after-2", token); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read(_)).WillOnce(Return(ReadReturn())); + EXPECT_CALL(*mock, Read(_, _)).WillOnce(Return(false)); EXPECT_CALL(*mock, Finish()) .WillOnce(Return(Status(StatusCode::kPermissionDenied, "uh-oh"))); return mock; @@ -236,11 +239,11 @@ TEST(PartialResultSetResume, PermanentError) { return mock_factory.MakeReader(token); }; auto reader = MakeTestResume(factory, Idempotency::kIdempotent); - auto v = reader->Read(""); - ASSERT_TRUE(v.has_value()); - EXPECT_THAT(v->result, IsProtoEqual(r12)); - v = reader->Read("resume-after-2"); - ASSERT_FALSE(v.has_value()); + google::spanner::v1::PartialResultSet raw_result; + auto result = UnownedPartialResultSet::FromPartialResultSet(raw_result); + ASSERT_TRUE(reader->Read("", result)); + EXPECT_THAT(result.result, IsProtoEqual(r12)); + ASSERT_FALSE(reader->Read("resume-after-2", result)); auto status = reader->Finish(); EXPECT_THAT(status, StatusIs(StatusCode::kPermissionDenied, HasSubstr("uh-oh"))); @@ -268,9 +271,9 @@ TEST(PartialResultSetResume, TransientNonIdempotent) { .WillOnce([&r12](std::string const& token) { EXPECT_TRUE(token.empty()); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read(_)) - .WillOnce([&r12] { return ReadReturn(r12); }) - .WillOnce(Return(ReadReturn())); + EXPECT_CALL(*mock, Read(_, _)) + .WillOnce(ReadAction(r12, false)) + .WillOnce(Return(false)); EXPECT_CALL(*mock, Finish()) .WillOnce(Return(Status(StatusCode::kUnavailable, "Try again"))); return mock; @@ -280,11 +283,11 @@ TEST(PartialResultSetResume, TransientNonIdempotent) { return mock_factory.MakeReader(token); }; auto reader = MakeTestResume(factory, Idempotency::kNonIdempotent); - auto v = reader->Read(""); - ASSERT_TRUE(v.has_value()); - EXPECT_THAT(v->result, IsProtoEqual(r12)); - v = reader->Read("resume-after-2"); - ASSERT_FALSE(v.has_value()); + google::spanner::v1::PartialResultSet raw_result; + auto result = UnownedPartialResultSet::FromPartialResultSet(raw_result); + ASSERT_TRUE(reader->Read("", result)); + EXPECT_THAT(result.result, IsProtoEqual(r12)); + ASSERT_FALSE(reader->Read("resume-after-2", result)); auto status = reader->Finish(); EXPECT_THAT(status, StatusIs(StatusCode::kUnavailable, HasSubstr("Try again"))); @@ -297,7 +300,7 @@ TEST(PartialResultSetResume, TooManyTransients) { .WillRepeatedly([](std::string const& token) { EXPECT_TRUE(token.empty()); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read(_)).WillOnce(Return(ReadReturn())); + EXPECT_CALL(*mock, Read(_, _)).WillOnce(Return(false)); EXPECT_CALL(*mock, Finish()) .WillOnce(Return(Status(StatusCode::kUnavailable, "Try again"))); return mock; @@ -307,8 +310,9 @@ TEST(PartialResultSetResume, TooManyTransients) { return mock_factory.MakeReader(token); }; auto reader = MakeTestResume(factory, Idempotency::kIdempotent); - auto v = reader->Read(""); - ASSERT_FALSE(v.has_value()); + google::spanner::v1::PartialResultSet raw_result; + auto result = UnownedPartialResultSet::FromPartialResultSet(raw_result); + ASSERT_FALSE(reader->Read("", result)); auto status = reader->Finish(); EXPECT_THAT(status, StatusIs(StatusCode::kUnavailable, HasSubstr("Try again"))); @@ -348,10 +352,10 @@ TEST(PartialResultSetResume, ResumptionStart) { .WillOnce([&response](std::string const& token) { EXPECT_TRUE(token.empty()); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read(_)) - .WillOnce([&response] { return ReadReturn(response[0]); }) - .WillOnce([&response] { return ReadReturn(response[1]); }) - .WillOnce(Return(ReadReturn())); + EXPECT_CALL(*mock, Read(_, _)) + .WillOnce(ReadAction(response[0], false)) + .WillOnce(ReadAction(response[1], false)) + .WillOnce(Return(false)); EXPECT_CALL(*mock, TryCancel()).Times(0); EXPECT_CALL(*mock, Finish()) .WillOnce(Return(Status(StatusCode::kUnavailable, "Try again"))); @@ -360,11 +364,11 @@ TEST(PartialResultSetResume, ResumptionStart) { .WillOnce([&response](std::string const& token) { EXPECT_TRUE(token.empty()); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read(_)) - .WillOnce([&response] { return ReadReturn(response[0]); }) - .WillOnce([&response] { return ReadReturn(response[1]); }) - .WillOnce([&response] { return ReadReturn(response[2]); }) - .WillOnce(Return(ReadReturn())); + EXPECT_CALL(*mock, Read(_, _)) + .WillOnce(ReadAction(response[0], false)) + .WillOnce(ReadAction(response[1], false)) + .WillOnce(ReadAction(response[2], false)) + .WillOnce(Return(false)); EXPECT_CALL(*mock, TryCancel()).Times(0); EXPECT_CALL(*mock, Finish()).WillOnce(Return(Status())); return mock; @@ -424,10 +428,10 @@ TEST(PartialResultSetResume, ResumptionMidway) { .WillOnce([&response](std::string const& token) { EXPECT_TRUE(token.empty()); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read(_)) - .WillOnce([&response] { return ReadReturn(response[0]); }) - .WillOnce([&response] { return ReadReturn(response[1]); }) - .WillOnce(Return(ReadReturn())); + EXPECT_CALL(*mock, Read(_, _)) + .WillOnce(ReadAction(response[0], false)) + .WillOnce(ReadAction(response[1], false)) + .WillOnce(Return(false)); EXPECT_CALL(*mock, TryCancel()).Times(0); EXPECT_CALL(*mock, Finish()) .WillOnce(Return(Status(StatusCode::kUnavailable, "Try again"))); @@ -436,9 +440,9 @@ TEST(PartialResultSetResume, ResumptionMidway) { .WillOnce([&response](std::string const& token) { EXPECT_EQ("resume-after-4", token); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read(_)) - .WillOnce([&response] { return ReadReturn(response[2]); }) - .WillOnce(Return(ReadReturn())); + EXPECT_CALL(*mock, Read(_, _)) + .WillOnce(ReadAction(response[2], false)) + .WillOnce(Return(false)); EXPECT_CALL(*mock, TryCancel()).Times(0); EXPECT_CALL(*mock, Finish()).WillOnce(Return(Status())); return mock; @@ -498,10 +502,10 @@ TEST(PartialResultSetResume, ResumptionAfterResync) { .WillOnce([&response](std::string const& token) { EXPECT_TRUE(token.empty()); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read(_)) - .WillOnce([&response] { return ReadReturn(response[0]); }) - .WillOnce([&response] { return ReadReturn(response[1]); }) - .WillOnce(Return(ReadReturn())); + EXPECT_CALL(*mock, Read(_, _)) + .WillOnce(ReadAction(response[0], false)) + .WillOnce(ReadAction(response[1], false)) + .WillOnce(Return(false)); EXPECT_CALL(*mock, TryCancel()).Times(0); EXPECT_CALL(*mock, Finish()) .WillOnce(Return(Status(StatusCode::kUnavailable, "Try again"))); @@ -510,9 +514,9 @@ TEST(PartialResultSetResume, ResumptionAfterResync) { .WillOnce([&response](std::string const& token) { EXPECT_EQ("resume-after-4", token); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read(_)) - .WillOnce([&response] { return ReadReturn(response[2]); }) - .WillOnce(Return(ReadReturn())); + EXPECT_CALL(*mock, Read(_, _)) + .WillOnce(ReadAction(response[2], false)) + .WillOnce(Return(false)); EXPECT_CALL(*mock, TryCancel()).Times(0); EXPECT_CALL(*mock, Finish()).WillOnce(Return(Status())); return mock; @@ -568,9 +572,9 @@ TEST(PartialResultSetResume, ResumptionBeforeResync) { .WillOnce([&response](std::string const& token) { EXPECT_TRUE(token.empty()); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Read(_)) - .WillOnce([&response] { return ReadReturn(response[0]); }) - .WillOnce(Return(ReadReturn())); + EXPECT_CALL(*mock, Read(_, _)) + .WillOnce(ReadAction(response[0], false)) + .WillOnce(Return(false)); EXPECT_CALL(*mock, TryCancel()).Times(0); EXPECT_CALL(*mock, Finish()) .WillOnce(Return(Status(StatusCode::kUnavailable, "Try again"))); diff --git a/google/cloud/spanner/internal/partial_result_set_source.cc b/google/cloud/spanner/internal/partial_result_set_source.cc index b4221636cc574..7eae6c55d4573 100644 --- a/google/cloud/spanner/internal/partial_result_set_source.cc +++ b/google/cloud/spanner/internal/partial_result_set_source.cc @@ -18,6 +18,7 @@ #include "google/cloud/internal/make_status.h" #include "google/cloud/log.h" #include "absl/container/fixed_array.h" +#include "absl/types/optional.h" namespace google { namespace cloud { @@ -81,7 +82,11 @@ PartialResultSetSource::Create(std::unique_ptr reader) { PartialResultSetSource::PartialResultSetSource( std::unique_ptr reader) - : options_(internal::CurrentOptions()), reader_(std::move(reader)) { + : options_(internal::CurrentOptions()), + reader_(std::move(reader)), + values_(absl::make_optional< + google::protobuf::RepeatedPtrField>( + &arena_)) { if (options_.has()) { values_space_limit_ = options_.get(); @@ -110,46 +115,76 @@ PartialResultSetSource::~PartialResultSetSource() { } StatusOr PartialResultSetSource::NextRow() { - while (rows_.empty()) { + if (usable_rows_ == 0 && rows_returned_ > 0) { + // There may be complete or partial rows in values_ that haven't been + // returned to the clients yet. Let's copy it over before we reset + // the arena. + int partial_size = + static_cast(values_->size() - rows_returned_ * columns_->size()); + absl::FixedArray tmp(partial_size); + if (!tmp.empty()) { + values_->ExtractSubrange(values_->size() - partial_size, partial_size, + tmp.data()); + } + values_.reset(); + values_space_.Clear(); + arena_.Reset(); + values_.emplace(&arena_); + for (auto* elem : tmp) { + values_->Add(std::move(*elem)); + delete elem; + } + rows_returned_ = 0; + } + while (usable_rows_ == 0) { if (state_ == kFinished) return spanner::Row(); internal::OptionsSpan span(options_); auto status = ReadFromStream(); if (!status.ok()) return status; } - auto row = std::move(rows_.front()); - rows_.pop_front(); - return row; + auto value_it = values_->begin() + rows_returned_ * columns_->size(); + ++rows_returned_; + --usable_rows_; + std::vector values; + values.reserve(metadata_->row_type().fields_size()); + for (auto const& field : metadata_->row_type().fields()) { + values.push_back(FromProto(field.type(), std::move(*value_it))); + ++value_it; + } + return RowFriend::MakeRow(std::move(values), columns_); } Status PartialResultSetSource::ReadFromStream() { - absl::optional result_set; - if (state_ == kFinished || !rows_.empty()) { + if (state_ == kFinished || usable_rows_ != 0 || rows_returned_ != 0) { return internal::InternalError("PartialResultSetSource state error", GCP_ERROR_INFO()); } + auto* raw_result_set = + google::protobuf::Arena::Create( + &arena_); + auto result_set = + UnownedPartialResultSet::FromPartialResultSet(*raw_result_set); if (state_ == kReading) { - result_set = reader_->Read(resume_token_); - if (!result_set) state_ = kEndOfStream; + if (!reader_->Read(resume_token_, result_set)) state_ = kEndOfStream; } if (state_ == kEndOfStream) { // If we have no buffered data, we're done. - if (values_.empty()) { + if (values_->empty()) { state_ = kFinished; return reader_->Finish(); } // Otherwise, proceed with a `PartialResultSet` using a fake resume // token to flush the buffer. The service does not appear to yield // a resume token in its final response, despite it completing a row. - result_set = PartialResultSet{{}, false}; - result_set->result.set_resume_token(""); + result_set.result.set_resume_token(""); } - if (result_set->result.has_metadata()) { + if (result_set.result.has_metadata()) { // If we get metadata more than once, log it, but use the first one. if (metadata_) { GCP_LOG(WARNING) << "PartialResultSetSource: Additional metadata"; } else { - metadata_ = std::move(*result_set->result.mutable_metadata()); + metadata_ = std::move(*result_set.result.mutable_metadata()); // Copy the column names into a vector that will be shared with // every Row object returned from NextRow(). columns_ = std::make_shared>(); @@ -159,21 +194,21 @@ Status PartialResultSetSource::ReadFromStream() { } } } - if (result_set->result.has_stats()) { + if (result_set.result.has_stats()) { // If we get stats more than once, log it, but use the last one. if (stats_) { GCP_LOG(WARNING) << "PartialResultSetSource: Additional stats"; } - stats_ = std::move(*result_set->result.mutable_stats()); + stats_ = std::move(*result_set.result.mutable_stats()); } - if (result_set->result.has_precommit_token()) { - precommit_token_ = std::move(*result_set->result.mutable_precommit_token()); + if (result_set.result.has_precommit_token()) { + precommit_token_ = std::move(*result_set.result.mutable_precommit_token()); } // If reader_->Read() resulted in a new PartialResultSetReader (i.e., it // used the token to resume an interrupted stream), then we must discard // any buffered data as it will be replayed. - if (result_set->resumption) { + if (result_set.resumption) { if (!resume_token_) { // The reader claims to have resumed the stream even though we said it // should not. That leaves us in the untenable position of possibly @@ -184,29 +219,30 @@ Status PartialResultSetSource::ReadFromStream() { GCP_ERROR_INFO()); } values_back_incomplete_ = false; - values_.Clear(); + values_->Clear(); + values_space_.Clear(); } // If the final value in the previous `PartialResultSet` was incomplete, // it must be combined with the first value from the new set. And then // we move everything remaining from the new set to the end of `values_`. - if (!result_set->result.values().empty()) { - auto& new_values = *result_set->result.mutable_values(); + if (!result_set.result.values().empty()) { + auto& new_values = *result_set.result.mutable_values(); int append_start = 0; if (values_back_incomplete_) { auto& first = *new_values.Mutable(append_start++); - auto status = MergeChunk(*values_.rbegin(), std::move(first)); + auto status = MergeChunk(*values_->rbegin(), std::move(first)); if (!status.ok()) return status; } - ExtractSubrangeAndAppend(new_values, append_start, values_); - values_back_incomplete_ = result_set->result.chunked_value(); + ExtractSubrangeAndAppend(new_values, append_start, *values_); + values_back_incomplete_ = result_set.result.chunked_value(); } // Deliver whatever rows we can muster. - auto const n_values = values_.size() - (values_back_incomplete_ ? 1 : 0); + auto const n_values = values_->size() - (values_back_incomplete_ ? 1 : 0); auto const n_columns = columns_ ? static_cast(columns_->size()) : 0; auto n_rows = n_columns ? n_values / n_columns : 0; - if (n_columns == 0 && !values_.empty()) { + if (n_columns == 0 && !values_->empty()) { return internal::InternalError( "PartialResultSetSource metadata is missing row type", GCP_ERROR_INFO()); @@ -214,8 +250,13 @@ Status PartialResultSetSource::ReadFromStream() { // If we didn't receive a resume token, and have not exceeded our buffer // limit, then we choose to `Read()` again so as to maintain resumability. - if (result_set->result.resume_token().empty()) { - if (values_.SpaceUsedExcludingSelfLong() < values_space_limit_) { + if (result_set.result.resume_token().empty() && values_space_limit_ > 0) { + for (auto it = values_->begin() + values_space_.index; it != values_->end(); + ++it) { + values_space_.space_used += it->SpaceUsedLong(); + } + values_space_.index = values_->size(); + if (values_space_.space_used < values_space_limit_) { return {}; // OK } } @@ -223,9 +264,9 @@ Status PartialResultSetSource::ReadFromStream() { // If we did receive a resume token then everything should be deliverable, // and we'll be able to resume the stream at this point after a breakage. // Otherwise, if we deliver anything at all, we must disable resumability. - if (!result_set->result.resume_token().empty()) { - resume_token_ = result_set->result.resume_token(); - if (n_rows * n_columns != values_.size()) { + if (!result_set.result.resume_token().empty()) { + resume_token_ = result_set.result.resume_token(); + if (n_rows * n_columns != values_->size()) { if (state_ != kEndOfStream) { return internal::InternalError( "PartialResultSetSource reader produced a resume token" @@ -243,24 +284,7 @@ Status PartialResultSetSource::ReadFromStream() { resume_token_ = absl::nullopt; } - // Combine the available values into new elements of `rows_`. - int values_pos = 0; - std::vector values; - values.reserve(n_columns); - for (; n_rows != 0; --n_rows) { - for (auto const& field : metadata_->row_type().fields()) { - auto& value = *values_.Mutable(values_pos++); - values.push_back(FromProto(field.type(), std::move(value))); - } - rows_.push_back(RowFriend::MakeRow(std::move(values), columns_)); - values.clear(); - } - - // If we didn't combine all the values, leave the remainder for next time. - auto* rem_values = result_set->result.mutable_values(); - ExtractSubrangeAndAppend(values_, values_pos, *rem_values); - values_.Swap(rem_values); - + usable_rows_ = n_rows; return {}; // OK } diff --git a/google/cloud/spanner/internal/partial_result_set_source.h b/google/cloud/spanner/internal/partial_result_set_source.h index 0f625843719f1..59f09ae6c570b 100644 --- a/google/cloud/spanner/internal/partial_result_set_source.h +++ b/google/cloud/spanner/internal/partial_result_set_source.h @@ -23,6 +23,7 @@ #include "google/cloud/status.h" #include "google/cloud/status_or.h" #include "absl/types/optional.h" +#include #include #include #include @@ -85,6 +86,9 @@ class PartialResultSetSource : public PartialResultSourceInterface { Status ReadFromStream(); + // Arena for the values_ field. + google::protobuf::Arena arena_; + Options options_; std::unique_ptr reader_; @@ -102,8 +106,31 @@ class PartialResultSetSource : public PartialResultSourceInterface { absl::optional precommit_token_ = absl::nullopt; - // `Row`s ready to be returned by `NextRow()`. - std::deque rows_; + // Number of rows returned to the client. + int rows_returned_ = 0; + + // Number of rows that can be created from `values_` in NextRow(). Note there + // may be more data in values_ but it's not ready to be returned to the + // client. + int usable_rows_ = 0; + + // Values that can be assembled into `Row`s ready to be returned by + // `NextRow()`. + absl::optional> + values_; + + // `space_used` is the sum of the SpaceUsedLong() by the values at indexes [0, + // index) in `values_`. + struct PrecomputedSpaceUsed { + void Clear() { + space_used = 0; + index = 0; + } + + std::size_t space_used = 0; + int index = 0; + }; + PrecomputedSpaceUsed values_space_; // When engaged, the token we can use to resume the stream immediately after // any data in (or previously in) `rows_`. When disengaged, we have already @@ -111,10 +138,6 @@ class PartialResultSetSource : public PartialResultSourceInterface { // see a new token. absl::optional resume_token_ = ""; - // `Value`s that could be combined into `rows_` when we have enough to fill - // an entire row, plus a token that would resume the stream after such rows. - google::protobuf::RepeatedPtrField values_; - // Should the space used by `values_` get larger than this limit, we will // move complete rows into `rows_` and disable resumption until we see a // new token. During this time, an error in the stream will be returned by diff --git a/google/cloud/spanner/internal/partial_result_set_source_test.cc b/google/cloud/spanner/internal/partial_result_set_source_test.cc index a3aef9aae52b3..1238436be66a9 100644 --- a/google/cloud/spanner/internal/partial_result_set_source_test.cc +++ b/google/cloud/spanner/internal/partial_result_set_source_test.cc @@ -39,6 +39,7 @@ using ::google::cloud::testing_util::IsProtoEqual; using ::google::cloud::testing_util::StatusIs; using ::google::protobuf::TextFormat; using ::testing::_; +using ::testing::Return; using ::testing::UnitTest; std::string CurrentTestName() { @@ -81,13 +82,6 @@ std::function ResultMock(T const& result) { }; } -absl::optional ReadResult( - google::spanner::v1::PartialResultSet response) { - return PartialResultSet{std::move(response), false}; -} - -absl::optional ReadResult() { return {}; } - MATCHER_P(IsValidAndEquals, expected, "Verifies that a StatusOr contains the given Row") { return arg && *arg == expected; @@ -96,7 +90,10 @@ MATCHER_P(IsValidAndEquals, expected, /// @test Verify the behavior when the initial `Read()` fails. TEST(PartialResultSetSourceTest, InitialReadFailure) { auto grpc_reader = std::make_unique(); - EXPECT_CALL(*grpc_reader, Read(_)).WillOnce(ResultMock(ReadResult())); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce( + [](absl::optional const&, + spanner_internal::UnownedPartialResultSet&) { return false; }); EXPECT_CALL(*grpc_reader, Finish()) .WillOnce(ResultMock(Status(StatusCode::kInvalidArgument, "invalid"))); EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); @@ -126,9 +123,13 @@ TEST(PartialResultSetSourceTest, ReadSuccessThenFailure) { ASSERT_TRUE(TextFormat::ParseFromString(kText, &response)); auto grpc_reader = std::make_unique(); - EXPECT_CALL(*grpc_reader, Read(_)) - .WillOnce(ResultMock(ReadResult(response))) - .WillOnce(ResultMock(ReadResult())); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response; + return true; + }) + .WillOnce(Return(false)); EXPECT_CALL(*grpc_reader, Finish()) .WillOnce(ResultMock(Status(StatusCode::kCancelled, "cancelled"))); EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); @@ -148,7 +149,12 @@ TEST(PartialResultSetSourceTest, MissingMetadata) { google::spanner::v1::PartialResultSet response; auto grpc_reader = std::make_unique(); - EXPECT_CALL(*grpc_reader, Read(_)).WillOnce(ResultMock(ReadResult(response))); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response; + return true; + }); EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); EXPECT_CALL(*grpc_reader, TryCancel()).WillOnce(VoidMock()); @@ -171,9 +177,13 @@ TEST(PartialResultSetSourceTest, MissingRowTypeNoData) { ASSERT_TRUE(TextFormat::ParseFromString(kText, &response)); auto grpc_reader = std::make_unique(); - EXPECT_CALL(*grpc_reader, Read(_)) - .WillOnce(ResultMock(ReadResult(response))) - .WillOnce(ResultMock(ReadResult())); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response; + return true; + }) + .WillOnce(Return(false)); EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); @@ -196,9 +206,13 @@ TEST(PartialResultSetSourceTest, MissingRowTypeWithData) { ASSERT_TRUE(TextFormat::ParseFromString(kText, &response)); auto grpc_reader = std::make_unique(); - EXPECT_CALL(*grpc_reader, Read(_)) - .WillOnce(ResultMock(ReadResult(response))) - .WillOnce(ResultMock(ReadResult())); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response; + return true; + }) + .WillOnce(Return(false)); EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); @@ -252,9 +266,13 @@ TEST(PartialResultSetSourceTest, SingleResponse) { ASSERT_TRUE(TextFormat::ParseFromString(kText, &response)); auto grpc_reader = std::make_unique(); - EXPECT_CALL(*grpc_reader, Read(_)) - .WillOnce(ResultMock(ReadResult(response))) - .WillOnce(ResultMock(ReadResult())); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response; + return true; + }) + .WillOnce(Return(false)); EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); @@ -377,13 +395,38 @@ TEST(PartialResultSetSourceTest, MultipleResponses) { // `ReadFromStream()`. for (std::size_t buffer_size : {0, 153, 161, 321, 385, 448}) { auto grpc_reader = std::make_unique(); - EXPECT_CALL(*grpc_reader, Read(_)) - .WillOnce(ResultMock(ReadResult(response[0]))) - .WillOnce(ResultMock(ReadResult(response[1]))) - .WillOnce(ResultMock(ReadResult(response[2]))) - .WillOnce(ResultMock(ReadResult(response[3]))) - .WillOnce(ResultMock(ReadResult(response[4]))) - .WillOnce(ResultMock(ReadResult())); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce( + [&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[0]; + return true; + }) + .WillOnce( + [&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[1]; + return true; + }) + .WillOnce( + [&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[2]; + return true; + }) + .WillOnce( + [&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[3]; + return true; + }) + .WillOnce( + [&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[4]; + return true; + }) + .WillOnce(Return(false)); EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); @@ -444,11 +487,23 @@ TEST(PartialResultSetSourceTest, ResponseWithNoValues) { } auto grpc_reader = std::make_unique(); - EXPECT_CALL(*grpc_reader, Read(_)) - .WillOnce(ResultMock(ReadResult(response[0]))) - .WillOnce(ResultMock(ReadResult(response[1]))) - .WillOnce(ResultMock(ReadResult(response[2]))) - .WillOnce(ResultMock(ReadResult())); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[0]; + return true; + }) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[1]; + return true; + }) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[2]; + return true; + }) + .WillOnce(Return(false)); EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); @@ -508,13 +563,33 @@ TEST(PartialResultSetSourceTest, ChunkedStringValueWellFormed) { } auto grpc_reader = std::make_unique(); - EXPECT_CALL(*grpc_reader, Read(_)) - .WillOnce(ResultMock(ReadResult(response[0]))) - .WillOnce(ResultMock(ReadResult(response[1]))) - .WillOnce(ResultMock(ReadResult(response[2]))) - .WillOnce(ResultMock(ReadResult(response[3]))) - .WillOnce(ResultMock(ReadResult(response[4]))) - .WillOnce(ResultMock(ReadResult())); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[0]; + return true; + }) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[1]; + return true; + }) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[2]; + return true; + }) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[3]; + return true; + }) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[4]; + return true; + }) + .WillOnce(Return(false)); EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); @@ -562,10 +637,18 @@ TEST(PartialResultSetSourceTest, ChunkedValueSetNoValue) { } auto grpc_reader = std::make_unique(); - EXPECT_CALL(*grpc_reader, Read(_)) - .WillOnce(ResultMock(ReadResult(response[0]))) - .WillOnce(ResultMock(ReadResult(response[1]))) - .WillOnce(ResultMock(ReadResult())); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[0]; + return true; + }) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[1]; + return true; + }) + .WillOnce(Return(false)); EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); @@ -605,10 +688,18 @@ TEST(PartialResultSetSourceTest, ChunkedValueSetNoFollowingValue) { } auto grpc_reader = std::make_unique(); - EXPECT_CALL(*grpc_reader, Read(_)) - .WillOnce(ResultMock(ReadResult(response[0]))) - .WillOnce(ResultMock(ReadResult(response[1]))) - .WillOnce(ResultMock(ReadResult())); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[0]; + return true; + }) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[1]; + return true; + }) + .WillOnce(Return(false)); EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); @@ -650,10 +741,18 @@ TEST(PartialResultSetSourceTest, ChunkedValueSetAtEndOfStream) { } auto grpc_reader = std::make_unique(); - EXPECT_CALL(*grpc_reader, Read(_)) - .WillOnce(ResultMock(ReadResult(response[0]))) - .WillOnce(ResultMock(ReadResult(response[1]))) - .WillOnce(ResultMock(ReadResult())); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[0]; + return true; + }) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[1]; + return true; + }) + .WillOnce(Return(false)); EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); @@ -699,10 +798,22 @@ TEST(PartialResultSetSourceTest, ChunkedValueMergeFailure) { } auto grpc_reader = std::make_unique(); - EXPECT_CALL(*grpc_reader, Read(_)) - .WillOnce(ResultMock(ReadResult(response[0]))) - .WillOnce(ResultMock(ReadResult(response[1]))) - .WillOnce(ResultMock(ReadResult(response[2]))); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[0]; + return true; + }) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[1]; + return true; + }) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[2]; + return true; + }); EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); EXPECT_CALL(*grpc_reader, TryCancel()).WillOnce(VoidMock()); @@ -771,13 +882,33 @@ TEST(PartialResultSetSourceTest, ErrorOnIncompleteRow) { } auto grpc_reader = std::make_unique(); - EXPECT_CALL(*grpc_reader, Read(_)) - .WillOnce(ResultMock(ReadResult(response[0]))) - .WillOnce(ResultMock(ReadResult(response[1]))) - .WillOnce(ResultMock(ReadResult(response[2]))) - .WillOnce(ResultMock(ReadResult(response[3]))) - .WillOnce(ResultMock(ReadResult(response[4]))) - .WillOnce(ResultMock(ReadResult())); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[0]; + return true; + }) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[1]; + return true; + }) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[2]; + return true; + }) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[3]; + return true; + }) + .WillOnce([&response](absl::optional const&, + spanner_internal::UnownedPartialResultSet& result) { + result.result = response[4]; + return true; + }) + .WillOnce(Return(false)); EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); diff --git a/google/cloud/spanner/testing/mock_partial_result_set_reader.h b/google/cloud/spanner/testing/mock_partial_result_set_reader.h index 8214358c56473..bc34c18bab1b6 100644 --- a/google/cloud/spanner/testing/mock_partial_result_set_reader.h +++ b/google/cloud/spanner/testing/mock_partial_result_set_reader.h @@ -28,8 +28,10 @@ class MockPartialResultSetReader : public spanner_internal::PartialResultSetReader { public: MOCK_METHOD(void, TryCancel, (), (override)); - MOCK_METHOD(absl::optional, Read, - (absl::optional const& resume_token), (override)); + MOCK_METHOD(bool, Read, + (absl::optional const& resume_token, + spanner_internal::UnownedPartialResultSet& result), + (override)); MOCK_METHOD(Status, Finish, (), (override)); };