Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ci/cloudbuild/builds/lib/integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ function integration::bazel_args() {
"--test_env=GOOGLE_CLOUD_CPP_BIGTABLE_TEST_SERVICE_ACCOUNT=${GOOGLE_CLOUD_CPP_BIGTABLE_TEST_SERVICE_ACCOUNT}"
"--test_env=ENABLE_BIGTABLE_ADMIN_INTEGRATION_TESTS=${ENABLE_BIGTABLE_ADMIN_INTEGRATION_TESTS:-no}"

# Pubsub
"--test_env=GOOGLE_CLOUD_CPP_PUBSUB_TEST_IMPERSONATED_SERVICE_ACCOUNT=${GOOGLE_CLOUD_CPP_PUBSUB_TEST_IMPERSONATED_SERVICE_ACCOUNT}"

# Rest
"--test_env=GOOGLE_CLOUD_CPP_REST_TEST_SIGNING_SERVICE_ACCOUNT=${GOOGLE_CLOUD_CPP_REST_TEST_SIGNING_SERVICE_ACCOUNT}"

Expand Down
1 change: 1 addition & 0 deletions ci/etc/integration-tests-config.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ $env:GOOGLE_CLOUD_CPP_SPANNER_TEST_QUICKSTART_DATABASE="quickstart-db"

# Cloud Pub/Sub configuration parameters
$env:GOOGLE_CLOUD_CPP_PUBSUB_TEST_QUICKSTART_TOPIC="quickstart"
$env:GOOGLE_CLOUD_CPP_PUBSUB_TEST_IMPERSONATED_SERVICE_ACCOUNT="pubsub-impersonate-test-sa@${env:GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com"

# Cloud Batch configuration parameters
$env:GOOGLE_CLOUD_CPP_BATCH_TEST_TEMPLATE_NAME="cloud-batch-sample-template"
Expand Down
1 change: 1 addition & 0 deletions ci/etc/integration-tests-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ export GOOGLE_CLOUD_CPP_SPANNER_TEST_QUICKSTART_DATABASE="quickstart-db"

# Cloud Pub/Sub configuration parameters
export GOOGLE_CLOUD_CPP_PUBSUB_TEST_QUICKSTART_TOPIC="quickstart"
export GOOGLE_CLOUD_CPP_PUBSUB_TEST_IMPERSONATED_SERVICE_ACCOUNT="pubsub-impersonate-test-sa@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com"

# Cloud Batch configuration parameters
# Created using:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,22 @@ TEST_F(SubscriberIntegrationTest, PublishPullAck) {
ASSERT_NO_FATAL_FAILURE(TestRoundtrip(publisher, subscriber));
}

TEST_F(SubscriberIntegrationTest, PublishPullAckWithImpersonatedCredentials) {
std::string iam_service_account =
google::cloud::internal::GetEnv(
"GOOGLE_CLOUD_CPP_PUBSUB_TEST_IMPERSONATED_SERVICE_ACCOUNT")
.value_or("");
ASSERT_FALSE(iam_service_account.empty());
auto google_default_credentials = MakeGoogleDefaultCredentials();
auto options = Options{}.set<UnifiedCredentialsOption>(
MakeImpersonateServiceAccountCredentials(google_default_credentials,
iam_service_account));
auto publisher = Publisher(MakePublisherConnection(topic_, options));
auto subscriber =
Subscriber(MakeSubscriberConnection(subscription_, options));
ASSERT_NO_FATAL_FAILURE(TestRoundtrip(publisher, subscriber));
}

TEST_F(SubscriberIntegrationTest, FireAndForget) {
std::mutex mu;
std::condition_variable cv;
Expand Down
5 changes: 3 additions & 2 deletions google/cloud/pubsub/internal/subscriber_connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ namespace pubsub_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

SubscriberConnectionImpl::SubscriberConnectionImpl(
Options opts, std::shared_ptr<pubsub_internal::SubscriberStub> stub)
Options opts, std::shared_ptr<pubsub_internal::SubscriberStub> stub,
std::shared_ptr<BackgroundThreads> background)
: opts_(std::move(opts)),
stub_(std::move(stub)),
background_(internal::MakeBackgroundThreadsFactory(opts_)()),
background_(std::move(background)),
generator_(internal::MakeDefaultPRNG()) {}

SubscriberConnectionImpl::~SubscriberConnectionImpl() = default;
Expand Down
3 changes: 2 additions & 1 deletion google/cloud/pubsub/internal/subscriber_connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
class SubscriberConnectionImpl : public pubsub::SubscriberConnection {
public:
explicit SubscriberConnectionImpl(
Options opts, std::shared_ptr<pubsub_internal::SubscriberStub> stub);
Options opts, std::shared_ptr<pubsub_internal::SubscriberStub> stub,
std::shared_ptr<BackgroundThreads> background);

