diff --git a/packages/pubsub/package.json b/packages/pubsub/package.json index e3807d5fd04..f983af61fa8 100644 --- a/packages/pubsub/package.json +++ b/packages/pubsub/package.json @@ -51,7 +51,7 @@ ], "dependencies": { "@google-cloud/common": "^0.13.0", - "@google-cloud/common-grpc": "^0.3.0", + "@google-cloud/common-grpc": "^0.3.1", "arrify": "^1.0.0", "extend": "^3.0.0", "google-gax": "^0.13.0", diff --git a/packages/pubsub/src/index.js b/packages/pubsub/src/index.js index 85db7e0e4f4..9c51a7af586 100644 --- a/packages/pubsub/src/index.js +++ b/packages/pubsub/src/index.js @@ -27,6 +27,12 @@ var extend = require('extend'); var is = require('is'); var util = require('util'); +/** + * @type {module:pubsub/snapshot} + * @private + */ +var Snapshot = require('./snapshot.js'); + /** * @type {module:pubsub/subscription} * @private @@ -136,6 +142,120 @@ PubSub.prototype.createTopic = function(name, callback) { }); }; +/** + * Get a list of snapshots. + * + * @param {object=} options - Configuration object. + * @param {boolean} options.autoPaginate - Have pagination handled + * automatically. Default: true. + * @param {number} options.maxApiCalls - Maximum number of API calls to make. + * @param {number} options.maxResults - Maximum number of results to return. + * @param {number} options.pageSize - Maximum number of results to return. + * @param {string} options.pageToken - Page token. + * @param {function} callback - The callback function. + * @param {?error} callback.err - An error from the API call, may be null. + * @param {module:pubsub/snapshot[]} callback.snapshots - The list of snapshots + * in your project. + * + * @example + * pubsub.getSnapshots(function(err, snapshots) { + * if (!err) { + * // snapshots is an array of Snapshot objects. + * } + * }); + * + * //- + * // To control how many API requests are made and page through the results + * // manually, set `autoPaginate` to `false`. + * //- + * var callback = function(err, snapshots, nextQuery, apiResponse) { + * if (nextQuery) { + * // More results exist. + * pubsub.getSnapshots(nextQuery, callback); + * } + * }; + * + * pubsub.getSnapshots({ + * autoPaginate: false + * }, callback); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * pubsub.getSnapshots().then(function(data) { + * var snapshots = data[0]; + * }); + */ +PubSub.prototype.getSnapshots = function(options, callback) { + var self = this; + + if (is.fn(options)) { + callback = options; + options = {}; + } + + var protoOpts = { + service: 'Subscriber', + method: 'listSnapshots' + }; + + var reqOpts = extend({}, options); + + reqOpts.project = 'projects/' + this.projectId; + + this.request(protoOpts, reqOpts, function(err, resp) { + if (err) { + callback(err, null, null, resp); + return; + } + + var snapshots = arrify(resp.snapshots).map(function(snapshot) { + var snapshotInstance = self.snapshot(snapshot.name); + snapshotInstance.metadata = snapshot; + return snapshotInstance; + }); + + var nextQuery = null; + + if (resp.nextPageToken) { + nextQuery = options; + nextQuery.pageToken = resp.nextPageToken; + } + + callback(null, snapshots, nextQuery, resp); + }); +}; + +/** + * Get a list of the {module:pubsub/snapshot} objects as a readable object + * stream. + * + * @param {object=} options - Configuration object. See + * {module:pubsub#getSnapshots} for a complete list of options. + * @return {stream} + * + * @example + * pubsub.getSnapshotsStream() + * .on('error', console.error) + * .on('data', function(snapshot) { + * // snapshot is a Snapshot object. + * }) + * .on('end', function() { + * // All snapshots retrieved. + * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * pubsub.getSnapshotsStream() + * .on('data', function(snapshot) { + * this.end(); + * }); + */ +PubSub.prototype.getSnapshotsStream = + common.paginator.streamify('getSnapshots'); + /** * Get a list of the subscriptions registered to all of your project's topics. * You may optionally provide a query object as the first argument to customize @@ -277,7 +397,7 @@ PubSub.prototype.getSubscriptions = function(options, callback) { * // unnecessary processing and API requests. * //- * pubsub.getSubscriptionsStream() - * .on('data', function(topic) { + * .on('data', function(subscription) { * this.end(); * }); */ @@ -540,6 +660,26 @@ PubSub.prototype.subscribe = function(topic, subName, options, callback) { }); }; +/** + * Create a Snapshot object. See {module:pubsub/subscription#createSnapshot} to + * create a snapshot. + * + * @throws {Error} If a name is not provided. + * + * @param {string} name - The name of the snapshot. + * @return {module:pubsub/snapshot} + * + * @example + * var snapshot = pubsub.snapshot('my-snapshot'); + */ +PubSub.prototype.snapshot = function(name) { + if (!is.string(name)) { + throw new Error('You must supply a valid name for the snapshot.'); + } + + return new Snapshot(this, name); +}; + /** * Create a Subscription object. This command by itself will not run any API * requests. You will receive a {module:pubsub/subscription} object, @@ -589,7 +729,7 @@ PubSub.prototype.subscription = function(name, options) { }; /** - * Create a Topic object. See {module:pubsub/createTopic} to create a topic. + * Create a Topic object. See {module:pubsub#createTopic} to create a topic. * * @throws {Error} If a name is not provided. * @@ -637,7 +777,11 @@ PubSub.prototype.determineBaseUrl_ = function() { * * These methods can be auto-paginated. */ -common.paginator.extend(PubSub, ['getSubscriptions', 'getTopics']); +common.paginator.extend(PubSub, [ + 'getSnapshots', + 'getSubscriptions', + 'getTopics' +]); /*! Developer Documentation * @@ -645,7 +789,7 @@ common.paginator.extend(PubSub, ['getSubscriptions', 'getTopics']); * that a callback is omitted. */ common.util.promisifyAll(PubSub, { - exclude: ['subscription', 'topic'] + exclude: ['snapshot', 'subscription', 'topic'] }); PubSub.Subscription = Subscription; diff --git a/packages/pubsub/src/snapshot.js b/packages/pubsub/src/snapshot.js new file mode 100644 index 00000000000..4a578191e49 --- /dev/null +++ b/packages/pubsub/src/snapshot.js @@ -0,0 +1,221 @@ +/*! + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*! + * @module pubsub/snapshot + */ + +'use strict'; + +var commonGrpc = require('@google-cloud/common-grpc'); +var is = require('is'); +var util = require('util'); + +/** + * A Snapshot object will give you access to your Cloud Pub/Sub snapshot. + * + * Snapshots are sometimes retrieved when using various methods: + * + * - {module:pubsub#getSnapshots} + * - {module:pubsub#getSnapshotsStream} + * - {module:pubsub#snapshot} + * + * Snapshots may be created with: + * + * - {module:pubsub/subscription#createSnapshot} + * + * You can use snapshots to seek a subscription to a specific point in time. + * + * - {module:pubsub/subscription#seek} + * + * @alias module:pubsub/snapshot + * @constructor + * + * @example + * //- + * // From {module:pubsub#getSnapshots}: + * //- + * pubsub.getSnapshots(function(err, snapshots) { + * // `snapshots` is an array of Snapshot objects. + * }); + * + * //- + * // From {module:pubsub#getSnapshotsStream}: + * //- + * pubsub.getSnapshotsStream() + * .on('error', console.error) + * .on('data', function(snapshot) { + * // `snapshot` is a Snapshot object. + * }); + * + * //- + * // From {module:pubsub#snapshot}: + * //- + * var snapshot = pubsub.snapshot('my-snapshot'); + * // snapshot is a Snapshot object. + * + * //- + * // Create a snapshot with {module:pubsub/subscription#createSnapshot}: + * //- + * var subscription = pubsub.subscription('my-subscription'); + * + * subscription.createSnapshot('my-snapshot', function(err, snapshot) { + * if (!err) { + * // `snapshot` is a Snapshot object. + * } + * }); + * + * //- + * // Seek to your snapshot: + * //- + * var subscription = pubsub.subscription('my-subscription'); + * + * subscription.seek('my-snapshot', function(err) { + * if (err) { + * // Error handling omitted. + * } + * }); + */ +function Snapshot(parent, name) { + var projectId = parent.projectId; + + if (!projectId && parent.parent) { + projectId = parent.parent.projectId; + } + + this.name = Snapshot.formatName_(projectId, name); + + var methods = { + /** + * Delete the snapshot. + * + * @param {function=} callback - The callback function. + * @param {?error} callback.err - An error returned while making this + * request. + * @param {object} callback.apiResponse - The full API response from the + * service. + * + * @example + * snapshot.delete(function(err, apiResponse) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * snapshot.delete().then(function(data) { + * var apiResponse = data[0]; + * }); + */ + delete: { + protoOpts: { + service: 'Subscriber', + method: 'deleteSnapshot' + }, + reqOpts: { + snapshot: this.name + } + } + }; + + var config = { + parent: parent, + id: this.name, + methods: methods + }; + + var isSubscription = is.fn(parent.createSnapshot); + + if (isSubscription) { + config.createMethod = parent.createSnapshot.bind(parent); + + /** + * Create a snapshot with the given name. + * + * **This is only available if you accessed this object through + * {module:pubsub/subscription#snapshot}.** + * + * @param {string} name - Name of the snapshot. + * @param {function=} callback - The callback function. + * @param {?error} callback.err - An error from the API call, may be null. + * @param {module:pubsub/snapshot} callback.snapshot - The newly created + * snapshot. + * @param {object} callback.apiResponse - The full API response from the + * service. + * + * @example + * var subscription = pubsub.subscription('my-subscription'); + * var snapshot = subscription.snapshot('my-snapshot'); + * + * var callback = function(err, snapshot, apiResponse) { + * if (!err) { + * // The snapshot was created successfully. + * } + * }; + * + * snapshot.create('my-snapshot', callback); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * snapshot.create('my-snapshot').then(function(data) { + * var snapshot = data[0]; + * var apiResponse = data[1]; + * }); + */ + methods.create = true; + + /** + * Seeks an existing subscription to the snapshot. + * + * **This is only available if you accessed this object through + * {module:pubsub/subscription#snapshot}.** + * + * @param {function} callback - The callback function. + * @param {?error} callback.err - An error from the API call, may be null. + * @param {object} callback.apiResponse - The full API response from the + * service. + * + * @example + * var subscription = pubsub.subscription('my-subscription'); + * var snapshot = subscription.snapshot('my-snapshot'); + * + * snapshot.seek(function(err, apiResponse) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * snapshot.seek().then(function(data) { + * var apiResponse = data[0]; + * }); + */ + this.seek = parent.seek.bind(parent, name); + } + + commonGrpc.ServiceObject.call(this, config); +} + +util.inherits(Snapshot, commonGrpc.ServiceObject); + +/** + * Format the name of a snapshot. A snapshot's full name is in the format of + * projects/{projectId}/snapshots/{snapshotName} + * + * @private + */ +Snapshot.formatName_ = function(projectId, name) { + return 'projects/' + projectId + '/snapshots/' + name.split('/').pop(); +}; + +module.exports = Snapshot; diff --git a/packages/pubsub/src/subscription.js b/packages/pubsub/src/subscription.js index 7d8e40b3ee7..69b564b726a 100644 --- a/packages/pubsub/src/subscription.js +++ b/packages/pubsub/src/subscription.js @@ -35,6 +35,12 @@ var uuid = require('uuid'); */ var IAM = require('./iam.js'); +/** + * @type {module:pubsub/snapshot} + * @private + */ +var Snapshot = require('./snapshot.js'); + /** * @const {number} - The amount of time a subscription pull HTTP connection to * Pub/Sub stays open. @@ -472,6 +478,64 @@ Subscription.prototype.ack = function(ackIds, options, callback) { }); }; +/** + * Create a snapshot with the given name. + * + * @param {string} name - Name of the snapshot. + * @param {function=} callback - The callback function. + * @param {?error} callback.err - An error from the API call, may be null. + * @param {module:pubsub/snapshot} callback.snapshot - The newly created + * snapshot. + * @param {object} callback.apiResponse - The full API response from the + * service. + * + * @example + * var callback = function(err, snapshot, apiResponse) { + * if (!err) { + * // The snapshot was created successfully. + * } + * }; + * + * subscription.createSnapshot('my-snapshot', callback); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * subscription.createSnapshot('my-snapshot').then(function(data) { + * var snapshot = data[0]; + * var apiResponse = data[1]; + * }); + */ +Subscription.prototype.createSnapshot = function(name, callback) { + var self = this; + + if (!is.string(name)) { + throw new Error('A name is required to create a snapshot.'); + } + + var protoOpts = { + service: 'Subscriber', + method: 'createSnapshot' + }; + + var reqOpts = { + name: Snapshot.formatName_(this.parent.projectId, name), + subscription: this.name + }; + + this.request(protoOpts, reqOpts, function(err, resp) { + if (err) { + callback(err, null, resp); + return; + } + + var snapshot = self.snapshot(name); + snapshot.metadata = resp; + + callback(null, snapshot, resp); + }); +}; + /** * Add functionality on top of a message returned from the API, including the * ability to `ack` and `skip` the message. @@ -667,6 +731,57 @@ Subscription.prototype.pull = function(options, callback) { }); }; +/** + * Seeks an existing subscription to a point in time or a given snapshot. + * + * @param {string|date} snapshot - The point to seek to. This will accept the + * name of the snapshot or a Date object. + * @param {function} callback - The callback function. + * @param {?error} callback.err - An error from the API call, may be null. + * @param {object} callback.apiResponse - The full API response from the + * service. + * + * @example + * var callback = function(err, resp) { + * if (!err) { + * // Seek was successful. + * } + * }; + * + * subscription.seek('my-snapshot', callback); + * + * //- + * // Alternatively, to specify a certain point in time, you can provide a Date + * // object. + * //- + * var date = new Date('October 21 2015'); + * + * subscription.seek(date, callback); + */ +Subscription.prototype.seek = function(snapshot, callback) { + var protoOpts = { + service: 'Subscriber', + method: 'seek' + }; + + var reqOpts = { + subscription: this.name + }; + + if (is.string(snapshot)) { + reqOpts.snapshot = Snapshot.formatName_(this.parent.projectId, snapshot); + } else if (is.date(snapshot)) { + reqOpts.time = { + seconds: Math.floor(snapshot.getTime() / 1000), + nanos: snapshot.getMilliseconds() * 1e6 + }; + } else { + throw new Error('Either a snapshot name or Date is needed to seek to.'); + } + + this.request(protoOpts, reqOpts, callback); +}; + /** * Modify the ack deadline for a specific message. This method is useful to * indicate that more time is needed to process a message by the subscriber, or @@ -715,6 +830,22 @@ Subscription.prototype.setAckDeadline = function(options, callback) { }); }; +/** + * Create a Snapshot object. See {module:pubsub/subscription#createSnapshot} to + * create a snapshot. + * + * @throws {Error} If a name is not provided. + * + * @param {string} name - The name of the snapshot. + * @return {module:pubsub/snapshot} + * + * @example + * var snapshot = subscription.snapshot('my-snapshot'); + */ +Subscription.prototype.snapshot = function(name) { + return this.parent.snapshot.call(this, name); +}; + /** * Begin listening for events on the subscription. This method keeps track of * how many message listeners are assigned, and then removed, making sure @@ -827,6 +958,8 @@ Subscription.prototype.startPulling_ = function() { * All async methods (except for streams) will return a Promise in the event * that a callback is omitted. */ -common.util.promisifyAll(Subscription); +common.util.promisifyAll(Subscription, { + exclude: ['snapshot'] +}); module.exports = Subscription; diff --git a/packages/pubsub/system-test/pubsub.js b/packages/pubsub/system-test/pubsub.js index 6bcaf82f227..ddc6ccdaa74 100644 --- a/packages/pubsub/system-test/pubsub.js +++ b/packages/pubsub/system-test/pubsub.js @@ -43,6 +43,10 @@ describe('pubsub', function() { TOPICS[2].name ]; + function generateSnapshotName() { + return 'test-snapshot-' + uuid.v4(); + } + function generateSubName() { return 'test-subscription-' + uuid.v4(); } @@ -471,4 +475,100 @@ describe('pubsub', function() { }); }); }); + + describe('Snapshot', function() { + var SNAPSHOT_NAME = generateSnapshotName(); + + var topic; + var subscription; + var snapshot; + + before(function(done) { + topic = pubsub.topic(TOPIC_NAMES[0]); + subscription = topic.subscription(generateSubName()); + snapshot = subscription.snapshot(SNAPSHOT_NAME); + subscription.create(done); + }); + + after(function() { + return pubsub.getSnapshots().then(function(data) { + return Promise.all(data[0].map(function(snapshot) { + return snapshot.delete(); + })); + }); + }); + + it('should create a snapshot', function(done) { + snapshot.create(done); + }); + + it('should get a list of snapshots', function(done) { + pubsub.getSnapshots(function(err, snapshots) { + assert.ifError(err); + assert.strictEqual(snapshots.length, 1); + assert.strictEqual(snapshots[0].name.split('/').pop(), SNAPSHOT_NAME); + done(); + }); + }); + + it('should get a list of snapshots as a stream', function(done) { + var snapshots = []; + + pubsub.getSnapshotsStream() + .on('error', done) + .on('data', function(snapshot) { + snapshots.push(snapshot); + }) + .on('end', function() { + assert.strictEqual(snapshots.length, 1); + assert.strictEqual(snapshots[0].name.split('/').pop(), SNAPSHOT_NAME); + done(); + }); + }); + + describe('seeking', function() { + var subscription; + var messageId; + + beforeEach(function() { + subscription = topic.subscription(); + + return subscription.create().then(function() { + return topic.publish('Hello, world!'); + }).then(function(data) { + messageId = data[0][0]; + }); + }); + + function checkMessage() { + return subscription.pull().then(function(data) { + var message = data[0][0]; + assert.strictEqual(message.id, messageId); + return message.ack(); + }); + } + + it('should seek to a snapshot', function() { + var snapshotName = generateSnapshotName(); + + return subscription.createSnapshot(snapshotName).then(function() { + return checkMessage(); + }).then(function() { + return subscription.seek(snapshotName); + }).then(function() { + return checkMessage(); + }); + }); + + it('should seek to a date', function() { + var date = new Date(); + + return checkMessage().then(function() { + return subscription.seek(date); + }).then(function() { + return checkMessage(); + }); + }); + }); + }); }); diff --git a/packages/pubsub/test/index.js b/packages/pubsub/test/index.js index b6f822ebc28..c5ad67e6e8e 100644 --- a/packages/pubsub/test/index.js +++ b/packages/pubsub/test/index.js @@ -40,7 +40,7 @@ var fakeUtil = extend({}, util, { } promisified = true; - assert.deepEqual(options.exclude, ['subscription', 'topic']); + assert.deepEqual(options.exclude, ['snapshot', 'subscription', 'topic']); } }); @@ -48,6 +48,10 @@ function FakeGrpcService() { this.calledWith_ = arguments; } +function FakeSnapshot() { + this.calledWith_ = arguments; +} + var extended = false; var fakePaginator = { extend: function(Class, methods) { @@ -57,7 +61,12 @@ var fakePaginator = { methods = arrify(methods); assert.equal(Class.name, 'PubSub'); - assert.deepEqual(methods, ['getSubscriptions', 'getTopics']); + assert.deepEqual(methods, [ + 'getSnapshots', + 'getSubscriptions', + 'getTopics' + ]); + extended = true; }, streamify: function(methodName) { @@ -83,6 +92,7 @@ describe('PubSub', function() { '@google-cloud/common-grpc': { Service: FakeGrpcService }, + './snapshot.js': FakeSnapshot, './subscription.js': Subscription, './topic.js': Topic }); @@ -106,6 +116,7 @@ describe('PubSub', function() { }); it('should streamify the correct methods', function() { + assert.strictEqual(pubsub.getSnapshotsStream, 'getSnapshots'); assert.strictEqual(pubsub.getSubscriptionsStream, 'getSubscriptions'); assert.strictEqual(pubsub.getTopicsStream, 'getTopics'); }); @@ -247,6 +258,102 @@ describe('PubSub', function() { }); }); + describe('getSnapshots', function() { + var SNAPSHOT_NAME = 'fake-snapshot'; + var apiResponse = { snapshots: [{ name: SNAPSHOT_NAME }]}; + + beforeEach(function() { + pubsub.request = function(protoOpts, reqOpts, callback) { + callback(null, apiResponse); + }; + }); + + it('should accept a query and a callback', function(done) { + pubsub.getSnapshots({}, done); + }); + + it('should accept just a callback', function(done) { + pubsub.getSnapshots(done); + }); + + it('should build the right request', function(done) { + var options = { a: 'b', c: 'd' }; + var originalOptions = extend({}, options); + var expectedOptions = extend({}, options, { + project: 'projects/' + pubsub.projectId + }); + + pubsub.request = function(protoOpts, reqOpts) { + assert.strictEqual(protoOpts.service, 'Subscriber'); + assert.strictEqual(protoOpts.method, 'listSnapshots'); + assert.deepEqual(reqOpts, expectedOptions); + assert.deepEqual(options, originalOptions); + done(); + }; + + pubsub.getSnapshots(options, assert.ifError); + }); + + it('should return Snapshot instances with metadata', function(done) { + var snapshot = {}; + + pubsub.snapshot = function(name) { + assert.strictEqual(name, SNAPSHOT_NAME); + return snapshot; + }; + + pubsub.getSnapshots(function(err, snapshots) { + assert.ifError(err); + assert.strictEqual(snapshots[0], snapshot); + assert.strictEqual(snapshots[0].metadata, apiResponse.snapshots[0]); + done(); + }); + }); + + it('should return a query if more results exist', function() { + var token = 'next-page-token'; + + pubsub.request = function(protoOpts, reqOpts, callback) { + callback(null, { nextPageToken: token }); + }; + + var query = { pageSize: 1 }; + + pubsub.getSnapshots(query, function(err, snapshots, nextQuery) { + assert.ifError(err); + assert.strictEqual(query.pageSize, nextQuery.pageSize); + assert.equal(query.pageToken, token); + }); + }); + + it('should pass error if api returns an error', function(done) { + var error = new Error('Error'); + + pubsub.request = function(protoOpts, reqOpts, callback) { + callback(error); + }; + + pubsub.getSnapshots(function(err) { + assert.equal(err, error); + done(); + }); + }); + + it('should pass apiResponse to callback', function(done) { + var resp = { success: true }; + + pubsub.request = function(protoOpts, reqOpts, callback) { + callback(null, resp); + }; + + pubsub.getSnapshots(function(err, snapshots, nextQuery, apiResponse) { + assert.ifError(err); + assert.equal(resp, apiResponse); + done(); + }); + }); + }); + describe('getSubscriptions', function() { beforeEach(function() { pubsub.request = function(protoOpts, reqOpts, callback) { @@ -699,6 +806,24 @@ describe('PubSub', function() { }); }); + describe('snapshot', function() { + it('should throw if a name is not provided', function() { + assert.throws(function() { + pubsub.snapshot(); + }, /You must supply a valid name for the snapshot\./); + }); + + it('should return a Snapshot object', function() { + var SNAPSHOT_NAME = 'new-snapshot'; + var snapshot = pubsub.snapshot(SNAPSHOT_NAME); + var args = snapshot.calledWith_; + + assert(snapshot instanceof FakeSnapshot); + assert.strictEqual(args[0], pubsub); + assert.strictEqual(args[1], SNAPSHOT_NAME); + }); + }); + describe('subscription', function() { var SUB_NAME = 'new-sub-name'; var CONFIG = { autoAck: true, interval: 90 }; diff --git a/packages/pubsub/test/snapshot.js b/packages/pubsub/test/snapshot.js new file mode 100644 index 00000000000..d97f76a6559 --- /dev/null +++ b/packages/pubsub/test/snapshot.js @@ -0,0 +1,149 @@ +/** + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +var assert = require('assert'); +var proxyquire = require('proxyquire'); + +function FakeGrpcServiceObject() { + this.calledWith_ = arguments; +} + +describe('Snapshot', function() { + var Snapshot; + + var SNAPSHOT_NAME = 'a'; + var PROJECT_ID = 'grape-spaceship-123'; + + var PUBSUB = { + projectId: PROJECT_ID + }; + + var SUBSCRIPTION = { + parent: PUBSUB, + createSnapshot: function() {}, + seek: function() {} + }; + + before(function() { + Snapshot = proxyquire('../src/snapshot.js', { + '@google-cloud/common-grpc': { + ServiceObject: FakeGrpcServiceObject + } + }); + }); + + describe('initialization', function() { + var FULL_SNAPSHOT_NAME = 'a/b/c/d'; + var formatName_; + + before(function() { + formatName_ = Snapshot.formatName_; + Snapshot.formatName_ = function() { + return FULL_SNAPSHOT_NAME; + }; + }); + + after(function() { + Snapshot.formatName_ = formatName_; + }); + + describe('name', function() { + it('should create and cache the full name', function() { + Snapshot.formatName_ = function(projectId, name) { + assert.strictEqual(projectId, PROJECT_ID); + assert.strictEqual(name, SNAPSHOT_NAME); + return FULL_SNAPSHOT_NAME; + }; + + var snapshot = new Snapshot(PUBSUB, SNAPSHOT_NAME); + assert.strictEqual(snapshot.name, FULL_SNAPSHOT_NAME); + }); + + it('should pull the projectId from subscription parent', function() { + Snapshot.formatName_ = function(projectId, name) { + assert.strictEqual(projectId, PROJECT_ID); + assert.strictEqual(name, SNAPSHOT_NAME); + return FULL_SNAPSHOT_NAME; + }; + + var snapshot = new Snapshot(SUBSCRIPTION, SNAPSHOT_NAME); + assert.strictEqual(snapshot.name, FULL_SNAPSHOT_NAME); + }); + }); + + it('should inherit from GrpcServiceObject', function() { + var snapshot = new Snapshot(PUBSUB, SNAPSHOT_NAME); + var calledWith = snapshot.calledWith_[0]; + + assert(snapshot instanceof FakeGrpcServiceObject); + assert.strictEqual(calledWith.parent, PUBSUB); + assert.strictEqual(calledWith.id, FULL_SNAPSHOT_NAME); + assert.deepEqual(calledWith.methods, { + delete: { + protoOpts: { + service: 'Subscriber', + method: 'deleteSnapshot' + }, + reqOpts: { + snapshot: FULL_SNAPSHOT_NAME + } + } + }); + }); + + describe('with Subscription parent', function() { + it('should include the create method', function(done) { + SUBSCRIPTION.createSnapshot = function(callback) { + callback(); // The done function + }; + + var snapshot = new Snapshot(SUBSCRIPTION, SNAPSHOT_NAME); + var calledWith = snapshot.calledWith_[0]; + + assert(calledWith.methods.create); + calledWith.createMethod(done); + }); + + it('should create a seek method', function(done) { + SUBSCRIPTION.seek = function(name, callback) { + assert.strictEqual(name, SNAPSHOT_NAME); + callback(); // The done function + }; + + var snapshot = new Snapshot(SUBSCRIPTION, SNAPSHOT_NAME); + snapshot.seek(done); + }); + }); + }); + + describe('formatName_', function() { + var EXPECTED = 'projects/' + PROJECT_ID + '/snapshots/' + SNAPSHOT_NAME; + + it('should format the name', function() { + var name = Snapshot.formatName_(PROJECT_ID, SNAPSHOT_NAME); + + assert.strictEqual(name, EXPECTED); + }); + + it('should not re-format the name', function() { + var name = Snapshot.formatName_(PROJECT_ID, EXPECTED); + + assert.strictEqual(name, EXPECTED); + }); + }); +}); diff --git a/packages/pubsub/test/subscription.js b/packages/pubsub/test/subscription.js index 4f365cc4342..b635e6be055 100644 --- a/packages/pubsub/test/subscription.js +++ b/packages/pubsub/test/subscription.js @@ -43,6 +43,10 @@ function FakeIAM() { this.calledWith_ = [].slice.call(arguments); } +function FakeSnapshot() { + this.calledWith_ = arguments; +} + var formatMessageOverride; describe('Subscription', function() { @@ -87,7 +91,8 @@ describe('Subscription', function() { '@google-cloud/common-grpc': { ServiceObject: FakeGrpcServiceObject }, - './iam.js': FakeIAM + './iam.js': FakeIAM, + './snapshot.js': FakeSnapshot }); var formatMessage = Subscription.formatMessage_; @@ -504,6 +509,80 @@ describe('Subscription', function() { }); }); + describe('createSnapshot', function() { + var SNAPSHOT_NAME = 'a'; + + it('should throw if a name is not provided', function() { + assert.throws(function() { + subscription.createSnapshot(); + }, /A name is required to create a snapshot\./); + }); + + it('should make the correct api request', function(done) { + var FULL_SNAPSHOT_NAME = 'a/b/c/d'; + + FakeSnapshot.formatName_ = function(projectId, name) { + assert.strictEqual(projectId, PROJECT_ID); + assert.strictEqual(name, SNAPSHOT_NAME); + return FULL_SNAPSHOT_NAME; + }; + + subscription.request = function(protoOpts, reqOpts) { + assert.strictEqual(protoOpts.service, 'Subscriber'); + assert.strictEqual(protoOpts.method, 'createSnapshot'); + + assert.strictEqual(reqOpts.name, FULL_SNAPSHOT_NAME); + assert.strictEqual(reqOpts.subscription, subscription.name); + + done(); + }; + + subscription.createSnapshot(SNAPSHOT_NAME, assert.ifError); + }); + + it('should return an error to the callback', function(done) { + var error = new Error('err'); + var resp = {}; + + subscription.request = function(protoOpts, reqOpts, callback) { + callback(error, resp); + }; + + function callback(err, snapshot, apiResponse) { + assert.strictEqual(err, error); + assert.strictEqual(snapshot, null); + assert.strictEqual(apiResponse, resp); + done(); + } + + subscription.createSnapshot(SNAPSHOT_NAME, callback); + }); + + it('should return a snapshot object to the callback', function(done) { + var fakeSnapshot = {}; + var resp = {}; + + subscription.request = function(protoOpts, reqOpts, callback) { + callback(null, resp); + }; + + subscription.snapshot = function(name) { + assert.strictEqual(name, SNAPSHOT_NAME); + return fakeSnapshot; + }; + + function callback(err, snapshot, apiResponse) { + assert.strictEqual(err, null); + assert.strictEqual(snapshot, fakeSnapshot); + assert.strictEqual(snapshot.metadata, resp); + assert.strictEqual(apiResponse, resp); + done(); + } + + subscription.createSnapshot(SNAPSHOT_NAME, callback); + }); + }); + describe('delete', function() { it('should delete a subscription', function(done) { subscription.request = function(protoOpts, reqOpts) { @@ -817,6 +896,55 @@ describe('Subscription', function() { }); }); + describe('seek', function() { + it('should throw if a name or date is not provided', function() { + assert.throws(function() { + subscription.seek(); + }, /Either a snapshot name or Date is needed to seek to\./); + }); + + it('should make the correct api request', function(done) { + var FAKE_SNAPSHOT_NAME = 'a'; + var FAKE_FULL_SNAPSHOT_NAME = 'a/b/c/d'; + + FakeSnapshot.formatName_ = function(projectId, name) { + assert.strictEqual(projectId, PROJECT_ID); + assert.strictEqual(name, FAKE_SNAPSHOT_NAME); + return FAKE_FULL_SNAPSHOT_NAME; + }; + + subscription.request = function(protoOpts, reqOpts, callback) { + assert.strictEqual(protoOpts.service, 'Subscriber'); + assert.strictEqual(protoOpts.method, 'seek'); + + assert.strictEqual(reqOpts.subscription, subscription.name); + assert.strictEqual(reqOpts.snapshot, FAKE_FULL_SNAPSHOT_NAME); + + // done function + callback(); + }; + + subscription.seek(FAKE_SNAPSHOT_NAME, done); + }); + + it('should optionally accept a Date object', function(done) { + var date = new Date(); + + subscription.request = function(protoOpts, reqOpts, callback) { + var seconds = Math.floor(date.getTime() / 1000); + assert.strictEqual(reqOpts.time.seconds, seconds); + + var nanos = date.getMilliseconds() * 1e6; + assert.strictEqual(reqOpts.time.nanos, nanos); + + // done function + callback(); + }; + + subscription.seek(date, done); + }); + }); + describe('setAckDeadline', function() { it('should set the ack deadline', function(done) { subscription.request = function(protoOpts, reqOpts) { @@ -854,6 +982,22 @@ describe('Subscription', function() { }); }); + describe('snapshot', function() { + it('should call through to pubsub#snapshot', function() { + var FAKE_SNAPSHOT_NAME = 'a'; + var FAKE_SNAPSHOT = {}; + + PUBSUB.snapshot = function(name) { + assert.strictEqual(this, subscription); + assert.strictEqual(name, FAKE_SNAPSHOT_NAME); + return FAKE_SNAPSHOT; + }; + + var snapshot = subscription.snapshot(FAKE_SNAPSHOT_NAME); + assert.strictEqual(snapshot, FAKE_SNAPSHOT); + }); + }); + describe('decorateMessage_', function() { var message = { ackId: 'b'