Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions packages/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ topic.subscribe('subscription-name', options, function(err, subscription) {
subscription.removeListener('message', onMessage);
subscription.removeListener('error', onError);
});

// Promises are also supported by omitting callbacks.
topic.publish('New message!').then(function(data) {
var messageIds = data[0];
});

// It's also possible to integrate with third-party Promise libraries.
var pubsub = require('@google-cloud/pubsub')({
promise: require('bluebird')
});
```


Expand Down
2 changes: 1 addition & 1 deletion packages/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"pubsub"
],
"dependencies": {
"@google-cloud/common": "^0.6.0",
"@google-cloud/common": "^0.7.0",
"arrify": "^1.0.0",
"extend": "^3.0.0",
"google-gax": "^0.7.0",
Expand Down
31 changes: 31 additions & 0 deletions packages/pubsub/src/iam.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ util.inherits(IAM, common.GrpcService);
* topic.iam.getPolicy(function(err, policy, apiResponse) {});
*
* subscription.iam.getPolicy(function(err, policy, apiResponse) {});
*
* //-
* // If the callback is omitted, we'll return a Promise.
* //-
* topic.iam.getPolicy().then(function(data) {
* var policy = data[0];
* var apiResponse = data[1];
* });
*/
IAM.prototype.getPolicy = function(callback) {
var protoOpts = {
Expand Down Expand Up @@ -144,6 +152,14 @@ IAM.prototype.getPolicy = function(callback) {
* topic.iam.setPolicy(myPolicy, function(err, policy, apiResponse) {});
*
* subscription.iam.setPolicy(myPolicy, function(err, policy, apiResponse) {});
*
* //-
* // If the callback is omitted, we'll return a Promise.
* //-
* topic.iam.setPolicy(myPolicy).then(function(data) {
* var policy = data[0];
* var apiResponse = data[1];
* });
*/
IAM.prototype.setPolicy = function(policy, callback) {
if (!is.object(policy)) {
Expand Down Expand Up @@ -209,6 +225,14 @@ IAM.prototype.setPolicy = function(policy, callback) {
* // "pubsub.subscriptions.update": false
* // }
* });
*
* //-
* // If the callback is omitted, we'll return a Promise.
* //-
* topic.iam.testPermissions(test).then(function(data) {
* var permissions = data[0];
* var apiResponse = data[1];
* });
*/
IAM.prototype.testPermissions = function(permissions, callback) {
if (!is.array(permissions) && !is.string(permissions)) {
Expand Down Expand Up @@ -244,4 +268,11 @@ IAM.prototype.testPermissions = function(permissions, callback) {
});
};

/*! Developer Documentation
*
* All async methods (except for streams) will return a Promise in the event
* that a callback is omitted.
*/
common.util.promisifyAll(IAM);

module.exports = IAM;
133 changes: 94 additions & 39 deletions packages/pubsub/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ util.inherits(PubSub, common.GrpcService);
* // The topic was created successfully.
* }
* });
*
* //-
* // If the callback is omitted, we'll return a Promise.
* //-
* pubsub.createTopic('my-new-topic').then(function(data) {
* var topic = data[0];
* var apiResponse = data[1];
* });
*/
PubSub.prototype.createTopic = function(name, callback) {
var self = this;
Expand Down Expand Up @@ -178,25 +186,11 @@ PubSub.prototype.createTopic = function(name, callback) {
* }, callback);
*
* //-
* // Get the subscriptions as a readable object stream.
* //-
* pubsub.getSubscriptions()
* .on('error', console.error)
* .on('data', function(subscription) {
* // subscription is a Subscription object.
* })
* .on('end', function() {
* // All subscriptions retrieved.
* });
*
* //-
* // If you anticipate many results, you can end a stream early to prevent
* // unnecessary processing and API requests.
* // If the callback is omitted, we'll return a Promise.
* //-
* pubsub.getSubscriptions()
* .on('data', function(topic) {
* this.end();
* });
* pubsub.getSubscriptions().then(function(data) {
* var subscriptions = data[0];
* });
*/
PubSub.prototype.getSubscriptions = function(options, callback) {
var self = this;
Expand Down Expand Up @@ -259,6 +253,36 @@ PubSub.prototype.getSubscriptions = function(options, callback) {
});
};

/**
* Get a list of the {module:pubsub/subscription} objects registered to all of
* your project's topics as a readable object stream.
*
* @param {object=} options - Configuration object. See
* {module:pubsub#getSubscriptions} for a complete list of options.
* @return {stream}
*
* @example
* pubsub.getSubscriptionsStream()
* .on('error', console.error)
* .on('data', function(subscription) {
* // subscription is a Subscription object.
* })
* .on('end', function() {
* // All subscriptions retrieved.
* });
*
* //-
* // If you anticipate many results, you can end a stream early to prevent
* // unnecessary processing and API requests.
* //-
* pubsub.getSubscriptionsStream()
* .on('data', function(topic) {
* this.end();
* });
*/
PubSub.prototype.getSubscriptionsStream =
common.paginator.streamify('getSubscriptions');

/**
* Get a list of the topics registered to your project. You may optionally
* provide a query object as the first argument to customize the response.
Expand Down Expand Up @@ -308,25 +332,11 @@ PubSub.prototype.getSubscriptions = function(options, callback) {
* }, callback);
*
* //-
* // Get the topics as a readable object stream.
* //-
* pubsub.getTopics()
* .on('error', console.error)
* .on('data', function(topic) {
* // topic is a Topic object.
* })
* .on('end', function() {
* // All topics retrieved.
* });
*
* //-
* // If you anticipate many results, you can end a stream early to prevent
* // unnecessary processing and API requests.
* // If the callback is omitted, we'll return a Promise.
* //-
* pubsub.getTopics()
* .on('data', function(topic) {
* this.end();
* });
* pubsub.getTopics().then(function(data) {
* var topics = data[0];
* });
*/
PubSub.prototype.getTopics = function(query, callback) {
var self = this;
Expand Down Expand Up @@ -367,6 +377,35 @@ PubSub.prototype.getTopics = function(query, callback) {
});
};

/**
* Get a list of the {module:pubsub/topic} objects registered to your project as
* a readable object stream.
*
* @param {object=} query - Configuration object. See
* {module:pubsub#getTopics} for a complete list of options.
* @return {stream}
*
* @example
* pubsub.getTopicsStream()
* .on('error', console.error)
* .on('data', function(topic) {
* // topic is a Topic object.
* })
* .on('end', function() {
* // All topics retrieved.
* });
*
* //-
* // If you anticipate many results, you can end a stream early to prevent
* // unnecessary processing and API requests.
* //-
* pubsub.getTopicsStream()
* .on('data', function(topic) {
* this.end();
* });
*/
PubSub.prototype.getTopicsStream = common.paginator.streamify('getTopics');

/**
* Create a subscription to a topic. You may optionally provide an object to
* customize the subscription.
Expand Down Expand Up @@ -423,6 +462,14 @@ PubSub.prototype.getTopics = function(query, callback) {
* autoAck: true,
* interval: 30
* }, function(err, subscription, apiResponse) {});
*
* //-
* // If the callback is omitted, we'll return a Promise.
* //-
* pubsub.subscribe(topic, name).then(function(data) {
* var subscription = data[0];
* var apiResponse = data[1];
* });
*/
PubSub.prototype.subscribe = function(topic, subName, options, callback) {
if (!is.string(topic) && !(topic instanceof Topic)) {
Expand Down Expand Up @@ -564,10 +611,18 @@ PubSub.prototype.determineBaseUrl_ = function() {

/*! Developer Documentation
*
* These methods can be used with either a callback or as a readable object
* stream. `streamRouter` is used to add this dual behavior.
* These methods can be auto-paginated.
*/
common.paginator.extend(PubSub, ['getSubscriptions', 'getTopics']);

/*! Developer Documentation
*
* All async methods (except for streams) will return a Promise in the event
* that a callback is omitted.
*/
common.streamRouter.extend(PubSub, ['getSubscriptions', 'getTopics']);
common.util.promisifyAll(PubSub, {
exclude: ['subscription', 'topic']
});

PubSub.Subscription = Subscription;
PubSub.Topic = Topic;
Expand Down
Loading