Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 54 additions & 46 deletions packages/storage/src/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,8 @@ File.prototype.createReadStream = function(options) {
var self = this;
var rangeRequest = is.number(options.start) || is.number(options.end);
var tailRequest = options.end < 0;

var validateStream;
var throughStream = streamEvents(through());

var crc32c = true;
Expand All @@ -468,12 +470,24 @@ File.prototype.createReadStream = function(options) {
md5 = false;
}

var shouldRunValidation = !rangeRequest && (crc32c || md5);

if (shouldRunValidation) {
validateStream = hashStreamValidation({
crc32c: crc32c,
md5: md5
});
}

// Authenticate the request, then pipe the remote API request to the stream
// returned to the user.
function makeRequest() {
var reqOpts = {
forever: false,
uri: '',
gzip: true,
headers: {
'Accept-Encoding': 'gzip'
},
qs: {
alt: 'media'
}
Expand All @@ -491,63 +505,77 @@ File.prototype.createReadStream = function(options) {
var start = is.number(options.start) ? options.start : '0';
var end = is.number(options.end) ? options.end : '';

reqOpts.headers = {
Range: 'bytes=' + (tailRequest ? end : start + '-' + end)
};
reqOpts.headers.Range =
'bytes=' + (tailRequest ? end : start + '-' + end);
}

var requestStream = self.requestStream(reqOpts);
var validateStream;
var requestStream = self.requestStream(reqOpts)
.on('error', function(err) {
throughStream.destroy(err);
})
.on('response', function(res) {
throughStream.emit('response', res);
common.util.handleResp(null, res, null, onResponse);
})
.on('complete', function(res) {
common.util.handleResp(null, res, null, onComplete);
})
.resume();

throughStream.on('error', function() {
// An error can occur before the request stream has been created
// (during authentication).
if (requestStream.abort) {
requestStream.abort();
}

requestStream.destroy();
});

// We listen to the response event from the request stream so that we can...
//
// 1) Intercept any data from going to the user if an error occurred.
// 2) Calculate the hashes from the http.IncomingMessage response stream,
// which will return the bytes from the source without decompressing
// gzip'd content. The request stream will do the decompression so the
// user receives the expected content.
function onResponse(err, body, res) {
// gzip'd content. We then send it through decompressed, if applicable,
// to the user.
function onResponse(err, body, rawResponseStream) {
if (err) {
requestStream.unpipe(throughStream);

// Get error message from the body.
res.pipe(concat(function(body) {
rawResponseStream.pipe(concat(function(body) {
err.message = body.toString();
throughStream.destroy(err);
}));

return;
}

if (!rangeRequest) {
validateStream = hashStreamValidation({
crc32c: crc32c,
md5: md5
});
var headers = rawResponseStream.toJSON().headers;
var isCompressed = headers['content-encoding'] === 'gzip';

res.pipe(validateStream).on('data', common.util.noop);
}
rawResponseStream
.pipe(shouldRunValidation ? validateStream : through())
.pipe(isCompressed ? zlib.createGunzip() : through())

This comment was marked as spam.

This comment was marked as spam.

.pipe(throughStream, { end: false });
}

// This is hooked to the `complete` event from the request stream. This is
// our chance to validate the data and let the user know if anything went
// wrong.
function onComplete(err) {
if (err) {
throughStream.destroy(err);
return;
}

if (rangeRequest) {
throughStream.end();
return;
}

if (!refreshedMetadata) {
refreshedMetadata = true;

self.getMetadata(function() {
onComplete(err);
});

self.getMetadata(onComplete);
return;
}

Expand Down Expand Up @@ -589,30 +617,10 @@ File.prototype.createReadStream = function(options) {
mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH';

throughStream.destroy(mismatchError);
} else {
throughStream.end();
}
}

requestStream
.on('error', function(err) {
throughStream.destroy(err);
})
.on('response', function(res) {
throughStream.emit('response', res);
common.util.handleResp(null, res, null, onResponse);
})
.on('complete', function(res) {
common.util.handleResp(null, res, null, onComplete);
})
.pipe(throughStream)
.on('error', function() {
// An error can occur before the request stream has been created (during
// authentication).
if (requestStream.abort) {
requestStream.abort();
}

requestStream.destroy();
});
}

throughStream.on('reading', makeRequest);
Expand Down
8 changes: 7 additions & 1 deletion packages/storage/system-test/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,8 @@ describe('storage', function() {
it('should upload a gzipped file and download it', function(done) {
var options = {
metadata: {
contentEncoding: 'gzip'
contentEncoding: 'gzip',
contentType: 'text/html'
}
};

Expand Down Expand Up @@ -1417,9 +1418,14 @@ describe('storage', function() {
});

it('should allow changing the storage class', function(done) {
var bucket = storage.bucket(generateName());
var file = bucket.file(generateName());

async.series([
function(next) {
bucket.create(next);
},

function(next) {
bucket.upload(FILES.logo.path, { destination: file }, next);
},
Expand Down
99 changes: 52 additions & 47 deletions packages/storage/test/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ fakeRequest.defaults = function(defaultConfiguration) {
return fakeRequest;
};

var hashStreamValidationOverride;
var hashStreamValidation = require('hash-stream-validation');
function fakeHashStreamValidation() {
return (hashStreamValidationOverride || hashStreamValidation)
.apply(null, arguments);
}

var resumableUploadOverride;
var resumableUpload = require('gcs-resumable-upload');
function fakeResumableUpload() {
Expand Down Expand Up @@ -105,6 +112,7 @@ describe('File', function() {
before(function() {
File = proxyquire('../src/file.js', {
'gcs-resumable-upload': fakeResumableUpload,
'hash-stream-validation': fakeHashStreamValidation,
request: fakeRequest,
'@google-cloud/common': {
ServiceObject: FakeServiceObject,
Expand Down Expand Up @@ -139,6 +147,7 @@ describe('File', function() {
directoryFile.request = util.noop;

handleRespOverride = null;
hashStreamValidationOverride = null;
makeWritableStreamOverride = null;
requestOverride = null;
resumableUploadOverride = null;
Expand Down Expand Up @@ -521,6 +530,14 @@ describe('File', function() {
}

beforeEach(function() {
handleRespOverride = function(err, res, body, callback) {
var rawResponseStream = through();
rawResponseStream.toJSON = function() {
return { headers: {} };
};
callback(null, null, rawResponseStream);
};

requestOverride = function() {
return through();
};
Expand Down Expand Up @@ -587,8 +604,8 @@ describe('File', function() {
file.requestStream = getFakeSuccessfulRequest('body', { body: null });

var readStream = file.createReadStream();

readStream.resume();
readStream.emit('reading');
readStream.emit('response');

// Let the error handler from createReadStream assign.
setImmediate(function() {
Expand All @@ -607,7 +624,8 @@ describe('File', function() {
};

var readStream = file.createReadStream();
readStream.resume();
readStream.emit('reading');
readStream.emit('response');

setImmediate(function() {
assert.doesNotThrow(function() {
Expand All @@ -621,8 +639,11 @@ describe('File', function() {
it('should create an authenticated request', function(done) {
file.requestStream = function(opts) {
assert.deepEqual(opts, {
forever: false,
uri: '',
gzip: true,
headers: {
'Accept-Encoding': 'gzip'
},
qs: {
alt: 'media'
}
Expand All @@ -636,18 +657,6 @@ describe('File', function() {
file.createReadStream().resume();
});

it('should accept gzip encoding', function(done) {
file.requestStream = function(opts) {
assert.strictEqual(opts.gzip, true);
setImmediate(function() {
done();
});
return duplexify();
};

file.createReadStream().resume();
});

describe('errors', function() {
var ERROR = new Error('Error.');

Expand Down Expand Up @@ -723,37 +732,6 @@ describe('File', function() {
file.createReadStream().resume();
});

it('should unpipe stream from an error on the response', function(done) {
var rawResponseStream = through();
var requestStream = through();
var readStream = file.createReadStream();

file.requestStream = function() {
setImmediate(function() {
// Must be a stream. Doesn't matter for the tests, though.
requestStream.emit('response', through());
});

return requestStream;
};

handleRespOverride = function(err, resp, body, callback) {
assert.strictEqual(requestStream._readableState.pipesCount, 1);
assert.strictEqual(requestStream._readableState.pipes, readStream);

// Triggers the unpipe.
callback(new Error(), null, rawResponseStream);

setImmediate(function() {
assert.strictEqual(requestStream._readableState.pipesCount, 0);
assert.strictEqual(requestStream._readableState.pipes, null);
done();
});
};

readStream.resume();
});

it('should let handleResp handle the completed request', function(done) {
var response = { a: 'b', c: 'd' };

Expand Down Expand Up @@ -824,6 +802,7 @@ describe('File', function() {

describe('validation', function() {
var data = 'test';
var fakeValidationStream;

beforeEach(function() {
file.metadata.mediaLink = 'http://uri';
Expand All @@ -835,6 +814,14 @@ describe('File', function() {
};
callback();
};

fakeValidationStream = through();
fakeValidationStream.test = function() {
return true;
};
hashStreamValidationOverride = function() {
return fakeValidationStream;
};
});

it('should destroy the stream on error', function(done) {
Expand Down Expand Up @@ -876,6 +863,10 @@ describe('File', function() {
it('should emit an error if crc32c validation fails', function(done) {
file.requestStream = getFakeSuccessfulRequest('bad-data', {});

fakeValidationStream.test = function() {
return false;
};

file.createReadStream({ validation: 'crc32c' })
.on('error', function(err) {
assert.strictEqual(err.code, 'CONTENT_DOWNLOAD_MISMATCH');
Expand All @@ -896,6 +887,10 @@ describe('File', function() {
it('should emit an error if md5 validation fails', function(done) {
file.requestStream = getFakeSuccessfulRequest('bad-data', {});

fakeValidationStream.test = function() {
return false;
};

file.createReadStream({ validation: 'md5' })
.on('error', function(err) {
assert.strictEqual(err.code, 'CONTENT_DOWNLOAD_MISMATCH');
Expand Down Expand Up @@ -925,13 +920,23 @@ describe('File', function() {
it('should ignore a data mismatch if validation: false', function(done) {
file.requestStream = getFakeSuccessfulRequest(data, {});

fakeValidationStream.test = function() {
return false;
};

file.createReadStream({ validation: false })
.resume()
.on('error', done)
.on('end', done);
});

describe('destroying the through stream', function() {
beforeEach(function() {
fakeValidationStream.test = function() {
return false;
};
});

it('should destroy after failed validation', function(done) {
file.requestStream = getFakeSuccessfulRequest('bad-data', {});

Expand Down