Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 19 additions & 51 deletions lib/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ var request = require('request').defaults({
var retryRequest = require('retry-request');
var streamForward = require('stream-forward');
var through = require('through2');
var uuid = require('node-uuid');

/** @const {object} gcloud-node's package.json file. */
var PKG = require('../../package.json');
Expand Down Expand Up @@ -214,79 +213,50 @@ util.parseApiResp = parseApiResp;
*/
function makeWritableStream(dup, options, onComplete) {
onComplete = onComplete || noop;

options = options || {};
options.metadata = options.metadata || {};

var boundary = uuid.v4();
var writeStream = through();
dup.setWritable(writeStream);

var defaults = {
var defaultReqOpts = {
method: 'POST',
qs: {
uploadType: 'multipart'
}
};

// Extend the provided request object with common defaults.
//
// `options.request` takes precedence over the defaults, but not over the
// headers, as these set up the boundary.
var reqOpts = extend(true, defaults, options.request, {
headers: {
'Content-Type': 'multipart/related; boundary="' + boundary + '"'
}
var metadata = options.metadata || {};

var reqOpts = extend(true, defaultReqOpts, options.request, {
multipart: [
{
'Content-Type': 'application/json',
body: JSON.stringify(metadata)
},
{
'Content-Type': metadata.contentType || 'application/octet-stream',
body: writeStream
}
]
});

// With the provided connection, be sure we have a valid token before
// attempting to create a request.
options.makeAuthorizedRequest(reqOpts, {
onAuthorized: function(err, authorizedReqOpts) {
if (err) {
dup.destroy(err);
return;
}

var streamType =
options.metadata.contentType || 'application/octet-stream';

var stream = request(authorizedReqOpts);
stream.callback = noop;

// Write the metadata to the request.
stream.write('--' + boundary + '\n');
stream.write('Content-Type: application/json\n\n');
stream.write(JSON.stringify(options.metadata));
stream.write('\n\n');
stream.write('--' + boundary + '\n');
stream.write('Content-Type: ' + streamType + '\n\n');

// Overwrite the `end` function, so we can close the boundary.
var oldEndFn = stream.end;
stream.end = function(data, encoding, callback) {
data = (data || '') + '\n--' + boundary + '--\n';
stream.write(data, encoding, callback);
oldEndFn.apply(this);
};

// When the request is complete, parse it. If everything went well, pass
// the parsed response data to the callback handler.
stream.on('complete', function(res) {
util.handleResp(null, res, res.body, function(err, data) {
request(authorizedReqOpts, function(err, resp, body) {
util.handleResp(err, resp, body, function(err, data) {
if (err) {
dup.destroy(err);
return;
}

onComplete(data);
});
});

// We have a writable stream - tell Duplexify about it, so it can resume
// processing incoming data.
dup.setWritable(stream);

// Keep part of the stream open to keep Request from closing the
// connection. Reference: http://goo.gl/zZVSif.
dup.pipe(stream);
}
});
}
Expand Down Expand Up @@ -332,8 +302,6 @@ util.shouldRetryRequest = shouldRetryRequest;
* response is related to rate limits or certain intermittent server errors.
* We will exponentially backoff subsequent requests by default. (default:
* true)
* @param {object=} config.authClient - AuthClient object. If not provided,
* it will be created and cached here.
* @param {object=} config.credentials - Credentials object.
* @param {boolean=} config.customEndpoint - If true, just return the provided
* request options. Default: false.
Expand Down
19 changes: 9 additions & 10 deletions lib/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -1588,36 +1588,35 @@ File.prototype.startResumableUpload_ = function(stream, metadata) {
* `startResumableUpload_`, which uses the resumable upload technique, this
* method uses a simple upload (all or nothing).
*
* @param {Duplexify} stream - Duplexify stream of data to pipe to the file.
* @param {Duplexify} dup - Duplexify stream of data to pipe to the file.
* @param {object=} metadata - Optional metadata to set on the file.
*
* @private
*/
File.prototype.startSimpleUpload_ = function(stream, metadata) {
var that = this;
File.prototype.startSimpleUpload_ = function(dup, metadata) {
var self = this;

var reqOpts = {
qs: {
name: that.name
name: self.name
},
uri: format('{base}/{bucket}/o', {
base: STORAGE_UPLOAD_BASE_URL,
bucket: that.bucket.name
bucket: self.bucket.name
})
};

if (this.generation) {
reqOpts.qs.ifGenerationMatch = this.generation;
}

util.makeWritableStream(stream, {
makeAuthorizedRequest: that.bucket.storage.makeAuthorizedRequest_,
util.makeWritableStream(dup, {
makeAuthorizedRequest: self.bucket.storage.makeAuthorizedRequest_,
metadata: metadata,
request: reqOpts
}, function(data) {
that.metadata = data;

stream.emit('complete', data);
self.metadata = data;
dup.emit('complete', data);
});
};

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
"is": "^3.0.1",
"methmeth": "^1.0.0",
"mime-types": "^2.0.8",
"node-uuid": "^1.4.2",
"once": "^1.3.1",
"prop-assign": "^1.0.0",
"propprop": "^0.3.0",
Expand All @@ -87,6 +86,7 @@
"mitm": "^1.1.0",
"mocha": "^2.1.0",
"mockery": "^1.4.0",
"node-uuid": "^1.4.3",
"tmp": "0.0.24"
},
"scripts": {
Expand Down
130 changes: 34 additions & 96 deletions test/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ var request = require('request');
var retryRequest = require('retry-request');
var stream = require('stream');
var streamForward = require('stream-forward');
var through = require('through2');

var googleAutoAuthOverride;
function fakeGoogleAutoAuth() {
Expand Down Expand Up @@ -268,13 +267,25 @@ describe('common/util', function() {
describe('makeWritableStream', function() {
it('should use defaults', function(done) {
var dup = duplexify();
var metadata = { a: 'b', c: 'd' };

util.makeWritableStream(dup, {
metadata: metadata,
makeAuthorizedRequest: function(request) {
assert.equal(request.method, 'POST');
assert.equal(request.qs.uploadType, 'multipart');

var contentType = request.headers['Content-Type'];
assert.equal(contentType.indexOf('multipart/related'), 0);
assert.strictEqual(Array.isArray(request.multipart), true);

var mp = request.multipart;

assert.strictEqual(mp[0]['Content-Type'], 'application/json');
assert.strictEqual(mp[0].body, JSON.stringify(metadata));

assert.strictEqual(mp[1]['Content-Type'], 'application/octet-stream');
// (is a writable stream:)
assert.strictEqual(typeof mp[1].body._writableState, 'object');

done();
}
});
Expand All @@ -292,10 +303,17 @@ describe('common/util', function() {
};

util.makeWritableStream(dup, {
metadata: {
contentType: 'application/json'
},
makeAuthorizedRequest: function(request) {
assert.equal(request.method, req.method);
assert.deepEqual(request.qs, req.qs);
assert.equal(request.something, req.something);

var mp = request.multipart;
assert.strictEqual(mp[1]['Content-Type'], 'application/json');

done();
},

Expand All @@ -306,7 +324,7 @@ describe('common/util', function() {
it('should emit an error', function(done) {
var error = new Error('Error.');

var ws = through();
var ws = duplexify();
ws.on('error', function(err) {
assert.equal(err, error);
done();
Expand All @@ -319,93 +337,15 @@ describe('common/util', function() {
});
});

it('should write request', function(done) {
var dup = duplexify();
var boundary;
var metadata = { a: 'b', c: 'd' };

requestOverride = function() {
var written = [];

var req = duplexify();

req.write = function(data) {
written.push(data);
};

req.end = function() {
var boundaryLine = '--' + boundary + '\n';

var startFirstBoundaryIdx = written.indexOf(boundaryLine);
var endFirstBoundaryIdx = written.lastIndexOf(boundaryLine);
var endBoundaryIdx = written.indexOf('\n--' + boundary + '--\n');

assert(startFirstBoundaryIdx > -1);
assert(endFirstBoundaryIdx > startFirstBoundaryIdx);
assert(endBoundaryIdx > -1);

assert(written.indexOf(JSON.stringify(metadata)) > -1);

done();
};

setImmediate(function() {
req.end();
});

return req;
};

util.makeWritableStream(dup, {
metadata: metadata,

makeAuthorizedRequest: function(request, opts) {
var contentType = request.headers['Content-Type'];
boundary = contentType.match(/boundary="([^"]*)/)[1];
opts.onAuthorized();
}
});
});

it('should set the writable stream', function(done) {
var dup = duplexify();
var ws = new stream.Writable();
ws.write = function() {};

requestOverride = function() {
return ws;
};

dup.setWritable = function(writable) {
assert.equal(writable, ws);
dup.setWritable = function() {
done();
};

util.makeWritableStream(dup, {
makeAuthorizedRequest: function(request, opts) {
opts.onAuthorized();
}
});
});

it('should keep the pipe open on the stream', function(done) {
var dup = duplexify();
var ws = new stream.Writable();
ws.write = function() {};

requestOverride = function() {
return ws;
};

dup.pipe = function(writable) {
assert.equal(writable, ws);
done();
};

util.makeWritableStream(dup, {
makeAuthorizedRequest: function(request, opts) {
opts.onAuthorized();
}
makeAuthorizedRequest: function() {}
});
});

Expand All @@ -421,23 +361,21 @@ describe('common/util', function() {
callback(error);
};

requestOverride = function() {
return fakeStream;
};

var options = {
makeAuthorizedRequest: function(request, opts) {
opts.onAuthorized();
}
requestOverride = function(reqOpts, callback) {
callback(error);
};

util.makeWritableStream(dup, options);

dup.on('error', function(err) {
assert.strictEqual(err, error);
done();
});

util.makeWritableStream(dup, {
makeAuthorizedRequest: function(request, opts) {
opts.onAuthorized();
}
});

setImmediate(function() {
fakeStream.emit('complete', {});
});
Expand All @@ -454,8 +392,8 @@ describe('common/util', function() {
callback(null, fakeResponse);
};

requestOverride = function() {
return fakeStream;
requestOverride = function(reqOpts, callback) {
callback();
};

var options = {
Expand Down