diff --git a/packages/storage/src/file.js b/packages/storage/src/file.js index 7563ccfcd0a..358205b122c 100644 --- a/packages/storage/src/file.js +++ b/packages/storage/src/file.js @@ -444,6 +444,8 @@ File.prototype.createReadStream = function(options) { var self = this; var rangeRequest = is.number(options.start) || is.number(options.end); var tailRequest = options.end < 0; + + var validateStream; var throughStream = streamEvents(through()); var crc32c = true; @@ -468,12 +470,24 @@ File.prototype.createReadStream = function(options) { md5 = false; } + var shouldRunValidation = !rangeRequest && (crc32c || md5); + + if (shouldRunValidation) { + validateStream = hashStreamValidation({ + crc32c: crc32c, + md5: md5 + }); + } + // Authenticate the request, then pipe the remote API request to the stream // returned to the user. function makeRequest() { var reqOpts = { + forever: false, uri: '', - gzip: true, + headers: { + 'Accept-Encoding': 'gzip' + }, qs: { alt: 'media' } @@ -491,27 +505,44 @@ File.prototype.createReadStream = function(options) { var start = is.number(options.start) ? options.start : '0'; var end = is.number(options.end) ? options.end : ''; - reqOpts.headers = { - Range: 'bytes=' + (tailRequest ? end : start + '-' + end) - }; + reqOpts.headers.Range = + 'bytes=' + (tailRequest ? end : start + '-' + end); } - var requestStream = self.requestStream(reqOpts); - var validateStream; + var requestStream = self.requestStream(reqOpts) + .on('error', function(err) { + throughStream.destroy(err); + }) + .on('response', function(res) { + throughStream.emit('response', res); + common.util.handleResp(null, res, null, onResponse); + }) + .on('complete', function(res) { + common.util.handleResp(null, res, null, onComplete); + }) + .resume(); + + throughStream.on('error', function() { + // An error can occur before the request stream has been created + // (during authentication). + if (requestStream.abort) { + requestStream.abort(); + } + + requestStream.destroy(); + }); // We listen to the response event from the request stream so that we can... // // 1) Intercept any data from going to the user if an error occurred. // 2) Calculate the hashes from the http.IncomingMessage response stream, // which will return the bytes from the source without decompressing - // gzip'd content. The request stream will do the decompression so the - // user receives the expected content. - function onResponse(err, body, res) { + // gzip'd content. We then send it through decompressed, if applicable, + // to the user. + function onResponse(err, body, rawResponseStream) { if (err) { - requestStream.unpipe(throughStream); - // Get error message from the body. - res.pipe(concat(function(body) { + rawResponseStream.pipe(concat(function(body) { err.message = body.toString(); throughStream.destroy(err); })); @@ -519,14 +550,13 @@ File.prototype.createReadStream = function(options) { return; } - if (!rangeRequest) { - validateStream = hashStreamValidation({ - crc32c: crc32c, - md5: md5 - }); + var headers = rawResponseStream.toJSON().headers; + var isCompressed = headers['content-encoding'] === 'gzip'; - res.pipe(validateStream).on('data', common.util.noop); - } + rawResponseStream + .pipe(shouldRunValidation ? validateStream : through()) + .pipe(isCompressed ? zlib.createGunzip() : through()) + .pipe(throughStream, { end: false }); } // This is hooked to the `complete` event from the request stream. This is @@ -534,20 +564,18 @@ File.prototype.createReadStream = function(options) { // wrong. function onComplete(err) { if (err) { + throughStream.destroy(err); return; } if (rangeRequest) { + throughStream.end(); return; } if (!refreshedMetadata) { refreshedMetadata = true; - - self.getMetadata(function() { - onComplete(err); - }); - + self.getMetadata(onComplete); return; } @@ -589,30 +617,10 @@ File.prototype.createReadStream = function(options) { mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH'; throughStream.destroy(mismatchError); + } else { + throughStream.end(); } } - - requestStream - .on('error', function(err) { - throughStream.destroy(err); - }) - .on('response', function(res) { - throughStream.emit('response', res); - common.util.handleResp(null, res, null, onResponse); - }) - .on('complete', function(res) { - common.util.handleResp(null, res, null, onComplete); - }) - .pipe(throughStream) - .on('error', function() { - // An error can occur before the request stream has been created (during - // authentication). - if (requestStream.abort) { - requestStream.abort(); - } - - requestStream.destroy(); - }); } throughStream.on('reading', makeRequest); diff --git a/packages/storage/system-test/storage.js b/packages/storage/system-test/storage.js index 1c0a2cffdb7..9366288801e 100644 --- a/packages/storage/system-test/storage.js +++ b/packages/storage/system-test/storage.js @@ -1156,7 +1156,8 @@ describe('storage', function() { it('should upload a gzipped file and download it', function(done) { var options = { metadata: { - contentEncoding: 'gzip' + contentEncoding: 'gzip', + contentType: 'text/html' } }; @@ -1417,9 +1418,14 @@ describe('storage', function() { }); it('should allow changing the storage class', function(done) { + var bucket = storage.bucket(generateName()); var file = bucket.file(generateName()); async.series([ + function(next) { + bucket.create(next); + }, + function(next) { bucket.upload(FILES.logo.path, { destination: file }, next); }, diff --git a/packages/storage/test/file.js b/packages/storage/test/file.js index 8efd7050020..346b9d8b9e8 100644 --- a/packages/storage/test/file.js +++ b/packages/storage/test/file.js @@ -70,6 +70,13 @@ fakeRequest.defaults = function(defaultConfiguration) { return fakeRequest; }; +var hashStreamValidationOverride; +var hashStreamValidation = require('hash-stream-validation'); +function fakeHashStreamValidation() { + return (hashStreamValidationOverride || hashStreamValidation) + .apply(null, arguments); +} + var resumableUploadOverride; var resumableUpload = require('gcs-resumable-upload'); function fakeResumableUpload() { @@ -105,6 +112,7 @@ describe('File', function() { before(function() { File = proxyquire('../src/file.js', { 'gcs-resumable-upload': fakeResumableUpload, + 'hash-stream-validation': fakeHashStreamValidation, request: fakeRequest, '@google-cloud/common': { ServiceObject: FakeServiceObject, @@ -139,6 +147,7 @@ describe('File', function() { directoryFile.request = util.noop; handleRespOverride = null; + hashStreamValidationOverride = null; makeWritableStreamOverride = null; requestOverride = null; resumableUploadOverride = null; @@ -521,6 +530,14 @@ describe('File', function() { } beforeEach(function() { + handleRespOverride = function(err, res, body, callback) { + var rawResponseStream = through(); + rawResponseStream.toJSON = function() { + return { headers: {} }; + }; + callback(null, null, rawResponseStream); + }; + requestOverride = function() { return through(); }; @@ -587,8 +604,8 @@ describe('File', function() { file.requestStream = getFakeSuccessfulRequest('body', { body: null }); var readStream = file.createReadStream(); - - readStream.resume(); + readStream.emit('reading'); + readStream.emit('response'); // Let the error handler from createReadStream assign. setImmediate(function() { @@ -607,7 +624,8 @@ describe('File', function() { }; var readStream = file.createReadStream(); - readStream.resume(); + readStream.emit('reading'); + readStream.emit('response'); setImmediate(function() { assert.doesNotThrow(function() { @@ -621,8 +639,11 @@ describe('File', function() { it('should create an authenticated request', function(done) { file.requestStream = function(opts) { assert.deepEqual(opts, { + forever: false, uri: '', - gzip: true, + headers: { + 'Accept-Encoding': 'gzip' + }, qs: { alt: 'media' } @@ -636,18 +657,6 @@ describe('File', function() { file.createReadStream().resume(); }); - it('should accept gzip encoding', function(done) { - file.requestStream = function(opts) { - assert.strictEqual(opts.gzip, true); - setImmediate(function() { - done(); - }); - return duplexify(); - }; - - file.createReadStream().resume(); - }); - describe('errors', function() { var ERROR = new Error('Error.'); @@ -723,37 +732,6 @@ describe('File', function() { file.createReadStream().resume(); }); - it('should unpipe stream from an error on the response', function(done) { - var rawResponseStream = through(); - var requestStream = through(); - var readStream = file.createReadStream(); - - file.requestStream = function() { - setImmediate(function() { - // Must be a stream. Doesn't matter for the tests, though. - requestStream.emit('response', through()); - }); - - return requestStream; - }; - - handleRespOverride = function(err, resp, body, callback) { - assert.strictEqual(requestStream._readableState.pipesCount, 1); - assert.strictEqual(requestStream._readableState.pipes, readStream); - - // Triggers the unpipe. - callback(new Error(), null, rawResponseStream); - - setImmediate(function() { - assert.strictEqual(requestStream._readableState.pipesCount, 0); - assert.strictEqual(requestStream._readableState.pipes, null); - done(); - }); - }; - - readStream.resume(); - }); - it('should let handleResp handle the completed request', function(done) { var response = { a: 'b', c: 'd' }; @@ -824,6 +802,7 @@ describe('File', function() { describe('validation', function() { var data = 'test'; + var fakeValidationStream; beforeEach(function() { file.metadata.mediaLink = 'http://uri'; @@ -835,6 +814,14 @@ describe('File', function() { }; callback(); }; + + fakeValidationStream = through(); + fakeValidationStream.test = function() { + return true; + }; + hashStreamValidationOverride = function() { + return fakeValidationStream; + }; }); it('should destroy the stream on error', function(done) { @@ -876,6 +863,10 @@ describe('File', function() { it('should emit an error if crc32c validation fails', function(done) { file.requestStream = getFakeSuccessfulRequest('bad-data', {}); + fakeValidationStream.test = function() { + return false; + }; + file.createReadStream({ validation: 'crc32c' }) .on('error', function(err) { assert.strictEqual(err.code, 'CONTENT_DOWNLOAD_MISMATCH'); @@ -896,6 +887,10 @@ describe('File', function() { it('should emit an error if md5 validation fails', function(done) { file.requestStream = getFakeSuccessfulRequest('bad-data', {}); + fakeValidationStream.test = function() { + return false; + }; + file.createReadStream({ validation: 'md5' }) .on('error', function(err) { assert.strictEqual(err.code, 'CONTENT_DOWNLOAD_MISMATCH'); @@ -925,6 +920,10 @@ describe('File', function() { it('should ignore a data mismatch if validation: false', function(done) { file.requestStream = getFakeSuccessfulRequest(data, {}); + fakeValidationStream.test = function() { + return false; + }; + file.createReadStream({ validation: false }) .resume() .on('error', done) @@ -932,6 +931,12 @@ describe('File', function() { }); describe('destroying the through stream', function() { + beforeEach(function() { + fakeValidationStream.test = function() { + return false; + }; + }); + it('should destroy after failed validation', function(done) { file.requestStream = getFakeSuccessfulRequest('bad-data', {});