diff --git a/packages/pubsub/src/connection-pool.js b/packages/pubsub/src/connection-pool.js index 4899f953054..8684544b257 100644 --- a/packages/pubsub/src/connection-pool.js +++ b/packages/pubsub/src/connection-pool.js @@ -56,6 +56,8 @@ var RETRY_CODES = [ */ function ConnectionPool(subscription) { this.subscription = subscription; + this.projectId = subscription.projectId; + this.connections = new Map(); this.isPaused = false; @@ -230,7 +232,8 @@ ConnectionPool.prototype.createConnection = function() { } connection.write({ - subscription: self.subscription.name, + subscription: common.util.replaceProjectIdToken( + self.subscription.name, self.projectId), streamAckDeadlineSeconds: self.settings.ackDeadline / 1000 }); @@ -302,6 +305,10 @@ ConnectionPool.prototype.getClient = function(callback) { grpc.credentials.createFromGoogleCredential(authClient) ); + if (!self.projectId || self.projectId === '{{projectId}}') { + self.projectId = pubsub.auth.projectId; + } + var Subscriber = v1(pubsub.options).Subscriber; self.client = new Subscriber(v1.SERVICE_ADDRESS, credentials, { diff --git a/packages/pubsub/system-test/pubsub.js b/packages/pubsub/system-test/pubsub.js index 895eb6d6443..fdc3a59c4b9 100644 --- a/packages/pubsub/system-test/pubsub.js +++ b/packages/pubsub/system-test/pubsub.js @@ -535,6 +535,7 @@ describe('pubsub', function() { return deleteAllSnapshots() .then(wait(2500)) .then(subscription.create.bind(subscription)) + .then(wait(2500)) .then(snapshot.create.bind(snapshot)) .then(wait(2500)); }); diff --git a/packages/pubsub/test/connection-pool.js b/packages/pubsub/test/connection-pool.js index e610035c22b..b2ff7297c03 100644 --- a/packages/pubsub/test/connection-pool.js +++ b/packages/pubsub/test/connection-pool.js @@ -66,9 +66,12 @@ describe('ConnectionPool', function() { var pool; var FAKE_PUBSUB_OPTIONS = {}; + var PROJECT_ID = 'grapce-spacheship-123'; var PUBSUB = { + projectId: PROJECT_ID, auth: { + projectId: PROJECT_ID, getAuthClient: fakeUtil.noop }, options: FAKE_PUBSUB_OPTIONS @@ -76,6 +79,7 @@ describe('ConnectionPool', function() { var SUB_NAME = 'test-subscription'; var SUBSCRIPTION = { + projectId: PROJECT_ID, name: SUB_NAME, pubsub: PUBSUB, request: fakeUtil.noop @@ -344,6 +348,7 @@ describe('ConnectionPool', function() { }); describe('connection', function() { + var TOKENIZED_SUB_NAME = 'project/p/subscriptions/' + SUB_NAME; var fakeId; beforeEach(function() { @@ -352,12 +357,20 @@ describe('ConnectionPool', function() { fakeUuid.v4 = function() { return fakeId; }; + + fakeUtil.replaceProjectIdToken = common.util.replaceProjectIdToken; }); it('should create a connection', function(done) { + fakeUtil.replaceProjectIdToken = function(subName, projectId) { + assert.strictEqual(subName, SUB_NAME); + assert.strictEqual(projectId, PROJECT_ID); + return TOKENIZED_SUB_NAME; + }; + fakeConnection.write = function(reqOpts) { assert.deepEqual(reqOpts, { - subscription: SUB_NAME, + subscription: TOKENIZED_SUB_NAME, streamAckDeadlineSeconds: pool.settings.ackDeadline / 1000 }); }; @@ -662,6 +675,7 @@ describe('ConnectionPool', function() { }); describe('getClient', function() { + var AUTH_PROJECT_ID = 'auth-project-id-123'; var fakeAuthClient = {}; function FakeSubscriber(address, creds, options) { @@ -671,6 +685,7 @@ describe('ConnectionPool', function() { } beforeEach(function() { + PUBSUB.auth.projectId = AUTH_PROJECT_ID; PUBSUB.auth.getAuthClient = function(callback) { callback(null, fakeAuthClient); }; @@ -734,6 +749,26 @@ describe('ConnectionPool', function() { }); }); + it('should capture the projectId when falsey', function(done) { + delete pool.projectId; + + pool.getClient(function(err) { + assert.ifError(err); + assert.strictEqual(pool.projectId, AUTH_PROJECT_ID); + done(); + }); + }); + + it('should capture the projectId if it needs tokenization', function(done) { + pool.projectId = '{{projectId}}'; + + pool.getClient(function(err) { + assert.ifError(err); + assert.strictEqual(pool.projectId, AUTH_PROJECT_ID); + done(); + }); + }); + it('should pass the pubsub options into the gax fn', function(done) { v1Override = function(options) { assert.strictEqual(options, FAKE_PUBSUB_OPTIONS);