From 8e8abd8e61537e7f3eb46207d4ab2d6a55be28bc Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Thu, 9 Oct 2025 05:14:52 +0000 Subject: [PATCH 01/26] feat(bigtable): introduce PartialResultSetSource --- google/cloud/bigtable/CMakeLists.txt | 4 + .../bigtable/bigtable_client_testing.bzl | 1 + .../bigtable/google_cloud_cpp_bigtable.bzl | 3 + .../internal/partial_result_set_reader.h | 72 ++++++ .../internal/partial_result_set_source.cc | 225 ++++++++++++++++++ .../internal/partial_result_set_source.h | 131 ++++++++++ .../partial_result_set_source_test.cc | 137 +++++++++++ google/cloud/bigtable/options.h | 12 + google/cloud/bigtable/results.h | 6 - .../testing/mock_partial_result_set_reader.h | 43 ++++ 10 files changed, 628 insertions(+), 6 deletions(-) create mode 100644 google/cloud/bigtable/internal/partial_result_set_reader.h create mode 100644 google/cloud/bigtable/internal/partial_result_set_source.cc create mode 100644 google/cloud/bigtable/internal/partial_result_set_source.h create mode 100644 google/cloud/bigtable/internal/partial_result_set_source_test.cc create mode 100644 google/cloud/bigtable/testing/mock_partial_result_set_reader.h diff --git a/google/cloud/bigtable/CMakeLists.txt b/google/cloud/bigtable/CMakeLists.txt index 6b0fb282d60e6..90808467d9322 100644 --- a/google/cloud/bigtable/CMakeLists.txt +++ b/google/cloud/bigtable/CMakeLists.txt @@ -205,6 +205,9 @@ add_library( internal/operation_context.h internal/operation_context_factory.cc internal/operation_context_factory.h + internal/partial_result_set_reader.h + internal/partial_result_set_source.cc + internal/partial_result_set_source.h internal/prefix_range_end.cc internal/prefix_range_end.h internal/rate_limiter.cc @@ -390,6 +393,7 @@ if (BUILD_TESTING) testing/mock_mutate_rows_limiter.h testing/mock_mutate_rows_reader.h testing/mock_policies.h + testing/mock_partial_result_set_reader.h testing/mock_read_rows_reader.h testing/mock_response_reader.h testing/mock_sample_row_keys_reader.h diff --git a/google/cloud/bigtable/bigtable_client_testing.bzl b/google/cloud/bigtable/bigtable_client_testing.bzl index d3e8a11cba1cd..c1a2666637d63 100644 --- a/google/cloud/bigtable/bigtable_client_testing.bzl +++ b/google/cloud/bigtable/bigtable_client_testing.bzl @@ -26,6 +26,7 @@ bigtable_client_testing_hdrs = [ "testing/mock_mutate_rows_limiter.h", "testing/mock_mutate_rows_reader.h", "testing/mock_policies.h", + "testing/mock_partial_result_set_reader.h", "testing/mock_read_rows_reader.h", "testing/mock_response_reader.h", "testing/mock_sample_row_keys_reader.h", diff --git a/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl b/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl index 1b5b8675775cb..c264bc654d06d 100644 --- a/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl +++ b/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl @@ -101,6 +101,8 @@ google_cloud_cpp_bigtable_hdrs = [ "internal/mutate_rows_limiter.h", "internal/operation_context.h", "internal/operation_context_factory.h", + "internal/partial_result_set_reader.h", + "internal/partial_result_set_source.h", "internal/prefix_range_end.h", "internal/rate_limiter.h", "internal/readrowsparser.h", @@ -211,6 +213,7 @@ google_cloud_cpp_bigtable_srcs = [ "internal/mutate_rows_limiter.cc", "internal/operation_context.cc", "internal/operation_context_factory.cc", + "internal/partial_result_set_source.cc", "internal/prefix_range_end.cc", "internal/rate_limiter.cc", "internal/readrowsparser.cc", diff --git a/google/cloud/bigtable/internal/partial_result_set_reader.h b/google/cloud/bigtable/internal/partial_result_set_reader.h new file mode 100644 index 0000000000000..5710984f50dbe --- /dev/null +++ b/google/cloud/bigtable/internal/partial_result_set_reader.h @@ -0,0 +1,72 @@ +// Copyright 2019 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_READER_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_READER_H + +#include "google/cloud/bigtable/version.h" +#include "google/cloud/status.h" +#include "google/cloud/status_or.h" +#include "absl/types/optional.h" +#include +#include +#include +#include + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +/** + * The result of a successful `PartialResultSetReader::Read()`, which may + * be the next partial result of a stream, or a resumption of an interrupted + * stream from the `resume_token` if it was engaged. In the latter case, the + * caller should discard any pending state not covered by the token, as that + * data will be replayed. + */ +struct UnownedPartialResultSet { + static UnownedPartialResultSet FromPartialResultSet( + google::bigtable::v2::PartialResultSet& result) { + return UnownedPartialResultSet{result, false}; + } + + google::bigtable::v2::PartialResultSet& result; + bool resumption; +}; + +/** + * Wrap `grpc::ClientReaderInterface`. + * + * This defines an interface to handle a streaming RPC returning a sequence + * of `google::bigtable::v2::PartialResultSet`. Its main purpose is to + * simplify memory management, as each streaming RPC requires two separate + * `std::unique_ptr<>`. As a side-effect, it is also easier to mock as it + * has a narrower interface. + */ +class PartialResultSetReader { + public: + virtual ~PartialResultSetReader() = default; + virtual void TryCancel() = 0; + virtual bool Read(absl::optional const& resume_token, + UnownedPartialResultSet& result) = 0; + virtual Status Finish() = 0; +}; + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_READER_H diff --git a/google/cloud/bigtable/internal/partial_result_set_source.cc b/google/cloud/bigtable/internal/partial_result_set_source.cc new file mode 100644 index 0000000000000..293645d9f2984 --- /dev/null +++ b/google/cloud/bigtable/internal/partial_result_set_source.cc @@ -0,0 +1,225 @@ +// Copyright 2019 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/bigtable/internal/partial_result_set_source.h" +#include "google/cloud/bigtable/options.h" +#include "google/cloud/internal/make_status.h" +#include "google/cloud/log.h" +#include "absl/container/fixed_array.h" +#include "absl/types/optional.h" + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + + +StatusOr> +PartialResultSetSource::Create(std::unique_ptr reader) { + std::unique_ptr source( + new PartialResultSetSource(std::move(reader))); + + // Do an initial read from the stream to determine the fate of the factory. + auto status = source->ReadFromStream(); + + // If the initial read finished the stream, and `Finish()` failed, then + // creating the `PartialResultSetSource` should fail with the same error. + if (source->state_ == kFinished && !status.ok()) return status; + + return {std::move(source)}; +} + +PartialResultSetSource::PartialResultSetSource( + std::unique_ptr reader) + : options_(internal::CurrentOptions()), + reader_(std::move(reader)), + values_(absl::make_optional( + google::protobuf::Arena::Create< + google::protobuf::RepeatedPtrField>( + &arena_))) { + if (options_.has()) { + values_space_limit_ = + options_.get(); + } +} + +PartialResultSetSource::~PartialResultSetSource() { + internal::OptionsSpan span(options_); + if (state_ == kReading) { + // Finish() can deadlock if there is still data in the streaming RPC, + // so before trying to read the final status we need to cancel. + reader_->TryCancel(); + state_ = kEndOfStream; + } + if (state_ == kEndOfStream) { + // The user didn't iterate over all the data, so finish the stream on + // their behalf, although we have no way to communicate error status. + auto status = reader_->Finish(); + if (!status.ok() && status.code() != StatusCode::kCancelled) { + GCP_LOG(WARNING) + << "PartialResultSetSource: Finish() failed in destructor: " + << status; + } + state_ = kFinished; + } +} + +StatusOr PartialResultSetSource::NextRow() { + if (usable_rows_ == 0 && rows_returned_ > 0) { + // There may be complete or partial rows in values_ that haven't been + // returned to the clients yet. Let's copy it over before we reset + // the arena. + auto* values = *values_; + int partial_size = + static_cast(values->size() - rows_returned_ * columns_->size()); + absl::FixedArray tmp(partial_size); + if (!tmp.empty()) { + values->ExtractSubrange(values->size() - partial_size, partial_size, + tmp.data()); + } + values_.reset(); + values_space_.Clear(); + arena_.Reset(); + values_.emplace( + google::protobuf::Arena::Create< + google::protobuf::RepeatedPtrField>( + &arena_)); + values = *values_; + for (auto* elem : tmp) { + values->Add(std::move(*elem)); + delete elem; + } + rows_returned_ = 0; + } + while (usable_rows_ == 0) { + if (state_ == kFinished) return bigtable::QueryRow(); + internal::OptionsSpan span(options_); + auto status = ReadFromStream(); + if (!status.ok()) return status; + } + ++rows_returned_; + --usable_rows_; + std::vector values; + return QueryRowFriend::MakeQueryRow(std::move(values), columns_); +} + +Status PartialResultSetSource::ReadFromStream() { + if (state_ == kFinished || usable_rows_ != 0 || rows_returned_ != 0) { + return internal::InternalError("PartialResultSetSource state error", + GCP_ERROR_INFO()); + } + auto* values = *values_; + auto* raw_result_set = + google::protobuf::Arena::Create( + &arena_); + auto result_set = + UnownedPartialResultSet::FromPartialResultSet(*raw_result_set); + if (state_ == kReading) { + if (!reader_->Read(resume_token_, result_set)) state_ = kEndOfStream; + } + if (state_ == kEndOfStream) { + // If we have no buffered data, we're done. + if (values->empty()) { + state_ = kFinished; + return reader_->Finish(); + } + // Otherwise, proceed with a `PartialResultSet` using a fake resume + // token to flush the buffer. The service does not appear to yield + // a resume token in its final response, despite it completing a row. + result_set.result.set_resume_token(""); + } + + // ExecuteQueryResponse contains metadata and partialResult +// // Copy the column names into a vector that will be shared with +// // every Row object returned from NextRow(). + // columns_ = std::make_shared>(); + // columns_->reserve(result_set.result.proto_rows_batch.fields_size()); + // for (auto const& field : metadata_->row_type().fields()) { + // columns_->push_back(field.name()); + // } +// } +// } + + // If reader_->Read() resulted in a new PartialResultSetReader (i.e., it + // used the token to resume an interrupted stream), then we must discard + // any buffered data as it will be replayed. + if (result_set.resumption) { + if (!resume_token_) { + // The reader claims to have resumed the stream even though we said it + // should not. That leaves us in the untenable position of possibly + // having returned data that will be replayed, so fail the stream now. + return internal::InternalError( + "PartialResultSetSource reader resumed the stream" + " despite our having asked it not to", + GCP_ERROR_INFO()); + } + values_back_incomplete_ = false; + values->Clear(); + values_space_.Clear(); + } + + // Deliver whatever rows we can muster. + auto const n_values = values->size() - (values_back_incomplete_ ? 1 : 0); + auto const n_columns = columns_ ? static_cast(columns_->size()) : 0; + auto n_rows = n_columns ? n_values / n_columns : 0; + if (n_columns == 0 && !values->empty()) { + return internal::InternalError( + "PartialResultSetSource metadata is missing row type", + GCP_ERROR_INFO()); + } + + // If we didn't receive a resume token, and have not exceeded our buffer + // limit, then we choose to `Read()` again so as to maintain resumability. + if (result_set.result.resume_token().empty() && values_space_limit_ > 0) { + for (auto it = values->begin() + values_space_.index; it != values->end(); + ++it) { + values_space_.space_used += it->SpaceUsedLong(); + } + values_space_.index = values->size(); + if (values_space_.space_used < values_space_limit_) { + return {}; // OK + } + } + + // If we did receive a resume token then everything should be deliverable, + // and we'll be able to resume the stream at this point after a breakage. + // Otherwise, if we deliver anything at all, we must disable resumability. + if (!result_set.result.resume_token().empty()) { + resume_token_ = result_set.result.resume_token(); + if (n_rows * n_columns != values->size()) { + if (state_ != kEndOfStream) { + return internal::InternalError( + "PartialResultSetSource reader produced a resume token" + " that is not on a row boundary", + GCP_ERROR_INFO()); + } + if (n_rows == 0) { + return internal::InternalError( + "PartialResultSetSource stream ended at a point" + " that is not on a row boundary", + GCP_ERROR_INFO()); + } + } + } else if (n_rows != 0) { + resume_token_ = absl::nullopt; + } + + usable_rows_ = n_rows; + return {}; // OK +} + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google diff --git a/google/cloud/bigtable/internal/partial_result_set_source.h b/google/cloud/bigtable/internal/partial_result_set_source.h new file mode 100644 index 0000000000000..06ef7102af051 --- /dev/null +++ b/google/cloud/bigtable/internal/partial_result_set_source.h @@ -0,0 +1,131 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_SOURCE_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_SOURCE_H + +#include "google/cloud/bigtable/internal/partial_result_set_reader.h" +#include "google/cloud/bigtable/results.h" +#include "google/cloud/bigtable/value.h" +#include "google/cloud/bigtable/version.h" +#include "google/cloud/options.h" +#include "google/cloud/status.h" +#include "google/cloud/status_or.h" +#include "absl/types/optional.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +class PartialResultSourceInterface : public bigtable::ResultSourceInterface { +}; + +/** + * Class used to iterate over the rows returned from a read operations. + */ +class PartialResultSetSource : public PartialResultSourceInterface { + public: + /// Factory method to create a PartialResultSetSource. + static StatusOr> Create( + std::unique_ptr reader); + + ~PartialResultSetSource() override; + + StatusOr NextRow() override; + + private: + explicit PartialResultSetSource( + std::unique_ptr reader); + + Status ReadFromStream(); + + // Arena for the values_ field. + google::protobuf::Arena arena_; + + Options options_; + std::unique_ptr reader_; + + std::shared_ptr> columns_; + + // Number of rows returned to the client. + int rows_returned_ = 0; + + // Number of rows that can be created from `values_` in NextRow(). Note there + // may be more data in values_ but it's not ready to be returned to the + // client. + int usable_rows_ = 0; + + // Values that can be assembled into `QueryRow`s ready to be returned by + // `NextRow()`. This is a pointer to an arena-allocated RepeatedPtrField. + absl::optional*> + values_; + + // `space_used` is the sum of the SpaceUsedLong() by the values at indexes [0, + // index) in `values_`. + struct PrecomputedSpaceUsed { + void Clear() { + space_used = 0; + index = 0; + } + + std::size_t space_used = 0; + int index = 0; + }; + PrecomputedSpaceUsed values_space_; + + // When engaged, the token we can use to resume the stream immediately after + // any data in (or previously in) `rows_`. When disengaged, we have already + // delivered data that would be replayed, so resumption is disabled until we + // see a new token. + absl::optional resume_token_ = ""; + + // Should the space used by `values_` get larger than this limit, we will + // move complete rows into `rows_` and disable resumption until we see a + // new token. During this time, an error in the stream will be returned by + // `NextRow()`. No individual row in a result set can exceed 100 MiB, so we + // set the default limit to twice that. + std::size_t values_space_limit_ = 2 * 100 * (std::size_t{1} << 20); + + // `*values_.rbegin()` exists, but it is incomplete. The rest of the value + // will be sent in subsequent `PartialResultSet` messages. + bool values_back_incomplete_ = false; + + // The state of our PartialResultSetReader. + enum : char { + // `Read()` has yet to return nullopt. + kReading, + // `Read()` has returned nullopt, but we are yet to call `Finish()`. + kEndOfStream, + // `Finish()` has been called, which means `NextRow()` has returned + // either an empty row or an error status. + kFinished, + } state_ = kReading; +}; + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_SOURCE_H diff --git a/google/cloud/bigtable/internal/partial_result_set_source_test.cc b/google/cloud/bigtable/internal/partial_result_set_source_test.cc new file mode 100644 index 0000000000000..c5a95456fa7a0 --- /dev/null +++ b/google/cloud/bigtable/internal/partial_result_set_source_test.cc @@ -0,0 +1,137 @@ + +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/bigtable/internal/partial_result_set_source.h" +#include "google/cloud/bigtable/mocks/mock_query_row.h" +#include "google/cloud/bigtable/options.h" +#include "google/cloud/bigtable/query_row.h" +#include "google/cloud/bigtable/testing/mock_partial_result_set_reader.h" +#include "google/cloud/bigtable/value.h" +#include "google/cloud/options.h" +#include "google/cloud/testing_util/is_proto_equal.h" +#include "google/cloud/testing_util/status_matchers.h" +#include +#include +#include +#include +#include +#include + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +namespace { + +using ::google::cloud::testing_util::StatusIs; +using ::google::protobuf::TextFormat; +using ::testing::_; +using ::testing::Return; +using ::testing::UnitTest; + +std::string CurrentTestName() { + return UnitTest::GetInstance()->current_test_info()->name(); +} + +struct StringOption { + using Type = std::string; +}; + +// Create the `PartialResultSetSource` within an `OptionsSpan` that has its +// `StringOption` set to the current test name, so that we might check that +// all `PartialResultSetReader` calls happen within a matching span. +StatusOr> +CreatePartialResultSetSource(std::unique_ptr reader, + Options opts = {}) { + internal::OptionsSpan span(internal::MergeOptions( + std::move(opts.set(CurrentTestName())), + internal::CurrentOptions())); + return PartialResultSetSource::Create(std::move(reader)); +} + +// // Returns a functor that expects the current `StringOption` to match the +// test +// // name. +// std::function VoidMock() { +// return [] { +// EXPECT_EQ(internal::CurrentOptions().get(), +// CurrentTestName()); +// }; +// } + +// Returns a functor that will return the argument after expecting that the +// current `StringOption` matches the test name. +template +std::function ResultMock(T const& result) { + return [result]() { + EXPECT_EQ(internal::CurrentOptions().get(), + CurrentTestName()); + return result; + }; +} + +MATCHER_P(IsValidAndEquals, expected, + "Verifies that a StatusOr contains the given QueryRow") { + return arg && *arg == expected; +} + +/// @test Verify the behavior when the initial `Read()` fails. +TEST(PartialResultSetSourceTest, InitialReadFailure) { + auto grpc_reader = + std::make_unique(); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([](absl::optional const&, + UnownedPartialResultSet&) { return false; }); + EXPECT_CALL(*grpc_reader, Finish()) + .WillOnce(ResultMock(Status(StatusCode::kInvalidArgument, "invalid"))); + EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); + + internal::OptionsSpan overlay(Options{}.set("uh-oh")); + auto reader = CreatePartialResultSetSource(std::move(grpc_reader)); + EXPECT_THAT(reader, StatusIs(StatusCode::kInvalidArgument, "invalid")); +} + +/** + * @test Verify the behavior when the response does not contain data. + */ +TEST(PartialResultSetSourceTest, MissingRowTypeNoData) { + auto constexpr kText = R"pb( + )pb"; + google::bigtable::v2::PartialResultSet response; + ASSERT_TRUE(TextFormat::ParseFromString(kText, &response)); + + auto grpc_reader = + std::make_unique(); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&response](absl::optional const&, + UnownedPartialResultSet& result) { + result.result = response; + return true; + }) + .WillOnce(Return(false)); + EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); + EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); + + internal::OptionsSpan overlay(Options{}.set("uh-oh")); + auto reader = CreatePartialResultSetSource(std::move(grpc_reader)); + ASSERT_STATUS_OK(reader); + EXPECT_THAT((*reader)->NextRow(), IsValidAndEquals(bigtable::QueryRow{})); +} + +} // namespace +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google diff --git a/google/cloud/bigtable/options.h b/google/cloud/bigtable/options.h index a49d08ce226b0..dc1cd00abacc8 100644 --- a/google/cloud/bigtable/options.h +++ b/google/cloud/bigtable/options.h @@ -260,6 +260,18 @@ using DataPolicyOptionList = IdempotentMutationPolicyOption, EnableMetricsOption, MetricsPeriodOption>; +/** + * Option for `google::cloud::Options` to set a limit on how much data will + * be buffered to guarantee resumability of a streaming read or SQL query. + * If the limit is exceeded, and the stream is subsequently interrupted before + * a new resumption point can be established, the read/query will fail. + * + * @ingroup google-cloud-spanner-options + */ +struct StreamingResumabilityBufferSizeOption { + using Type = std::size_t; +}; + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace bigtable } // namespace cloud diff --git a/google/cloud/bigtable/results.h b/google/cloud/bigtable/results.h index 3e2af0bde80ea..0325c8c1135af 100644 --- a/google/cloud/bigtable/results.h +++ b/google/cloud/bigtable/results.h @@ -44,12 +44,6 @@ class ResultSourceInterface { * row_key() to indicate end-of-stream. */ virtual StatusOr NextRow() = 0; - - /** - * Returns metadata about the result set. - */ - virtual absl::optional - Metadata() = 0; }; /** diff --git a/google/cloud/bigtable/testing/mock_partial_result_set_reader.h b/google/cloud/bigtable/testing/mock_partial_result_set_reader.h new file mode 100644 index 0000000000000..0751a3d2d540d --- /dev/null +++ b/google/cloud/bigtable/testing/mock_partial_result_set_reader.h @@ -0,0 +1,43 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_TESTING_MOCK_PARTIAL_RESULT_SET_READER_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_TESTING_MOCK_PARTIAL_RESULT_SET_READER_H + +#include "google/cloud/bigtable/internal/partial_result_set_reader.h" +#include "google/cloud/bigtable/version.h" +#include + +namespace google { +namespace cloud { +namespace bigtable_testing { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +class MockPartialResultSetReader + : public bigtable_internal::PartialResultSetReader { + public: + MOCK_METHOD(void, TryCancel, (), (override)); + MOCK_METHOD(bool, Read, + (absl::optional const& resume_token, + bigtable_internal::UnownedPartialResultSet& result), + (override)); + MOCK_METHOD(Status, Finish, (), (override)); +}; + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_testing +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_TESTING_MOCK_PARTIAL_RESULT_SET_READER_H From b1fc24849213e041b4660627639e04886c826b65 Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Thu, 9 Oct 2025 05:16:06 +0000 Subject: [PATCH 02/26] fix headers --- google/cloud/bigtable/internal/partial_result_set_reader.h | 2 +- google/cloud/bigtable/internal/partial_result_set_source.cc | 2 +- google/cloud/bigtable/internal/partial_result_set_source.h | 2 +- .../cloud/bigtable/internal/partial_result_set_source_test.cc | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/google/cloud/bigtable/internal/partial_result_set_reader.h b/google/cloud/bigtable/internal/partial_result_set_reader.h index 5710984f50dbe..6e502a2dd2ed5 100644 --- a/google/cloud/bigtable/internal/partial_result_set_reader.h +++ b/google/cloud/bigtable/internal/partial_result_set_reader.h @@ -1,4 +1,4 @@ -// Copyright 2019 Google LLC +// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/google/cloud/bigtable/internal/partial_result_set_source.cc b/google/cloud/bigtable/internal/partial_result_set_source.cc index 293645d9f2984..e5ea4990442ec 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source.cc @@ -1,4 +1,4 @@ -// Copyright 2019 Google LLC +// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/google/cloud/bigtable/internal/partial_result_set_source.h b/google/cloud/bigtable/internal/partial_result_set_source.h index 06ef7102af051..420c18a9743fa 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.h +++ b/google/cloud/bigtable/internal/partial_result_set_source.h @@ -1,4 +1,4 @@ -// Copyright 2022 Google LLC +// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/google/cloud/bigtable/internal/partial_result_set_source_test.cc b/google/cloud/bigtable/internal/partial_result_set_source_test.cc index c5a95456fa7a0..f6a131a1b654b 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source_test.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source_test.cc @@ -1,5 +1,5 @@ -// Copyright 2022 Google LLC +// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From 2cc07a463e26f3bcfab171ec6546923519ae9e35 Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Thu, 9 Oct 2025 05:17:17 +0000 Subject: [PATCH 03/26] clean up --- .../internal/partial_result_set_source_test.cc | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/google/cloud/bigtable/internal/partial_result_set_source_test.cc b/google/cloud/bigtable/internal/partial_result_set_source_test.cc index f6a131a1b654b..eff9c81af2919 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source_test.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source_test.cc @@ -61,16 +61,6 @@ CreatePartialResultSetSource(std::unique_ptr reader, return PartialResultSetSource::Create(std::move(reader)); } -// // Returns a functor that expects the current `StringOption` to match the -// test -// // name. -// std::function VoidMock() { -// return [] { -// EXPECT_EQ(internal::CurrentOptions().get(), -// CurrentTestName()); -// }; -// } - // Returns a functor that will return the argument after expecting that the // current `StringOption` matches the test name. template From 99e00b8b323d9113491b272fa808811ba52a6640 Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Thu, 9 Oct 2025 05:27:10 +0000 Subject: [PATCH 04/26] clean up --- .../bigtable/internal/partial_result_set_source.cc | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/google/cloud/bigtable/internal/partial_result_set_source.cc b/google/cloud/bigtable/internal/partial_result_set_source.cc index e5ea4990442ec..07ad07c70ab1a 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source.cc @@ -140,17 +140,6 @@ Status PartialResultSetSource::ReadFromStream() { result_set.result.set_resume_token(""); } - // ExecuteQueryResponse contains metadata and partialResult -// // Copy the column names into a vector that will be shared with -// // every Row object returned from NextRow(). - // columns_ = std::make_shared>(); - // columns_->reserve(result_set.result.proto_rows_batch.fields_size()); - // for (auto const& field : metadata_->row_type().fields()) { - // columns_->push_back(field.name()); - // } -// } -// } - // If reader_->Read() resulted in a new PartialResultSetReader (i.e., it // used the token to resume an interrupted stream), then we must discard // any buffered data as it will be replayed. From 697e99f533e1fb5bc3ab82458d0df007aabb11cf Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Fri, 10 Oct 2025 20:46:22 +0000 Subject: [PATCH 05/26] add testing; fix data retrieval --- .../bigtable/bigtable_client_unit_tests.bzl | 1 + .../internal/partial_result_set_source.cc | 156 ++++------ .../internal/partial_result_set_source.h | 23 +- .../partial_result_set_source_test.cc | 277 +++++++++++++++++- google/cloud/bigtable/results.h | 9 + 5 files changed, 364 insertions(+), 102 deletions(-) diff --git a/google/cloud/bigtable/bigtable_client_unit_tests.bzl b/google/cloud/bigtable/bigtable_client_unit_tests.bzl index 116f5657ffda3..2094210979264 100644 --- a/google/cloud/bigtable/bigtable_client_unit_tests.bzl +++ b/google/cloud/bigtable/bigtable_client_unit_tests.bzl @@ -65,6 +65,7 @@ bigtable_client_unit_tests = [ "internal/operation_context_factory_test.cc", "internal/operation_context_test.cc", "internal/prefix_range_end_test.cc", + "internal/partial_result_set_source_test.cc", "internal/rate_limiter_test.cc", "internal/retry_traits_test.cc", "internal/traced_row_reader_test.cc", diff --git a/google/cloud/bigtable/internal/partial_result_set_source.cc b/google/cloud/bigtable/internal/partial_result_set_source.cc index 07ad07c70ab1a..19902ddea1be4 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source.cc @@ -24,11 +24,12 @@ namespace cloud { namespace bigtable_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN - StatusOr> -PartialResultSetSource::Create(std::unique_ptr reader) { +PartialResultSetSource::Create( + absl::optional metadata, + std::unique_ptr reader) { std::unique_ptr source( - new PartialResultSetSource(std::move(reader))); + new PartialResultSetSource(std::move(metadata), std::move(reader))); // Do an initial read from the stream to determine the fate of the factory. auto status = source->ReadFromStream(); @@ -41,13 +42,22 @@ PartialResultSetSource::Create(std::unique_ptr reader) { } PartialResultSetSource::PartialResultSetSource( + absl::optional metadata, std::unique_ptr reader) : options_(internal::CurrentOptions()), reader_(std::move(reader)), + metadata_(std::move(metadata)), values_(absl::make_optional( google::protobuf::Arena::Create< google::protobuf::RepeatedPtrField>( &arena_))) { + if (metadata_.has_value()) { + columns_ = std::make_shared>(); + columns_->reserve(metadata_->proto_schema().columns_size()); + for (auto const& c : metadata_->proto_schema().columns()) { + columns_->push_back(c.name()); + } + } if (options_.has()) { values_space_limit_ = options_.get(); @@ -76,53 +86,28 @@ PartialResultSetSource::~PartialResultSetSource() { } StatusOr PartialResultSetSource::NextRow() { - if (usable_rows_ == 0 && rows_returned_ > 0) { - // There may be complete or partial rows in values_ that haven't been - // returned to the clients yet. Let's copy it over before we reset - // the arena. - auto* values = *values_; - int partial_size = - static_cast(values->size() - rows_returned_ * columns_->size()); - absl::FixedArray tmp(partial_size); - if (!tmp.empty()) { - values->ExtractSubrange(values->size() - partial_size, partial_size, - tmp.data()); - } - values_.reset(); - values_space_.Clear(); - arena_.Reset(); - values_.emplace( - google::protobuf::Arena::Create< - google::protobuf::RepeatedPtrField>( - &arena_)); - values = *values_; - for (auto* elem : tmp) { - values->Add(std::move(*elem)); - delete elem; - } - rows_returned_ = 0; - } - while (usable_rows_ == 0) { + while (rows_.empty()) { if (state_ == kFinished) return bigtable::QueryRow(); internal::OptionsSpan span(options_); + // Continue fetching if there are more rows in the stream. auto status = ReadFromStream(); if (!status.ok()) return status; } - ++rows_returned_; - --usable_rows_; - std::vector values; - return QueryRowFriend::MakeQueryRow(std::move(values), columns_); + // Returns the row at the front of the queue + auto row = std::move(rows_.front()); + rows_.pop_front(); + return row; } Status PartialResultSetSource::ReadFromStream() { - if (state_ == kFinished || usable_rows_ != 0 || rows_returned_ != 0) { + if (state_ == kFinished || !rows_.empty()) { return internal::InternalError("PartialResultSetSource state error", GCP_ERROR_INFO()); } - auto* values = *values_; auto* raw_result_set = google::protobuf::Arena::Create( &arena_); + auto* values = *values_; auto result_set = UnownedPartialResultSet::FromPartialResultSet(*raw_result_set); if (state_ == kReading) { @@ -140,71 +125,56 @@ Status PartialResultSetSource::ReadFromStream() { result_set.result.set_resume_token(""); } - // If reader_->Read() resulted in a new PartialResultSetReader (i.e., it - // used the token to resume an interrupted stream), then we must discard - // any buffered data as it will be replayed. - if (result_set.resumption) { - if (!resume_token_) { - // The reader claims to have resumed the stream even though we said it - // should not. That leaves us in the untenable position of possibly - // having returned data that will be replayed, so fail the stream now. - return internal::InternalError( - "PartialResultSetSource reader resumed the stream" - " despite our having asked it not to", - GCP_ERROR_INFO()); - } - values_back_incomplete_ = false; - values->Clear(); - values_space_.Clear(); - } - - // Deliver whatever rows we can muster. - auto const n_values = values->size() - (values_back_incomplete_ ? 1 : 0); - auto const n_columns = columns_ ? static_cast(columns_->size()) : 0; - auto n_rows = n_columns ? n_values / n_columns : 0; - if (n_columns == 0 && !values->empty()) { - return internal::InternalError( - "PartialResultSetSource metadata is missing row type", - GCP_ERROR_INFO()); - } + return ProcessDataFromStream(result_set.result); +} - // If we didn't receive a resume token, and have not exceeded our buffer - // limit, then we choose to `Read()` again so as to maintain resumability. - if (result_set.result.resume_token().empty() && values_space_limit_ > 0) { - for (auto it = values->begin() + values_space_.index; it != values->end(); - ++it) { - values_space_.space_used += it->SpaceUsedLong(); - } - values_space_.index = values->size(); - if (values_space_.space_used < values_space_limit_) { - return {}; // OK +Status PartialResultSetSource::ProcessDataFromStream( + google::bigtable::v2::PartialResultSet& result) { + // Gather results from returned from `ProtoRowsBatch` which is a field within + // the PartialResultSet message. + if (result.estimated_batch_size() > 0) { + if (read_buffer_.empty()) { + read_buffer_.reserve(result.estimated_batch_size()); } } + if (result.has_proto_rows_batch()) { + absl::StrAppend(&read_buffer_, result.proto_rows_batch().batch_data()); + } - // If we did receive a resume token then everything should be deliverable, - // and we'll be able to resume the stream at this point after a breakage. - // Otherwise, if we deliver anything at all, we must disable resumability. - if (!result_set.result.resume_token().empty()) { - resume_token_ = result_set.result.resume_token(); - if (n_rows * n_columns != values->size()) { - if (state_ != kEndOfStream) { - return internal::InternalError( - "PartialResultSetSource reader produced a resume token" - " that is not on a row boundary", - GCP_ERROR_INFO()); - } - if (n_rows == 0) { - return internal::InternalError( - "PartialResultSetSource stream ended at a point" - " that is not on a row boundary", - GCP_ERROR_INFO()); + if (result.has_batch_checksum() && !read_buffer_.empty()) { + google::bigtable::v2::ProtoRows proto_rows; + if (proto_rows.ParseFromString(read_buffer_)) { + read_buffer_.clear(); + if (metadata_.has_value()) { + auto columns = metadata_.value().proto_schema().columns_size(); + auto parsed_value = proto_rows.values().begin(); + std::vector values; + values.reserve(columns); + + while (parsed_value != proto_rows.values().end()) { + for (auto const& column : metadata_->proto_schema().columns()) { + auto value = FromProto(column.type(), *parsed_value); + values.push_back(std::move(value)); + ++parsed_value; + } + uncommitted_rows_.push_back( + QueryRowFriend::MakeQueryRow(std::move(values), columns_)); + values.clear(); + } } } - } else if (n_rows != 0) { - resume_token_ = absl::nullopt; } - usable_rows_ = n_rows; + // If reader_->Read() resulted in a new PartialResultSetReader (i.e., it + // used the token to resume an interrupted stream), then we must discard + // any buffered data as it will be replayed. + if (!result.resume_token().empty()) { + // Commit completed rows into rows_ + rows_.insert(rows_.end(), uncommitted_rows_.begin(), + uncommitted_rows_.end()); + uncommitted_rows_.clear(); + resume_token_ = result.resume_token(); + } return {}; // OK } diff --git a/google/cloud/bigtable/internal/partial_result_set_source.h b/google/cloud/bigtable/internal/partial_result_set_source.h index 420c18a9743fa..29e8209f4845c 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.h +++ b/google/cloud/bigtable/internal/partial_result_set_source.h @@ -16,6 +16,7 @@ #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_SOURCE_H #include "google/cloud/bigtable/internal/partial_result_set_reader.h" +// #include "google/cloud/bigtable/query_row.cc" #include "google/cloud/bigtable/results.h" #include "google/cloud/bigtable/value.h" #include "google/cloud/bigtable/version.h" @@ -23,10 +24,10 @@ #include "google/cloud/status.h" #include "google/cloud/status_or.h" #include "absl/types/optional.h" +#include #include #include #include -#include #include #include #include @@ -38,27 +39,34 @@ namespace cloud { namespace bigtable_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN -class PartialResultSourceInterface : public bigtable::ResultSourceInterface { -}; +class PartialResultSourceInterface : public bigtable::ResultSourceInterface {}; /** - * Class used to iterate over the rows returned from a read operations. + * Class used to iterate over the rows returned from a read operations. */ class PartialResultSetSource : public PartialResultSourceInterface { public: /// Factory method to create a PartialResultSetSource. static StatusOr> Create( + absl::optional metadata, std::unique_ptr reader); ~PartialResultSetSource() override; StatusOr NextRow() override; + absl::optional Metadata() override { + return metadata_; + } + private: explicit PartialResultSetSource( + absl::optional metadata, std::unique_ptr reader); Status ReadFromStream(); + Status ProcessDataFromStream(google::bigtable::v2::PartialResultSet& result); + std::string read_buffer_; // Arena for the values_ field. google::protobuf::Arena arena_; @@ -66,8 +74,15 @@ class PartialResultSetSource : public PartialResultSourceInterface { Options options_; std::unique_ptr reader_; + // The ResultSetMetadata is received in the first response. It is received + // from ExecuteQueryResponse + absl::optional metadata_; + std::shared_ptr> columns_; + std::deque rows_; + std::vector uncommitted_rows_; + // Number of rows returned to the client. int rows_returned_ = 0; diff --git a/google/cloud/bigtable/internal/partial_result_set_source_test.cc b/google/cloud/bigtable/internal/partial_result_set_source_test.cc index eff9c81af2919..59d0e1323dee7 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source_test.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source_test.cc @@ -22,6 +22,8 @@ #include "google/cloud/options.h" #include "google/cloud/testing_util/is_proto_equal.h" #include "google/cloud/testing_util/status_matchers.h" +#include "absl/strings/substitute.h" +#include #include #include #include @@ -38,6 +40,7 @@ namespace { using ::google::cloud::testing_util::StatusIs; using ::google::protobuf::TextFormat; using ::testing::_; +using ::testing::ElementsAre; using ::testing::Return; using ::testing::UnitTest; @@ -53,12 +56,13 @@ struct StringOption { // `StringOption` set to the current test name, so that we might check that // all `PartialResultSetReader` calls happen within a matching span. StatusOr> -CreatePartialResultSetSource(std::unique_ptr reader, - Options opts = {}) { +CreatePartialResultSetSource( + absl::optional metadata, + std::unique_ptr reader, Options opts = {}) { internal::OptionsSpan span(internal::MergeOptions( std::move(opts.set(CurrentTestName())), internal::CurrentOptions())); - return PartialResultSetSource::Create(std::move(reader)); + return PartialResultSetSource::Create(std::move(metadata), std::move(reader)); } // Returns a functor that will return the argument after expecting that the @@ -89,7 +93,8 @@ TEST(PartialResultSetSourceTest, InitialReadFailure) { EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); internal::OptionsSpan overlay(Options{}.set("uh-oh")); - auto reader = CreatePartialResultSetSource(std::move(grpc_reader)); + auto reader = + CreatePartialResultSetSource(absl::nullopt, std::move(grpc_reader)); EXPECT_THAT(reader, StatusIs(StatusCode::kInvalidArgument, "invalid")); } @@ -98,6 +103,7 @@ TEST(PartialResultSetSourceTest, InitialReadFailure) { */ TEST(PartialResultSetSourceTest, MissingRowTypeNoData) { auto constexpr kText = R"pb( + proto_rows_batch: {} )pb"; google::bigtable::v2::PartialResultSet response; ASSERT_TRUE(TextFormat::ParseFromString(kText, &response)); @@ -115,11 +121,272 @@ TEST(PartialResultSetSourceTest, MissingRowTypeNoData) { EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); internal::OptionsSpan overlay(Options{}.set("uh-oh")); - auto reader = CreatePartialResultSetSource(std::move(grpc_reader)); + auto reader = CreatePartialResultSetSource( + google::bigtable::v2::ResultSetMetadata{}, std::move(grpc_reader)); ASSERT_STATUS_OK(reader); EXPECT_THAT((*reader)->NextRow(), IsValidAndEquals(bigtable::QueryRow{})); } +/// @test Verify a single response is handled correctly. +TEST(PartialResultSetSourceTest, SingleResponse) { + auto constexpr kResultMetadataText = R"pb( + proto_schema { + columns { + name: "user_id" + type { string_type {} } + } + columns { + name: "email" + type { string_type {} } + } + columns { + name: "name" + type { string_type {} } + } + } + )pb"; + google::bigtable::v2::ResultSetMetadata metadata; + ASSERT_TRUE(TextFormat::ParseFromString(kResultMetadataText, &metadata)); + auto constexpr kProtoRowsText = R"pb( + values { string_value: "r1" } + values { string_value: "f1" } + values { string_value: "q1" } + )pb"; + google::bigtable::v2::ProtoRows proto_rows; + ASSERT_TRUE(TextFormat::ParseFromString(kProtoRowsText, &proto_rows)); + + std::string binary_batch_data = proto_rows.SerializeAsString(); + + std::string partial_result_set_text = + absl::Substitute(R"pb( + proto_rows_batch: { + batch_data: "$0", + }, + resume_token: "AAAAAWVyZXN1bWVfdG9rZW4=", + reset: true, + estimated_batch_size: 31, + batch_checksum: 3400346059 + )pb", + binary_batch_data); + google::bigtable::v2::PartialResultSet response; + ASSERT_TRUE(TextFormat::ParseFromString(partial_result_set_text, &response)); + + auto grpc_reader = + std::make_unique(); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&response](absl::optional const&, + UnownedPartialResultSet& result) { + result.result = response; + return true; + }) + .WillOnce(Return(false)); + EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); + EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); + auto reader = CreatePartialResultSetSource(metadata, std::move(grpc_reader)); + ASSERT_STATUS_OK(reader); + auto row = (*reader)->NextRow(); + ASSERT_STATUS_OK(row); + auto first_row = *row; + ASSERT_EQ(first_row.values().size(), 3); + EXPECT_EQ(*row->values().at(0).get(), "r1"); + EXPECT_EQ(*row->values().at(1).get(), "f1"); + EXPECT_EQ(*row->values().at(2).get(), "q1"); + + EXPECT_THAT((*reader)->NextRow(), IsValidAndEquals(bigtable::QueryRow{})); +} + +/** + * @test Verify the behavior when we get an incomplete Row. + */ +TEST(PartialResultSetSourceTest, MultipleResponses) { + auto constexpr kResultMetadataText = R"pb( + proto_schema { + columns { + name: "user_id" + type { string_type {} } + } + columns { + name: "email" + type { string_type {} } + } + columns { + name: "name" + type { string_type {} } + } + } + )pb"; + google::bigtable::v2::ResultSetMetadata metadata; + ASSERT_TRUE(TextFormat::ParseFromString(kResultMetadataText, &metadata)); + + std::array proto_rows_text{{ + R"pb( + values { string_value: "a1" } + values { string_value: "a2" } + values { string_value: "a3" } + )pb", + R"pb( + values { string_value: "b1" } + values { string_value: "b2" } + values { string_value: "b3" } + )pb", + R"pb( + values { string_value: "c1" } + values { string_value: "c2" } + values { string_value: "c3" } + )pb", + }}; + + std::vector responses; + for (auto const* text : proto_rows_text) { + google::bigtable::v2::ProtoRows proto_rows; + ASSERT_TRUE(TextFormat::ParseFromString(text, &proto_rows)); + std::string binary_batch_data = proto_rows.SerializeAsString(); + std::string partial_result_set_text = absl::Substitute( + R"pb( + proto_rows_batch: { + batch_data: "$0", + }, + resume_token: "AAAAAWVyZXN1bWVfdG9rZW4=", + reset: true, + estimated_batch_size: 31, + batch_checksum: 3400346059 + )pb", + binary_batch_data); + google::bigtable::v2::PartialResultSet response; + ASSERT_TRUE( + TextFormat::ParseFromString(partial_result_set_text, &response)); + responses.push_back(std::move(response)); + } + + auto grpc_reader = + std::make_unique(); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&responses](absl::optional const&, + UnownedPartialResultSet& result) { + result.result = responses[0]; + return true; + }) + .WillOnce([&responses](absl::optional const&, + UnownedPartialResultSet& result) { + result.result = responses[1]; + return true; + }) + .WillOnce([&responses](absl::optional const&, + UnownedPartialResultSet& result) { + result.result = responses[2]; + return true; + }) + .WillOnce(Return(false)); + EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); + EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); + + auto reader = CreatePartialResultSetSource(metadata, std::move(grpc_reader)); + ASSERT_STATUS_OK(reader); + + auto row0 = (*reader)->NextRow(); + ASSERT_STATUS_OK(row0); + ASSERT_EQ(row0->values().size(), 3); + EXPECT_EQ(*row0->values().at(0).get(), "a1"); + EXPECT_EQ(*row0->values().at(1).get(), "a2"); + EXPECT_EQ(*row0->values().at(2).get(), "a3"); + + auto row1 = (*reader)->NextRow(); + ASSERT_STATUS_OK(row1); + ASSERT_EQ(row1->values().size(), 3); + EXPECT_EQ(*row1->values().at(0).get(), "b1"); + EXPECT_EQ(*row1->values().at(1).get(), "b2"); + EXPECT_EQ(*row1->values().at(2).get(), "b3"); + + auto row2 = (*reader)->NextRow(); + ASSERT_STATUS_OK(row2); + ASSERT_EQ(row2->values().size(), 3); + EXPECT_EQ(*row2->values().at(0).get(), "c1"); + EXPECT_EQ(*row2->values().at(1).get(), "c2"); + EXPECT_EQ(*row2->values().at(2).get(), "c3"); + + EXPECT_THAT((*reader)->NextRow(), IsValidAndEquals(bigtable::QueryRow{})); +} + +/** + * @test Verify the behavior when we get an incomplete Row. + */ +TEST(PartialResultSetSourceTest, ResponseWithNoValues) { + auto constexpr kResultMetadataText = R"pb( + proto_schema { + columns { + name: "user_id" + type { string_type {} } + } + } + )pb"; + google::bigtable::v2::ResultSetMetadata metadata; + ASSERT_TRUE(TextFormat::ParseFromString(kResultMetadataText, &metadata)); + + std::array proto_rows_text{{ + R"pb( + values { string_value: "a1" } + )pb", + R"pb( + )pb", + R"pb( + )pb"}}; + + std::vector responses; + for (auto const* text : proto_rows_text) { + google::bigtable::v2::ProtoRows proto_rows; + ASSERT_TRUE(TextFormat::ParseFromString(text, &proto_rows)); + std::string binary_batch_data = proto_rows.SerializeAsString(); + std::string partial_result_set_text = absl::Substitute( + R"pb( + proto_rows_batch: { + batch_data: "$0", + }, + resume_token: "AAAAAWVyZXN1bWVfdG9rZW4=", + reset: true, + estimated_batch_size: 31, + batch_checksum: 3400346059 + )pb", + binary_batch_data); + google::bigtable::v2::PartialResultSet response; + ASSERT_TRUE( + TextFormat::ParseFromString(partial_result_set_text, &response)); + responses.push_back(std::move(response)); + } + + auto grpc_reader = + std::make_unique(); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&responses](absl::optional const&, + UnownedPartialResultSet& result) { + result.result = responses[0]; + return true; + }) + .WillOnce([&responses](absl::optional const&, + UnownedPartialResultSet& result) { + result.result = responses[1]; + return true; + }) + .WillOnce([&responses](absl::optional const&, + UnownedPartialResultSet& result) { + result.result = responses[2]; + return true; + }) + .WillOnce(Return(false)); + EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); + EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); + internal::OptionsSpan overlay(Options{}.set("uh-oh")); + auto reader = CreatePartialResultSetSource(metadata, std::move(grpc_reader)); + ASSERT_STATUS_OK(reader); + + // Verify the returned row is correct. + EXPECT_THAT((*reader)->NextRow(), + IsValidAndEquals(bigtable_mocks::MakeQueryRow( + {{"user_id", bigtable::Value("a1")}}))); + + // At end of stream, we get an 'ok' response with an empty row. + EXPECT_THAT((*reader)->NextRow(), IsValidAndEquals(bigtable::QueryRow{})); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace bigtable_internal diff --git a/google/cloud/bigtable/results.h b/google/cloud/bigtable/results.h index 0325c8c1135af..30f3e266629ab 100644 --- a/google/cloud/bigtable/results.h +++ b/google/cloud/bigtable/results.h @@ -44,6 +44,15 @@ class ResultSourceInterface { * row_key() to indicate end-of-stream. */ virtual StatusOr NextRow() = 0; + + /** + * Returns metadata about the result set, such as the column names and types + * + * @see https://github.com/googleapis/googleapis/blob/master/google/bigtable/v2/data.proto + * for more information. + */ + virtual absl::optional + Metadata() = 0; }; /** From 14e13465596a38a8e21b735d5c58d66e2b15734d Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Fri, 10 Oct 2025 22:20:16 +0000 Subject: [PATCH 06/26] fix ordering --- google/cloud/bigtable/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigtable/CMakeLists.txt b/google/cloud/bigtable/CMakeLists.txt index 90808467d9322..556c8070c07d4 100644 --- a/google/cloud/bigtable/CMakeLists.txt +++ b/google/cloud/bigtable/CMakeLists.txt @@ -392,8 +392,8 @@ if (BUILD_TESTING) testing/mock_data_client.h testing/mock_mutate_rows_limiter.h testing/mock_mutate_rows_reader.h - testing/mock_policies.h testing/mock_partial_result_set_reader.h + testing/mock_policies.h testing/mock_read_rows_reader.h testing/mock_response_reader.h testing/mock_sample_row_keys_reader.h From d9d5362a4fdf3e0a30567cdc56e46e38035bdecb Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Sun, 12 Oct 2025 23:08:29 +0000 Subject: [PATCH 07/26] clean up resumption logic --- .../internal/partial_result_set_reader.h | 6 -- .../internal/partial_result_set_source.cc | 89 ++++++++++++------- .../internal/partial_result_set_source.h | 27 ------ .../partial_result_set_source_test.cc | 19 ++-- google/cloud/bigtable/options.h | 2 +- 5 files changed, 70 insertions(+), 73 deletions(-) diff --git a/google/cloud/bigtable/internal/partial_result_set_reader.h b/google/cloud/bigtable/internal/partial_result_set_reader.h index 6e502a2dd2ed5..3cecf01de9884 100644 --- a/google/cloud/bigtable/internal/partial_result_set_reader.h +++ b/google/cloud/bigtable/internal/partial_result_set_reader.h @@ -15,14 +15,8 @@ #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_READER_H #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_READER_H -#include "google/cloud/bigtable/version.h" #include "google/cloud/status.h" -#include "google/cloud/status_or.h" -#include "absl/types/optional.h" #include -#include -#include -#include namespace google { namespace cloud { diff --git a/google/cloud/bigtable/internal/partial_result_set_source.cc b/google/cloud/bigtable/internal/partial_result_set_source.cc index 19902ddea1be4..ed295a4561744 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source.cc @@ -22,6 +22,14 @@ namespace google { namespace cloud { namespace bigtable_internal { + +namespace { +bool isValidChecksum(std::string data, uint32_t expected_checksum) { + absl::crc32c_t computed_crc = absl::ComputeCrc32c(data); + return static_cast(computed_crc) == expected_checksum; +} + +} // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN StatusOr> @@ -38,7 +46,9 @@ PartialResultSetSource::Create( // creating the `PartialResultSetSource` should fail with the same error. if (source->state_ == kFinished && !status.ok()) return status; - return {std::move(source)}; + std::unique_ptr interface_ptr = + std::move(source); + return interface_ptr; } PartialResultSetSource::PartialResultSetSource( @@ -100,56 +110,70 @@ StatusOr PartialResultSetSource::NextRow() { } Status PartialResultSetSource::ReadFromStream() { - if (state_ == kFinished || !rows_.empty()) { - return internal::InternalError("PartialResultSetSource state error", + if (state_ == kFinished) { + return internal::InternalError("PartialResultSetSource already finished", GCP_ERROR_INFO()); } + // The application should consume rows_ before calling ReadFromStream again. + if (!rows_.empty()) { + return internal::InternalError("PartialResultSetSource has unconsumed rows", + GCP_ERROR_INFO()); + } + auto* raw_result_set = google::protobuf::Arena::Create( &arena_); - auto* values = *values_; auto result_set = UnownedPartialResultSet::FromPartialResultSet(*raw_result_set); - if (state_ == kReading) { - if (!reader_->Read(resume_token_, result_set)) state_ = kEndOfStream; - } - if (state_ == kEndOfStream) { - // If we have no buffered data, we're done. - if (values->empty()) { - state_ = kFinished; - return reader_->Finish(); + + // The resume_token_ member holds the token from the previous + // PartialResultSet. It's empty on the first call. + if (reader_->Read(resume_token_, result_set)) { + return ProcessDataFromStream(result_set.result); + } else { + state_ = kFinished; + // The uncommitted_rows_ is expected to be empty because the last successful + // read would have had a sentinel resume_token, causing + // ProcessDataFromStream to commit them. + if (!uncommitted_rows_.empty()) { + return internal::InternalError("Stream ended with uncommitted rows.", + GCP_ERROR_INFO()); } - // Otherwise, proceed with a `PartialResultSet` using a fake resume - // token to flush the buffer. The service does not appear to yield - // a resume token in its final response, despite it completing a row. - result_set.result.set_resume_token(""); + return reader_->Finish(); } - - return ProcessDataFromStream(result_set.result); } Status PartialResultSetSource::ProcessDataFromStream( google::bigtable::v2::PartialResultSet& result) { - // Gather results from returned from `ProtoRowsBatch` which is a field within - // the PartialResultSet message. - if (result.estimated_batch_size() > 0) { - if (read_buffer_.empty()) { - read_buffer_.reserve(result.estimated_batch_size()); - } + // If the `reset` is true then all the data buffered since the last + // resume_token should be discared. + if (result.reset()) { + read_buffer_.clear(); + uncommitted_rows_.clear(); } + + // Reserve space of the buffer at the start of a new batch of data. + if (result.estimated_batch_size() > 0 && read_buffer_.empty()) { + read_buffer_.reserve(result.estimated_batch_size()); + } + if (result.has_proto_rows_batch()) { absl::StrAppend(&read_buffer_, result.proto_rows_batch().batch_data()); } if (result.has_batch_checksum() && !read_buffer_.empty()) { + if (!isValidChecksum(read_buffer_, result.batch_checksum())) { + read_buffer_.clear(); + uncommitted_rows_.clear(); + return internal::InternalError("Batch checksum mismatch"); + } google::bigtable::v2::ProtoRows proto_rows; if (proto_rows.ParseFromString(read_buffer_)) { - read_buffer_.clear(); if (metadata_.has_value()) { - auto columns = metadata_.value().proto_schema().columns_size(); + auto columns_size = metadata_.value().proto_schema().columns_size(); auto parsed_value = proto_rows.values().begin(); std::vector values; - values.reserve(columns); + values.reserve(columns_size); while (parsed_value != proto_rows.values().end()) { for (auto const& column : metadata_->proto_schema().columns()) { @@ -162,14 +186,16 @@ Status PartialResultSetSource::ProcessDataFromStream( values.clear(); } } + } else { + read_buffer_.clear(); + uncommitted_rows_.clear(); + return internal::InternalError("Failed to parse ProtoRows from buffer"); } } - // If reader_->Read() resulted in a new PartialResultSetReader (i.e., it - // used the token to resume an interrupted stream), then we must discard - // any buffered data as it will be replayed. + // Buffered rows in uncommitted_rows_ are ready to be committed into rows_ + // once the resume_token is received. if (!result.resume_token().empty()) { - // Commit completed rows into rows_ rows_.insert(rows_.end(), uncommitted_rows_.begin(), uncommitted_rows_.end()); uncommitted_rows_.clear(); @@ -177,7 +203,6 @@ Status PartialResultSetSource::ProcessDataFromStream( } return {}; // OK } - GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace bigtable_internal } // namespace cloud diff --git a/google/cloud/bigtable/internal/partial_result_set_source.h b/google/cloud/bigtable/internal/partial_result_set_source.h index 29e8209f4845c..773a731220847 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.h +++ b/google/cloud/bigtable/internal/partial_result_set_source.h @@ -16,7 +16,6 @@ #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_SOURCE_H #include "google/cloud/bigtable/internal/partial_result_set_reader.h" -// #include "google/cloud/bigtable/query_row.cc" #include "google/cloud/bigtable/results.h" #include "google/cloud/bigtable/value.h" #include "google/cloud/bigtable/version.h" @@ -79,36 +78,14 @@ class PartialResultSetSource : public PartialResultSourceInterface { absl::optional metadata_; std::shared_ptr> columns_; - std::deque rows_; std::vector uncommitted_rows_; - // Number of rows returned to the client. - int rows_returned_ = 0; - - // Number of rows that can be created from `values_` in NextRow(). Note there - // may be more data in values_ but it's not ready to be returned to the - // client. - int usable_rows_ = 0; - // Values that can be assembled into `QueryRow`s ready to be returned by // `NextRow()`. This is a pointer to an arena-allocated RepeatedPtrField. absl::optional*> values_; - // `space_used` is the sum of the SpaceUsedLong() by the values at indexes [0, - // index) in `values_`. - struct PrecomputedSpaceUsed { - void Clear() { - space_used = 0; - index = 0; - } - - std::size_t space_used = 0; - int index = 0; - }; - PrecomputedSpaceUsed values_space_; - // When engaged, the token we can use to resume the stream immediately after // any data in (or previously in) `rows_`. When disengaged, we have already // delivered data that would be replayed, so resumption is disabled until we @@ -122,10 +99,6 @@ class PartialResultSetSource : public PartialResultSourceInterface { // set the default limit to twice that. std::size_t values_space_limit_ = 2 * 100 * (std::size_t{1} << 20); - // `*values_.rbegin()` exists, but it is incomplete. The rest of the value - // will be sent in subsequent `PartialResultSet` messages. - bool values_back_incomplete_ = false; - // The state of our PartialResultSetReader. enum : char { // `Read()` has yet to return nullopt. diff --git a/google/cloud/bigtable/internal/partial_result_set_source_test.cc b/google/cloud/bigtable/internal/partial_result_set_source_test.cc index 59d0e1323dee7..1f95edeedff02 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source_test.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source_test.cc @@ -156,7 +156,8 @@ TEST(PartialResultSetSourceTest, SingleResponse) { ASSERT_TRUE(TextFormat::ParseFromString(kProtoRowsText, &proto_rows)); std::string binary_batch_data = proto_rows.SerializeAsString(); - + uint32_t correct_checksum = + static_cast(absl::ComputeCrc32c(binary_batch_data)); std::string partial_result_set_text = absl::Substitute(R"pb( proto_rows_batch: { @@ -165,9 +166,9 @@ TEST(PartialResultSetSourceTest, SingleResponse) { resume_token: "AAAAAWVyZXN1bWVfdG9rZW4=", reset: true, estimated_batch_size: 31, - batch_checksum: 3400346059 + batch_checksum: $1 )pb", - binary_batch_data); + binary_batch_data, correct_checksum); google::bigtable::v2::PartialResultSet response; ASSERT_TRUE(TextFormat::ParseFromString(partial_result_set_text, &response)); @@ -241,6 +242,8 @@ TEST(PartialResultSetSourceTest, MultipleResponses) { google::bigtable::v2::ProtoRows proto_rows; ASSERT_TRUE(TextFormat::ParseFromString(text, &proto_rows)); std::string binary_batch_data = proto_rows.SerializeAsString(); + uint32_t correct_checksum = + static_cast(absl::ComputeCrc32c(binary_batch_data)); std::string partial_result_set_text = absl::Substitute( R"pb( proto_rows_batch: { @@ -249,9 +252,9 @@ TEST(PartialResultSetSourceTest, MultipleResponses) { resume_token: "AAAAAWVyZXN1bWVfdG9rZW4=", reset: true, estimated_batch_size: 31, - batch_checksum: 3400346059 + batch_checksum: $1 )pb", - binary_batch_data); + binary_batch_data, correct_checksum); google::bigtable::v2::PartialResultSet response; ASSERT_TRUE( TextFormat::ParseFromString(partial_result_set_text, &response)); @@ -336,6 +339,8 @@ TEST(PartialResultSetSourceTest, ResponseWithNoValues) { google::bigtable::v2::ProtoRows proto_rows; ASSERT_TRUE(TextFormat::ParseFromString(text, &proto_rows)); std::string binary_batch_data = proto_rows.SerializeAsString(); + uint32_t correct_checksum = + static_cast(absl::ComputeCrc32c(binary_batch_data)); std::string partial_result_set_text = absl::Substitute( R"pb( proto_rows_batch: { @@ -344,9 +349,9 @@ TEST(PartialResultSetSourceTest, ResponseWithNoValues) { resume_token: "AAAAAWVyZXN1bWVfdG9rZW4=", reset: true, estimated_batch_size: 31, - batch_checksum: 3400346059 + batch_checksum: $1 )pb", - binary_batch_data); + binary_batch_data, correct_checksum); google::bigtable::v2::PartialResultSet response; ASSERT_TRUE( TextFormat::ParseFromString(partial_result_set_text, &response)); diff --git a/google/cloud/bigtable/options.h b/google/cloud/bigtable/options.h index dc1cd00abacc8..027c1db175f5e 100644 --- a/google/cloud/bigtable/options.h +++ b/google/cloud/bigtable/options.h @@ -266,7 +266,7 @@ using DataPolicyOptionList = * If the limit is exceeded, and the stream is subsequently interrupted before * a new resumption point can be established, the read/query will fail. * - * @ingroup google-cloud-spanner-options + * @ingroup google-cloud-bigtable-options */ struct StreamingResumabilityBufferSizeOption { using Type = std::size_t; From 10d16d1c2162cc2ce9e30ffccc184c80609afcb6 Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Sun, 12 Oct 2025 23:59:00 +0000 Subject: [PATCH 08/26] fix typo --- google/cloud/bigtable/internal/partial_result_set_source.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigtable/internal/partial_result_set_source.cc b/google/cloud/bigtable/internal/partial_result_set_source.cc index ed295a4561744..bfea48913ab7d 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source.cc @@ -146,7 +146,7 @@ Status PartialResultSetSource::ReadFromStream() { Status PartialResultSetSource::ProcessDataFromStream( google::bigtable::v2::PartialResultSet& result) { // If the `reset` is true then all the data buffered since the last - // resume_token should be discared. + // resume_token should be discarded. if (result.reset()) { read_buffer_.clear(); uncommitted_rows_.clear(); From 08fb956f04dda7b98f8476570bb2dc67b47bfe98 Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Mon, 13 Oct 2025 00:24:24 +0000 Subject: [PATCH 09/26] apply suggestions --- .../internal/partial_result_set_source.cc | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/google/cloud/bigtable/internal/partial_result_set_source.cc b/google/cloud/bigtable/internal/partial_result_set_source.cc index bfea48913ab7d..58d43ac3caa43 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source.cc @@ -24,7 +24,7 @@ namespace cloud { namespace bigtable_internal { namespace { -bool isValidChecksum(std::string data, uint32_t expected_checksum) { +bool isValidChecksum(std::string const& data, uint32_t expected_checksum) { absl::crc32c_t computed_crc = absl::ComputeCrc32c(data); return static_cast(computed_crc) == expected_checksum; } @@ -130,17 +130,16 @@ Status PartialResultSetSource::ReadFromStream() { // PartialResultSet. It's empty on the first call. if (reader_->Read(resume_token_, result_set)) { return ProcessDataFromStream(result_set.result); - } else { - state_ = kFinished; - // The uncommitted_rows_ is expected to be empty because the last successful - // read would have had a sentinel resume_token, causing - // ProcessDataFromStream to commit them. - if (!uncommitted_rows_.empty()) { - return internal::InternalError("Stream ended with uncommitted rows.", - GCP_ERROR_INFO()); - } - return reader_->Finish(); } + state_ = kFinished; + // The uncommitted_rows_ is expected to be empty because the last successful + // read would have had a sentinel resume_token, causing + // ProcessDataFromStream to commit them. + if (!uncommitted_rows_.empty()) { + return internal::InternalError("Stream ended with uncommitted rows.", + GCP_ERROR_INFO()); + } + return reader_->Finish(); } Status PartialResultSetSource::ProcessDataFromStream( From dcd8f62888f66acc7954917d5d0b6d67c656f3ed Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Mon, 13 Oct 2025 00:59:58 +0000 Subject: [PATCH 10/26] fix default size for bigtable --- .../bigtable/internal/partial_result_set_source.cc | 1 + .../bigtable/internal/partial_result_set_source.h | 11 +++++------ google/cloud/bigtable/options.h | 3 ++- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/google/cloud/bigtable/internal/partial_result_set_source.cc b/google/cloud/bigtable/internal/partial_result_set_source.cc index 58d43ac3caa43..d9f30143944df 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source.cc @@ -17,6 +17,7 @@ #include "google/cloud/internal/make_status.h" #include "google/cloud/log.h" #include "absl/container/fixed_array.h" +#include "absl/crc/crc32c.h" #include "absl/types/optional.h" namespace google { diff --git a/google/cloud/bigtable/internal/partial_result_set_source.h b/google/cloud/bigtable/internal/partial_result_set_source.h index 773a731220847..d67e5f05411fd 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.h +++ b/google/cloud/bigtable/internal/partial_result_set_source.h @@ -92,12 +92,11 @@ class PartialResultSetSource : public PartialResultSourceInterface { // see a new token. absl::optional resume_token_ = ""; - // Should the space used by `values_` get larger than this limit, we will - // move complete rows into `rows_` and disable resumption until we see a - // new token. During this time, an error in the stream will be returned by - // `NextRow()`. No individual row in a result set can exceed 100 MiB, so we - // set the default limit to twice that. - std::size_t values_space_limit_ = 2 * 100 * (std::size_t{1} << 20); + // The default value is set to 2*256 MiB. A single row in Bigtable can't + // exceed 256 MiB so setting the limit to twice that size to provide a safe + // upper bound for the buffer. + std::size_t values_space_limit_ = + 2 * 256 * (std::size_t{1} << 20); // 512 MiB // The state of our PartialResultSetReader. enum : char { diff --git a/google/cloud/bigtable/options.h b/google/cloud/bigtable/options.h index 027c1db175f5e..76381e1303380 100644 --- a/google/cloud/bigtable/options.h +++ b/google/cloud/bigtable/options.h @@ -264,7 +264,8 @@ using DataPolicyOptionList = * Option for `google::cloud::Options` to set a limit on how much data will * be buffered to guarantee resumability of a streaming read or SQL query. * If the limit is exceeded, and the stream is subsequently interrupted before - * a new resumption point can be established, the read/query will fail. + * a new resumption point can be established, the read/query will fail. The + * default is 512 MiB (2*256). * * @ingroup google-cloud-bigtable-options */ From 2f090e6c19119356b2db3228e453dda36f193124 Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Mon, 13 Oct 2025 01:11:46 +0000 Subject: [PATCH 11/26] add absl:crc to CMake --- google/cloud/bigtable/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/google/cloud/bigtable/CMakeLists.txt b/google/cloud/bigtable/CMakeLists.txt index 556c8070c07d4..dce58790596e6 100644 --- a/google/cloud/bigtable/CMakeLists.txt +++ b/google/cloud/bigtable/CMakeLists.txt @@ -417,6 +417,7 @@ if (BUILD_TESTING) GTest::gtest gRPC::grpc++ gRPC::grpc + absl::crc absl::memory absl::str_format protobuf::libprotobuf) From 2094ed1da949193c016db0ebf4b5853ef9f3e7df Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Mon, 13 Oct 2025 02:11:47 +0000 Subject: [PATCH 12/26] add absl::crc to Cmake --- google/cloud/bigtable/CMakeLists.txt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/google/cloud/bigtable/CMakeLists.txt b/google/cloud/bigtable/CMakeLists.txt index dce58790596e6..e7e5033c94e9b 100644 --- a/google/cloud/bigtable/CMakeLists.txt +++ b/google/cloud/bigtable/CMakeLists.txt @@ -271,7 +271,9 @@ add_library( wait_for_consistency.h) target_link_libraries( google_cloud_cpp_bigtable - PUBLIC absl::memory + PUBLIC + absl::crc + absl::memory google-cloud-cpp::bigtable_protos google-cloud-cpp::common google-cloud-cpp::grpc_utils @@ -417,7 +419,6 @@ if (BUILD_TESTING) GTest::gtest gRPC::grpc++ gRPC::grpc - absl::crc absl::memory absl::str_format protobuf::libprotobuf) From 6064779b92af23f73be854522d36e260196fc493 Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Mon, 13 Oct 2025 02:25:24 +0000 Subject: [PATCH 13/26] remove crc from Cmake --- google/cloud/bigtable/CMakeLists.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/google/cloud/bigtable/CMakeLists.txt b/google/cloud/bigtable/CMakeLists.txt index e7e5033c94e9b..d4e61d3a80f92 100644 --- a/google/cloud/bigtable/CMakeLists.txt +++ b/google/cloud/bigtable/CMakeLists.txt @@ -272,7 +272,6 @@ add_library( target_link_libraries( google_cloud_cpp_bigtable PUBLIC - absl::crc absl::memory google-cloud-cpp::bigtable_protos google-cloud-cpp::common From e2a57047726597c2720e5a131ece41f1abb20075 Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Mon, 13 Oct 2025 03:05:08 +0000 Subject: [PATCH 14/26] fix formatting --- google/cloud/bigtable/CMakeLists.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/google/cloud/bigtable/CMakeLists.txt b/google/cloud/bigtable/CMakeLists.txt index d4e61d3a80f92..556c8070c07d4 100644 --- a/google/cloud/bigtable/CMakeLists.txt +++ b/google/cloud/bigtable/CMakeLists.txt @@ -271,8 +271,7 @@ add_library( wait_for_consistency.h) target_link_libraries( google_cloud_cpp_bigtable - PUBLIC - absl::memory + PUBLIC absl::memory google-cloud-cpp::bigtable_protos google-cloud-cpp::common google-cloud-cpp::grpc_utils From 99b9402964b4e448cb2a6f67589c072d944fc89f Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Mon, 13 Oct 2025 20:19:49 +0000 Subject: [PATCH 15/26] address comments; remove limit; remove checksum verification --- .../internal/partial_result_set_source.cc | 82 ++++++++++--------- .../internal/partial_result_set_source.h | 10 +-- google/cloud/bigtable/options.h | 13 --- 3 files changed, 47 insertions(+), 58 deletions(-) diff --git a/google/cloud/bigtable/internal/partial_result_set_source.cc b/google/cloud/bigtable/internal/partial_result_set_source.cc index d9f30143944df..c07db2496f8cf 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source.cc @@ -16,21 +16,12 @@ #include "google/cloud/bigtable/options.h" #include "google/cloud/internal/make_status.h" #include "google/cloud/log.h" -#include "absl/container/fixed_array.h" -#include "absl/crc/crc32c.h" #include "absl/types/optional.h" namespace google { namespace cloud { namespace bigtable_internal { -namespace { -bool isValidChecksum(std::string const& data, uint32_t expected_checksum) { - absl::crc32c_t computed_crc = absl::ComputeCrc32c(data); - return static_cast(computed_crc) == expected_checksum; -} - -} // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN StatusOr> @@ -47,9 +38,7 @@ PartialResultSetSource::Create( // creating the `PartialResultSetSource` should fail with the same error. if (source->state_ == kFinished && !status.ok()) return status; - std::unique_ptr interface_ptr = - std::move(source); - return interface_ptr; + return {std::move(source)}; } PartialResultSetSource::PartialResultSetSource( @@ -69,10 +58,6 @@ PartialResultSetSource::PartialResultSetSource( columns_->push_back(c.name()); } } - if (options_.has()) { - values_space_limit_ = - options_.get(); - } } PartialResultSetSource::~PartialResultSetSource() { @@ -161,31 +146,12 @@ Status PartialResultSetSource::ProcessDataFromStream( absl::StrAppend(&read_buffer_, result.proto_rows_batch().batch_data()); } + // TODO(#15617): Validate that the checksum matches the contents of `buffer`. if (result.has_batch_checksum() && !read_buffer_.empty()) { - if (!isValidChecksum(read_buffer_, result.batch_checksum())) { - read_buffer_.clear(); - uncommitted_rows_.clear(); - return internal::InternalError("Batch checksum mismatch"); - } google::bigtable::v2::ProtoRows proto_rows; if (proto_rows.ParseFromString(read_buffer_)) { - if (metadata_.has_value()) { - auto columns_size = metadata_.value().proto_schema().columns_size(); - auto parsed_value = proto_rows.values().begin(); - std::vector values; - values.reserve(columns_size); - - while (parsed_value != proto_rows.values().end()) { - for (auto const& column : metadata_->proto_schema().columns()) { - auto value = FromProto(column.type(), *parsed_value); - values.push_back(std::move(value)); - ++parsed_value; - } - uncommitted_rows_.push_back( - QueryRowFriend::MakeQueryRow(std::move(values), columns_)); - values.clear(); - } - } + auto status = BufferProtoRows(proto_rows); + if (!status.ok()) return status; } else { read_buffer_.clear(); uncommitted_rows_.clear(); @@ -199,10 +165,50 @@ Status PartialResultSetSource::ProcessDataFromStream( rows_.insert(rows_.end(), uncommitted_rows_.begin(), uncommitted_rows_.end()); uncommitted_rows_.clear(); + read_buffer_.clear(); resume_token_ = result.resume_token(); } return {}; // OK } + +Status PartialResultSetSource::BufferProtoRows( + google::bigtable::v2::ProtoRows const& proto_rows) { + if (metadata_.has_value()) { + auto const& proto_schema = metadata_->proto_schema(); + auto const columns_size = proto_schema.columns_size(); + auto const& proto_values = proto_rows.values(); + + if (columns_size == 0) { + if (!proto_values.empty()) { + return internal::InternalError( + "ProtoRows has values but the schema has no columns.", + GCP_ERROR_INFO()); + } + } else if (proto_values.size() % columns_size != 0) { + return internal::InternalError( + "The number of values in ProtoRows is not a multiple of the " + "number of columns in the schema.", + GCP_ERROR_INFO()); + } + + auto parsed_value = proto_values.begin(); + std::vector values; + values.reserve(columns_size); + + while (parsed_value != proto_values.end()) { + for (auto const& column : proto_schema.columns()) { + auto value = FromProto(column.type(), *parsed_value); + values.push_back(std::move(value)); + ++parsed_value; + } + uncommitted_rows_.push_back( + QueryRowFriend::MakeQueryRow(std::move(values), columns_)); + values.clear(); + } + } + return {}; +} + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace bigtable_internal } // namespace cloud diff --git a/google/cloud/bigtable/internal/partial_result_set_source.h b/google/cloud/bigtable/internal/partial_result_set_source.h index d67e5f05411fd..503999644913d 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.h +++ b/google/cloud/bigtable/internal/partial_result_set_source.h @@ -65,6 +65,8 @@ class PartialResultSetSource : public PartialResultSourceInterface { Status ReadFromStream(); Status ProcessDataFromStream(google::bigtable::v2::PartialResultSet& result); + Status BufferProtoRows( + google::bigtable::v2::ProtoRows const& proto_rows); std::string read_buffer_; // Arena for the values_ field. @@ -91,13 +93,7 @@ class PartialResultSetSource : public PartialResultSourceInterface { // delivered data that would be replayed, so resumption is disabled until we // see a new token. absl::optional resume_token_ = ""; - - // The default value is set to 2*256 MiB. A single row in Bigtable can't - // exceed 256 MiB so setting the limit to twice that size to provide a safe - // upper bound for the buffer. - std::size_t values_space_limit_ = - 2 * 256 * (std::size_t{1} << 20); // 512 MiB - + // The state of our PartialResultSetReader. enum : char { // `Read()` has yet to return nullopt. diff --git a/google/cloud/bigtable/options.h b/google/cloud/bigtable/options.h index 76381e1303380..a49d08ce226b0 100644 --- a/google/cloud/bigtable/options.h +++ b/google/cloud/bigtable/options.h @@ -260,19 +260,6 @@ using DataPolicyOptionList = IdempotentMutationPolicyOption, EnableMetricsOption, MetricsPeriodOption>; -/** - * Option for `google::cloud::Options` to set a limit on how much data will - * be buffered to guarantee resumability of a streaming read or SQL query. - * If the limit is exceeded, and the stream is subsequently interrupted before - * a new resumption point can be established, the read/query will fail. The - * default is 512 MiB (2*256). - * - * @ingroup google-cloud-bigtable-options - */ -struct StreamingResumabilityBufferSizeOption { - using Type = std::size_t; -}; - GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace bigtable } // namespace cloud From 9182efd02c2380754937bfa3c5c4fc08b2b583de Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Mon, 13 Oct 2025 20:26:03 +0000 Subject: [PATCH 16/26] fix formatting --- google/cloud/bigtable/internal/partial_result_set_source.h | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/google/cloud/bigtable/internal/partial_result_set_source.h b/google/cloud/bigtable/internal/partial_result_set_source.h index 503999644913d..34d1e3ea29aa3 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.h +++ b/google/cloud/bigtable/internal/partial_result_set_source.h @@ -65,8 +65,7 @@ class PartialResultSetSource : public PartialResultSourceInterface { Status ReadFromStream(); Status ProcessDataFromStream(google::bigtable::v2::PartialResultSet& result); - Status BufferProtoRows( - google::bigtable::v2::ProtoRows const& proto_rows); + Status BufferProtoRows(google::bigtable::v2::ProtoRows const& proto_rows); std::string read_buffer_; // Arena for the values_ field. @@ -93,7 +92,7 @@ class PartialResultSetSource : public PartialResultSourceInterface { // delivered data that would be replayed, so resumption is disabled until we // see a new token. absl::optional resume_token_ = ""; - + // The state of our PartialResultSetReader. enum : char { // `Read()` has yet to return nullopt. From 9f560a3736a4e43c4e610bc4b2bfc885bf306c2e Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Mon, 13 Oct 2025 21:11:03 +0000 Subject: [PATCH 17/26] remove unused variables --- google/cloud/bigtable/internal/partial_result_set_source.cc | 6 +----- google/cloud/bigtable/internal/partial_result_set_source.h | 5 ----- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/google/cloud/bigtable/internal/partial_result_set_source.cc b/google/cloud/bigtable/internal/partial_result_set_source.cc index c07db2496f8cf..35a677a5556c0 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source.cc @@ -46,11 +46,7 @@ PartialResultSetSource::PartialResultSetSource( std::unique_ptr reader) : options_(internal::CurrentOptions()), reader_(std::move(reader)), - metadata_(std::move(metadata)), - values_(absl::make_optional( - google::protobuf::Arena::Create< - google::protobuf::RepeatedPtrField>( - &arena_))) { + metadata_(std::move(metadata)) { if (metadata_.has_value()) { columns_ = std::make_shared>(); columns_->reserve(metadata_->proto_schema().columns_size()); diff --git a/google/cloud/bigtable/internal/partial_result_set_source.h b/google/cloud/bigtable/internal/partial_result_set_source.h index 34d1e3ea29aa3..0750c9c4ce521 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.h +++ b/google/cloud/bigtable/internal/partial_result_set_source.h @@ -82,11 +82,6 @@ class PartialResultSetSource : public PartialResultSourceInterface { std::deque rows_; std::vector uncommitted_rows_; - // Values that can be assembled into `QueryRow`s ready to be returned by - // `NextRow()`. This is a pointer to an arena-allocated RepeatedPtrField. - absl::optional*> - values_; - // When engaged, the token we can use to resume the stream immediately after // any data in (or previously in) `rows_`. When disengaged, we have already // delivered data that would be replayed, so resumption is disabled until we From bd2fb2276cd6c65e40cc325ae343f437997e40ce Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Mon, 13 Oct 2025 22:22:15 +0000 Subject: [PATCH 18/26] address warnings --- google/cloud/bigtable/internal/partial_result_set_source.cc | 1 + google/cloud/bigtable/results.h | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/google/cloud/bigtable/internal/partial_result_set_source.cc b/google/cloud/bigtable/internal/partial_result_set_source.cc index 35a677a5556c0..f484e06b39e23 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source.cc @@ -14,6 +14,7 @@ #include "google/cloud/bigtable/internal/partial_result_set_source.h" #include "google/cloud/bigtable/options.h" +#include "google/cloud/internal/absl_str_cat_quiet.h" #include "google/cloud/internal/make_status.h" #include "google/cloud/log.h" #include "absl/types/optional.h" diff --git a/google/cloud/bigtable/results.h b/google/cloud/bigtable/results.h index 30f3e266629ab..6f4d2b16e61ed 100644 --- a/google/cloud/bigtable/results.h +++ b/google/cloud/bigtable/results.h @@ -76,7 +76,7 @@ class RowStream { } /// Returns a `RowStreamIterator` defining the end of this range. - RowStreamIterator end() { return {}; } + static RowStreamIterator end() { return {}; } private: std::unique_ptr source_; From cec45533e48d6da5e20feb41dc14bec8be1b44fa Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Tue, 14 Oct 2025 00:08:11 +0000 Subject: [PATCH 19/26] fix ordering in bzl file --- google/cloud/bigtable/bigtable_client_testing.bzl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigtable/bigtable_client_testing.bzl b/google/cloud/bigtable/bigtable_client_testing.bzl index c1a2666637d63..865460e3aead3 100644 --- a/google/cloud/bigtable/bigtable_client_testing.bzl +++ b/google/cloud/bigtable/bigtable_client_testing.bzl @@ -25,8 +25,8 @@ bigtable_client_testing_hdrs = [ "testing/mock_data_client.h", "testing/mock_mutate_rows_limiter.h", "testing/mock_mutate_rows_reader.h", - "testing/mock_policies.h", "testing/mock_partial_result_set_reader.h", + "testing/mock_policies.h", "testing/mock_read_rows_reader.h", "testing/mock_response_reader.h", "testing/mock_sample_row_keys_reader.h", From bc4944d8f4dfac1ad363b36a8a8204e52a0cd842 Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Tue, 14 Oct 2025 00:28:54 +0000 Subject: [PATCH 20/26] include test in CMake --- google/cloud/bigtable/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/google/cloud/bigtable/CMakeLists.txt b/google/cloud/bigtable/CMakeLists.txt index 556c8070c07d4..da051de87c799 100644 --- a/google/cloud/bigtable/CMakeLists.txt +++ b/google/cloud/bigtable/CMakeLists.txt @@ -474,6 +474,7 @@ if (BUILD_TESTING) internal/mutate_rows_limiter_test.cc internal/operation_context_factory_test.cc internal/operation_context_test.cc + internal/partial_result_set_reader_test.cc internal/prefix_range_end_test.cc internal/rate_limiter_test.cc internal/retry_traits_test.cc From 13b7b721e5f1c32fbff579c97bec7dd6d09f2338 Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Tue, 14 Oct 2025 00:39:11 +0000 Subject: [PATCH 21/26] follow clang-tidy-pr suggestions --- google/cloud/bigtable/CMakeLists.txt | 1 - google/cloud/bigtable/bigtable_client_unit_tests.bzl | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/google/cloud/bigtable/CMakeLists.txt b/google/cloud/bigtable/CMakeLists.txt index da051de87c799..556c8070c07d4 100644 --- a/google/cloud/bigtable/CMakeLists.txt +++ b/google/cloud/bigtable/CMakeLists.txt @@ -474,7 +474,6 @@ if (BUILD_TESTING) internal/mutate_rows_limiter_test.cc internal/operation_context_factory_test.cc internal/operation_context_test.cc - internal/partial_result_set_reader_test.cc internal/prefix_range_end_test.cc internal/rate_limiter_test.cc internal/retry_traits_test.cc diff --git a/google/cloud/bigtable/bigtable_client_unit_tests.bzl b/google/cloud/bigtable/bigtable_client_unit_tests.bzl index 2094210979264..50053a4d71994 100644 --- a/google/cloud/bigtable/bigtable_client_unit_tests.bzl +++ b/google/cloud/bigtable/bigtable_client_unit_tests.bzl @@ -64,8 +64,8 @@ bigtable_client_unit_tests = [ "internal/mutate_rows_limiter_test.cc", "internal/operation_context_factory_test.cc", "internal/operation_context_test.cc", + "internal/partial_result_set_reader_test.cc", "internal/prefix_range_end_test.cc", - "internal/partial_result_set_source_test.cc", "internal/rate_limiter_test.cc", "internal/retry_traits_test.cc", "internal/traced_row_reader_test.cc", From 9ab575e907e2ed34d104af0c7ce24ec138dda9a7 Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Tue, 14 Oct 2025 01:44:04 +0000 Subject: [PATCH 22/26] fix typo in cmake file --- google/cloud/bigtable/CMakeLists.txt | 1 + google/cloud/bigtable/bigtable_client_unit_tests.bzl | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/google/cloud/bigtable/CMakeLists.txt b/google/cloud/bigtable/CMakeLists.txt index 556c8070c07d4..76c121af6b604 100644 --- a/google/cloud/bigtable/CMakeLists.txt +++ b/google/cloud/bigtable/CMakeLists.txt @@ -474,6 +474,7 @@ if (BUILD_TESTING) internal/mutate_rows_limiter_test.cc internal/operation_context_factory_test.cc internal/operation_context_test.cc + internal/partial_result_set_source_test.cc internal/prefix_range_end_test.cc internal/rate_limiter_test.cc internal/retry_traits_test.cc diff --git a/google/cloud/bigtable/bigtable_client_unit_tests.bzl b/google/cloud/bigtable/bigtable_client_unit_tests.bzl index 50053a4d71994..c34b2a728217e 100644 --- a/google/cloud/bigtable/bigtable_client_unit_tests.bzl +++ b/google/cloud/bigtable/bigtable_client_unit_tests.bzl @@ -64,7 +64,7 @@ bigtable_client_unit_tests = [ "internal/mutate_rows_limiter_test.cc", "internal/operation_context_factory_test.cc", "internal/operation_context_test.cc", - "internal/partial_result_set_reader_test.cc", + "internal/partial_result_set_source_test.cc", "internal/prefix_range_end_test.cc", "internal/rate_limiter_test.cc", "internal/retry_traits_test.cc", From da8ded08b4ea42c2cf8f85609c66cf5ab393b208 Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Tue, 14 Oct 2025 02:07:16 +0000 Subject: [PATCH 23/26] remove checksum verification from testing --- .../internal/partial_result_set_source_test.cc | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/google/cloud/bigtable/internal/partial_result_set_source_test.cc b/google/cloud/bigtable/internal/partial_result_set_source_test.cc index 1f95edeedff02..d90624e710a9d 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source_test.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source_test.cc @@ -156,8 +156,6 @@ TEST(PartialResultSetSourceTest, SingleResponse) { ASSERT_TRUE(TextFormat::ParseFromString(kProtoRowsText, &proto_rows)); std::string binary_batch_data = proto_rows.SerializeAsString(); - uint32_t correct_checksum = - static_cast(absl::ComputeCrc32c(binary_batch_data)); std::string partial_result_set_text = absl::Substitute(R"pb( proto_rows_batch: { @@ -166,9 +164,9 @@ TEST(PartialResultSetSourceTest, SingleResponse) { resume_token: "AAAAAWVyZXN1bWVfdG9rZW4=", reset: true, estimated_batch_size: 31, - batch_checksum: $1 + batch_checksum: 123456 )pb", - binary_batch_data, correct_checksum); + binary_batch_data); google::bigtable::v2::PartialResultSet response; ASSERT_TRUE(TextFormat::ParseFromString(partial_result_set_text, &response)); @@ -242,8 +240,6 @@ TEST(PartialResultSetSourceTest, MultipleResponses) { google::bigtable::v2::ProtoRows proto_rows; ASSERT_TRUE(TextFormat::ParseFromString(text, &proto_rows)); std::string binary_batch_data = proto_rows.SerializeAsString(); - uint32_t correct_checksum = - static_cast(absl::ComputeCrc32c(binary_batch_data)); std::string partial_result_set_text = absl::Substitute( R"pb( proto_rows_batch: { @@ -252,9 +248,9 @@ TEST(PartialResultSetSourceTest, MultipleResponses) { resume_token: "AAAAAWVyZXN1bWVfdG9rZW4=", reset: true, estimated_batch_size: 31, - batch_checksum: $1 + batch_checksum: 123456 )pb", - binary_batch_data, correct_checksum); + binary_batch_data); google::bigtable::v2::PartialResultSet response; ASSERT_TRUE( TextFormat::ParseFromString(partial_result_set_text, &response)); @@ -339,8 +335,6 @@ TEST(PartialResultSetSourceTest, ResponseWithNoValues) { google::bigtable::v2::ProtoRows proto_rows; ASSERT_TRUE(TextFormat::ParseFromString(text, &proto_rows)); std::string binary_batch_data = proto_rows.SerializeAsString(); - uint32_t correct_checksum = - static_cast(absl::ComputeCrc32c(binary_batch_data)); std::string partial_result_set_text = absl::Substitute( R"pb( proto_rows_batch: { @@ -349,9 +343,9 @@ TEST(PartialResultSetSourceTest, ResponseWithNoValues) { resume_token: "AAAAAWVyZXN1bWVfdG9rZW4=", reset: true, estimated_batch_size: 31, - batch_checksum: $1 + batch_checksum: 123456 )pb", - binary_batch_data, correct_checksum); + binary_batch_data); google::bigtable::v2::PartialResultSet response; ASSERT_TRUE( TextFormat::ParseFromString(partial_result_set_text, &response)); From 39d82df6258fdb38f067aed44f1ad6d51232cac4 Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Tue, 14 Oct 2025 02:16:16 +0000 Subject: [PATCH 24/26] remove ElementsAre --- google/cloud/bigtable/internal/partial_result_set_source_test.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/google/cloud/bigtable/internal/partial_result_set_source_test.cc b/google/cloud/bigtable/internal/partial_result_set_source_test.cc index d90624e710a9d..929a04e28d256 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source_test.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source_test.cc @@ -40,7 +40,6 @@ namespace { using ::google::cloud::testing_util::StatusIs; using ::google::protobuf::TextFormat; using ::testing::_; -using ::testing::ElementsAre; using ::testing::Return; using ::testing::UnitTest; From 038fd5ef506633d51573d5d94d80a1cccb873fe5 Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Tue, 14 Oct 2025 18:31:08 +0000 Subject: [PATCH 25/26] address feedback --- .../internal/partial_result_set_source.cc | 59 +++++++++---------- .../internal/partial_result_set_source.h | 19 +++--- .../partial_result_set_source_test.cc | 3 +- 3 files changed, 37 insertions(+), 44 deletions(-) diff --git a/google/cloud/bigtable/internal/partial_result_set_source.cc b/google/cloud/bigtable/internal/partial_result_set_source.cc index f484e06b39e23..a8830e7c973ad 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source.cc @@ -25,7 +25,7 @@ namespace bigtable_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN -StatusOr> +StatusOr> PartialResultSetSource::Create( absl::optional metadata, std::unique_ptr reader) { @@ -37,7 +37,7 @@ PartialResultSetSource::Create( // If the initial read finished the stream, and `Finish()` failed, then // creating the `PartialResultSetSource` should fail with the same error. - if (source->state_ == kFinished && !status.ok()) return status; + if (source->state_ == State::kFinished && !status.ok()) return status; return {std::move(source)}; } @@ -59,13 +59,13 @@ PartialResultSetSource::PartialResultSetSource( PartialResultSetSource::~PartialResultSetSource() { internal::OptionsSpan span(options_); - if (state_ == kReading) { + if (state_ == State::kReading) { // Finish() can deadlock if there is still data in the streaming RPC, // so before trying to read the final status we need to cancel. reader_->TryCancel(); - state_ = kEndOfStream; + state_ = State::kEndOfStream; } - if (state_ == kEndOfStream) { + if (state_ == State::kEndOfStream) { // The user didn't iterate over all the data, so finish the stream on // their behalf, although we have no way to communicate error status. auto status = reader_->Finish(); @@ -74,13 +74,13 @@ PartialResultSetSource::~PartialResultSetSource() { << "PartialResultSetSource: Finish() failed in destructor: " << status; } - state_ = kFinished; + state_ = State::kFinished; } } StatusOr PartialResultSetSource::NextRow() { while (rows_.empty()) { - if (state_ == kFinished) return bigtable::QueryRow(); + if (state_ == State::kFinished) return bigtable::QueryRow(); internal::OptionsSpan span(options_); // Continue fetching if there are more rows in the stream. auto status = ReadFromStream(); @@ -93,7 +93,7 @@ StatusOr PartialResultSetSource::NextRow() { } Status PartialResultSetSource::ReadFromStream() { - if (state_ == kFinished) { + if (state_ == State::kFinished) { return internal::InternalError("PartialResultSetSource already finished", GCP_ERROR_INFO()); } @@ -114,11 +114,11 @@ Status PartialResultSetSource::ReadFromStream() { if (reader_->Read(resume_token_, result_set)) { return ProcessDataFromStream(result_set.result); } - state_ = kFinished; - // The uncommitted_rows_ is expected to be empty because the last successful + state_ = State::kFinished; + // The buffered_rows_ is expected to be empty because the last successful // read would have had a sentinel resume_token, causing // ProcessDataFromStream to commit them. - if (!uncommitted_rows_.empty()) { + if (!buffered_rows_.empty()) { return internal::InternalError("Stream ended with uncommitted rows.", GCP_ERROR_INFO()); } @@ -131,7 +131,7 @@ Status PartialResultSetSource::ProcessDataFromStream( // resume_token should be discarded. if (result.reset()) { read_buffer_.clear(); - uncommitted_rows_.clear(); + buffered_rows_.clear(); } // Reserve space of the buffer at the start of a new batch of data. @@ -145,43 +145,40 @@ Status PartialResultSetSource::ProcessDataFromStream( // TODO(#15617): Validate that the checksum matches the contents of `buffer`. if (result.has_batch_checksum() && !read_buffer_.empty()) { - google::bigtable::v2::ProtoRows proto_rows; - if (proto_rows.ParseFromString(read_buffer_)) { - auto status = BufferProtoRows(proto_rows); + if (proto_rows_.ParseFromString(read_buffer_)) { + auto status = BufferProtoRows(); if (!status.ok()) return status; + proto_rows_.Clear(); } else { read_buffer_.clear(); - uncommitted_rows_.clear(); + buffered_rows_.clear(); return internal::InternalError("Failed to parse ProtoRows from buffer"); } } - // Buffered rows in uncommitted_rows_ are ready to be committed into rows_ + // Buffered rows in buffered_rows_ are ready to be committed into rows_ // once the resume_token is received. if (!result.resume_token().empty()) { - rows_.insert(rows_.end(), uncommitted_rows_.begin(), - uncommitted_rows_.end()); - uncommitted_rows_.clear(); + rows_.insert(rows_.end(), buffered_rows_.begin(), buffered_rows_.end()); + buffered_rows_.clear(); read_buffer_.clear(); resume_token_ = result.resume_token(); } return {}; // OK } -Status PartialResultSetSource::BufferProtoRows( - google::bigtable::v2::ProtoRows const& proto_rows) { +Status PartialResultSetSource::BufferProtoRows() { if (metadata_.has_value()) { auto const& proto_schema = metadata_->proto_schema(); auto const columns_size = proto_schema.columns_size(); - auto const& proto_values = proto_rows.values(); + auto const& proto_values = proto_rows_.values(); - if (columns_size == 0) { - if (!proto_values.empty()) { - return internal::InternalError( - "ProtoRows has values but the schema has no columns.", - GCP_ERROR_INFO()); - } - } else if (proto_values.size() % columns_size != 0) { + if (columns_size == 0 && !proto_values.empty()) { + return internal::InternalError( + "ProtoRows has values but the schema has no columns.", + GCP_ERROR_INFO()); + } + if (proto_values.size() % columns_size != 0) { return internal::InternalError( "The number of values in ProtoRows is not a multiple of the " "number of columns in the schema.", @@ -198,7 +195,7 @@ Status PartialResultSetSource::BufferProtoRows( values.push_back(std::move(value)); ++parsed_value; } - uncommitted_rows_.push_back( + buffered_rows_.push_back( QueryRowFriend::MakeQueryRow(std::move(values), columns_)); values.clear(); } diff --git a/google/cloud/bigtable/internal/partial_result_set_source.h b/google/cloud/bigtable/internal/partial_result_set_source.h index 0750c9c4ce521..98928430033af 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.h +++ b/google/cloud/bigtable/internal/partial_result_set_source.h @@ -38,15 +38,10 @@ namespace cloud { namespace bigtable_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN -class PartialResultSourceInterface : public bigtable::ResultSourceInterface {}; - -/** - * Class used to iterate over the rows returned from a read operations. - */ -class PartialResultSetSource : public PartialResultSourceInterface { +class PartialResultSetSource : public bigtable::ResultSourceInterface { public: /// Factory method to create a PartialResultSetSource. - static StatusOr> Create( + static StatusOr> Create( absl::optional metadata, std::unique_ptr reader); @@ -65,7 +60,7 @@ class PartialResultSetSource : public PartialResultSourceInterface { Status ReadFromStream(); Status ProcessDataFromStream(google::bigtable::v2::PartialResultSet& result); - Status BufferProtoRows(google::bigtable::v2::ProtoRows const& proto_rows); + Status BufferProtoRows(); std::string read_buffer_; // Arena for the values_ field. @@ -80,7 +75,8 @@ class PartialResultSetSource : public PartialResultSourceInterface { std::shared_ptr> columns_; std::deque rows_; - std::vector uncommitted_rows_; + std::vector buffered_rows_; + google::bigtable::v2::ProtoRows proto_rows_; // When engaged, the token we can use to resume the stream immediately after // any data in (or previously in) `rows_`. When disengaged, we have already @@ -89,7 +85,7 @@ class PartialResultSetSource : public PartialResultSourceInterface { absl::optional resume_token_ = ""; // The state of our PartialResultSetReader. - enum : char { + enum class State { // `Read()` has yet to return nullopt. kReading, // `Read()` has returned nullopt, but we are yet to call `Finish()`. @@ -97,7 +93,8 @@ class PartialResultSetSource : public PartialResultSourceInterface { // `Finish()` has been called, which means `NextRow()` has returned // either an empty row or an error status. kFinished, - } state_ = kReading; + }; + State state_ = State::kReading; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/bigtable/internal/partial_result_set_source_test.cc b/google/cloud/bigtable/internal/partial_result_set_source_test.cc index 929a04e28d256..2f60759db03eb 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source_test.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source_test.cc @@ -1,4 +1,3 @@ - // Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -54,7 +53,7 @@ struct StringOption { // Create the `PartialResultSetSource` within an `OptionsSpan` that has its // `StringOption` set to the current test name, so that we might check that // all `PartialResultSetReader` calls happen within a matching span. -StatusOr> +StatusOr> CreatePartialResultSetSource( absl::optional metadata, std::unique_ptr reader, Options opts = {}) { From 4d2c757891c4860765da15f91ad953aab54e9d6d Mon Sep 17 00:00:00 2001 From: mpeddada1 Date: Tue, 14 Oct 2025 19:26:14 +0000 Subject: [PATCH 26/26] clear before return --- google/cloud/bigtable/internal/partial_result_set_source.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigtable/internal/partial_result_set_source.cc b/google/cloud/bigtable/internal/partial_result_set_source.cc index a8830e7c973ad..58d0a6df89534 100644 --- a/google/cloud/bigtable/internal/partial_result_set_source.cc +++ b/google/cloud/bigtable/internal/partial_result_set_source.cc @@ -147,8 +147,8 @@ Status PartialResultSetSource::ProcessDataFromStream( if (result.has_batch_checksum() && !read_buffer_.empty()) { if (proto_rows_.ParseFromString(read_buffer_)) { auto status = BufferProtoRows(); - if (!status.ok()) return status; proto_rows_.Clear(); + if (!status.ok()) return status; } else { read_buffer_.clear(); buffered_rows_.clear();