diff --git a/lib/common/util.js b/lib/common/util.js index f12ca02f8bf..2bb256a5fab 100644 --- a/lib/common/util.js +++ b/lib/common/util.js @@ -147,6 +147,7 @@ function ApiError(errorBody) { this.errors = errorBody.errors; this.code = errorBody.code; this.message = errorBody.message; + this.response = errorBody.response; } util.inherits(ApiError, Error); @@ -180,7 +181,8 @@ function handleResp(err, resp, body, callback) { callback(new ApiError({ errors: [], code: resp.statusCode, - message: body || 'Error during request.' + message: body || 'Error during request.', + response: resp })); return; } diff --git a/lib/storage/bucket.js b/lib/storage/bucket.js index 35ca34d161b..2bfd1b0907b 100644 --- a/lib/storage/bucket.js +++ b/lib/storage/bucket.js @@ -49,6 +49,15 @@ var util = require('../common/util.js'); */ var STORAGE_BASE_URL = 'https://www.googleapis.com/storage/v1/b'; +/** + * The size of a file (in bytes) must be greater than this number to + * automatically trigger a resumable upload. + * + * @const {number} + * @private + */ +var RESUMABLE_THRESHOLD = 5000000; + /** * Create a Bucket object to interact with a Google Cloud Storage bucket. * @@ -311,14 +320,27 @@ Bucket.prototype.setMetadata = function(metadata, callback) { * * @param {string} localPath - The fully qualified path to the file you wish to * upload to your bucket. - * @param {string=|module:storage/file=} destination - The place to save your - * file. If given a string, the file will be uploaded to the bucket using - * the string as a filename. When given a File object, your local file will - * be uploaded to the File object's bucket and under the File object's name. - * Lastly, when this argument is omitted, the file is uploaded to your + * @param {object=} options - Configuration options. + * @param {string|module:storage/file} options.destination - The place to save + * your file. If given a string, the file will be uploaded to the bucket + * using the string as a filename. When given a File object, your local file + * will be uploaded to the File object's bucket and under the File object's + * name. Lastly, when this argument is omitted, the file is uploaded to your * bucket using the name of the local file. - * @param {object=} metadata - Metadata to set for your file. + * @param {object=} options.metadata - Metadata to set for your file. + * @param {boolean=} options.resumable - Force a resumable upload. (default: + * true for files larger than 5MB). Read more about resumable uploads + * [here](http://goo.gl/1JWqCF). NOTE: This behavior is only possible with + * this method, and not {module:storage/file#createWriteStream}. When + * working with streams, the file format and size is unknown until it's + * completely consumed. Because of this, it's best for you to be explicit + * for what makes sense given your input. * @param {function} callback - The callback function. + * @param {string|boolean} options.validation - Possible values: `"md5"`, + * `"crc32c"`, or `false`. By default, data integrity is validated with an + * MD5 checksum for maximum reliability. CRC32c will provide better + * performance with less reliability. You may also choose to skip validation + * completely, however this is **not recommended**. * * @example * //- @@ -334,8 +356,19 @@ Bucket.prototype.setMetadata = function(metadata, callback) { * //- * // It's not always that easy. You will likely want to specify the filename * // used when your new file lands in your bucket. + * // + * // You may also want to set metadata or customize other options. * //- - * bucket.upload('/local/path/image.png', 'new-image.png', function(err, file) { + * var options = { + * destination: 'new-image.png', + * resumable: true, + * validation: 'crc32c', + * metadata: { + * event: 'Fall trip to the zoo' + * } + * }; + * + * bucket.upload('/local/path/image.png', options, function(err, file) { * // Your bucket now contains: * // - "new-image.png" (with the contents of `/local/path/image.png') * @@ -346,8 +379,12 @@ Bucket.prototype.setMetadata = function(metadata, callback) { * // You may also re-use a File object, {module:storage/file}, that references * // the file you wish to create or overwrite. * //- - * var file = bucket.file('existing-file.png'); - * bucket.upload('/local/path/image.png', file, function(err, newFile) { + * var options = { + * destination: bucket.file('existing-file.png'), + * resumable: false + * }; + * + * bucket.upload('/local/path/image.png', options, function(err, newFile) { * // Your bucket now contains: * // - "existing-file.png" (with the contents of `/local/path/image.png') * @@ -355,36 +392,24 @@ Bucket.prototype.setMetadata = function(metadata, callback) { * // The `newFile` parameter is equal to `file`. * }); */ -Bucket.prototype.upload = function(localPath, destination, metadata, callback) { - var name; - var newFile; - switch (arguments.length) { - case 4: - break; - case 3: - callback = metadata; - if (util.is(destination, 'object')) { - metadata = destination; - } else { - metadata = {}; - } - /* falls through */ - default: - callback = callback || destination; - name = path.basename(localPath); - break; +Bucket.prototype.upload = function(localPath, options, callback) { + if (util.is(options, 'function')) { + callback = options; + options = {}; } - metadata = metadata || {}; - callback = callback || util.noop; - if (util.is(destination, 'string')) { - name = destination; - } - if (destination instanceof File) { - name = destination.name; - newFile = destination; + + var newFile; + if (options.destination instanceof File) { + newFile = options.destination; + } else if (util.is(options.destination, 'string')) { + // Use the string as the name of the file. + newFile = this.file(options.destination); + } else { + // Resort to using the name of the incoming file. + newFile = this.file(path.basename(localPath)); } - newFile = newFile || this.file(name); + var metadata = options.metadata || {}; var contentType = mime.lookup(localPath); if (contentType && !metadata.contentType) { metadata.contentType = contentType; @@ -395,12 +420,36 @@ Bucket.prototype.upload = function(localPath, destination, metadata, callback) { metadata.contentType += '; charset=' + charset; } - fs.createReadStream(localPath) - .pipe(newFile.createWriteStream(metadata)) - .on('error', callback) - .on('complete', function() { - callback(null, newFile); + var resumable; + if (util.is(options.resumable, 'boolean')) { + resumable = options.resumable; + upload(); + } else { + // Determine if the upload should be resumable if it's over the threshold. + fs.stat(localPath, function(err, fd) { + if (err) { + callback(err); + return; + } + + resumable = fd.size > RESUMABLE_THRESHOLD; + + upload(); }); + } + + function upload() { + fs.createReadStream(localPath) + .pipe(newFile.createWriteStream({ + validation: options.validation, + resumable: resumable, + metadata: metadata + })) + .on('error', callback) + .on('complete', function() { + callback(null, newFile); + }); + } }; /** diff --git a/lib/storage/file.js b/lib/storage/file.js index e6da583b7d8..78004c31916 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -20,10 +20,14 @@ 'use strict'; +var bufferEqual = require('buffer-equal'); +var ConfigStore = require('configstore'); +var crc = require('fast-crc32c'); var crypto = require('crypto'); var duplexify = require('duplexify'); var request = require('request'); var streamEvents = require('stream-events'); +var through = require('through2'); /** * @type module:storage/acl @@ -202,7 +206,6 @@ File.prototype.copy = function(destination, callback) { }); }; - /** * Create a readable stream to read the contents of the remote file. It can be * piped to a writable stream or listened to for 'data' events to read a file's @@ -260,7 +263,18 @@ File.prototype.createReadStream = function() { * * A File object can also be used to create files for the first time. * - * @param {object=} metadata - Set the metadata for this file. + * @param {object=} options - Configuration object. + * @param {object=} options.metadata - Set the metadata for this file. + * @param {boolean=} options.resumable - Force a resumable upload. NOTE: When + * working with streams, the file format and size is unknown until it's + * completely consumed. Because of this, it's best for you to be explicit + * for what makes sense given your input. Read more about resumable uploads + * [here](http://goo.gl/1JWqCF). + * @param {string|boolean} options.validation - Possible values: `"md5"`, + * `"crc32c"`, or `false`. By default, data integrity is validated with an + * MD5 checksum for maximum reliability. CRC32c will provide better + * performance with less reliability. You may also choose to skip validation + * completely, however this is **not recommended**. * * @example * //- @@ -289,35 +303,142 @@ File.prototype.createReadStream = function() { * var image = myBucket.file('image.png'); * * fs.createReadStream('/Users/stephen/Photos/birthday-at-the-zoo/panda.jpg') - * .pipe(image.createWriteStream({ contentType: 'image/jpeg' })) + * .pipe(image.createWriteStream({ + * metadata: contentType: 'image/jpeg' + * })) * .on('error', function(err) {}); */ -File.prototype.createWriteStream = function(metadata) { +File.prototype.createWriteStream = function(options) { + options = options || {}; + var that = this; + var metadata = options.metadata || {}; + var validations = ['crc32c', 'md5']; + var validation; + + if (util.is(options.validation, 'string')) { + options.validation = options.validation.toLowerCase(); + + if (validations.indexOf(options.validation) > -1) { + validation = options.validation; + } else { + validation = 'all'; + } + } + + if (util.is(options.validation, 'undefined')) { + validation = 'all'; + } + + var crc32c = validation === 'crc32c' || validation === 'all'; + var md5 = validation === 'md5' || validation === 'all'; + + // Collect data as it comes in to store in a hash. This is compared to the + // checksum value on the returned metadata from the API. + var localCrc32cHash; + var localMd5Hash = crypto.createHash('md5'); + var dup = streamEvents(duplexify()); - dup.once('writing', function() { - util.makeWritableStream(dup, { - makeAuthorizedRequest: that.bucket.storage.makeAuthorizedRequest_, - metadata: metadata, - request: { - qs: { - name: that.name - }, - uri: util.format('{base}/{bucket}/o', { - base: STORAGE_UPLOAD_BASE_URL, - bucket: that.bucket.name - }) + var throughStream = through(function(chunk, enc, next) { + if (crc32c) { + localCrc32cHash = crc.calculate(chunk, localCrc32cHash); + } + + if (md5) { + localMd5Hash.update(chunk); + } + + this.push(chunk); + next(); + }); + + throughStream + .on('end', function() { + if (crc32c) { + localCrc32cHash = new Buffer([localCrc32cHash]).toString('base64'); } - }, function(data) { - that.metadata = data; - dup.emit('complete', data); - dup.end(); + if (md5) { + localMd5Hash = localMd5Hash.digest('base64'); + } + }) + + .pipe(dup) + + // Wait until we've received data to determine what upload technique to use. + .once('writing', function() { + if (util.is(options.resumable, 'boolean') && !options.resumable) { + that.startSimpleUpload_(dup, metadata); + } else { + that.startResumableUpload_(dup, metadata); + } + }) + + // Catch any errors from the writable stream and patch them upstream. + .on('error', function(err) { + throughStream.emit('error', err); + throughStream.end(); + }) + + // Compare our hashed version vs the completed upload's version. + .on('complete', function(metadata) { + var failed = false; + + // We must remove the first four bytes from the returned checksum. + // http://stackoverflow.com/questions/25096737/ + // base64-encoding-of-crc32c-long-value + + if (validation === 'all') { + if (metadata.md5Hash) { + failed = localMd5Hash !== metadata.md5Hash; + } else if (metadata.crc32c) { + failed = localCrc32cHash !== metadata.crc32c.substr(4); + } + } else if (md5) { + failed = localMd5Hash !== metadata.md5Hash; + } else if (crc32c) { + failed = localCrc32cHash !== metadata.crc32c.substr(4); + } + + if (failed) { + that.delete(function(err) { + var code; + var message; + + if (err) { + code = 'FILE_NO_UPLOAD_DELETE'; + message = [ + 'The uploaded data did not match the data from the server. As a', + 'precaution, we attempted to delete the file, but it was not', + 'successful. To be sure the content is the same, you should try', + 'removing the file manually, then uploading the file again.', + '\n\nThe delete attempt failed with this message:', + '\n\n ' + err.message + ].join(' '); + } else { + code = 'FILE_NO_UPLOAD'; + message = [ + 'The uploaded data did not match the data from the server. As a', + 'precaution, the file has been deleted. To be sure the content', + 'is the same, you should try uploading the file again.' + ].join(' '); + } + + var error = new Error(message); + error.code = code; + error.errors = [err]; + + throughStream.emit('error', error); + }); + } else { + throughStream.emit('complete', metadata); + } + + throughStream.end(); }); - }); - return dup; + return throughStream; }; /** @@ -456,4 +577,297 @@ File.prototype.setMetadata = function(metadata, callback) { }.bind(this)); }; +/** + * `startResumableUpload_` uses the Resumable Upload API: http://goo.gl/jb0e9D. + * + * The process involves these steps: + * + * 1. POST the file's metadata. We get a resumable upload URI back, then cache + * it with ConfigStore. + * 2. PUT data to that URI with a Content-Range header noting what position + * the data is beginning from. We also cache, at most, the first 16 bytes + * of the data being uploaded. + * 3. Delete the ConfigStore cache after the upload completes. + * + * If the initial upload operation is interrupted, the next time the user + * uploads the file, these steps occur: + * + * 1. Detect the presence of a cached URI in ConfigStore. + * 2. Make an empty PUT request to that URI to get the last byte written to + * the remote file. + * 3. PUT data to the URI starting from the first byte after the last byte + * returned from the call above. + * + * If the user tries to upload entirely different data to the remote file: + * + * 1. -- same as above -- + * 2. -- same as above -- + * 3. -- same as above -- + * 4. Compare the first chunk of the new data with the chunk in cache. If it's + * different, start a new resumable upload (Step 1 of the first example). + * + * @param {Duplexify} stream - Duplexify stream of data to pipe to the file. + * @param {object=} metadata - Optional metadata to set on the file. + * + * @private + */ +File.prototype.startResumableUpload_ = function(stream, metadata) { + metadata = metadata || {}; + + var that = this; + var configStore = new ConfigStore('gcloud-node'); + var config = configStore.get(that.name); + var makeAuthorizedRequest = that.bucket.storage.makeAuthorizedRequest_; + + var numBytesWritten; + var resumableUri; + var RETRY_LIMIT = 5; + var retries = 0; + + // This is used to hold all data coming in from the user's readable stream. If + // we need to abort a resumable upload to start a new one, this will hold the + // data until we're ready again. + var bufferStream = through(); + + if (config && config.uri) { + resumableUri = config.uri; + resumeUpload(); + } else { + startUpload(); + } + + // Begin a new resumable upload. Send the metadata and cache the URI returned. + function startUpload() { + var headers = {}; + + if (metadata.contentType) { + headers['X-Upload-Content-Type'] = metadata.contentType; + } + + makeAuthorizedRequest({ + method: 'POST', + uri: util.format('{base}/{bucket}/o', { + base: STORAGE_UPLOAD_BASE_URL, + bucket: that.bucket.name + }), + qs: { + name: that.name, + uploadType: 'resumable' + }, + headers: headers, + json: metadata + }, function(err, res, body) { + if (err) { + handleError(err); + return; + } + + numBytesWritten = -1; + resumableUri = body.headers.location; + + configStore.set(that.name, { + uri: resumableUri + }); + + resumeUpload(); + }); + } + + // Given a byte offset, create an authorized request to the resumable URI. If + // resuming an upload, we first determine the last byte written, then create + // the authorized request. + function resumeUpload() { + if (util.is(numBytesWritten, 'number')) { + createUploadRequest(numBytesWritten); + } else { + getNumBytesWritten(createUploadRequest); + } + + function createUploadRequest(offset) { + makeAuthorizedRequest({ + method: 'PUT', + uri: resumableUri + }, { + onAuthorized: function (err, reqOpts) { + if (err) { + handleError(err); + return; + } + + sendFile(reqOpts, offset + 1); + } + }); + } + } + + // Given an authorized request and a byte offset, begin sending data to the + // resumable URI from where the upload last left off. + function sendFile(reqOpts, offset) { + reqOpts.headers['Content-Range'] = 'bytes ' + offset + '-*/*'; + + var bytesWritten = 0; + + var offsetStream = through(function(chunk, enc, next) { + // Determine if this is the same content uploaded previously. We do this + // by caching a slice of the first chunk, then comparing it with the first + // byte of incoming data. + if (bytesWritten === 0) { + var cachedFirstChunk = configStore.get(that.name).firstChunk; + var firstChunk = chunk.slice(0, 16); + + if (!cachedFirstChunk) { + // This is a new upload. Cache the first chunk. + configStore.set(that.name, { + uri: reqOpts.uri, + firstChunk: firstChunk + }); + } else { + // This is a continuation of an upload. Make sure the first bytes are + // the same. + cachedFirstChunk = new Buffer(cachedFirstChunk); + firstChunk = new Buffer(firstChunk); + + if (!bufferEqual(cachedFirstChunk, firstChunk)) { + // The data being uploaded now is different than the original data. + // Give the chunk back to the stream and create a new upload stream. + bufferStream.unshift(chunk); + bufferStream.unpipe(this); + + configStore.del(that.name); + + startUpload(); + return; + } + } + } + + var length = chunk.length; + + if (util.is(chunk, 'string')) { + length = Buffer.byteLength(chunk, enc); + } + + if (bytesWritten < offset) { + chunk = chunk.slice(offset - bytesWritten); + } + + bytesWritten += length; + + // Only push data to the request stream from the byte after the one we + // left off on. + if (bytesWritten > offset) { + this.push(chunk); + } + + next(); + }); + + var writeStream = request(reqOpts); + writeStream.callback = util.noop; + + writeStream.on('complete', function(res) { + util.handleResp(null, res, res.body, function(err, data) { + if (err) { + handleError(err); + return; + } + + that.metadata = data; + + stream.emit('complete', that.metadata); + + configStore.del(that.name); + }); + }); + + bufferStream.pipe(offsetStream).pipe(writeStream); + stream.setWritable(bufferStream); + } + + // If an upload to this file has previously started, this will return the last + // byte written to it. + function getNumBytesWritten(callback) { + makeAuthorizedRequest({ + method: 'PUT', + uri: resumableUri, + headers: { + 'Content-Length': 0, + 'Content-Range': 'bytes */*' + } + }, function(err) { + var RESUME_INCOMPLETE_STATUS = 308; + + if (err && err.code === RESUME_INCOMPLETE_STATUS) { + // headers.range format: ##-## (e.g. 0-4915200) + if (err.response.headers.range) { + callback(parseInt(err.response.headers.range.split('-')[1])); + return; + } + } + + // Start from the first byte. + callback(-1); + }); + } + + // Handle an error from API calls following the recommended best practices: + // http://goo.gl/AajKku + function handleError(err) { + if (err.code === 404) { + startUpload(); + return; + } + + if (err.code > 499 && err.code < 600 && retries < RETRY_LIMIT) { + // Exponential backoff: http://goo.gl/CifIFy + var randomMs = Math.round(Math.random() * 1000); + var waitTime = Math.pow(2, retries) * 1000 + randomMs; + + retries++; + + // Reset `numBytesWritten` so we update this value by pinging the API. + numBytesWritten = null; + + setTimeout(resumeUpload, waitTime); + return; + } + + stream.emit('error', err); + stream.end(); + } +}; + +/** + * Takes a readable stream and pipes it to a remote file. Unlike + * `startResumableUpload_`, which uses the resumable upload technique, this + * method uses a simple upload (all or nothing). + * + * @param {Duplexify} stream - Duplexify stream of data to pipe to the file. + * @param {object=} metadata - Optional metadata to set on the file. + * + * @private + */ +File.prototype.startSimpleUpload_ = function(stream, metadata) { + var that = this; + + util.makeWritableStream(stream, { + makeAuthorizedRequest: that.bucket.storage.makeAuthorizedRequest_, + metadata: metadata, + request: { + qs: { + name: that.name + }, + uri: util.format('{base}/{bucket}/o', { + base: STORAGE_UPLOAD_BASE_URL, + bucket: that.bucket.name + }) + } + }, function(data) { + that.metadata = data; + + stream.emit('complete', data); + stream.end(); + }); +}; + module.exports = File; diff --git a/package.json b/package.json index b1768d92b31..84d25958e46 100644 --- a/package.json +++ b/package.json @@ -44,8 +44,11 @@ "google storage" ], "dependencies": { + "buffer-equal": "0.0.1", + "configstore": "^0.3.1", "duplexify": "^3.1.2", "extend": "^1.3.0", + "fast-crc32c": "^0.1.3", "google-service-account": "^1.0.3", "mime": "^1.2.11", "node-uuid": "^1.4.1", @@ -61,7 +64,7 @@ "dox": "^0.4.6", "istanbul": "^0.3.0", "jshint": "^2.5.2", - "mocha": "^1.21.3", + "mocha": "^2.0.1", "sandboxed-module": "^1.0.1", "tmp": "0.0.24" }, @@ -69,9 +72,9 @@ "docs": "./scripts/docs.sh", "lint": "jshint lib/ regression/ test/", "test": "mocha --recursive", - "regression-test": "mocha regression/* --timeout 15000", - "cover": "istanbul cover -x 'regression/*' _mocha -- --timeout 15000 test/* regression/*", - "coveralls": "istanbul cover -x 'regression/*' _mocha --report lcovonly -- --timeout 15000 test/* regression/* -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage" + "regression-test": "mocha regression/* --timeout 20000", + "cover": "istanbul cover -x 'regression/*' _mocha -- --timeout 20000 test/* regression/*", + "coveralls": "istanbul cover -x 'regression/*' _mocha --report lcovonly -- --timeout 20000 test/* regression/* -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage" }, "license": "Apache 2" } diff --git a/regression/data/five-mb-file.zip b/regression/data/five-mb-file.zip deleted file mode 100644 index 38da09a4f79..00000000000 Binary files a/regression/data/five-mb-file.zip and /dev/null differ diff --git a/regression/data/three-mb-file.tif b/regression/data/three-mb-file.tif new file mode 100644 index 00000000000..5aaa8a1745b Binary files /dev/null and b/regression/data/three-mb-file.tif differ diff --git a/regression/storage.js b/regression/storage.js index 4b2c6f60fa1..c132ba5c3e1 100644 --- a/regression/storage.js +++ b/regression/storage.js @@ -23,6 +23,7 @@ var async = require('async'); var crypto = require('crypto'); var fs = require('fs'); var request = require('request'); +var through = require('through2'); var tmp = require('tmp'); var uuid = require('node-uuid'); @@ -36,7 +37,7 @@ var files = { path: 'regression/data/CloudPlatform_128px_Retina.png' }, big: { - path: 'regression/data/five-mb-file.zip' + path: 'regression/data/three-mb-file.tif' } }; @@ -315,7 +316,7 @@ describe('storage', function() { var file = bucket.file('directory/file'); var contents = 'test'; - var writeStream = file.createWriteStream(); + var writeStream = file.createWriteStream({ resumable: false }); writeStream.write(contents); writeStream.end(); @@ -331,10 +332,10 @@ describe('storage', function() { }); describe('stream write', function() { - it('should stream write, then remove large file (5mb)', function(done) { + it('should stream write, then remove file (3mb)', function(done) { var file = bucket.file('LargeFile'); fs.createReadStream(files.big.path) - .pipe(file.createWriteStream()) + .pipe(file.createWriteStream({ resumable: false })) .on('error', done) .on('complete', function(fileObject) { assert.equal(fileObject.md5Hash, files.big.hash); @@ -343,26 +344,77 @@ describe('storage', function() { }); it('should write metadata', function(done) { - var myMetadata = { contentType: 'image/png' }; - bucket.upload(files.logo.path, myMetadata, function(err, file) { + var options = { + metadata: { contentType: 'image/png' }, + resumable: false + }; + + bucket.upload(files.logo.path, options, function(err, file) { assert.ifError(err); + file.getMetadata(function(err, metadata) { assert.ifError(err); - assert.equal(metadata.contentType, myMetadata.contentType); + assert.equal(metadata.contentType, options.metadata.contentType); file.delete(done); }); }); }); + it('should resume an upload after an interruption', function(done) { + fs.stat(files.big.path, function(err, metadata) { + assert.ifError(err); + + var fileSize = metadata.size; + var file = bucket.file('LargeFile'); + + upload({ interrupt: true }, function(err) { + assert.ifError(err); + + upload({ interrupt: false }, function(err, metadata) { + assert.ifError(err); + + assert.equal(metadata.size, fileSize); + file.delete(done); + }); + }); + + function upload(opts, callback) { + var sizeStreamed = 0; + + fs.createReadStream(files.big.path) + .pipe(through(function(chunk, enc, next) { + sizeStreamed += chunk.length; + + if (opts.interrupt && sizeStreamed >= fileSize / 2) { + // stop sending data half way through. + next(); + } else { + this.push(chunk); + next(); + } + })) + .pipe(file.createWriteStream()) + .on('error', callback) + .on('complete', function(fileObject) { + callback(null, fileObject); + }); + } + }); + }); + it('should write/read/remove from a buffer', function(done) { tmp.setGracefulCleanup(); tmp.file(function _tempFileCreated(err, tmpFilePath) { + assert.ifError(err); + var file = bucket.file('MyBuffer'); var fileContent = 'Hello World'; - assert.ifError(err); + var writable = file.createWriteStream(); + writable.write(fileContent); writable.end(); + writable.on('complete', function() { file.createReadStream() .pipe(fs.createWriteStream(tmpFilePath)) @@ -370,6 +422,7 @@ describe('storage', function() { .on('finish', function() { file.delete(function(err) { assert.ifError(err); + fs.readFile(tmpFilePath, function(err, data) { assert.equal(data, fileContent); done(); @@ -400,10 +453,8 @@ describe('storage', function() { before(function(done) { deleteFiles(bucket, function(err) { - if (err) { - done(err); - return; - } + assert.ifError(err); + var file = bucket.file(filenames[0]); fs.createReadStream(files.logo.path) .pipe(file.createWriteStream()) diff --git a/test/storage/bucket.js b/test/storage/bucket.js index 13e0c243819..0ec83e7ced6 100644 --- a/test/storage/bucket.js +++ b/test/storage/bucket.js @@ -27,8 +27,8 @@ function FakeFile(bucket, name, metadata) { this.bucket = bucket; this.name = name; this.metadata = metadata; - this.createWriteStream = function(metadata) { - this.metadata = metadata; + this.createWriteStream = function(options) { + this.metadata = options.metadata; var dup = duplexify(); dup._write = function() { dup.emit('complete'); @@ -278,112 +278,137 @@ describe('Bucket', function() { }; }); - describe('variable arity', function() { - it('should accept a path & cb', function(done) { - bucket.upload(filepath, function(err, file) { - assert.ifError(err); - assert.equal(file.bucket.name, bucket.name); - assert.equal(file.name, basename); - done(); - }); + it('should accept a path & cb', function(done) { + bucket.upload(filepath, function(err, file) { + assert.ifError(err); + assert.equal(file.bucket.name, bucket.name); + assert.equal(file.name, basename); + done(); }); + }); - it('should accept a path, metadata, & cb', function(done) { - bucket.upload(filepath, metadata, function(err, file) { - assert.ifError(err); - assert.equal(file.bucket.name, bucket.name); - assert.deepEqual(file.metadata, metadata); - done(); - }); + it('should accept a path, metadata, & cb', function(done) { + var options = { metadata: metadata }; + bucket.upload(filepath, options, function(err, file) { + assert.ifError(err); + assert.equal(file.bucket.name, bucket.name); + assert.deepEqual(file.metadata, metadata); + done(); }); + }); - it('should accept a path, a string dest, & cb', function(done) { - var newFileName = 'new-file-name.png'; - bucket.upload(filepath, newFileName, function(err, file) { - assert.ifError(err); - assert.equal(file.bucket.name, bucket.name); - assert.equal(file.name, newFileName); - done(); - }); + it('should accept a path, a string dest, & cb', function(done) { + var newFileName = 'new-file-name.png'; + var options = { destination: newFileName }; + bucket.upload(filepath, options, function(err, file) { + assert.ifError(err); + assert.equal(file.bucket.name, bucket.name); + assert.equal(file.name, newFileName); + done(); }); + }); - it('should accept a path, a string dest, metadata, & cb', function(done) { - var newFileName = 'new-file-name.png'; - bucket.upload(filepath, newFileName, metadata, function(err, file) { - assert.ifError(err); - assert.equal(file.bucket.name, bucket.name); - assert.equal(file.name, newFileName); - assert.deepEqual(file.metadata, metadata); - done(); - }); + it('should accept a path, a string dest, metadata, & cb', function(done) { + var newFileName = 'new-file-name.png'; + var options = { destination: newFileName, metadata: metadata }; + bucket.upload(filepath, options, function(err, file) { + assert.ifError(err); + assert.equal(file.bucket.name, bucket.name); + assert.equal(file.name, newFileName); + assert.deepEqual(file.metadata, metadata); + done(); }); + }); - it('should accept a path, a File dest, & cb', function(done) { - var fakeFile = new FakeFile(bucket, 'file-name'); - fakeFile.isSameFile = function() { - done(); - }; - bucket.upload(filepath, fakeFile, function(err, file) { - assert.ifError(err); - file.isSameFile(); - }); + it('should accept a path, a File dest, & cb', function(done) { + var fakeFile = new FakeFile(bucket, 'file-name'); + fakeFile.isSameFile = function() { + return true; + }; + var options = { destination: fakeFile }; + bucket.upload(filepath, options, function(err, file) { + assert.ifError(err); + assert(file.isSameFile()); + done(); }); + }); - it('should accept a path, a File dest, metadata, & cb', function(done) { - var fakeFile = new FakeFile(bucket, 'file-name'); - fakeFile.isSameFile = function() { - done(); - }; - bucket.upload(filepath, fakeFile, metadata, function(err, file) { - assert.ifError(err); - file.isSameFile(); - }); + it('should accept a path, a File dest, metadata, & cb', function(done) { + var fakeFile = new FakeFile(bucket, 'file-name'); + fakeFile.isSameFile = function() { + return true; + }; + var options = { destination: fakeFile, metadata: metadata }; + bucket.upload(filepath, options, function(err, file) { + assert.ifError(err); + assert(file.isSameFile()); + assert.deepEqual(file.metadata, metadata); + done(); }); }); it('should guess at the content type', function(done) { var fakeFile = new FakeFile(bucket, 'file-name'); - fakeFile.createWriteStream = function(metadata) { + var options = { destination: fakeFile }; + fakeFile.createWriteStream = function(options) { var dup = duplexify(); setImmediate(function() { - assert.equal(metadata.contentType, 'application/json'); + assert.equal(options.metadata.contentType, 'application/json'); done(); }); return dup; }; - bucket.upload(filepath, fakeFile, assert.ifError); + bucket.upload(filepath, options, assert.ifError); }); it('should guess at the charset', function(done) { var fakeFile = new FakeFile(bucket, 'file-name'); - fakeFile.createWriteStream = function(metadata) { + var options = { destination: fakeFile }; + fakeFile.createWriteStream = function(options) { var dup = duplexify(); setImmediate(function() { - assert.equal(metadata.contentType, 'text/plain; charset=UTF-8'); + assert.equal( + options.metadata.contentType, 'text/plain; charset=UTF-8'); done(); }); return dup; }; - bucket.upload(textFilepath, fakeFile, assert.ifError); + bucket.upload(textFilepath, options, assert.ifError); }); it('should allow overriding content type', function(done) { var fakeFile = new FakeFile(bucket, 'file-name'); var metadata = { contentType: 'made-up-content-type' }; - fakeFile.createWriteStream = function(meta) { + var options = { destination: fakeFile, metadata: metadata }; + fakeFile.createWriteStream = function(options) { + var dup = duplexify(); + setImmediate(function() { + assert.equal(options.metadata.contentType, metadata.contentType); + done(); + }); + return dup; + }; + bucket.upload(filepath, options, assert.ifError); + }); + + it('should allow specifying options.resumable', function(done) { + var fakeFile = new FakeFile(bucket, 'file-name'); + var options = { destination: fakeFile, resumable: false }; + fakeFile.createWriteStream = function(options) { var dup = duplexify(); setImmediate(function() { - assert.equal(meta.contentType, metadata.contentType); + assert.strictEqual(options.resumable, false); done(); }); return dup; }; - bucket.upload(filepath, fakeFile, metadata, assert.ifError); + bucket.upload(filepath, options, assert.ifError); }); it('should execute callback on error', function(done) { var error = new Error('Error.'); var fakeFile = new FakeFile(bucket, 'file-name'); + var options = { destination: fakeFile }; fakeFile.createWriteStream = function() { var dup = duplexify(); setImmediate(function() { @@ -391,7 +416,7 @@ describe('Bucket', function() { }); return dup; }; - bucket.upload(filepath, fakeFile, function(err) { + bucket.upload(filepath, options, function(err) { assert.equal(err, error); done(); }); diff --git a/test/storage/file.js b/test/storage/file.js index 5f5837f1623..e81c34d7d08 100644 --- a/test/storage/file.js +++ b/test/storage/file.js @@ -20,10 +20,14 @@ var assert = require('assert'); var Bucket = require('../../lib/storage/bucket.js'); +var crc = require('fast-crc32c'); +var crypto = require('crypto'); var duplexify = require('duplexify'); var extend = require('extend'); var nodeutil = require('util'); var request = require('request'); +var stream = require('stream'); +var through = require('through2'); var url = require('url'); var util = require('../../lib/common/util'); @@ -62,9 +66,25 @@ function fakeRequest() { return results; } +var configStoreData = {}; +function FakeConfigStore() { + this.del = function(key) { + delete configStoreData[key]; + }; + + this.get = function(key) { + return configStoreData[key]; + }; + + this.set = function(key, value) { + configStoreData[key] = value; + }; +} + var File = require('sandboxed-module') .require('../../lib/storage/file.js', { requires: { + configstore: FakeConfigStore, duplexify: FakeDuplexify, request: fakeRequest, '../common/util': fakeUtil @@ -303,45 +323,229 @@ describe('File', function() { }); describe('createWriteStream', function() { - it('should get a writable stream', function(done) { - makeWritableStream_Override = function() { + var METADATA = { a: 'b', c: 'd' }; + + it('should return a stream', function() { + assert(file.createWriteStream() instanceof stream); + }); + + it('should emit errors', function(done) { + var error = new Error('Error.'); + + file.bucket.storage.makeAuthorizedRequest_ = function(reqOpts, cb) { + cb(error); + }; + + var writable = file.createWriteStream(); + + writable.on('error', function(err) { + assert.equal(err, error); + done(); + }); + + writable.write('data'); + }); + + it('should start a simple upload if specified', function(done) { + var writable = file.createWriteStream({ + metadata: METADATA, + resumable: false + }); + + file.startSimpleUpload_ = function(stream, metadata) { + assert.deepEqual(metadata, METADATA); done(); }; - file.createWriteStream().emit('writing'); + + writable.write('data'); }); - it('should emit complete event when writing is complete', function(done) { - var fakeResponse = { fake: 'data' }; + it('should start a resumable upload if specified', function(done) { + var writable = file.createWriteStream({ + metadata: METADATA, + resumable: true + }); - makeWritableStream_Override = function(stream, options, callback) { - callback(fakeResponse); + file.startResumableUpload_ = function(stream, metadata) { + assert.deepEqual(metadata, METADATA); + done(); }; - file.createWriteStream() - .on('complete', function(data) { - assert.deepEqual(data, fakeResponse); - assert.deepEqual(file.metadata, data); - done(); - }) - .emit('writing'); + writable.write('data'); }); - it('should pass the required arguments', function(done) { - var metadata = { a: 'b', c: 'd' }; + it('should default to a resumable upload', function(done) { + var writable = file.createWriteStream({ + metadata: METADATA + }); - makeWritableStream_Override = function(stream, options) { - assert.deepEqual(options.metadata, metadata); - assert.deepEqual(options.request, { - qs: { - name: file.name - }, - uri: 'https://www.googleapis.com/upload/storage/v1/b/' + - file.bucket.name + '/o' - }); + file.startResumableUpload_ = function(stream, metadata) { + assert.deepEqual(metadata, METADATA); done(); }; - file.createWriteStream(metadata).emit('writing'); + writable.write('data'); + }); + + describe('validation', function() { + var data = 'test'; + + var crc32cBase64 = new Buffer([crc.calculate(data)]).toString('base64'); + + var md5HashBase64 = crypto.createHash('md5'); + md5HashBase64.update(data); + md5HashBase64 = md5HashBase64.digest('base64'); + + var fakeMetadata = { + crc32c: { crc32c: '####' + crc32cBase64 }, + md5: { md5Hash: md5HashBase64 } + }; + + it('should validate with crc32c', function(done) { + var writable = file.createWriteStream({ validation: 'crc32c' }); + + file.startResumableUpload_ = function(stream) { + setImmediate(function() { + stream.emit('complete', fakeMetadata.crc32c); + }); + }; + + writable.write(data); + writable.end(); + + writable + .on('error', done) + .on('complete', function() { + done(); + }); + }); + + it('should emit an error if crc32c validation fails', function(done) { + var writable = file.createWriteStream({ validation: 'crc32c' }); + + file.startResumableUpload_ = function(stream) { + setImmediate(function() { + stream.emit('complete', fakeMetadata.crc32c); + }); + }; + + file.delete = function(cb) { + cb(); + }; + + writable.write('bad-data'); + writable.end(); + + writable.on('error', function(err) { + assert.equal(err.code, 'FILE_NO_UPLOAD'); + done(); + }); + }); + + it('should validate with md5', function(done) { + var writable = file.createWriteStream({ validation: 'md5' }); + + file.startResumableUpload_ = function(stream) { + setImmediate(function() { + stream.emit('complete', fakeMetadata.md5); + }); + }; + + writable.write(data); + writable.end(); + + writable + .on('error', done) + .on('complete', function() { + done(); + }); + }); + + it('should emit an error if md5 validation fails', function(done) { + var writable = file.createWriteStream({ validation: 'md5' }); + + file.startResumableUpload_ = function(stream) { + setImmediate(function() { + stream.emit('complete', fakeMetadata.md5); + }); + }; + + file.delete = function(cb) { + cb(); + }; + + writable.write('bad-data'); + writable.end(); + + writable.on('error', function(err) { + assert.equal(err.code, 'FILE_NO_UPLOAD'); + done(); + }); + }); + + it('should default to md5 validation', function(done) { + var writable = file.createWriteStream(); + + file.startResumableUpload_ = function(stream) { + setImmediate(function() { + stream.emit('complete', { md5Hash: 'bad-hash' }); + }); + }; + + file.delete = function(cb) { + cb(); + }; + + writable.write(data); + writable.end(); + + writable.on('error', function(err) { + assert.equal(err.code, 'FILE_NO_UPLOAD'); + done(); + }); + }); + + it('should delete the file if validation fails', function(done) { + var writable = file.createWriteStream(); + + file.startResumableUpload_ = function(stream) { + setImmediate(function() { + stream.emit('complete', { md5Hash: 'bad-hash' }); + }); + }; + + file.delete = function() { + done(); + }; + + writable.write(data); + writable.end(); + }); + + it('should emit a different error if delete fails', function(done) { + var writable = file.createWriteStream(); + + file.startResumableUpload_ = function(stream) { + setImmediate(function() { + stream.emit('complete', { md5Hash: 'bad-hash' }); + }); + }; + + var deleteErrorMessage = 'Delete error message.'; + var deleteError = new Error(deleteErrorMessage); + file.delete = function(cb) { + cb(deleteError); + }; + + writable.write(data); + writable.end(); + + writable.on('error', function(err) { + assert.equal(err.code, 'FILE_NO_UPLOAD_DELETE'); + assert(err.message.indexOf(deleteErrorMessage > -1)); + done(); + }); + }); }); }); @@ -523,4 +727,201 @@ describe('File', function() { }); }); }); + + describe('startResumableUpload_', function() { + var RESUMABLE_URI = 'http://resume'; + + beforeEach(function() { + configStoreData = {}; + }); + + describe('starting', function() { + it('should start a resumable upload', function(done) { + file.bucket.storage.makeAuthorizedRequest_ = function(reqOpts) { + var uri = 'https://www.googleapis.com/upload/storage/v1/b/' + + file.bucket.name + '/o'; + + assert.equal(reqOpts.method, 'POST'); + assert.equal(reqOpts.uri, uri); + assert.equal(reqOpts.qs.name, file.name); + assert.equal(reqOpts.qs.uploadType, 'resumable'); + + assert.deepEqual(reqOpts.headers, { + 'X-Upload-Content-Type': 'custom' + }); + assert.deepEqual(reqOpts.json, { contentType: 'custom' }); + + done(); + }; + + file.startResumableUpload_(duplexify(), { contentType: 'custom' }); + }); + + it('should upload file', function(done) { + var requestCount = 0; + file.bucket.storage.makeAuthorizedRequest_ = function(reqOpts, cb) { + requestCount++; + + // respond to creation POST. + if (requestCount === 1) { + cb(null, null, { headers: { location: RESUMABLE_URI }}); + assert.deepEqual(configStoreData[file.name].uri, RESUMABLE_URI); + return; + } + + // create an authorized request for the first PUT. + if (requestCount === 2) { + assert.equal(reqOpts.method, 'PUT'); + assert.equal(reqOpts.uri, RESUMABLE_URI); + cb.onAuthorized(null, { headers: {} }); + } + }; + + // respond to first upload PUT request. + var metadata = { a: 'b', c: 'd' }; + request_Override = function(reqOpts) { + assert.equal(reqOpts.headers['Content-Range'], 'bytes 0-*/*'); + + var stream = through(); + setImmediate(function() { + stream.emit('complete', { body: metadata }); + }); + return stream; + }; + + var stream = duplexify(); + + stream + .on('error', done) + .on('complete', function(data) { + assert.deepEqual(data, metadata); + + setImmediate(function() { + // cache deleted. + assert(!configStoreData[file.name]); + done(); + }); + }); + + file.startResumableUpload_(stream); + }); + }); + + describe('resuming', function() { + beforeEach(function() { + configStoreData[file.name] = { + uri: RESUMABLE_URI + }; + }); + + it('should resume uploading from last sent byte', function(done) { + var lastByte = 135; + + var requestCount = 0; + file.bucket.storage.makeAuthorizedRequest_ = function(reqOpts, cb) { + requestCount++; + + if (requestCount === 1) { + assert.equal(reqOpts.method, 'PUT'); + assert.equal(reqOpts.uri, RESUMABLE_URI); + assert.deepEqual(reqOpts.headers, { + 'Content-Length': 0, + 'Content-Range': 'bytes */*' + }); + + cb({ + code: 308, // resumable upload status code + response: { headers: { range: '0-' + lastByte } } + }); + + return; + } + + if (requestCount === 2) { + assert.equal(reqOpts.method, 'PUT'); + assert.equal(reqOpts.uri, RESUMABLE_URI); + + cb.onAuthorized(null, { headers: {} }); + } + }; + + var metadata = { a: 'b', c: 'd' }; + request_Override = function(reqOpts) { + var startByte = lastByte + 1; + assert.equal( + reqOpts.headers['Content-Range'], 'bytes ' + startByte + '-*/*'); + + var stream = through(); + setImmediate(function() { + stream.emit('complete', { body: metadata }); + }); + return stream; + }; + + var stream = duplexify(); + + stream + .on('error', done) + .on('complete', function(data) { + assert.deepEqual(data, metadata); + + setImmediate(function() { + // cache deleted. + assert(!configStoreData[file.name]); + done(); + }); + }); + + file.startResumableUpload_(stream); + }); + }); + }); + + describe('startSimpleUpload_', function() { + it('should get a writable stream', function(done) { + makeWritableStream_Override = function() { + done(); + }; + + file.startSimpleUpload_(duplexify()); + }); + + it('should pass the required arguments', function(done) { + var metadata = { a: 'b', c: 'd' }; + + makeWritableStream_Override = function(stream, options) { + assert.deepEqual(options.metadata, metadata); + assert.deepEqual(options.request, { + qs: { + name: file.name + }, + uri: 'https://www.googleapis.com/upload/storage/v1/b/' + + file.bucket.name + '/o' + }); + done(); + }; + + file.startSimpleUpload_(duplexify(), metadata); + }); + + it('should finish stream and set metadata', function(done) { + var metadata = { a: 'b', c: 'd' }; + + makeWritableStream_Override = function(stream, options, callback) { + callback(metadata); + }; + + var stream = duplexify(); + + stream + .on('error', done) + .on('complete', function(meta) { + assert.deepEqual(meta, metadata); + assert.deepEqual(file.metadata, metadata); + done(); + }); + + file.startSimpleUpload_(stream, metadata); + }); + }); }); diff --git a/test/storage/index.js b/test/storage/index.js index 405886e6133..92ba69f0424 100644 --- a/test/storage/index.js +++ b/test/storage/index.js @@ -32,10 +32,8 @@ describe('Storage', function() { }); describe('initialization', function() { - it('should throw if a bucket name is not passed', function() { - assert.throws(function() { - storage.bucket(); - }, Error); + it('should set the project id', function() { + assert.equal(storage.projectId, 'project-id'); }); });