diff --git a/lib/handlers/PatchHandler.js b/lib/handlers/PatchHandler.js index c43c0dee..f1163649 100644 --- a/lib/handlers/PatchHandler.js +++ b/lib/handlers/PatchHandler.js @@ -3,8 +3,11 @@ const BaseHandler = require('./BaseHandler'); const File = require('../models/File'); const { ERRORS, EVENTS } = require('../constants'); +const stream = require('stream'); const debug = require('debug'); +const { StreamDuplexLimiter: StreamLimiter } = require('../models/StreamLimiter'); const log = debug('tus-node-server:handlers:patch'); + class PatchHandler extends BaseHandler { /** * Write data to the DataStore and return the new offset. @@ -61,14 +64,25 @@ class PatchHandler extends BaseHandler { file.upload_length = upload_length; } - const new_offset = await this.store.write(req, file_id, offset); - if (new_offset === parseInt(file.upload_length, 10)) { + let max_bytes = Number.MAX_SAFE_INTEGER; + if (req.headers['content-length']) { + max_bytes = Math.min(max_bytes, req.headers['content-length']); + } + if (file.upload_length !== undefined) { + const remaining_bytes = parseInt(file.upload_length, 10) - file.size; + max_bytes = Math.min(max_bytes, remaining_bytes); + } + + const limiter = stream.pipeline(req, new StreamLimiter(max_bytes), () => {}); + + file.size = await this.store.write(limiter, file_id, offset); + if (file.size === parseInt(file.upload_length, 10)) { this.emit(EVENTS.EVENT_UPLOAD_COMPLETE, { file: new File(file_id, file.upload_length, file.upload_defer_length, file.upload_metadata) }); } // It MUST include the Upload-Offset header containing the new offset. const headers = { - 'Upload-Offset': new_offset, + 'Upload-Offset': file.size, }; // The Server MUST acknowledge successful PATCH requests with the 204 diff --git a/lib/handlers/PostHandler.js b/lib/handlers/PostHandler.js index 0b06228f..e6493314 100644 --- a/lib/handlers/PostHandler.js +++ b/lib/handlers/PostHandler.js @@ -5,7 +5,9 @@ const File = require('../models/File'); const Uid = require('../models/Uid'); const RequestValidator = require('../validators/RequestValidator'); const { EVENTS, ERRORS } = require('../constants'); +const stream = require('stream'); const debug = require('debug'); +const { StreamDuplexLimiter: StreamLimiter } = require('../models/StreamLimiter'); const log = debug('tus-node-server:handlers:post'); class PostHandler extends BaseHandler { @@ -70,12 +72,23 @@ class PostHandler extends BaseHandler { // The request MIGHT include a Content-Type header when using creation-with-upload extension if (!RequestValidator.isInvalidHeader('content-type', req.headers['content-type'])) { - const new_offset = await this.store.write(req, file.id, 0); - optional_headers['Upload-Offset'] = new_offset; + let max_bytes = Number.MAX_SAFE_INTEGER; + if (req.headers['content-length']) { + max_bytes = Math.min(max_bytes, req.headers['content-length']); + } + if (file.upload_length !== undefined) { + const remaining_bytes = parseInt(file.upload_length, 10) - (file.size ?? 0); + max_bytes = Math.min(max_bytes, remaining_bytes); + } - if (new_offset === parseInt(upload_length, 10)) { + const limiter = stream.pipeline(req, new StreamLimiter(max_bytes), () => {}); + + file.size = await this.store.write(limiter, file.id, 0); + if (file.size === parseInt(file.upload_length, 10)) { this.emit(EVENTS.EVENT_UPLOAD_COMPLETE, { file: new File(file_id, file.upload_length, file.upload_defer_length, file.upload_metadata) }); } + + optional_headers['Upload-Offset'] = file.size; } return this.write(res, 201, { Location: url, ...optional_headers }); diff --git a/lib/models/StreamLimiter.js b/lib/models/StreamLimiter.js new file mode 100644 index 00000000..da7c3c01 --- /dev/null +++ b/lib/models/StreamLimiter.js @@ -0,0 +1,96 @@ +const stream = require('stream'); + +class StreamTransformLimiter extends stream.Transform { + constructor(limitBytes, options = {}) { + super(options); + + this.remainingBytes = limitBytes; + + if (options.objectMode) { + throw new Error('Object mode is not supported'); + } + } + + _transform(chunk, encoding, callback) { + if (!Buffer.isBuffer(chunk)) { + callback(new Error('Only buffers are supported')); + return; + } + + const bytes = Math.min(chunk.length, this.remainingBytes); + const data = chunk.subarray(0, bytes); + + this.remainingBytes -= data.length; + + callback(null, data); + } +} + + +class StreamDuplexLimiter extends stream.Duplex { + constructor(limitBytes, options = {}) { + super(options); + + this.receivedBytes = 0; + this.remainingBytes = limitBytes; + + if (options.objectMode) { + throw new Error('Object mode is not supported'); + } + } + + _write(chunk, encoding, callback) { + if (!Buffer.isBuffer(chunk)) { + callback(new Error('Only buffers are supported')); + return; + } + + this.receivedBytes += chunk.length; + + if (this.remainingBytes === 0) { + callback(); + return; + } + + const bytes = Math.min(chunk.length, this.remainingBytes); + const data = chunk.subarray(0, bytes); + + this.remainingBytes -= data.length; + + const final = this.remainingBytes === 0; + + if (final) { + this.push(data, encoding); + this.push(null); + + callback(); + } + else if (this.push(data, encoding)) { + callback(); + } + else { + this.callback = callback; + } + } + + _read(size) { + if (this.callback) { + const callback = this.callback; + this.callback = null; + callback(); + } + } + + _final(callback) { + if (this.readable) { + this.push(null); + } + + callback(); + } + _destroy(err, callback) { + callback(err); + } +} + +module.exports = { StreamTransformLimiter, StreamDuplexLimiter }; diff --git a/package-lock.json b/package-lock.json index 72c13401..161e1d1f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,7 +12,8 @@ "@google-cloud/storage": "^6.2.2", "aws-sdk": "^2.1064.0", "configstore": "^5.0.1", - "debug": "^4.3.3" + "debug": "^4.3.3", + "ftp": "^0.3.10" }, "devDependencies": { "@vimeo/eslint-config-player": "^5.0.1", @@ -1132,6 +1133,11 @@ "integrity": "sha512-JxbCBUdrfr6AQjOXrxoTvAMJO4HBTUIlBzslcJPAz+/KT8yk53fXun51u+RenNYvad/+Vc2DIz5o9UxlCDymFQ==", "dev": true }, + "node_modules/core-util-is": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.3.tgz", + "integrity": "sha512-ZQBvi1DcpJ4GDqanjucZ2Hj3wEO5pZDS89BWbkcrvdxksJorwUDDZamX9ldFkp9aw2lmBDLgkObEA4DWNJ9FYQ==" + }, "node_modules/cross-env": { "version": "7.0.3", "resolved": "https://registry.npmjs.org/cross-env/-/cross-env-7.0.3.tgz", @@ -2084,6 +2090,39 @@ "node": "^8.16.0 || ^10.6.0 || >=11.0.0" } }, + "node_modules/ftp": { + "version": "0.3.10", + "resolved": "https://registry.npmjs.org/ftp/-/ftp-0.3.10.tgz", + "integrity": "sha512-faFVML1aBx2UoDStmLwv2Wptt4vw5x03xxX172nhA5Y5HBshW5JweqQ2W4xL4dezQTG8inJsuYcpPHHU3X5OTQ==", + "dependencies": { + "readable-stream": "1.1.x", + "xregexp": "2.0.0" + }, + "engines": { + "node": ">=0.8.0" + } + }, + "node_modules/ftp/node_modules/isarray": { + "version": "0.0.1", + "resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", + "integrity": "sha512-D2S+3GLxWH+uhrNEcoh/fnmYeP8E8/zHl644d/jdA0g2uyXvy3sb0qxotE+ne0LtccHknQzWwZEzhak7oJ0COQ==" + }, + "node_modules/ftp/node_modules/readable-stream": { + "version": "1.1.14", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz", + "integrity": "sha512-+MeVjFf4L44XUkhM1eYbD8fyEsxcV81pqMSR5gblfcLCHfZvbrqy4/qYHE+/R5HoBUT11WV5O08Cr1n3YXkWVQ==", + "dependencies": { + "core-util-is": "~1.0.0", + "inherits": "~2.0.1", + "isarray": "0.0.1", + "string_decoder": "~0.10.x" + } + }, + "node_modules/ftp/node_modules/string_decoder": { + "version": "0.10.31", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", + "integrity": "sha512-ev2QzSzWPYmy9GuqfIVildA4OdcGLeFZQrq5ys6RtiuF+RQQiZWr8TZNyAcuVXyQRYfEO+MsoB/1BuQVhOJuoQ==" + }, "node_modules/function-bind": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/function-bind/-/function-bind-1.1.1.tgz", @@ -4260,7 +4299,11 @@ "node_modules/proper-lockfile": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/proper-lockfile/-/proper-lockfile-2.0.1.tgz", +<<<<<<< Updated upstream "integrity": "sha1-FZ+wYZPTIAP0s2kd0uwaY0qoDR0=", +======= + "integrity": "sha512-rjaeGbsmhNDcDInmwi4MuI6mRwJu6zq8GjYCLuSuE7GF+4UjgzkL69sVKKJ2T2xH61kK7rXvGYpvaTu909oXaQ==", +>>>>>>> Stashed changes "dev": true, "dependencies": { "graceful-fs": "^4.1.2", @@ -4438,7 +4481,11 @@ "node_modules/retry": { "version": "0.10.1", "resolved": "https://registry.npmjs.org/retry/-/retry-0.10.1.tgz", +<<<<<<< Updated upstream "integrity": "sha1-52OI0heZLCUnUCQdPTlW/tmNj/Q=", +======= + "integrity": "sha512-ZXUSQYTHdl3uS7IuCehYfMzKyIDBNoAuUblvy5oGO5UJSUTmStUUVPXbA9Qxd173Bgre53yCQczQuHgRWAdvJQ==", +>>>>>>> Stashed changes "dev": true, "engines": { "node": "*" @@ -4976,9 +5023,15 @@ "integrity": "sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==" }, "node_modules/tus-js-client": { +<<<<<<< Updated upstream "version": "2.3.1", "resolved": "https://registry.npmjs.org/tus-js-client/-/tus-js-client-2.3.1.tgz", "integrity": "sha512-QEM7ySnthWT+wwePLTXVSQP8vBLCy0ZoJNDGFzNlsU+YVoK2WevIZwcRnKyo962xhYMiABe3aMvXvk4Ln+VRzQ==", +======= + "version": "2.3.2", + "resolved": "https://registry.npmjs.org/tus-js-client/-/tus-js-client-2.3.2.tgz", + "integrity": "sha512-5a2rm7gp+G7Z+ZB0AO4PzD/dwczB3n1fZeWO5W8AWLJ12RRk1rY4Aeb2VAYX9oKGE+/rGPrdxoFPA/vDSVKnpg==", +>>>>>>> Stashed changes "dev": true, "dependencies": { "buffer-from": "^0.1.1", @@ -5335,6 +5388,14 @@ "node": ">=4.0" } }, + "node_modules/xregexp": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/xregexp/-/xregexp-2.0.0.tgz", + "integrity": "sha512-xl/50/Cf32VsGq/1R8jJE5ajH1yMCQkpmoS10QbFZWl2Oor4H0Me64Pu2yxvsRWK3m6soJbmGfzSR7BYmDcWAA==", + "engines": { + "node": "*" + } + }, "node_modules/y18n": { "version": "4.0.1", "resolved": "https://registry.npmjs.org/y18n/-/y18n-4.0.1.tgz", @@ -6342,6 +6403,11 @@ "integrity": "sha512-JxbCBUdrfr6AQjOXrxoTvAMJO4HBTUIlBzslcJPAz+/KT8yk53fXun51u+RenNYvad/+Vc2DIz5o9UxlCDymFQ==", "dev": true }, + "core-util-is": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.3.tgz", + "integrity": "sha512-ZQBvi1DcpJ4GDqanjucZ2Hj3wEO5pZDS89BWbkcrvdxksJorwUDDZamX9ldFkp9aw2lmBDLgkObEA4DWNJ9FYQ==" + }, "cross-env": { "version": "7.0.3", "resolved": "https://registry.npmjs.org/cross-env/-/cross-env-7.0.3.tgz", @@ -7049,6 +7115,38 @@ "dev": true, "optional": true }, + "ftp": { + "version": "0.3.10", + "resolved": "https://registry.npmjs.org/ftp/-/ftp-0.3.10.tgz", + "integrity": "sha512-faFVML1aBx2UoDStmLwv2Wptt4vw5x03xxX172nhA5Y5HBshW5JweqQ2W4xL4dezQTG8inJsuYcpPHHU3X5OTQ==", + "requires": { + "readable-stream": "1.1.x", + "xregexp": "2.0.0" + }, + "dependencies": { + "isarray": { + "version": "0.0.1", + "resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", + "integrity": "sha512-D2S+3GLxWH+uhrNEcoh/fnmYeP8E8/zHl644d/jdA0g2uyXvy3sb0qxotE+ne0LtccHknQzWwZEzhak7oJ0COQ==" + }, + "readable-stream": { + "version": "1.1.14", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz", + "integrity": "sha512-+MeVjFf4L44XUkhM1eYbD8fyEsxcV81pqMSR5gblfcLCHfZvbrqy4/qYHE+/R5HoBUT11WV5O08Cr1n3YXkWVQ==", + "requires": { + "core-util-is": "~1.0.0", + "inherits": "~2.0.1", + "isarray": "0.0.1", + "string_decoder": "~0.10.x" + } + }, + "string_decoder": { + "version": "0.10.31", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", + "integrity": "sha512-ev2QzSzWPYmy9GuqfIVildA4OdcGLeFZQrq5ys6RtiuF+RQQiZWr8TZNyAcuVXyQRYfEO+MsoB/1BuQVhOJuoQ==" + } + } + }, "function-bind": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/function-bind/-/function-bind-1.1.1.tgz", @@ -8686,7 +8784,11 @@ "proper-lockfile": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/proper-lockfile/-/proper-lockfile-2.0.1.tgz", +<<<<<<< Updated upstream "integrity": "sha1-FZ+wYZPTIAP0s2kd0uwaY0qoDR0=", +======= + "integrity": "sha512-rjaeGbsmhNDcDInmwi4MuI6mRwJu6zq8GjYCLuSuE7GF+4UjgzkL69sVKKJ2T2xH61kK7rXvGYpvaTu909oXaQ==", +>>>>>>> Stashed changes "dev": true, "requires": { "graceful-fs": "^4.1.2", @@ -8819,7 +8921,11 @@ "retry": { "version": "0.10.1", "resolved": "https://registry.npmjs.org/retry/-/retry-0.10.1.tgz", +<<<<<<< Updated upstream "integrity": "sha1-52OI0heZLCUnUCQdPTlW/tmNj/Q=", +======= + "integrity": "sha512-ZXUSQYTHdl3uS7IuCehYfMzKyIDBNoAuUblvy5oGO5UJSUTmStUUVPXbA9Qxd173Bgre53yCQczQuHgRWAdvJQ==", +>>>>>>> Stashed changes "dev": true }, "retry-request": { @@ -9258,9 +9364,15 @@ "integrity": "sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==" }, "tus-js-client": { +<<<<<<< Updated upstream "version": "2.3.1", "resolved": "https://registry.npmjs.org/tus-js-client/-/tus-js-client-2.3.1.tgz", "integrity": "sha512-QEM7ySnthWT+wwePLTXVSQP8vBLCy0ZoJNDGFzNlsU+YVoK2WevIZwcRnKyo962xhYMiABe3aMvXvk4Ln+VRzQ==", +======= + "version": "2.3.2", + "resolved": "https://registry.npmjs.org/tus-js-client/-/tus-js-client-2.3.2.tgz", + "integrity": "sha512-5a2rm7gp+G7Z+ZB0AO4PzD/dwczB3n1fZeWO5W8AWLJ12RRk1rY4Aeb2VAYX9oKGE+/rGPrdxoFPA/vDSVKnpg==", +>>>>>>> Stashed changes "dev": true, "requires": { "buffer-from": "^0.1.1", @@ -9554,6 +9666,11 @@ "resolved": "https://registry.npmjs.org/xmlbuilder/-/xmlbuilder-9.0.7.tgz", "integrity": "sha1-Ey7mPS7FVlxVfiD0wi35rKaGsQ0=" }, + "xregexp": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/xregexp/-/xregexp-2.0.0.tgz", + "integrity": "sha512-xl/50/Cf32VsGq/1R8jJE5ajH1yMCQkpmoS10QbFZWl2Oor4H0Me64Pu2yxvsRWK3m6soJbmGfzSR7BYmDcWAA==" + }, "y18n": { "version": "4.0.1", "resolved": "https://registry.npmjs.org/y18n/-/y18n-4.0.1.tgz",