diff --git a/google/cloud/bigtable/CMakeLists.txt b/google/cloud/bigtable/CMakeLists.txt index 6b0fb282d60e6..76c121af6b604 100644 --- a/google/cloud/bigtable/CMakeLists.txt +++ b/google/cloud/bigtable/CMakeLists.txt @@ -205,6 +205,9 @@ add_library( internal/operation_context.h internal/operation_context_factory.cc internal/operation_context_factory.h + internal/partial_result_set_reader.h + internal/partial_result_set_source.cc + internal/partial_result_set_source.h internal/prefix_range_end.cc internal/prefix_range_end.h internal/rate_limiter.cc @@ -389,6 +392,7 @@ if (BUILD_TESTING) testing/mock_data_client.h testing/mock_mutate_rows_limiter.h testing/mock_mutate_rows_reader.h + testing/mock_partial_result_set_reader.h testing/mock_policies.h testing/mock_read_rows_reader.h testing/mock_response_reader.h @@ -470,6 +474,7 @@ if (BUILD_TESTING) internal/mutate_rows_limiter_test.cc internal/operation_context_factory_test.cc internal/operation_context_test.cc + internal/partial_result_set_source_test.cc internal/prefix_range_end_test.cc internal/rate_limiter_test.cc internal/retry_traits_test.cc diff --git a/google/cloud/bigtable/bigtable_client_testing.bzl b/google/cloud/bigtable/bigtable_client_testing.bzl index d3e8a11cba1cd..865460e3aead3 100644 --- a/google/cloud/bigtable/bigtable_client_testing.bzl +++ b/google/cloud/bigtable/bigtable_client_testing.bzl @@ -25,6 +25,7 @@ bigtable_client_testing_hdrs = [ "testing/mock_data_client.h", "testing/mock_mutate_rows_limiter.h", "testing/mock_mutate_rows_reader.h", + "testing/mock_partial_result_set_reader.h", "testing/mock_policies.h", "testing/mock_read_rows_reader.h", "testing/mock_response_reader.h", diff --git a/google/cloud/bigtable/bigtable_client_unit_tests.bzl b/google/cloud/bigtable/bigtable_client_unit_tests.bzl index 116f5657ffda3..c34b2a728217e 100644 --- a/google/cloud/bigtable/bigtable_client_unit_tests.bzl +++ b/google/cloud/bigtable/bigtable_client_unit_tests.bzl @@ -64,6 +64,7 @@ bigtable_client_unit_tests = [ "internal/mutate_rows_limiter_test.cc", "internal/operation_context_factory_test.cc", "internal/operation_context_test.cc", + "internal/partial_result_set_source_test.cc", "internal/prefix_range_end_test.cc", "internal/rate_limiter_test.cc", "internal/retry_traits_test.cc", diff --git a/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl b/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl index 1b5b8675775cb..c264bc654d06d 100644 --- a/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl +++ b/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl @@ -101,6 +101,8 @@ google_cloud_cpp_bigtable_hdrs = [ "internal/mutate_rows_limiter.h", "internal/operation_context.h", "internal/operation_context_factory.h", + "internal/partial_result_set_reader.h", + "internal/partial_result_set_source.h", "internal/prefix_range_end.h", "internal/rate_limiter.h", "internal/readrowsparser.h", @@ -211,6 +213,7 @@ google_cloud_cpp_bigtable_srcs = [ "internal/mutate_rows_limiter.cc", "internal/operation_context.cc", "internal/operation_context_factory.cc", + "internal/partial_result_set_source.cc", "internal/prefix_range_end.cc", "internal/rate_limiter.cc", "internal/readrowsparser.cc", diff --git a/google/cloud/bigtable/internal/partial_result_set_reader.h b/google/cloud/bigtable/internal/partial_result_set_reader.h new file mode 100644 index 0000000000000..3cecf01de9884 --- /dev/null +++ b/google/cloud/bigtable/internal/partial_result_set_reader.h @@ -0,0 +1,66 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_READER_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_READER_H + +#include "google/cloud/status.h" +#include + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +/** + * The result of a successful `PartialResultSetReader::Read()`, which may + * be the next partial result of a stream, or a resumption of an interrupted + * stream from the `resume_token` if it was engaged. In the latter case, the + * caller should discard any pending state not covered by the token, as that + * data will be replayed. + */ +struct UnownedPartialResultSet { + static UnownedPartialResultSet FromPartialResultSet( + google::bigtable::v2::PartialResultSet& result) { + return UnownedPartialResultSet{result, false}; + } + + google::bigtable::v2::PartialResultSet& result; + bool resumption; +}; + +/** + * Wrap `grpc::ClientReaderInterface`. + * + * This defines an interface to handle a streaming RPC returning a sequence + * of `google::bigtable::v2::PartialResultSet`. Its main purpose is to + * simplify memory management, as each streaming RPC requires two separate + * `std::unique_ptr<>`. As a side-effect, it is also easier to mock as it + * has a narrower interface. + */ +class PartialResultSetReader { + public: + virtual ~PartialResultSetReader() = default; + virtual void TryCancel() = 0; + virtual bool Read(absl::optional const& resume_token, + UnownedPartialResultSet& result) = 0; + virtual Status Finish() = 0; +}; + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_READER_H diff --git a/google/cloud/bigtable/internal/partial_result_set_source.cc b/google/cloud/bigtable/internal/partial_result_set_source.cc new file mode 100644 index 0000000000000..58d0a6df89534 --- /dev/null +++ b/google/cloud/bigtable/internal/partial_result_set_source.cc @@ -0,0 +1,209 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/bigtable/internal/partial_result_set_source.h" +#include "google/cloud/bigtable/options.h" +#include "google/cloud/internal/absl_str_cat_quiet.h" +#include "google/cloud/internal/make_status.h" +#include "google/cloud/log.h" +#include "absl/types/optional.h" + +namespace google { +namespace cloud { +namespace bigtable_internal { + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +StatusOr> +PartialResultSetSource::Create( + absl::optional metadata, + std::unique_ptr reader) { + std::unique_ptr source( + new PartialResultSetSource(std::move(metadata), std::move(reader))); + + // Do an initial read from the stream to determine the fate of the factory. + auto status = source->ReadFromStream(); + + // If the initial read finished the stream, and `Finish()` failed, then + // creating the `PartialResultSetSource` should fail with the same error. + if (source->state_ == State::kFinished && !status.ok()) return status; + + return {std::move(source)}; +} + +PartialResultSetSource::PartialResultSetSource( + absl::optional metadata, + std::unique_ptr reader) + : options_(internal::CurrentOptions()), + reader_(std::move(reader)), + metadata_(std::move(metadata)) { + if (metadata_.has_value()) { + columns_ = std::make_shared>(); + columns_->reserve(metadata_->proto_schema().columns_size()); + for (auto const& c : metadata_->proto_schema().columns()) { + columns_->push_back(c.name()); + } + } +} + +PartialResultSetSource::~PartialResultSetSource() { + internal::OptionsSpan span(options_); + if (state_ == State::kReading) { + // Finish() can deadlock if there is still data in the streaming RPC, + // so before trying to read the final status we need to cancel. + reader_->TryCancel(); + state_ = State::kEndOfStream; + } + if (state_ == State::kEndOfStream) { + // The user didn't iterate over all the data, so finish the stream on + // their behalf, although we have no way to communicate error status. + auto status = reader_->Finish(); + if (!status.ok() && status.code() != StatusCode::kCancelled) { + GCP_LOG(WARNING) + << "PartialResultSetSource: Finish() failed in destructor: " + << status; + } + state_ = State::kFinished; + } +} + +StatusOr PartialResultSetSource::NextRow() { + while (rows_.empty()) { + if (state_ == State::kFinished) return bigtable::QueryRow(); + internal::OptionsSpan span(options_); + // Continue fetching if there are more rows in the stream. + auto status = ReadFromStream(); + if (!status.ok()) return status; + } + // Returns the row at the front of the queue + auto row = std::move(rows_.front()); + rows_.pop_front(); + return row; +} + +Status PartialResultSetSource::ReadFromStream() { + if (state_ == State::kFinished) { + return internal::InternalError("PartialResultSetSource already finished", + GCP_ERROR_INFO()); + } + // The application should consume rows_ before calling ReadFromStream again. + if (!rows_.empty()) { + return internal::InternalError("PartialResultSetSource has unconsumed rows", + GCP_ERROR_INFO()); + } + + auto* raw_result_set = + google::protobuf::Arena::Create( + &arena_); + auto result_set = + UnownedPartialResultSet::FromPartialResultSet(*raw_result_set); + + // The resume_token_ member holds the token from the previous + // PartialResultSet. It's empty on the first call. + if (reader_->Read(resume_token_, result_set)) { + return ProcessDataFromStream(result_set.result); + } + state_ = State::kFinished; + // The buffered_rows_ is expected to be empty because the last successful + // read would have had a sentinel resume_token, causing + // ProcessDataFromStream to commit them. + if (!buffered_rows_.empty()) { + return internal::InternalError("Stream ended with uncommitted rows.", + GCP_ERROR_INFO()); + } + return reader_->Finish(); +} + +Status PartialResultSetSource::ProcessDataFromStream( + google::bigtable::v2::PartialResultSet& result) { + // If the `reset` is true then all the data buffered since the last + // resume_token should be discarded. + if (result.reset()) { + read_buffer_.clear(); + buffered_rows_.clear(); + } + + // Reserve space of the buffer at the start of a new batch of data. + if (result.estimated_batch_size() > 0 && read_buffer_.empty()) { + read_buffer_.reserve(result.estimated_batch_size()); + } + + if (result.has_proto_rows_batch()) { + absl::StrAppend(&read_buffer_, result.proto_rows_batch().batch_data()); + } + + // TODO(#15617): Validate that the checksum matches the contents of `buffer`. + if (result.has_batch_checksum() && !read_buffer_.empty()) { + if (proto_rows_.ParseFromString(read_buffer_)) { + auto status = BufferProtoRows(); + proto_rows_.Clear(); + if (!status.ok()) return status; + } else { + read_buffer_.clear(); + buffered_rows_.clear(); + return internal::InternalError("Failed to parse ProtoRows from buffer"); + } + } + + // Buffered rows in buffered_rows_ are ready to be committed into rows_ + // once the resume_token is received. + if (!result.resume_token().empty()) { + rows_.insert(rows_.end(), buffered_rows_.begin(), buffered_rows_.end()); + buffered_rows_.clear(); + read_buffer_.clear(); + resume_token_ = result.resume_token(); + } + return {}; // OK +} + +Status PartialResultSetSource::BufferProtoRows() { + if (metadata_.has_value()) { + auto const& proto_schema = metadata_->proto_schema(); + auto const columns_size = proto_schema.columns_size(); + auto const& proto_values = proto_rows_.values(); + + if (columns_size == 0 && !proto_values.empty()) { + return internal::InternalError( + "ProtoRows has values but the schema has no columns.", + GCP_ERROR_INFO()); + } + if (proto_values.size() % columns_size != 0) { + return internal::InternalError( + "The number of values in ProtoRows is not a multiple of the " + "number of columns in the schema.", + GCP_ERROR_INFO()); + } + + auto parsed_value = proto_values.begin(); + std::vector values; + values.reserve(columns_size); + + while (parsed_value != proto_values.end()) { + for (auto const& column : proto_schema.columns()) { + auto value = FromProto(column.type(), *parsed_value); + values.push_back(std::move(value)); + ++parsed_value; + } + buffered_rows_.push_back( + QueryRowFriend::MakeQueryRow(std::move(values), columns_)); + values.clear(); + } + } + return {}; +} + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google diff --git a/google/cloud/bigtable/internal/partial_result_set_source.h b/google/cloud/bigtable/internal/partial_result_set_source.h new file mode 100644 index 0000000000000..98928430033af --- /dev/null +++ b/google/cloud/bigtable/internal/partial_result_set_source.h @@ -0,0 +1,105 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_SOURCE_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_SOURCE_H + +#include "google/cloud/bigtable/internal/partial_result_set_reader.h" +#include "google/cloud/bigtable/results.h" +#include "google/cloud/bigtable/value.h" +#include "google/cloud/bigtable/version.h" +#include "google/cloud/options.h" +#include "google/cloud/status.h" +#include "google/cloud/status_or.h" +#include "absl/types/optional.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +class PartialResultSetSource : public bigtable::ResultSourceInterface { + public: + /// Factory method to create a PartialResultSetSource. + static StatusOr> Create( + absl::optional metadata, + std::unique_ptr reader); + + ~PartialResultSetSource() override; + + StatusOr NextRow() override; + + absl::optional Metadata() override { + return metadata_; + } + + private: + explicit PartialResultSetSource( + absl::optional metadata, + std::unique_ptr reader); + + Status ReadFromStream(); + Status ProcessDataFromStream(google::bigtable::v2::PartialResultSet& result); + Status BufferProtoRows(); + std::string read_buffer_; + + // Arena for the values_ field. + google::protobuf::Arena arena_; + + Options options_; + std::unique_ptr reader_; + + // The ResultSetMetadata is received in the first response. It is received + // from ExecuteQueryResponse + absl::optional metadata_; + + std::shared_ptr> columns_; + std::deque rows_; + std::vector buffered_rows_; + google::bigtable::v2::ProtoRows proto_rows_; + + // When engaged, the token we can use to resume the stream immediately after + // any data in (or previously in) `rows_`. When disengaged, we have already + // delivered data that would be replayed, so resumption is disabled until we + // see a new token. + absl::optional resume_token_ = ""; + + // The state of our PartialResultSetReader. + enum class State { + // `Read()` has yet to return nullopt. + kReading, + // `Read()` has returned nullopt, but we are yet to call `Finish()`. + kEndOfStream, + // `Finish()` has been called, which means `NextRow()` has returned + // either an empty row or an error status. + kFinished, + }; + State state_ = State::kReading; +}; + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_SOURCE_H diff --git a/google/cloud/bigtable/internal/partial_result_set_source_test.cc b/google/cloud/bigtable/internal/partial_result_set_source_test.cc new file mode 100644 index 0000000000000..2f60759db03eb --- /dev/null +++ b/google/cloud/bigtable/internal/partial_result_set_source_test.cc @@ -0,0 +1,391 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/bigtable/internal/partial_result_set_source.h" +#include "google/cloud/bigtable/mocks/mock_query_row.h" +#include "google/cloud/bigtable/options.h" +#include "google/cloud/bigtable/query_row.h" +#include "google/cloud/bigtable/testing/mock_partial_result_set_reader.h" +#include "google/cloud/bigtable/value.h" +#include "google/cloud/options.h" +#include "google/cloud/testing_util/is_proto_equal.h" +#include "google/cloud/testing_util/status_matchers.h" +#include "absl/strings/substitute.h" +#include +#include +#include +#include +#include +#include +#include + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +namespace { + +using ::google::cloud::testing_util::StatusIs; +using ::google::protobuf::TextFormat; +using ::testing::_; +using ::testing::Return; +using ::testing::UnitTest; + +std::string CurrentTestName() { + return UnitTest::GetInstance()->current_test_info()->name(); +} + +struct StringOption { + using Type = std::string; +}; + +// Create the `PartialResultSetSource` within an `OptionsSpan` that has its +// `StringOption` set to the current test name, so that we might check that +// all `PartialResultSetReader` calls happen within a matching span. +StatusOr> +CreatePartialResultSetSource( + absl::optional metadata, + std::unique_ptr reader, Options opts = {}) { + internal::OptionsSpan span(internal::MergeOptions( + std::move(opts.set(CurrentTestName())), + internal::CurrentOptions())); + return PartialResultSetSource::Create(std::move(metadata), std::move(reader)); +} + +// Returns a functor that will return the argument after expecting that the +// current `StringOption` matches the test name. +template +std::function ResultMock(T const& result) { + return [result]() { + EXPECT_EQ(internal::CurrentOptions().get(), + CurrentTestName()); + return result; + }; +} + +MATCHER_P(IsValidAndEquals, expected, + "Verifies that a StatusOr contains the given QueryRow") { + return arg && *arg == expected; +} + +/// @test Verify the behavior when the initial `Read()` fails. +TEST(PartialResultSetSourceTest, InitialReadFailure) { + auto grpc_reader = + std::make_unique(); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([](absl::optional const&, + UnownedPartialResultSet&) { return false; }); + EXPECT_CALL(*grpc_reader, Finish()) + .WillOnce(ResultMock(Status(StatusCode::kInvalidArgument, "invalid"))); + EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); + + internal::OptionsSpan overlay(Options{}.set("uh-oh")); + auto reader = + CreatePartialResultSetSource(absl::nullopt, std::move(grpc_reader)); + EXPECT_THAT(reader, StatusIs(StatusCode::kInvalidArgument, "invalid")); +} + +/** + * @test Verify the behavior when the response does not contain data. + */ +TEST(PartialResultSetSourceTest, MissingRowTypeNoData) { + auto constexpr kText = R"pb( + proto_rows_batch: {} + )pb"; + google::bigtable::v2::PartialResultSet response; + ASSERT_TRUE(TextFormat::ParseFromString(kText, &response)); + + auto grpc_reader = + std::make_unique(); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&response](absl::optional const&, + UnownedPartialResultSet& result) { + result.result = response; + return true; + }) + .WillOnce(Return(false)); + EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); + EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); + + internal::OptionsSpan overlay(Options{}.set("uh-oh")); + auto reader = CreatePartialResultSetSource( + google::bigtable::v2::ResultSetMetadata{}, std::move(grpc_reader)); + ASSERT_STATUS_OK(reader); + EXPECT_THAT((*reader)->NextRow(), IsValidAndEquals(bigtable::QueryRow{})); +} + +/// @test Verify a single response is handled correctly. +TEST(PartialResultSetSourceTest, SingleResponse) { + auto constexpr kResultMetadataText = R"pb( + proto_schema { + columns { + name: "user_id" + type { string_type {} } + } + columns { + name: "email" + type { string_type {} } + } + columns { + name: "name" + type { string_type {} } + } + } + )pb"; + google::bigtable::v2::ResultSetMetadata metadata; + ASSERT_TRUE(TextFormat::ParseFromString(kResultMetadataText, &metadata)); + auto constexpr kProtoRowsText = R"pb( + values { string_value: "r1" } + values { string_value: "f1" } + values { string_value: "q1" } + )pb"; + google::bigtable::v2::ProtoRows proto_rows; + ASSERT_TRUE(TextFormat::ParseFromString(kProtoRowsText, &proto_rows)); + + std::string binary_batch_data = proto_rows.SerializeAsString(); + std::string partial_result_set_text = + absl::Substitute(R"pb( + proto_rows_batch: { + batch_data: "$0", + }, + resume_token: "AAAAAWVyZXN1bWVfdG9rZW4=", + reset: true, + estimated_batch_size: 31, + batch_checksum: 123456 + )pb", + binary_batch_data); + google::bigtable::v2::PartialResultSet response; + ASSERT_TRUE(TextFormat::ParseFromString(partial_result_set_text, &response)); + + auto grpc_reader = + std::make_unique(); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&response](absl::optional const&, + UnownedPartialResultSet& result) { + result.result = response; + return true; + }) + .WillOnce(Return(false)); + EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); + EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); + auto reader = CreatePartialResultSetSource(metadata, std::move(grpc_reader)); + ASSERT_STATUS_OK(reader); + auto row = (*reader)->NextRow(); + ASSERT_STATUS_OK(row); + auto first_row = *row; + ASSERT_EQ(first_row.values().size(), 3); + EXPECT_EQ(*row->values().at(0).get(), "r1"); + EXPECT_EQ(*row->values().at(1).get(), "f1"); + EXPECT_EQ(*row->values().at(2).get(), "q1"); + + EXPECT_THAT((*reader)->NextRow(), IsValidAndEquals(bigtable::QueryRow{})); +} + +/** + * @test Verify the behavior when we get an incomplete Row. + */ +TEST(PartialResultSetSourceTest, MultipleResponses) { + auto constexpr kResultMetadataText = R"pb( + proto_schema { + columns { + name: "user_id" + type { string_type {} } + } + columns { + name: "email" + type { string_type {} } + } + columns { + name: "name" + type { string_type {} } + } + } + )pb"; + google::bigtable::v2::ResultSetMetadata metadata; + ASSERT_TRUE(TextFormat::ParseFromString(kResultMetadataText, &metadata)); + + std::array proto_rows_text{{ + R"pb( + values { string_value: "a1" } + values { string_value: "a2" } + values { string_value: "a3" } + )pb", + R"pb( + values { string_value: "b1" } + values { string_value: "b2" } + values { string_value: "b3" } + )pb", + R"pb( + values { string_value: "c1" } + values { string_value: "c2" } + values { string_value: "c3" } + )pb", + }}; + + std::vector responses; + for (auto const* text : proto_rows_text) { + google::bigtable::v2::ProtoRows proto_rows; + ASSERT_TRUE(TextFormat::ParseFromString(text, &proto_rows)); + std::string binary_batch_data = proto_rows.SerializeAsString(); + std::string partial_result_set_text = absl::Substitute( + R"pb( + proto_rows_batch: { + batch_data: "$0", + }, + resume_token: "AAAAAWVyZXN1bWVfdG9rZW4=", + reset: true, + estimated_batch_size: 31, + batch_checksum: 123456 + )pb", + binary_batch_data); + google::bigtable::v2::PartialResultSet response; + ASSERT_TRUE( + TextFormat::ParseFromString(partial_result_set_text, &response)); + responses.push_back(std::move(response)); + } + + auto grpc_reader = + std::make_unique(); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&responses](absl::optional const&, + UnownedPartialResultSet& result) { + result.result = responses[0]; + return true; + }) + .WillOnce([&responses](absl::optional const&, + UnownedPartialResultSet& result) { + result.result = responses[1]; + return true; + }) + .WillOnce([&responses](absl::optional const&, + UnownedPartialResultSet& result) { + result.result = responses[2]; + return true; + }) + .WillOnce(Return(false)); + EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); + EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); + + auto reader = CreatePartialResultSetSource(metadata, std::move(grpc_reader)); + ASSERT_STATUS_OK(reader); + + auto row0 = (*reader)->NextRow(); + ASSERT_STATUS_OK(row0); + ASSERT_EQ(row0->values().size(), 3); + EXPECT_EQ(*row0->values().at(0).get(), "a1"); + EXPECT_EQ(*row0->values().at(1).get(), "a2"); + EXPECT_EQ(*row0->values().at(2).get(), "a3"); + + auto row1 = (*reader)->NextRow(); + ASSERT_STATUS_OK(row1); + ASSERT_EQ(row1->values().size(), 3); + EXPECT_EQ(*row1->values().at(0).get(), "b1"); + EXPECT_EQ(*row1->values().at(1).get(), "b2"); + EXPECT_EQ(*row1->values().at(2).get(), "b3"); + + auto row2 = (*reader)->NextRow(); + ASSERT_STATUS_OK(row2); + ASSERT_EQ(row2->values().size(), 3); + EXPECT_EQ(*row2->values().at(0).get(), "c1"); + EXPECT_EQ(*row2->values().at(1).get(), "c2"); + EXPECT_EQ(*row2->values().at(2).get(), "c3"); + + EXPECT_THAT((*reader)->NextRow(), IsValidAndEquals(bigtable::QueryRow{})); +} + +/** + * @test Verify the behavior when we get an incomplete Row. + */ +TEST(PartialResultSetSourceTest, ResponseWithNoValues) { + auto constexpr kResultMetadataText = R"pb( + proto_schema { + columns { + name: "user_id" + type { string_type {} } + } + } + )pb"; + google::bigtable::v2::ResultSetMetadata metadata; + ASSERT_TRUE(TextFormat::ParseFromString(kResultMetadataText, &metadata)); + + std::array proto_rows_text{{ + R"pb( + values { string_value: "a1" } + )pb", + R"pb( + )pb", + R"pb( + )pb"}}; + + std::vector responses; + for (auto const* text : proto_rows_text) { + google::bigtable::v2::ProtoRows proto_rows; + ASSERT_TRUE(TextFormat::ParseFromString(text, &proto_rows)); + std::string binary_batch_data = proto_rows.SerializeAsString(); + std::string partial_result_set_text = absl::Substitute( + R"pb( + proto_rows_batch: { + batch_data: "$0", + }, + resume_token: "AAAAAWVyZXN1bWVfdG9rZW4=", + reset: true, + estimated_batch_size: 31, + batch_checksum: 123456 + )pb", + binary_batch_data); + google::bigtable::v2::PartialResultSet response; + ASSERT_TRUE( + TextFormat::ParseFromString(partial_result_set_text, &response)); + responses.push_back(std::move(response)); + } + + auto grpc_reader = + std::make_unique(); + EXPECT_CALL(*grpc_reader, Read(_, _)) + .WillOnce([&responses](absl::optional const&, + UnownedPartialResultSet& result) { + result.result = responses[0]; + return true; + }) + .WillOnce([&responses](absl::optional const&, + UnownedPartialResultSet& result) { + result.result = responses[1]; + return true; + }) + .WillOnce([&responses](absl::optional const&, + UnownedPartialResultSet& result) { + result.result = responses[2]; + return true; + }) + .WillOnce(Return(false)); + EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status())); + EXPECT_CALL(*grpc_reader, TryCancel()).Times(0); + internal::OptionsSpan overlay(Options{}.set("uh-oh")); + auto reader = CreatePartialResultSetSource(metadata, std::move(grpc_reader)); + ASSERT_STATUS_OK(reader); + + // Verify the returned row is correct. + EXPECT_THAT((*reader)->NextRow(), + IsValidAndEquals(bigtable_mocks::MakeQueryRow( + {{"user_id", bigtable::Value("a1")}}))); + + // At end of stream, we get an 'ok' response with an empty row. + EXPECT_THAT((*reader)->NextRow(), IsValidAndEquals(bigtable::QueryRow{})); +} + +} // namespace +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google diff --git a/google/cloud/bigtable/results.h b/google/cloud/bigtable/results.h index 3e2af0bde80ea..6f4d2b16e61ed 100644 --- a/google/cloud/bigtable/results.h +++ b/google/cloud/bigtable/results.h @@ -46,7 +46,10 @@ class ResultSourceInterface { virtual StatusOr NextRow() = 0; /** - * Returns metadata about the result set. + * Returns metadata about the result set, such as the column names and types + * + * @see https://github.com/googleapis/googleapis/blob/master/google/bigtable/v2/data.proto + * for more information. */ virtual absl::optional Metadata() = 0; @@ -73,7 +76,7 @@ class RowStream { } /// Returns a `RowStreamIterator` defining the end of this range. - RowStreamIterator end() { return {}; } + static RowStreamIterator end() { return {}; } private: std::unique_ptr source_; diff --git a/google/cloud/bigtable/testing/mock_partial_result_set_reader.h b/google/cloud/bigtable/testing/mock_partial_result_set_reader.h new file mode 100644 index 0000000000000..0751a3d2d540d --- /dev/null +++ b/google/cloud/bigtable/testing/mock_partial_result_set_reader.h @@ -0,0 +1,43 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_TESTING_MOCK_PARTIAL_RESULT_SET_READER_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_TESTING_MOCK_PARTIAL_RESULT_SET_READER_H + +#include "google/cloud/bigtable/internal/partial_result_set_reader.h" +#include "google/cloud/bigtable/version.h" +#include + +namespace google { +namespace cloud { +namespace bigtable_testing { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +class MockPartialResultSetReader + : public bigtable_internal::PartialResultSetReader { + public: + MOCK_METHOD(void, TryCancel, (), (override)); + MOCK_METHOD(bool, Read, + (absl::optional const& resume_token, + bigtable_internal::UnownedPartialResultSet& result), + (override)); + MOCK_METHOD(Status, Finish, (), (override)); +}; + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_testing +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_TESTING_MOCK_PARTIAL_RESULT_SET_READER_H