diff --git a/README.md b/README.md index dd8e96c3541..94d732e1e18 100644 --- a/README.md +++ b/README.md @@ -388,7 +388,7 @@ a pubsub Connection with your Google Developers Console project ID. ~~~~ js var gcloud = require('gcloud'), - conn = new gcloud.pubsub.Connection({ projectId: YOUR_PROJECT_ID }); + pubsub = new gcloud.PubSub({ projectId: YOUR_PROJECT_ID }); ~~~~ Elsewhere, construct with project ID, service account's email @@ -396,7 +396,7 @@ and private key downloaded from Developer's Console. ~~~~ js var gcloud = require('gcloud'), - conn = new gcloud.pubsub.Connection({ + pubsub = new gcloud.PubSub({ projectId: YOUR_PROJECT_ID, keyFilename: '/path/to/the/key.json' }); @@ -404,89 +404,110 @@ var gcloud = require('gcloud'), #### Topics and Subscriptions -List, get, create and delete topics. +##### Topics +*Create, get, and delete topics.* + +Create a topic by name. ~~~ js -// lists topics. -conn.listTopics({ maxResults: 5 }, function(err, topics, nextQuery) { - // if more results, nextQuery will be non-null. -}); +pubsub.topic('new-topic'); +~~~ -// retrieves an existing topic by name. -conn.getTopic('topic1', function(err, topic) { - // deletes this topic. - topic.del(callback); -}); +The `topic` and `getTopic` methods extend Node's event emitter. For any command you run against the returned Topic object, it will first assure the topic has been created or retrieved. If there is an error during creation or retrieval of the topic, you can catch this by adding an `error` event listener, as demonstrated below. -// creates a new topic named topic2. -conn.createTopic('topic2', callback); +~~~ js +var newTopic = pubsub.topic('new-topic'); +newTopic.on('error', function(err) {}); +newTopic.delete(function(err) {}); ~~~ -List, get, create and delete subscriptions. +Retrieve an existing topic by name. +~~~ js +var newTopic = pubsub.getTopic('new-topic'); +newTopic.on('error', function(err) {}); +newTopic.delete(function(err) {}); +~~~ +Get a list of the topics registered to your project. ~~~ js -var query = { - maxResults: 5, - filterByTopicName: 'topic1' -}; -// list 5 subscriptions that are subscribed to topic1. -conn.listSubscriptions(query, function(err, subs, nextQuery) { - // if there are more results, nextQuery will be non-null. +pubsub.getTopics({ maxResults: 5 }, function(err, topics, nextQuery) { + // if more results exist, nextQuery will be non-null. }); +~~~ -// get subscription named sub1 -conn.getSubscription('sub1', function(err, sub) { - // delete this subscription. - sub.del(callback); -}); +##### Subscriptions -// create a new subsription named sub2, listens to topic1. -conn.createSubscription({ - topic: 'topic1', - name: 'sub2', - ackDeadlineSeconds: 60 -}, callback); +*Create, get, and delete subscriptions.* + +Create a new subsription by name. +~~~ js +newTopic.subscribe('new-sub'); ~~~ -#### Publishing a message +As with the Topic examples above, `subscribe` and `getSubscription` also extend Node's event emitter. Assign an error handler to catch any errors the subscription may throw. +~~~ js +var newSub = newTopic.subscribe({ name: 'new-sub', ackDeadlineSeconds: 60 }); +newSub.on('error', function(err) {}); +newSub.delete(function(err) {}); +~~~ -You need to retrieve or create a topic to publish a message. -You can either publish simple string messages or a raw Pub/Sub -message object. +Get a subscription by name. +~~~ js +var newSub = newTopic.getSubscription('new-sub'); +newSub.on('error', function(err) {}); +newSub.delete(function(err) {}); +~~~ +Get 5 subscriptions that are subscribed to "new-topic". ~~~ js -conn.getTopic('topic1', function(err, topic) { - // publishes "hello world" to to topic1 subscribers. - topic.publish('hello world', callback); - topic.publishMessage({ - data: 'Some text here...', - label: [ - { key: 'priority', numValue: 0 }, - { key: 'foo', stringValue: 'bar' } - ] - }, callback); +newTopic.getSubscriptions( + { maxResults: 5 }, function(err, subscriptions, nextQuery) { + // if more results exist, nextQuery will be non-null. }); ~~~ -#### Listening for messages +#### Publishing Messages -You can either pull messages one by one via a subscription, or -let the client to open a long-lived request to poll them. +You need to retrieve or create a topic to publish a message. +~~~ js +newTopic.publish('hello world', function(err) {}); +~~~ +To publish a raw Pub/Sub message object, use the `publishRaw` method. ~~~ js -// allow client to poll messages from sub1 -// autoAck automatically acknowledges the messages. by default, false. -var sub = conn.subscribe('sub1', { autoAck: true }); -sub.on('ready', function() { - console.log('listening messages...'); -}); -sub.on('message', function(msg) { - console.log('message retrieved:', msg); -}); -sub.on('error', function(err) { - console.log('error occured:', err); +newTopic.publishRaw({ + data: 'Some text here...', + label: [ + { key: 'priority', numValue: 0 }, + { key: 'foo', stringValue: 'bar' } + ] +}, function(err) {}); +~~~ + +#### Listening for Messages + +You can either pull messages one by one via a subscription, or let the client open a long-lived request to poll them. +~~~ js +var newerSub = newTopic.subscribe({ + name: 'newer-sub', + + // automatically acknowledges the messages (default: false) + autoAck: true }); -sub.close(); // closes the connection, stops listening for messages. + +// Listening for messages... +newerSub.on('ready', function() {}); + +// Received a message... +newerSub.on('message', function(msg) {}); + +// Received an error... +newerSub.on('error', function(err) {}); +~~~ + +To close the connection and stop listening for messages, call `close`. +~~~ js +newerSub.close(); ~~~ ## Contributing diff --git a/lib/common/util.js b/lib/common/util.js index 28636425d52..c93b529caba 100644 --- a/lib/common/util.js +++ b/lib/common/util.js @@ -88,3 +88,29 @@ module.exports.handleResp = function(err, resp, body, callback) { } callback(null, body, resp); }; + +function LazyCreate(options) { + if (typeof this.onReady !== 'function' || typeof this.get !== 'function' || + typeof this.create !== 'function') { + throw new Error( + 'You must implement all methods: `create`, `get, and `onReady`'); + } + + this.on('ready', this.onReady.bind(this)); + + if (typeof options.callback === 'function') { + this.on('error', options.callback); + this.on('ready', options.callback.bind(null, null, this)); + } + + if (typeof options.autoCreate === 'undefined') { + // By default, see if Resource already exists. + this.get(); + } else if (options.autoCreate === true) { + this.create(); + } else if (options.autoCreate === false) { + this.emit('ready'); + } +} + +module.exports.LazyCreate = LazyCreate; diff --git a/lib/index.js b/lib/index.js index 32783a9066b..376a879e1ef 100644 --- a/lib/index.js +++ b/lib/index.js @@ -18,6 +18,6 @@ module.exports = { datastore: require('./datastore'), - pubsub: require('./pubsub'), + PubSub: require('./pubsub'), storage: require('./storage') }; diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index 2df6d23329f..82640999fd0 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -16,12 +16,12 @@ 'use strict'; -var events = require('events'); -var nodeutil = require('util'); - var conn = require('../common/connection.js'); var util = require('../common/util.js'); +var Subscription = require('./subscription.js'); +var Topic = require('./topic.js'); + /** * Base URL for Pub/Sub API. * @type {String} @@ -37,183 +37,6 @@ var SCOPES = [ 'https://www.googleapis.com/auth/cloud-platform' ]; -function Subscription(conn, name) { - this.conn = conn; - this.name = name; - - this.autoAck = false; - this.pullIntervalInMs = 10; - this.closed = false; -} - -nodeutil.inherits(Subscription, events.EventEmitter); - -/** - * Acknowledges the backend that message is retrieved. - * @param {Array} ids A list of message IDs. - * @param {Function} callback Callback function. - */ -Subscription.prototype.ack = function(ids, callback) { - ids = util.arrayize(ids); - var body = { - subscription: this.name, - ackId: ids - }; - this.conn.makeReq('POST', 'subscriptions/acknowledge', null, body, callback); -}; - -/** - * Pulls from the subscribed topic. - * @param {Boolean} opts.returnImmediately If set, the system will respond - * immediately. Otherwise, wait - * until new messages are available. - * Returns if timeout is reached. - * @param {Function} callback Callback. - */ -Subscription.prototype.pull = function(opts, callback) { - var that = this; - // TODO(jbd): Should not be racing with other pull. - // TOOD(jbd): Make opts optional. - var body = { - subscription: this.name, - returnImmediately: !!opts.returnImmediately - }; - this.conn.makeReq( - 'POST', 'subscriptions/pull', null, body, function(err, message) { - // TODO(jbd): Fix API to return a list of messages. - if (err) { - callback(err); - return; - } - if (!that.autoAck) { - that.emitMessage_(message); - callback(); - return; - } - that.ack(message.ackId, function(err) { - if (err) { - callback(err); - return; - } - that.emitMessage_(message); - callback(); - }); - }); -}; - -/** - * Polls the backend for new messages. - */ -Subscription.prototype.startPulling_ = function() { - var that = this; - var pullFn = function() { - if (that.closed) { - return; - } - that.pull({ returnImmediately: false }, function(err) { - // TODO(jbd): Fix API to return a more explicit error code or message. - if (err && err.message.indexOf('has no more messages') < 0) { - that.emitError_(err); - } - setTimeout(function() { - pullFn(); - }, that.pullIntervalInMs); - }); - }; - pullFn(); -}; - -/** - * Deletes the current subscription. Pull requests from the current - * subscription will be errored once unsubscription is done. - * @param {Function} callback Optional callback. - */ -Subscription.prototype.del = function(callback) { - callback = callback || util.noop; - var that = this; - var path = util.format('subscriptions/{fullName}', { - fullName: this.name - }); - this.conn.makeReq('DELETE', path, null, true, function(err) { - if (err) { - return callback(err); - } - that.closed = true; - callback(err); - }); -}; - -/** - * Closes the subscription. - */ -Subscription.prototype.close = function() { - this.closed = true; -}; - -/** - * Emits a 'message' event with the provided message. - */ -Subscription.prototype.emitMessage_ = function(msg) { - if (msg.pubsubEvent && msg.pubsubEvent.message) { - var data = msg.pubsubEvent.message.data; - msg.pubsubEvent.message.data = new Buffer(data, 'base64').toString('utf-8'); - } - this.emit('message', msg); -}; - -/** - * Emits an error with the provided error. - */ -Subscription.prototype.emitError_ = function(err) { - this.emit('error', err); -}; - -/** - * Represents a Google Cloud Pub/Sub API topic. - * @param {Connection} conn Authorized connection. - * @param {string} name Full name of the topic. - */ -function Topic(conn, name) { - this.conn = conn; - this.name = name; -} - -/** - * Publishes the provided string message. - * @param {string} data String message to publish. - * @param {Function} callback Optional callback. - */ -Topic.prototype.publish = function(data, callback) { - callback = callback || util.noop; - this.publishMessage({ - topic: this.name, - message: { - data: new Buffer(data).toString('base64') - } - }, callback); -}; - -/** - * Publishes a raw message. - * @param {message} message Raw message to publish. - * @param {Function} callback Optional callback. - */ -Topic.prototype.publishMessage = function(message, callback) { - callback = callback || util.noop; - message.topic = this.name; - this.conn.makeReq('POST', 'topics/publish', null, message, callback); -}; - -/** - * Deletes a topic. - * @param {Function} callback Optional callback. - */ -Topic.prototype.del = function(callback) { - callback = callback || util.noop; - var path = 'topics/' + this.name; - this.conn.makeReq('DELETE', path, null, true, callback); -}; - /** * Represents connection to Google Cloud Pub/Sub API. * @param {string} opts.projectId Google Developers Console Project ID. @@ -221,11 +44,12 @@ Topic.prototype.del = function(callback) { * @param {string} opts.pemFilePath Path to the pem file that contains your * private key. */ -function Connection(opts) { +function PubSub(opts) { opts = opts || {}; - var id = opts.projectId; - this.id = id; + this.id = opts.projectId; + this.name = '/projects/' + this.id; + this.conn = new conn.Connection({ keyFilename: opts.keyFilename, scopes: SCOPES @@ -233,185 +57,110 @@ function Connection(opts) { } /** - * Lists subscriptions. - * @param {string} query.filterByTopic Returns subscriptions that are - * subscribed to the topic provided. + * Create/retrieve a Topic object. + * @param {object} options + * @param {function=} callback - optional callback + */ +PubSub.prototype.topic = function(options, callback) { + if (typeof options === 'string') { + options = { + name: options + }; + } + options = options || {}; + options.callback = callback; + options.name = options.name.replace(/.*\//, ''); + options.pubsub = this; + return new Topic(options); +}; + +/** + * Create a topic. + * @param {object} options + * @param {function=} optional callback + */ +PubSub.prototype.createTopic = function(options, callback) { + if (typeof options === 'string') { + options = { + name: options + }; + } + options = options || {}; + options.autoCreate = true; + return this.topic(options, callback); +}; + +/** + * Returns all topics registered to this project. * @param {string} query.pageToken Page token. * @param {Number} query.maxResults Max number of results to return. * @param {Function} callback Callback function. */ -Connection.prototype.listSubscriptions = function(query, callback) { +PubSub.prototype.getTopics = function(query, callback) { var that = this; if (arguments.length < 2) { callback = query; query = {}; } - var q = util.extend({}, query); - if (q.filterByTopic) { - q.query = - 'pubsub.googleapis.com/topic in (' + - this.fullTopicName_(q.filterByTopic) + ')'; - } else { - q.query = - 'cloud.googleapis.com/project in (' + this.fullProjectName_() + ')'; - } - delete q.filterByTopic; - - this.makeReq('GET', 'subscriptions', q, true, function(err, result) { + query = query || {}; + query.query = 'cloud.googleapis.com/project in (' + this.name + ')'; + this.makeReq('GET', 'topics', query, true, function(err, result) { if (err) { return callback(err); } - var items = result.subscription || []; - var subscriptions = items.map(function(item) { - return new Subscription(that, item.name); + var topics = (result.topic || []).map(function(item) { + return that.topic({ + autoCreate: false, + name: item.name + }); }); var nextQuery = null; if (result.nextPageToken) { - nextQuery = q; + nextQuery = query; nextQuery.pageToken = result.nextPageToken; } - callback(null, subscriptions, nextQuery); - }); -}; - -/** - * Gets a subscription. - * @param {string} name Name of the subscription. - * @param {Function} callback Callback. - */ -Connection.prototype.getSubscription = function(name, callback) { - var that = this; - var fullName = '/subscriptions/' + this.id + '/' + name; - this.makeReq('GET', 'subscriptions/' + fullName, null, true, function(err) { - if (err) { - callback(err); - return; - } - callback(null, new Subscription(that, fullName)); - }); -}; - -Connection.prototype.createSubscription = function(opts, callback) { - var that = this; - var subscription = { - topic:'/topics/' + this.id + '/' + opts.topic, - name: '/subscriptions/' + this.id + '/' + opts.name, - ackDeadlineSeconds: opts.ackDeadlineSeconds - }; - this.makeReq('POST', 'subscriptions', null, subscription, function(err) { - if (err) { - callback(err); - return; - } - callback(null, new Subscription(that, subscription.name)); - }); -}; - -/** - * Subscribe with the provided options. - * @param {string} name Name of the subscription. - * @param {Boolean} opts.autoAck Automatically acknowledges the - * message once it's pulled. - * @return {Subscription} - */ -Connection.prototype.subscribe = function(name, opts) { - opts = opts || {}; - - var fullName = '/subscriptions/' + this.id + '/' + name; - var sub = new Subscription(this, fullName); - sub.autoAck = !!opts.autoAck; - this.getSubscription(name, function(err) { - if (err) { - sub.emitError_(err); - return; - } - sub.emit('ready'); - sub.startPulling_(); + callback(null, topics, nextQuery); }); - return sub; }; /** - * Lists topics. + * Gets subscriptions. * @param {string} query.pageToken Page token. * @param {Number} query.maxResults Max number of results to return. * @param {Function} callback Callback function. */ -Connection.prototype.listTopics = function(query, callback) { +PubSub.prototype.getSubscriptions = function(query, callback) { var that = this; if (arguments.length < 2) { callback = query; query = {}; } - var q = util.extend({}, query); - q.query = 'cloud.googleapis.com/project in (' + this.fullProjectName_() + ')'; - this.makeReq('GET', 'topics', q, true, function(err, result) { - if (err) { return callback(err); } - var items = result.topic || []; - var topics = items.map(function(item) { - return new Topic(that, item.name); + query = query || {}; + if (!query.query) { + query.query = 'cloud.googleapis.com/project in (' + this.name + ')'; + } + this.makeReq('GET', 'subscriptions', query, true, function(err, result) { + if (err) { + return callback(err); + } + var subscriptions = (result.subscription || []).map(function(item) { + return new Subscription({ + autoCreate: false, + autoPull: false, + name: item.name, + pubsub: that + }); }); var nextQuery = null; if (result.nextPageToken) { - nextQuery = q; + nextQuery = query; nextQuery.pageToken = result.nextPageToken; } - callback(null, topics, nextQuery); - }); -}; - -/** - * Gets a topic. - * @param {string} name Name of the topic to get. - * @param {Function} callback Optional callback. - */ -Connection.prototype.getTopic = function(name, callback) { - var that = this; - var cb = callback || util.noop; - var fullName = this.fullTopicName_(name); - this.makeReq('GET', 'topics/' + fullName, null, true, function(err) { - if (err) { - return cb(err); - } - cb(null, new Topic(that, fullName)); - }); -}; - -/** - * Creates a topic with the given name. - * @param {string} name Name of the topic. - * @param {Function} callback Optional callback. - */ -Connection.prototype.createTopic = function(name, callback) { - var that = this; - var cb = callback || util.noop; - var fullName = this.fullTopicName_(name); - this.makeReq('POST', 'topics', null, { name: fullName }, function(err) { - cb(err, new Topic(that, fullName)); - }); -}; - -/** - * Returns the full name of a topic. - * Full name is in /topics// form. - */ -Connection.prototype.fullTopicName_ = function(name) { - return util.format('/topics/{projectId}/{name}', { - projectId: this.id, name: name - }); -}; - -/** - * Returns the fully qualified project name. - * Full name is in /projects/ form. - */ -Connection.prototype.fullProjectName_ = function() { - return util.format('/projects/{projectId}', { - projectId: this.id + callback(null, subscriptions, nextQuery); }); }; -Connection.prototype.makeReq = function(method, path, q, body, callback) { +PubSub.prototype.makeReq = function(method, path, q, body, callback) { var reqOpts = { method: method, qs: q, @@ -429,16 +178,6 @@ Connection.prototype.makeReq = function(method, path, q, body, callback) { }; /** - * Exports Connection. - */ -module.exports.Connection = Connection; - -/** - * Exports Topic. - */ -module.exports.Topic = Topic; - -/** - * Exports Subscription. + * Exports PubSub. */ -module.exports.Subscription = Subscription; +module.exports = PubSub; diff --git a/lib/pubsub/subscription.js b/lib/pubsub/subscription.js new file mode 100644 index 00000000000..4537157a9b4 --- /dev/null +++ b/lib/pubsub/subscription.js @@ -0,0 +1,233 @@ +/** + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +var events = require('events'); +var nodeutil = require('util'); +var util = require('../common/util.js'); + +function Subscription(options) { + events.EventEmitter.call(this); + + if (!options.name) { + throw new Error('A name must be specified to register a subscription.'); + } + + this.pubsub = options.pubsub; + this.topic = options.topic; + + this.name = options.name; + if (this.name.substr(0, '/subscriptions/'.length) !== '/subscriptions/') { + this.name = '/subscriptions/' + this.pubsub.id + '/' + this.name; + } + + this.ackDeadlineSeconds = options.ackDeadlineSeconds; + this.autoAck = options.autoAck || false; + this.autoPull = true; + this.closed = false; + this.pullIntervalInMs = options.pullIntervalInMs || 10; + + if (typeof options.autoPull === 'boolean') { + this.autoPull = options.autoPull; + } + + util.LazyCreate.call(this, options); +} + +nodeutil.inherits(Subscription, events.EventEmitter); + +Subscription.prototype.onReady = function() { + this.exists_ = true; + if (this.autoPull === true) { + this.startPulling(); + } +}; + +Subscription.prototype.create = function() { + var subscription = { + name: this.name, + topic: this.topic.name, + ackDeadlineSeconds: this.ackDeadlineSeconds + }; + this.pubsub.makeReq( + 'POST', 'subscriptions', null, subscription, function(err) { + if (err) { + this.emit('error', err); + return; + } + this.emit('ready'); + }.bind(this)); +}; + +Subscription.prototype.get = function() { + this.pubsub.makeReq( + 'GET', 'subscriptions/' + this.name, null, true, function(err, res) { + if (err) { + if (err.code === 404) { + // Subscription doesn't exist. Try to create the Subscription instead. + this.create(); + } else { + this.emit('error', err); + } + return; + } + if (this.ackDeadlineSeconds && + res.ackDeadlineSeconds !== this.ackDeadlineSeconds) { + this.exists_ = true; + this.delete(this.create.bind(this)); + return; + } + this.emit('ready'); + }.bind(this)); +}; + +/** + * Acknowledges the backend that message is retrieved. + * @param {Array} ids A list of message IDs. + * @param {Function} callback Callback function. + */ +Subscription.prototype.ack = function(ids, callback) { + ids = util.arrayize(ids); + var body = { + subscription: this.name, + ackId: ids + }; + this.makeReq('POST', 'subscriptions/acknowledge', null, body, callback); +}; + +/** + * Pulls from the subscribed topic. + * @param {Boolean} opts.returnImmediately If set, the system will respond + * immediately. Otherwise, wait + * until new messages are available. + * Returns if timeout is reached. + * @param {Function} callback Callback. + */ +Subscription.prototype.pull = function(opts, callback) { + // TODO(jbd): Should not be racing with other pull. + // TOOD(jbd): Make opts optional. + opts = opts || {}; + callback = callback || util.noop; + var that = this; + var body = { + subscription: this.name, + returnImmediately: !!opts.returnImmediately + }; + this.makeReq( + 'POST', 'subscriptions/pull', null, body, function(err, message) { + // TODO(jbd): Fix API to return a list of messages. + if (err) { + that.emit('error', err); + callback(err); + return; + } + if (!that.autoAck) { + that.emitMessage_(message); + callback(); + return; + } + that.ack(message.ackId, function(err) { + if (err) { + that.emit('error', err); + callback(err); + return; + } + that.emitMessage_(message); + callback(); + }); + }); +}; + +/** + * Polls the backend for new messages. + */ +Subscription.prototype.startPulling = function() { + var that = this; + var pullFn = function() { + if (that.closed) { + return; + } + that.pull({ returnImmediately: false }, function(err) { + // TODO(jbd): Fix API to return a more explicit error code or message. + if (err && err.message.indexOf('has no more messages') === -1) { + that.emit('error', err); + } + setTimeout(pullFn, that.pullIntervalInMs); + }); + }; + pullFn(); +}; + +/** + * Deletes the current subscription. Pull requests from the current + * subscription will be errored once unsubscription is done. + * @param {Function} callback Optional callback. + */ +Subscription.prototype.delete = function(callback) { + callback = callback || util.noop; + var path = util.format('subscriptions/{fullName}', { + fullName: this.name + }); + this.makeReq('DELETE', path, null, true, function(err) { + if (err) { + this.emit('error', err); + callback(err); + return; + } + this.close(); + this.exists_ = false; + callback(); + }.bind(this)); +}; + +/** + * Closes the subscription. + */ +Subscription.prototype.close = function() { + this.closed = true; +}; + +/** + * Emits a 'message' event with the provided message. + */ +Subscription.prototype.emitMessage_ = function(msg) { + if (msg.pubsubEvent && msg.pubsubEvent.message) { + var data = msg.pubsubEvent.message.data; + msg.pubsubEvent.message.data = new Buffer(data, 'base64').toString('utf-8'); + } + this.emit('message', msg); +}; + +/** + * Overwrite makeReq method to be sure the Subscription exists + * before making any requests. + */ + Subscription.prototype.makeReq = function(method, path, q, body, callback) { + var that = this; + if (!this.exists_) { + // Queue the request until we know the Subscription exists. + this.on('error', callback); + this.on('ready', makeReq); + return; + } + function makeReq() { + that.pubsub.makeReq(method, path, q, body, callback); + } + makeReq(); +}; + +module.exports = Subscription; diff --git a/lib/pubsub/topic.js b/lib/pubsub/topic.js new file mode 100644 index 00000000000..ec195a4c492 --- /dev/null +++ b/lib/pubsub/topic.js @@ -0,0 +1,179 @@ +/** + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +var events = require('events'); +var nodeutil = require('util'); +var util = require('../common/util.js'); +var Subscription = require('./subscription.js'); + +/** + * Represents a Google Cloud Pub/Sub API topic. + * @param {object} options + * @param {Function=} callback + */ +function Topic(options) { + events.EventEmitter.call(this); + + this.pubsub = options.pubsub; + + this.name = options.name; + if (this.name.substr(0, '/topics/'.length) !== '/topics/') { + this.name = '/topics/' + this.pubsub.id + '/' + this.name; + } + + util.LazyCreate.call(this, options); +} + +nodeutil.inherits(Topic, events.EventEmitter); + +Topic.prototype.onReady = function() { + this.exists_ = true; +}; + +Topic.prototype.create = function() { + this.pubsub.makeReq( + 'POST', 'topics', null, { name: this.name }, function(err) { + if (err) { + this.emit('error', err); + return; + } + this.emit('ready'); + }.bind(this)); +}; + +Topic.prototype.get = function() { + this.pubsub.makeReq( + 'GET', 'topics/' + this.name, null, true, function(err) { + if (err) { + if (err.code === 404) { + // Subscription doesn't exist. Try to create the Subscription instead. + this.create(); + } else { + this.emit('error', err); + } + return; + } + this.emit('ready'); + }.bind(this)); +}; + +/** + * Publishes the provided string message. + * @param {string} message String message to publish. + * @param {Function} callback Optional callback. + */ +Topic.prototype.publish = function(message, callback) { + callback = callback || util.noop; + this.publishRaw({ + message: { + data: new Buffer(message).toString('base64') + } + }, callback); +}; + +/** + * Publishes a raw message. + * @param {message} message Raw message to publish. + * @param {Function} callback Optional callback. + */ +Topic.prototype.publishRaw = function(message, callback) { + callback = callback || util.noop; + message.topic = this.name; + this.makeReq('POST', 'topics/publish', null, message, callback); +}; + +/** + * Deletes a topic. + * @param {Function} callback Optional callback. + */ +Topic.prototype.delete = function(callback) { + callback = callback || {}; + this.makeReq('DELETE', 'topics/' + this.name, null, true, callback); +}; + +/** + * Create/retrieve a Topic object. + * @param {object} options + * @param {function=} callback + */ +Topic.prototype.subscribe = function(options, callback) { + if (typeof options === 'string') { + options = { + name: options + }; + } + options = options || {}; + options.callback = callback; + options.name = options.name.replace(/.*\//, ''); + options.topic = this; + options.pubsub = this.pubsub; + return new Subscription(options); +}; + +/** + * Create a subscription. + * @param {object} options + * @param {function=} callback + */ +Topic.prototype.createSubscription = function(options, callback) { + if (typeof options === 'string') { + options = { + name: options + }; + } + options = options || {}; + options.autoCreate = true; + return this.topic(options, callback); +}; + +/** + * Gets subscriptions. + * @param {string} query.pageToken Page token. + * @param {Number} query.maxResults Max number of results to return. + * @param {Function} callback Callback function. + */ +Topic.prototype.getSubscriptions = function(query, callback) { + if (arguments.length < 2) { + callback = query; + query = {}; + } + query = query || {}; + query.query = query.query || + 'pubsub.googleapis.com/topic in (' + this.name + ')'; + this.pubsub.getSubscriptions(query, callback); +}; + +/** + * Overwrite makeReq method to be sure the Topic exists + * before making any requests. + */ +Topic.prototype.makeReq = function(method, path, q, body, callback) { + var that = this; + if (!this.exists_) { + // Queue the request until we know the Topic exists. + this.on('error', callback); + this.on('ready', makeReq); + return; + } + function makeReq() { + that.pubsub.makeReq(method, path, q, body, callback); + } + makeReq(); +}; + +module.exports = Topic; diff --git a/regression/pubsub.js b/regression/pubsub.js index 736fcef2d31..f93e33bd11c 100644 --- a/regression/pubsub.js +++ b/regression/pubsub.js @@ -24,142 +24,154 @@ var async = require('async'); var env = require('./env.js'); var gcloud = require('../lib'); -var topicNames = ['topic1', 'topic2', 'topic3']; -var subscriptions = [ - { - name: 'sub1', - ackDeadlineSeconds: 30 - }, - { - name: 'sub2', - ackDeadlineSeconds: 60 - } -]; - -var conn = new gcloud.pubsub.Connection(env); - -before(function(done) { - // TODO: Handle pagination. - var createFn = function(name, callback) { - conn.createTopic(name, callback); - }; - conn.listTopics(function(err, topics) { - assert.ifError(err); - var fns = topics.map(function(t) { - return function(cb) { - t.del(cb); - }; - }); - async.parallel(fns, function(err) { +var Topic = require('../lib/pubsub/topic.js'); +var Subscription = require('../lib/pubsub/subscription.js'); + +describe('PubSub', function() { + var pubsub; + + before(function(done) { + pubsub = new gcloud.PubSub(env); + var topicNames = ['topic1', 'topic2', 'topic3']; + // TODO: Handle pagination. + pubsub.getTopics(function(err, topics) { assert.ifError(err); - async.map(topicNames, createFn, done); - }); - }); -}); -describe('Topic', function() { - it('should be listed', function(done) { - conn.listTopics(function(err, topics) { - assert(topics.length, 3); - done(err); + async.series( + // delete existing topics. + topics.map(function(topic) { + return topic.delete.bind(topic); + }), + function(err) { + assert.ifError(err); + // create new topics. + async.map(topicNames, function(topic, callback) { + pubsub.topic(topic).on('ready', callback); + }, done); + }); }); }); - it('should return a nextQuery if there are more results', function(done) { - conn.listTopics({ maxResults: 2 }, function(err, topics, next) { - assert(topics.length, 2); - assert(next.maxResults, 2); - assert(!!next.pageToken, true); - done(err); + describe('Topic', function() { + it('should return Topic instances for existing topics', function(done) { + pubsub.getTopics(function(err, topics) { + assert(topics.length, 3); + topics.forEach(function(topic) { + assert(topic instanceof Topic); + }); + done(err); + }); }); - }); - - it('should be created', function(done) { - conn.createTopic('topic-new', done); - }); - it('should be gettable', function(done) { - conn.getTopic('topic1', done); - }); + it('should return a nextQuery if there are more results', function(done) { + pubsub.getTopics({ maxResults: 2 }, function(err, topics, next) { + assert.ifError(err); + assert.equal(topics.length, 2); + assert.equal(next.maxResults, 2); + assert.equal(!!next.pageToken, true); + done(); + }); + }); - it('should publish a message', function(done) { - conn.getTopic('topic1', function(err, topic) { - topic.publish('message from me', done); + it('should be created', function(done) { + var topic = pubsub.topic('topic-new'); + topic.on('error', assert.ifError); + topic.on('ready', done); }); - }); - it('should be deleted', function(done) { - conn.getTopic('topic3', function(err, topic) { - topic.del(done); + it('should be gettable', function(done) { + var topic = pubsub.topic('topic1'); + topic.on('error', assert.ifError); + topic.on('ready', done); }); - }); -}); -describe('Subscription', function() { - before(function(done) { - var createFn = function(item, callback) { - conn.createSubscription({ - name: item.name, - topic: 'topic1', - ackDeadlineSeconds: item.ackDeadlineSeconds - }, callback); - }; - conn.listSubscriptions(function(err, subs) { - assert.ifError(err); - var fns = subs.map(function(sub) { - return function(cb) { - sub.del(cb); - }; - }); - async.series(fns, function(err) { + it('should publish a message', function(done) { + pubsub.topic('topic1').publish('message from me', function(err) { assert.ifError(err); - async.map(subscriptions, createFn, done); + done(); }); }); - }); - it('should be listed', function(done) { - conn.listSubscriptions(function(err, subs) { - assert.strictEqual(subs.length, 2); - done(err); + it('should be deleted', function(done) { + pubsub.topic('topic1').delete(function (err) { + assert.ifError(err); + done(); + }); }); }); - it('should be gettable', function(done) { - conn.getSubscription('sub1', function(err, sub) { - assert.ifError(err); - assert.strictEqual(sub.name, '/subscriptions/' + env.projectId + '/sub1'); - done(); + describe('Subscription', function() { + var subscriptionObjects = [ + { name: 'sub1', ackDeadlineSeconds: 30, autoPull: false }, + { name: 'sub2', ackDeadlineSeconds: 60, autoPull: false } + ]; + var topic; + + before(function(done) { + topic = pubsub.topic('topic1'); + topic.on('error', assert.ifError); + + pubsub.getSubscriptions(function(err, subscriptions) { + assert.ifError(err); + + async.series( + // delete existing subscriptions. + subscriptions.map(function(subscription) { + return subscription.delete.bind(subscription); + }), + function(err) { + assert.ifError(err); + // re-subscribe. + async.map(subscriptionObjects, function(subscriptionObj, callback) { + topic.subscribe(subscriptionObj).on('ready', callback); + }, done); + }); + }); }); - }); - it('should error while getting a non-existent subscription', function(done){ - conn.getSubscription('sub-nothing-is-here', function(err) { - assert.strictEqual(err.code, 404); - done(); + it('should return Subscription instances for existing subscriptions', + function(done) { + topic.getSubscriptions(function(err, subscriptions) { + assert.ifError(err); + assert.strictEqual(subscriptions.length, subscriptionObjects.length); + subscriptions.forEach(function(subscription) { + assert(subscription instanceof Subscription); + }); + done(); + }); }); - }); - it('should be created', function(done) { - conn.createSubscription({ - topic: 'topic1', - name: 'new-sub' - }, done); - }); + it('should be gettable', function(done) { + var sub = topic.subscribe(subscriptionObjects[0]); + sub.on('error', assert.ifError); + sub.on('ready', function() { + assert.equal(sub.name, '/subscriptions/' + env.projectId + '/sub1'); + sub.close(); + done(); + }); + }); - it('should be able to pull and ack', function(done) { - conn.getTopic('topic1', function(err, topic) { - assert.ifError(err); - topic.publish('hello', function(err) { - assert.ifError(err); + it('should create a subscription', function(done) { + var newSub = topic.subscribe('new-sub'); + newSub.on('error', assert.ifError); + newSub.on('ready', function() { + newSub.close(); + done(); }); }); - conn.getSubscription('sub1', function(err, sub) { - assert.ifError(err); + + it('should be able to pull and ack', function(done) { + topic.publish('hello'); + var sub = topic.subscribe(subscriptionObjects[0].name); + sub.on('error', assert.ifError); sub.on('message', function(msg) { - sub.ack(msg.ackId, done); + sub.ack(msg.ackId, function(err) { + assert.ifError(err); + sub.close(); + done(); + }); }); - sub.pull({}, function() {}); + sub.pull(); }); }); }); diff --git a/test/pubsub/index.js b/test/pubsub/index.js index 8b54dd8523c..3d43540c02e 100644 --- a/test/pubsub/index.js +++ b/test/pubsub/index.js @@ -19,81 +19,96 @@ 'use strict'; var assert = require('assert'); -var pubsub = require('../../lib/pubsub'); +var util = require('../../lib/common/util.js'); +var PubSub = require('../../lib/pubsub/index.js'); +var Topic = require('../../lib/pubsub/topic.js'); -describe('Subscription', function() { - it('should ack messages if autoAck is set', function(done) { - var sub = new pubsub.Subscription({}, 'sub1'); - sub.autoAck = true; - sub.conn.makeReq = function(method, path, qs, body, callback) { - if (path === 'subscriptions/pull') { - callback(null, { ackId: 'ackd-id' }); - return; - } - if (path === 'subscriptions/acknowledge') { - done(); - } - }; - sub.pull({}, function() {}); - }); +describe('PubSub', function() { + function getPubSub(config, makeReqOverride) { + if (typeof config === 'string' || !config) { + config = { projectId: config || 'test-project' }; + } + var pubsub = new PubSub(config); + pubsub.makeReq = makeReqOverride || util.noop; + return pubsub; + } - it('should be closed', function(done) { - var sub = new pubsub.Subscription({}, 'sub1'); - sub.close(); - assert.strictEqual(sub.closed, true); - done(); - }); + describe('topics', function() { + var topicNames = ['topic-name-1', 'topic-name-2', 'topic-name-3']; - it('should pull messages', function(done) { - var conn = new pubsub.Connection({ - projectId: 'test-project' + function createPubSubAndRegisterTopics(makeReqOverride) { + var pubsub = getPubSub('test-project', makeReqOverride); + topicNames.forEach(function(topicName) { + pubsub.topic(topicName); + }); + return pubsub; + } + + it('should accept a string', function() { + var topicName = 'topic-name'; + var topic = getPubSub().topic(topicName); + assert.equal(topic.name, '/topics/test-project/' + topicName); }); - conn.makeReq = function(method, path, qs, body, callback) { - switch (path) { - case 'subscriptions//subscriptions/test-project/sub1': - callback(null, {}); - return; - case 'subscriptions/pull': - callback(null, { ackId: 123 }); - return; - } - }; - var sub = conn.subscribe('sub1', { autoAck: false }); - var doneCalled = false; - sub.on('message', function() { - if (!doneCalled) { - done(); - } - doneCalled = true; + + it('should accept an object', function() { + var topicName = 'topic-name'; + var topic = getPubSub().topic({ name: topicName }); + assert.equal(topic.name, '/topics/test-project/' + topicName); }); - }); - it('should pull and ack messages', function(done) { - var conn = new pubsub.Connection({ - projectId: 'test-project' + it('should create a new Topic object', function() { + assert(getPubSub().topic('topic-name') instanceof Topic); }); - conn.makeReq = function(method, path, qs, body, callback) { - switch (path) { - case 'subscriptions//subscriptions/test-project/sub1': - callback(null, {}); - return; - case 'subscriptions/pull': - setTimeout(function() { - callback(null, { ackId: 123 }); - }, 500); - return; - case 'subscriptions/acknowledge': - callback(null, true); - return; + + it('should return an individual topic', function() { + var topicName = '/topics/test-project/topic-name-1'; + function makeReqOverride(method, path, qs, body, callback) { + if (path === 'topics' || path === 'topics/' + topicName) { + return callback(); + } } - }; - var sub = conn.subscribe('sub1', { autoAck: true }); - var doneCalled = false; - sub.on('message', function() { - if (!doneCalled) { - done(); + var pubsub = createPubSubAndRegisterTopics(makeReqOverride); + pubsub.topic(topicNames[0], function(err, topic) { + assert.ifError(err); + assert(topic instanceof Topic); + assert.equal(topic.name, topicName); + }); + }); + + it('should return all topics', function() { + function makeReqOverride(method, path, qs, body, callback) { + if (path === 'topics') { + if (method === 'POST') { + return callback(); + } + if (method === 'GET') { + return callback(null, { + topic: topicNames.map(function(topic) { + return { name: topic }; + }) + }); + } + } } - doneCalled = true; + var pubsub = createPubSubAndRegisterTopics(makeReqOverride); + pubsub.getTopics(function(err, subscriptions) { + assert.ifError(err); + assert.equal(subscriptions.length, topicNames.length); + subscriptions.forEach(function(sub, index) { + // ignore the fullName formatting + assert.equal(sub.name.split('/')[3], topicNames[index]); + }); + }); }); }); + + it('should pass network requests to the connection object', function() { + var called = false; + var pubsub = new PubSub({ projectId: 'test-project' }); + pubsub.conn.req = function () { + called = true; + }; + pubsub.makeReq(); + assert.strictEqual(called, true); + }); }); diff --git a/test/pubsub/subscription.js b/test/pubsub/subscription.js new file mode 100644 index 00000000000..1647256fb3b --- /dev/null +++ b/test/pubsub/subscription.js @@ -0,0 +1,181 @@ +/** + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*global describe, it */ + +'use strict'; + +var assert = require('assert'); +var util = require('../../lib/common/util.js'); +var Subscription = require('../../lib/pubsub/subscription.js'); + +describe('PubSub/Subscription', function() { + function getSubscription(config, makeReqOverride) { + if (typeof config === 'string') { + config = { name: config }; + } + config.pubsub = { + id: 'test-project', + makeReq: makeReqOverride || util.noop + }; + config.topic = { + name: 'topic' + }; + return new Subscription(config); + } + + it('should ack messages if autoAck is set', function() { + var acked = false; + function makeReqOverride(method, path, qs, body, callback) { + if (path === 'subscriptions') { + callback(null); + return; + } + if (path === 'subscriptions/pull') { + callback(null, { ackId: 'ackd-id' }); + return; + } + if (path === 'subscriptions/acknowledge') { + acked = true; + } + } + var config = { + name: 'sub', + autoAck: true, + autoCreate: false + }; + var sub = getSubscription(config, makeReqOverride); + sub.pull({}, function() {}); + assert.strictEqual(acked, true); + }); + + it('should be closed', function() { + var sub = getSubscription('sub'); + sub.close(); + assert.strictEqual(sub.closed, true); + }); + + it('should pull messages', function(done) { + var message = { ackId: 123 }; + function makeReqOverride(method, path, qs, body, callback) { + if ('subscriptions/pull') { + callback(null, message); + return; + } + } + var config = { name: 'sub', autoAck: false, exists: true }; + var sub = getSubscription(config, makeReqOverride); + sub.once('message', function(messageObj) { + assert.equal(messageObj, message); + done(); + }); + }); + + it('should pull and ack messages', function(done) { + var message = { ackId: 123 }; + function makeReqOverride(method, path, qs, body, callback) { + if (path === 'subscriptions/pull') { + setTimeout(callback.bind(null, null, message), 1); + return; + } + if (path === 'subscriptions/acknowledge') { + callback(null, true); + return; + } + } + var config = { + name: 'sub', + autoAck: true, + autoCreate: false + }; + var sub = getSubscription(config, makeReqOverride); + sub.once('message', function() { + done(); + }); + }); + + it('should attempt to get by default', function() { + function makeReqOverride(method, path, qs, body, callback) { + if (method === 'GET' && + path === 'subscriptions//subscriptions/test-project/hi') { + callback(null); + } + } + var sub = getSubscription({ name: 'hi' }, makeReqOverride); + assert.strictEqual(sub.exists_, true); + }); + + it('should not attempt to create or get when specified not to', function() { + var attemptedToCreate = false; + var attemptedToGet = false; + function makeReqOverride(method, path, qs, body, callback) { + if (method === 'GET' && + path === 'subscriptions//subscriptions/test-project/hi') { + attemptedToGet = true; + callback(); + return; + } + if (method === 'POST' && path === 'subscriptions') { + attemptedToCreate = true; + callback(); + return; + } + } + var config = { + name: '/subscriptions/test-project/hi', + autoCreate: false + }; + var sub = getSubscription(config, makeReqOverride); + assert.strictEqual(attemptedToCreate, false); + assert.strictEqual(attemptedToGet, false); + assert.strictEqual(sub.exists_, true); + }); + + it('should delete and re-create when ackDeadlineSeconds do not match', + function() { + var attemptedToRecreate = false; + var attemptedToDelete = false; + var attemptedToGet = false; + function makeReqOverride(method, path, qs, body, callback) { + if (method === 'GET' && + path === 'subscriptions//subscriptions/test-project/hi') { + attemptedToGet = true; + callback(null, { ackDeadlineSeconds: 60 }); + return; + } + if (method === 'DELETE') { + attemptedToDelete = true; + callback(); + return; + } + if (method === 'POST' && path === 'subscriptions' && + body.ackDeadlineSeconds === 30) { + attemptedToRecreate = true; + callback(); + return; + } + } + var config = { + name: '/subscriptions/test-project/hi', + ackDeadlineSeconds: 30 + }; + var sub = getSubscription(config, makeReqOverride); + assert.strictEqual(attemptedToRecreate, true); + assert.strictEqual(attemptedToDelete, true); + assert.strictEqual(attemptedToGet, true); + assert.strictEqual(sub.exists_, true); + }); +}); diff --git a/test/pubsub/topic.js b/test/pubsub/topic.js new file mode 100644 index 00000000000..8241a174b26 --- /dev/null +++ b/test/pubsub/topic.js @@ -0,0 +1,262 @@ +/** + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*global describe, it */ + +'use strict'; + +var assert = require('assert'); +var util = require('../../lib/common/util.js'); +var Topic = require('../../lib/pubsub/topic.js'); +var Subscription = require('../../lib/pubsub/subscription.js'); + +describe('PubSub/Topic', function() { + function getTopic(config, makeReqOverride) { + if (typeof config === 'string') { + config = { name: config }; + } + config.pubsub = { + id: 'test-project', + makeReq: makeReqOverride || util.noop + }; + return new Topic(config); + } + + it('should throw error if the topic could not be created', function(done) { + function makeReqOverride(method, path, qs, body, callback) { + if (path === 'topics') { + setTimeout(callback.bind(null, error), 1); + } + } + var error = new Error('No way, Jose.'); + var topic = getTopic({ + name: 'topic-wont-work', + autoCreate: true + }, makeReqOverride); + topic.on('error', function(err) { + assert.equal(err, error); + done(); + }); + }); + + it('should attempt to get by default', function() { + function makeReqOverride(method, path, qs, body, callback) { + if (method === 'GET' && path === 'topics//topics/test-project/hi') { + callback(null); + } + } + var topic = getTopic({ name: 'hi' }, makeReqOverride); + assert.strictEqual(topic.exists_, true); + }); + + it('should attempt to create with `autoCreate: true`', function() { + var attemptedToCreate = false; + var attemptedToGet = false; + function makeReqOverride(method, path, qs, body, callback) { + if (method === 'GET' && path === 'topics//topics/test-project/hi') { + attemptedToGet = true; + callback(); + return; + } + if (method === 'POST' && path === 'topics') { + attemptedToCreate = true; + callback(); + return; + } + } + var config = { + name: '/topics/test-project/hi', + autoCreate: true + }; + var topic = getTopic(config, makeReqOverride); + assert.strictEqual(attemptedToCreate, true); + assert.strictEqual(attemptedToGet, false); + assert.strictEqual(topic.exists_, true); + }); + + it('should not attempt to create or get when specified not to', function() { + var attemptedToCreate = false; + var attemptedToGet = false; + function makeReqOverride(method, path, qs, body, callback) { + if (method === 'GET' && path === 'topics//topics/test-project/hi') { + attemptedToGet = true; + callback(); + return; + } + if (method === 'POST' && path === 'topics') { + attemptedToCreate = true; + callback(); + return; + } + } + var config = { + name: '/topics/test-project/hi', + autoCreate: false + }; + var topic = getTopic(config, makeReqOverride); + assert.strictEqual(attemptedToCreate, false); + assert.strictEqual(attemptedToGet, false); + assert.strictEqual(topic.exists_, true); + }); + + describe('publishing', function() { + var message = 'howdy'; + var messageBuffer = new Buffer(message).toString('base64'); + var messageRaw = { + topic: 'lazy-confirm', + message: { + data: messageBuffer + } + }; + + it('should convert simple messages into raw messages', function() { + function makeReqOverride(method, path, qs, body, callback) { + if (path === 'topics') { + // confirm existence. + callback(null); + return; + } + if (path === 'topics/publish') { + assert.equal(body.message.data, messageBuffer); + } + } + var topic = getTopic('lazy-confirm', makeReqOverride); + topic.on('error', assert.ifError); + topic.publish(message, assert.ifError); + }); + + it('should accept raw messages', function() { + function makeReqOverride(method, path, qs, body, callback) { + if (path === 'topics') { + // confirm existence. + callback(null); + return; + } + if (path === 'topics/publish') { + assert.deepEqual(body, messageRaw); + } + } + getTopic('lazy-confirm', makeReqOverride).publishRaw(messageRaw); + }); + + it('should publish a message once existence is confirmed', function() { + function makeReqOverride(method, path, qs, body, callback) { + if (method === 'GET' && path === 'topics//topics/test-project/hi') { + // confirm existence. + timesCheckedForExistence++; + callback(null); + return; + } + if (path === 'topics/publish') { + timesAttemptedToPublish++; + return; + } + } + var timesCheckedForExistence = 0; + var timesAttemptedToPublish = 0; + var topic = getTopic('hi', makeReqOverride); + topic.on('error', assert.ifError); + + topic.publish('Hi', assert.ifError); + assert.equal(timesAttemptedToPublish, 1); + topic.publishRaw(messageRaw, assert.ifError); + assert.equal(timesAttemptedToPublish, 2); + + assert.equal(timesCheckedForExistence, 1); + }); + + it('should not try to publish a message if Topic does not exist', + function(done) { + function makeReqOverride(method, path, qs, body, callback) { + if (method === 'GET' && path === 'topics//topics/test-project/hi') { + // confirm non-existence. + timesCheckedForExistence++; + setTimeout(callback.bind(null, { code: 404 }), 1); + return; + } + if (method === 'POST' && path === 'topics') { + // deny creation of the topic. + timesAttemptedToCreate++; + callback(error); + return; + } + if (path === 'topics/publish') { + timesAttemptedToPublish++; + return; + } + } + var timesAttemptedToCreate = 0; + var timesAttemptedToPublish = 0; + var timesCheckedForExistence = 0; + var error = new Error('No way, Jose.'); + var topic = getTopic('hi', makeReqOverride); + topic.on('error', function() {/*ignore failed creation error.*/}); + topic.publish(message, function() { + assert.equal(timesAttemptedToPublish, 0); + assert.equal(timesCheckedForExistence, 1); + assert.equal(timesAttemptedToCreate, 1); + }); + topic.publishRaw(messageRaw, function() { + assert.equal(timesAttemptedToPublish, 0); + assert.equal(timesCheckedForExistence, 1); + assert.equal(timesAttemptedToCreate, 1); + done(); + }); + }); + }); + + describe('subscribing', function() { + var subNames = ['sub-name-1', 'sub-name-2', 'sub-name-3']; + + function createTopicAndRegisterSubscriptions(makeReqOverride) { + var topic = getTopic('topic', makeReqOverride); + subNames.forEach(function(subName) { + topic.subscribe(subName); + }); + return topic; + } + + it('should create a new Subscription object', function() { + assert(getTopic('topic').subscribe('sub-name') instanceof Subscription); + }); + + it('should return an individual subscription', function() { + var subName = '/subscriptions/test-project/sub-name-1'; + function makeReqOverride(method, path, qs, body, callback) { + if (path === 'topics' || + path === 'subscriptions' || + path === 'subscriptions/' + subName) { + return callback(); + } + } + var topic = createTopicAndRegisterSubscriptions(makeReqOverride); + topic.subscribe(subNames[0], function(err, sub) { + assert.ifError(err); + assert(sub instanceof Subscription); + assert.equal(sub.name, subName); + }); + }); + + it('should return all subscriptions to the topic', function() { + var topic = createTopicAndRegisterSubscriptions(); + topic.pubsub.getSubscriptions = function(query) { + assert.equal(query.query, + 'pubsub.googleapis.com/topic in (/topics/test-project/topic)'); + }; + topic.getSubscriptions(); + }); + }); +});