Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions google/cloud/spanner/internal/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,16 @@ class DefaultPartialResultSetReader : public PartialResultSetReader {

void TryCancel() override { context_->TryCancel(); }

absl::optional<PartialResultSet> Read(
absl::optional<std::string> const&) override {
google::spanner::v1::PartialResultSet result;
auto status = reader_->Read(&result);
if (status.has_value()) {
final_status_ = *std::move(status);
return absl::nullopt;
bool Read(absl::optional<std::string> const&,
UnownedPartialResultSet& response) override {
auto opt_status = reader_->Read(&response.result);
response.resumption = false;

if (opt_status.has_value()) {
final_status_ = *std::move(opt_status);
return false;
}
return PartialResultSet{std::move(result), false};
return true;
}

Status Finish() override { return final_status_; }
Expand Down
15 changes: 8 additions & 7 deletions google/cloud/spanner/internal/logging_result_set_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,23 @@ using ::google::cloud::internal::DebugString;

void LoggingResultSetReader::TryCancel() { impl_->TryCancel(); }

absl::optional<PartialResultSet> LoggingResultSetReader::Read(
absl::optional<std::string> const& resume_token) {
bool LoggingResultSetReader::Read(
absl::optional<std::string> const& resume_token,
UnownedPartialResultSet& result) {
if (resume_token) {
GCP_LOG(DEBUG) << __func__ << "() << resume_token=\""
<< DebugString(*resume_token, tracing_options_) << "\"";
} else {
GCP_LOG(DEBUG) << __func__ << "() << (unresumable)";
}
auto result = impl_->Read(resume_token);
if (!result) {
GCP_LOG(DEBUG) << __func__ << "() >> (optional-with-no-value)";
bool success = impl_->Read(resume_token, result);
if (!success) {
GCP_LOG(DEBUG) << __func__ << "() >> (failed)";
} else {
GCP_LOG(DEBUG) << __func__ << "() >> resumption="
<< (result->resumption ? "true" : "false");
<< (result.resumption ? "true" : "false");
}
return result;
return success;
}

Status LoggingResultSetReader::Finish() { return impl_->Finish(); }
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/spanner/internal/logging_result_set_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class LoggingResultSetReader : public PartialResultSetReader {
~LoggingResultSetReader() override = default;

void TryCancel() override;
absl::optional<PartialResultSet> Read(
absl::optional<std::string> const& resume_token) override;
bool Read(absl::optional<std::string> const& resume_token,
UnownedPartialResultSet& result) override;
Status Finish() override;

private:
Expand Down
26 changes: 14 additions & 12 deletions google/cloud/spanner/internal/logging_result_set_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,30 +49,32 @@ TEST_F(LoggingResultSetReaderTest, TryCancel) {

TEST_F(LoggingResultSetReaderTest, Read) {
auto mock = std::make_unique<spanner_testing::MockPartialResultSetReader>();
EXPECT_CALL(*mock, Read(_))
.WillOnce([] {
google::spanner::v1::PartialResultSet result;
result.set_resume_token("test-token");
return PartialResultSet{std::move(result), false};
EXPECT_CALL(*mock, Read(_, _))
.WillOnce([](absl::optional<std::string> const&,
UnownedPartialResultSet& result) {
result.resumption = false;
result.result.set_resume_token("test-token");
return true;
})
.WillOnce([] { return absl::optional<PartialResultSet>{}; });
.WillOnce([] { return false; });
LoggingResultSetReader reader(std::move(mock), TracingOptions{});
auto result = reader.Read("");
ASSERT_TRUE(result.has_value());
EXPECT_EQ("test-token", result->result.resume_token());
google::spanner::v1::PartialResultSet partial_result_set;
auto result =
UnownedPartialResultSet::FromPartialResultSet(partial_result_set);
ASSERT_TRUE(reader.Read("", result));
EXPECT_EQ("test-token", result.result.resume_token());

auto log_lines = log_.ExtractLines();
EXPECT_THAT(log_lines, AllOf(Contains(StartsWith("Read()"))));
EXPECT_THAT(log_lines, Contains(HasSubstr("resume_token=\"\"")));
EXPECT_THAT(log_lines, Contains(HasSubstr("resumption=false")));

result = reader.Read("test-token");
ASSERT_FALSE(result.has_value());
ASSERT_FALSE(reader.Read("test-token", result));

log_lines = log_.ExtractLines();
EXPECT_THAT(log_lines, AllOf(Contains(StartsWith("Read()"))));
EXPECT_THAT(log_lines, Contains(HasSubstr("resume_token=\"test-token\"")));
EXPECT_THAT(log_lines, Contains(HasSubstr("(optional-with-no-value)")));
EXPECT_THAT(log_lines, Contains(HasSubstr("(failed)")));
}

TEST_F(LoggingResultSetReaderTest, Finish) {
Expand Down
13 changes: 9 additions & 4 deletions google/cloud/spanner/internal/partial_result_set_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,13 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
* caller should discard any pending state not covered by the token, as that
* data will be replayed.
*/
struct PartialResultSet {
google::spanner::v1::PartialResultSet result;
struct UnownedPartialResultSet {
static UnownedPartialResultSet FromPartialResultSet(
google::spanner::v1::PartialResultSet& result) {
return UnownedPartialResultSet{result, false};
}

google::spanner::v1::PartialResultSet& result;
bool resumption;
};

Expand All @@ -54,8 +59,8 @@ class PartialResultSetReader {
public:
virtual ~PartialResultSetReader() = default;
virtual void TryCancel() = 0;
virtual absl::optional<PartialResultSet> Read(
absl::optional<std::string> const& resume_token) = 0;
virtual bool Read(absl::optional<std::string> const& resume_token,
UnownedPartialResultSet& result) = 0;
virtual Status Finish() = 0;
};

Expand Down
20 changes: 10 additions & 10 deletions google/cloud/spanner/internal/partial_result_set_resume.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,36 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

void PartialResultSetResume::TryCancel() { child_->TryCancel(); }

absl::optional<PartialResultSet> PartialResultSetResume::Read(
absl::optional<std::string> const& resume_token) {
bool PartialResultSetResume::Read(
absl::optional<std::string> const& resume_token,
UnownedPartialResultSet& result) {
bool resumption = false;
do {
absl::optional<PartialResultSet> result = child_->Read(resume_token);
if (result) {
if (child_->Read(resume_token, result)) {
// Let the caller know if we recreated the PartialResultSetReader using
// the resume_token so that they might discard any previous results that
// will be contained in the new stream.
if (resumption) result->resumption = true;
return result;
if (resumption) result.resumption = true;
return true;
}
auto status = Finish();
if (status.ok()) return {};
if (status.ok()) return false;
if (!resume_token) {
// Our caller has requested that we not try to resume the stream,
// probably because they have already delivered previous results that
// would otherwise be replayed.
return {};
return false;
}
if (idempotency_ == google::cloud::Idempotency::kNonIdempotent ||
!retry_policy_prototype_->OnFailure(status)) {
return {};
return false;
}
std::this_thread::sleep_for(backoff_policy_prototype_->OnCompletion());
resumption = true;
last_status_.reset();
child_ = factory_(*resume_token);
} while (!retry_policy_prototype_->IsExhausted());
return {};
return false;
}

Status PartialResultSetResume::Finish() {
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/spanner/internal/partial_result_set_resume.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class PartialResultSetResume : public PartialResultSetReader {
~PartialResultSetResume() override = default;

void TryCancel() override;
absl::optional<PartialResultSet> Read(
absl::optional<std::string> const& resume_token) override;
bool Read(absl::optional<std::string> const& resume_token,
UnownedPartialResultSet& result) override;
Status Finish() override;

private:
Expand Down
Loading
Loading