~SubscriberConnectionImpl() override;

Expand Down
55 changes: 38 additions & 17 deletions google/cloud/pubsub/internal/subscriber_connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,14 @@ using ::testing::Contains;
using ::testing::HasSubstr;
using ::testing::Pair;
using ::testing::Property;
using ::testing::Return;
using ::testing::StartsWith;

class MockBackgroundThreads : public BackgroundThreads {
public:
MOCK_METHOD(CompletionQueue, cq, (), (override, const));
};

Options MakeTestOptions(Subscription subscription, Options opts = {}) {
opts.set<pubsub::SubscriptionOption>(std::move(subscription));
opts.set<UnifiedCredentialsOption>(MakeInsecureCredentials());
Expand Down Expand Up @@ -163,8 +169,10 @@ TEST(SubscriberConnectionTest, Subscribe) {
.WillRepeatedly(FakeAsyncStreamingPull);

CompletionQueue cq;
auto mock_background_threads = std::make_shared<MockBackgroundThreads>();
ON_CALL(*mock_background_threads, cq()).WillByDefault(Return(cq));
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
MakeTestOptions(subscription, cq), mock);
MakeTestOptions(subscription, cq), mock, mock_background_threads);
std::atomic_flag received_one{false};
promise<void> waiter;
auto handler = [&](Message const& m, AckHandler h) {
Expand Down Expand Up @@ -206,8 +214,10 @@ TEST(SubscriberConnectionTest, SubscribeOverrideSubscription) {
.WillRepeatedly(MakeAsyncStreamingPullMock(s2.FullName()));

CompletionQueue cq;
auto subscriber =
std::make_shared<SubscriberConnectionImpl>(MakeTestOptions(s1, cq), mock);
auto mock_background_threads = std::make_shared<MockBackgroundThreads>();
ON_CALL(*mock_background_threads, cq()).WillByDefault(Return(cq));
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
MakeTestOptions(s1, cq), mock, mock_background_threads);
std::atomic_flag received_one{false};
promise<void> waiter;
auto handler = [&](Message const& m, AckHandler h) {
Expand Down Expand Up @@ -250,8 +260,10 @@ TEST(SubscriberConnectionTest, ExactlyOnce) {
.WillRepeatedly(FakeAsyncStreamingPull);

CompletionQueue cq;
auto mock_background_threads = std::make_shared<MockBackgroundThreads>();
ON_CALL(*mock_background_threads, cq()).WillByDefault(Return(cq));
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
MakeTestOptions(subscription, cq), mock);
MakeTestOptions(subscription, cq), mock, mock_background_threads);
std::atomic_flag received_one{false};
promise<void> waiter;
auto callback = [&](Message const& m, ExactlyOnceAckHandler h) {
Expand Down Expand Up @@ -294,8 +306,10 @@ TEST(SubscriberConnectionTest, ExactlyOnceOverrideSubscription) {
.WillRepeatedly(MakeAsyncStreamingPullMock(s2.FullName()));

CompletionQueue cq;
auto subscriber =
std::make_shared<SubscriberConnectionImpl>(MakeTestOptions(s1, cq), mock);
auto mock_background_threads = std::make_shared<MockBackgroundThreads>();
ON_CALL(*mock_background_threads, cq()).WillByDefault(Return(cq));
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
MakeTestOptions(s1, cq), mock, mock_background_threads);
std::atomic_flag received_one{false};
promise<void> waiter;
auto handler = [&](Message const& m, AckHandler h) {
Expand Down Expand Up @@ -359,7 +373,8 @@ TEST(SubscriberConnectionTest, StreamingPullFailure) {
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
pubsub_testing::MakeTestOptions(
Options{}.set<pubsub::SubscriptionOption>(subscription)),
mock);
mock,
std::make_shared<internal::AutomaticallyCreatedBackgroundThreads>());
auto handler = [&](Message const&, AckHandler const&) {};
google::cloud::internal::OptionsSpan span(subscriber->options());
auto response = subscriber->Subscribe({handler});
Expand Down Expand Up @@ -401,8 +416,10 @@ TEST(SubscriberConnectionTest, Pull) {
CompletionQueue cq;
std::thread t([&cq] { cq.Run(); });

auto mock_background_threads = std::make_shared<MockBackgroundThreads>();
ON_CALL(*mock_background_threads, cq()).WillByDefault(Return(cq));
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
MakeTestOptions(subscription, cq), mock);
MakeTestOptions(subscription, cq), mock, mock_background_threads);
google::cloud::internal::OptionsSpan span(subscriber->options());
auto response = subscriber->Pull();
ASSERT_STATUS_OK(response);
Expand Down Expand Up @@ -440,7 +457,8 @@ TEST(SubscriberConnectionTest, PullReturnsNoMessage) {

CompletionQueue cq;
std::thread t([&cq] { cq.Run(); });

auto mock_background_threads = std::make_shared<MockBackgroundThreads>();
ON_CALL(*mock_background_threads, cq()).WillByDefault(Return(cq));
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
MakeTestOptions(
subscription,
Expand All @@ -450,7 +468,7 @@ TEST(SubscriberConnectionTest, PullReturnsNoMessage) {
google::cloud::pubsub::LimitedErrorCountRetryPolicy(
kNumRetries)
.clone())),
mock);
mock, mock_background_threads);
google::cloud::internal::OptionsSpan span(subscriber->options());
auto response = subscriber->Pull();
EXPECT_THAT(response, StatusIs(StatusCode::kUnavailable,
Expand Down Expand Up @@ -496,9 +514,10 @@ TEST(SubscriberConnectionTest, PullOverrideSubscription) {

CompletionQueue cq;
std::thread t([&cq] { cq.Run(); });

auto subscriber =
std::make_shared<SubscriberConnectionImpl>(MakeTestOptions(s1, cq), mock);
auto mock_background_threads = std::make_shared<MockBackgroundThreads>();
ON_CALL(*mock_background_threads, cq()).WillByDefault(Return(cq));
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
MakeTestOptions(s1, cq), mock, mock_background_threads);
google::cloud::internal::OptionsSpan span(
subscriber->options().set<pubsub::SubscriptionOption>(s2));
auto response = subscriber->Pull();
Expand All @@ -523,9 +542,10 @@ TEST(SubscriberConnectionTest, PullPermanentFailure) {

CompletionQueue cq;
std::thread t([&cq] { cq.Run(); });

auto mock_background_threads = std::make_shared<MockBackgroundThreads>();
ON_CALL(*mock_background_threads, cq()).WillByDefault(Return(cq));
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
MakeTestOptions(subscription, cq), mock);
MakeTestOptions(subscription, cq), mock, mock_background_threads);
google::cloud::internal::OptionsSpan span(subscriber->options());
auto response = subscriber->Pull();
EXPECT_THAT(response,
Expand All @@ -550,9 +570,10 @@ TEST(SubscriberConnectionTest, PullTooManyTransientFailures) {

CompletionQueue cq;
std::thread t([&cq] { cq.Run(); });

auto mock_background_threads = std::make_shared<MockBackgroundThreads>();
ON_CALL(*mock_background_threads, cq()).WillByDefault(Return(cq));
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
MakeTestOptions(subscription, cq), mock);
MakeTestOptions(subscription, cq), mock, mock_background_threads);
google::cloud::internal::OptionsSpan span(subscriber->options());
auto response = subscriber->Pull();
EXPECT_THAT(response,
Expand Down
12 changes: 7 additions & 5 deletions google/cloud/pubsub/subscriber_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace {

std::shared_ptr<pubsub::SubscriberConnection> ConnectionFromDecoratedStub(
std::shared_ptr<pubsub_internal::SubscriberStub> stub,
Options const& opts) {
std::shared_ptr<pubsub_internal::SubscriberStub> stub, Options const& opts,
std::shared_ptr<BackgroundThreads> background) {
auto tracing_enabled = google::cloud::internal::TracingEnabled(opts);
std::shared_ptr<SubscriberConnection> connection =
std::make_shared<pubsub_internal::SubscriberConnectionImpl>(
opts, std::move(stub));
opts, std::move(stub), std::move(background));
if (tracing_enabled) {
connection =
pubsub_internal::MakeSubscriberTracingConnection(std::move(connection));
Expand Down Expand Up @@ -92,7 +92,8 @@ std::shared_ptr<SubscriberConnection> MakeSubscriberConnection(
auto background = internal::MakeBackgroundThreadsFactory(opts)();
auto stub =
pubsub_internal::MakeRoundRobinSubscriberStub(background->cq(), opts);
return ConnectionFromDecoratedStub(std::move(stub), std::move(opts));
return ConnectionFromDecoratedStub(std::move(stub), std::move(opts),
std::move(background));
}

std::shared_ptr<SubscriberConnection> MakeSubscriberConnection(
Expand Down Expand Up @@ -126,7 +127,8 @@ std::shared_ptr<pubsub::SubscriberConnection> MakeTestSubscriberConnection(
auto stub = pubsub_internal::MakeTestSubscriberStub(background->cq(), opts,
std::move(stubs));
opts.set<pubsub::SubscriptionOption>(std::move(subscription));
return pubsub::ConnectionFromDecoratedStub(std::move(stub), std::move(opts));
return pubsub::ConnectionFromDecoratedStub(std::move(stub), std::move(opts),
std::move(background));
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down