Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
],
"dependencies": {
"@google-cloud/common": "^0.13.0",
"@google-cloud/common-grpc": "^0.3.0",
"@google-cloud/common-grpc": "^0.3.1",
"arrify": "^1.0.0",
"extend": "^3.0.0",
"google-gax": "^0.13.0",
Expand Down
152 changes: 148 additions & 4 deletions packages/pubsub/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ var extend = require('extend');
var is = require('is');
var util = require('util');

/**
* @type {module:pubsub/snapshot}
* @private
*/
var Snapshot = require('./snapshot.js');

/**
* @type {module:pubsub/subscription}
* @private
Expand Down Expand Up @@ -136,6 +142,120 @@ PubSub.prototype.createTopic = function(name, callback) {
});
};

/**
* Get a list of snapshots.
*
* @param {object=} options - Configuration object.
* @param {boolean} options.autoPaginate - Have pagination handled
* automatically. Default: true.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of results to return.
* @param {number} options.pageSize - Maximum number of results to return.
* @param {string} options.pageToken - Page token.
* @param {function} callback - The callback function.
* @param {?error} callback.err - An error from the API call, may be null.
* @param {module:pubsub/snapshot[]} callback.snapshots - The list of snapshots
* in your project.
*
* @example
* pubsub.getSnapshots(function(err, snapshots) {
* if (!err) {
* // snapshots is an array of Snapshot objects.
* }
* });
*
* //-
* // To control how many API requests are made and page through the results
* // manually, set `autoPaginate` to `false`.
* //-
* var callback = function(err, snapshots, nextQuery, apiResponse) {
* if (nextQuery) {
* // More results exist.
* pubsub.getSnapshots(nextQuery, callback);
* }
* };
*
* pubsub.getSnapshots({
* autoPaginate: false
* }, callback);
*
* //-
* // If the callback is omitted, we'll return a Promise.
* //-
* pubsub.getSnapshots().then(function(data) {
* var snapshots = data[0];
* });
*/
PubSub.prototype.getSnapshots = function(options, callback) {
var self = this;

if (is.fn(options)) {
callback = options;
options = {};
}

var protoOpts = {
service: 'Subscriber',
method: 'listSnapshots'
};

var reqOpts = extend({}, options);

reqOpts.project = 'projects/' + this.projectId;

this.request(protoOpts, reqOpts, function(err, resp) {
if (err) {
callback(err, null, null, resp);
return;
}

var snapshots = arrify(resp.snapshots).map(function(snapshot) {
var snapshotInstance = self.snapshot(snapshot.name);
snapshotInstance.metadata = snapshot;
return snapshotInstance;
});

var nextQuery = null;

if (resp.nextPageToken) {
nextQuery = options;
nextQuery.pageToken = resp.nextPageToken;
}

callback(null, snapshots, nextQuery, resp);
});
};

/**
* Get a list of the {module:pubsub/snapshot} objects as a readable object
* stream.
*
* @param {object=} options - Configuration object. See
* {module:pubsub#getSnapshots} for a complete list of options.
* @return {stream}
*
* @example
* pubsub.getSnapshotsStream()
* .on('error', console.error)
* .on('data', function(snapshot) {
* // snapshot is a Snapshot object.
* })
* .on('end', function() {
* // All snapshots retrieved.
* });
*
* //-
* // If you anticipate many results, you can end a stream early to prevent
* // unnecessary processing and API requests.
* //-
* pubsub.getSnapshotsStream()
* .on('data', function(snapshot) {
* this.end();
* });
*/
PubSub.prototype.getSnapshotsStream =
common.paginator.streamify('getSnapshots');

/**
* Get a list of the subscriptions registered to all of your project's topics.
* You may optionally provide a query object as the first argument to customize
Expand Down Expand Up @@ -277,7 +397,7 @@ PubSub.prototype.getSubscriptions = function(options, callback) {
* // unnecessary processing and API requests.
* //-
* pubsub.getSubscriptionsStream()
* .on('data', function(topic) {
* .on('data', function(subscription) {
* this.end();
* });
*/
Expand Down Expand Up @@ -540,6 +660,26 @@ PubSub.prototype.subscribe = function(topic, subName, options, callback) {
});
};

/**
* Create a Snapshot object. See {module:pubsub/subscription#createSnapshot} to
* create a snapshot.
*
* @throws {Error} If a name is not provided.
*
* @param {string} name - The name of the snapshot.
* @return {module:pubsub/snapshot}
*
* @example
* var snapshot = pubsub.snapshot('my-snapshot');
*/
PubSub.prototype.snapshot = function(name) {
if (!is.string(name)) {
throw new Error('You must supply a valid name for the snapshot.');
}

return new Snapshot(this, name);
};

/**
* Create a Subscription object. This command by itself will not run any API
* requests. You will receive a {module:pubsub/subscription} object,
Expand Down Expand Up @@ -589,7 +729,7 @@ PubSub.prototype.subscription = function(name, options) {
};

/**
* Create a Topic object. See {module:pubsub/createTopic} to create a topic.
* Create a Topic object. See {module:pubsub#createTopic} to create a topic.
*
* @throws {Error} If a name is not provided.
*
Expand Down Expand Up @@ -637,15 +777,19 @@ PubSub.prototype.determineBaseUrl_ = function() {
*
* These methods can be auto-paginated.
*/
common.paginator.extend(PubSub, ['getSubscriptions', 'getTopics']);
common.paginator.extend(PubSub, [
'getSnapshots',
'getSubscriptions',
'getTopics'
]);

/*! Developer Documentation
*
* All async methods (except for streams) will return a Promise in the event
* that a callback is omitted.
*/
common.util.promisifyAll(PubSub, {
exclude: ['subscription', 'topic']
exclude: ['snapshot', 'subscription', 'topic']
});

PubSub.Subscription = Subscription;
Expand Down
Loading