diff --git a/src/hash-stream-validator.ts b/src/hash-stream-validator.ts index 5f4a10e4e..0c3a2ca72 100644 --- a/src/hash-stream-validator.ts +++ b/src/hash-stream-validator.ts @@ -81,8 +81,18 @@ class HashStreamValidator extends Transform { return this.#crc32cHash?.toString(); } + /** + * Return the calculated MD5 value, if available. + */ + get md5Digest(): string | undefined { + if (this.#md5Hash && !this.#md5Digest) { + this.#md5Digest = this.#md5Hash.digest('base64'); + } + return this.#md5Digest; + } + _flush(callback: (error?: Error | null | undefined) => void) { - if (this.#md5Hash) { + if (this.#md5Hash && !this.#md5Digest) { this.#md5Digest = this.#md5Hash.digest('base64'); } diff --git a/src/resumable-upload.ts b/src/resumable-upload.ts index e6a51fd14..507e3c1f8 100644 --- a/src/resumable-upload.ts +++ b/src/resumable-upload.ts @@ -36,10 +36,11 @@ import { getUserAgentString, } from './util.js'; import {GCCL_GCS_CMD_KEY} from './nodejs-common/util.js'; -import {FileMetadata} from './file.js'; +import {FileExceptionMessages, FileMetadata, RequestError} from './file.js'; // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore import {getPackageJSON} from './package-json-helper.cjs'; +import {HashStreamValidator} from './hash-stream-validator.js'; const NOT_FOUND_STATUS_CODE = 404; const RESUMABLE_INCOMPLETE_STATUS_CODE = 308; @@ -149,6 +150,19 @@ export interface UploadConfig extends Pick { */ isPartialUpload?: boolean; + clientCrc32c?: string; + clientMd5Hash?: string; + /** + * Enables CRC32C calculation on the client side. + * The calculated hash will be sent in the final PUT request if `clientCrc32c` is not provided. + */ + crc32c?: boolean; + /** + * Enables MD5 calculation on the client side. + * The calculated hash will be sent in the final PUT request if `clientMd5Hash` is not provided. + */ + md5?: boolean; + /** * A customer-supplied encryption key. See * https://cloud.google.com/storage/docs/encryption#customer-supplied. @@ -334,6 +348,11 @@ export class Upload extends Writable { */ private writeBuffers: Buffer[] = []; private numChunksReadInRequest = 0; + + #hashValidator?: HashStreamValidator; + #clientCrc32c?: string; + #clientMd5Hash?: string; + /** * An array of buffers used for caching the most recent upload chunk. * We should not assume that the server received all bytes sent in the request. @@ -428,6 +447,20 @@ export class Upload extends Writable { this.retryOptions = cfg.retryOptions; this.isPartialUpload = cfg.isPartialUpload ?? false; + this.#clientCrc32c = cfg.clientCrc32c; + this.#clientMd5Hash = cfg.clientMd5Hash; + + const calculateCrc32c = !cfg.clientCrc32c && cfg.crc32c; + const calculateMd5 = !cfg.clientMd5Hash && cfg.md5; + + if (calculateCrc32c || calculateMd5) { + this.#hashValidator = new HashStreamValidator({ + crc32c: calculateCrc32c, + md5: calculateMd5, + updateHashesOnly: true, + }); + } + if (cfg.key) { if (typeof cfg.key === 'string') { const base64Key = Buffer.from(cfg.key).toString('base64'); @@ -518,9 +551,19 @@ export class Upload extends Writable { // Backwards-compatible event this.emit('writing'); - this.writeBuffers.push( - typeof chunk === 'string' ? Buffer.from(chunk, encoding) : chunk - ); + const bufferChunk = + typeof chunk === 'string' ? Buffer.from(chunk, encoding) : chunk; + + if (this.#hashValidator) { + try { + this.#hashValidator.write(bufferChunk); + } catch (e) { + this.destroy(e as Error); + return; + } + } + + this.writeBuffers.push(bufferChunk); this.once('readFromChunkBuffer', readCallback); @@ -537,6 +580,63 @@ export class Upload extends Writable { this.localWriteCacheByteLength += buf.byteLength; } + /** + * Compares the client's calculated or provided hash against the server's + * returned hash for a specific checksum type. Destroys the stream on mismatch. + * @param clientHash The client's calculated or provided hash (Base64). + * @param serverHash The hash returned by the server (Base64). + * @param hashType The type of hash ('CRC32C' or 'MD5'). + */ + #validateChecksum( + clientHash: string | undefined, + serverHash: string | undefined, + hashType: 'CRC32C' | 'MD5' + ): boolean { + // Only validate if both client and server hashes are present. + if (clientHash && serverHash) { + if (clientHash !== serverHash) { + const detailMessage = `${hashType} checksum mismatch. Client calculated: ${clientHash}, Server returned: ${serverHash}`; + const code = 'FILE_NO_UPLOAD'; + const primaryMessage = FileExceptionMessages.UPLOAD_MISMATCH; + const detailError = new Error(detailMessage); + const error = new RequestError(primaryMessage); + error.code = code; + error.errors = [detailError]; + + this.destroy(error); + return true; + } + } + return false; + } + + /** + * Builds and applies the X-Goog-Hash header to the request options + * using either calculated hashes from #hashValidator or pre-calculated + * client-side hashes. This should only be called on the final request. + * + * @param headers The headers object to modify. + */ + #applyChecksumHeaders(headers: GaxiosOptions['headers']) { + const checksums: string[] = []; + + if (this.#hashValidator?.crc32cEnabled) { + checksums.push(`crc32c=${this.#hashValidator.crc32c!}`); + } else if (this.#clientCrc32c) { + checksums.push(`crc32c=${this.#clientCrc32c}`); + } + + if (this.#hashValidator?.md5Enabled) { + checksums.push(`md5=${this.#hashValidator.md5Digest!}`); + } else if (this.#clientMd5Hash) { + checksums.push(`md5=${this.#clientMd5Hash}`); + } + + if (checksums.length > 0) { + headers!['X-Goog-Hash'] = checksums.join(','); + } + } + /** * Prepends the local buffer to write buffer and resets it. * @@ -929,6 +1029,10 @@ export class Upload extends Writable { // unshifting data back into the queue. This way we will know if this is the last request or not. const isLastChunkOfUpload = !(await this.waitForNextChunk()); + if (isLastChunkOfUpload && this.#hashValidator) { + this.#hashValidator.end(); + } + // Important: put the data back in the queue for the actual upload this.prependLocalBufferToUpstream(); @@ -951,8 +1055,18 @@ export class Upload extends Writable { headers['Content-Length'] = bytesToUpload; headers['Content-Range'] = `bytes ${this.offset}-${endingByte}/${totalObjectSize}`; + + // Apply X-Goog-Hash header ONLY on the final chunk (WriteObject call) + if (isLastChunkOfUpload) { + this.#applyChecksumHeaders(headers); + } } else { headers['Content-Range'] = `bytes ${this.offset}-*/${this.contentLength}`; + + if (this.#hashValidator) { + this.#hashValidator.end(); + } + this.#applyChecksumHeaders(headers); } const reqOpts: GaxiosOptions = { @@ -1046,7 +1160,29 @@ export class Upload extends Writable { } this.destroy(err); - } else { + } else if (this.isSuccessfulResponse(resp.status)) { + const serverCrc32c = resp.data.crc32c; + const serverMd5 = resp.data.md5Hash; + + if (this.#hashValidator) { + this.#hashValidator.end(); + } + + const clientCrc32cToValidate = + this.#hashValidator?.crc32c || this.#clientCrc32c; + const clientMd5HashToValidate = + this.#hashValidator?.md5Digest || this.#clientMd5Hash; + if ( + this.#validateChecksum( + clientCrc32cToValidate, + serverCrc32c, + 'CRC32C' + ) || + this.#validateChecksum(clientMd5HashToValidate, serverMd5, 'MD5') + ) { + return; + } + // no need to keep the cache this.#resetLocalBuffersCache(); @@ -1058,6 +1194,11 @@ export class Upload extends Writable { // Allow the object (Upload) to continue naturally so the user's // "finish" event fires. this.emit('uploadFinished'); + } else { + // Handles the case where shouldContinueUploadInAnotherRequest is true + // and the response is not successful (e.g., 308 for a partial upload). + // This is the expected behavior for partial uploads that have finished their chunk. + this.emit('uploadFinished'); } } diff --git a/system-test/kitchen.ts b/system-test/kitchen.ts index fbdd139d1..e63615f7b 100644 --- a/system-test/kitchen.ts +++ b/system-test/kitchen.ts @@ -35,6 +35,7 @@ import { RETRYABLE_ERR_FN_DEFAULT, Storage, } from '../src/storage.js'; +import {CRC32C} from '../src/crc32c.js'; const bucketName = process.env.BUCKET_NAME || 'gcs-resumable-upload-test'; @@ -305,4 +306,66 @@ describe('resumable-upload', () => { ); assert.equal(results.size, FILE_SIZE); }); + + const KNOWN_CRC32C_OF_ZEROS = 'rthIWA=='; + describe('Validation of Client Checksums Against Server Response', () => { + let crc32c: string; + + before(async () => { + crc32c = (await CRC32C.fromFile(filePath)).toString(); + }); + it('should upload successfully when crc32c calculation is enabled', done => { + let uploadSucceeded = false; + + fs.createReadStream(filePath) + .on('error', done) + .pipe( + upload({ + bucket: bucketName, + file: filePath, + crc32c: true, + clientCrc32c: crc32c, + retryOptions: retryOptions, + }) + ) + .on('error', err => { + console.log(err); + done( + new Error( + `Upload failed unexpectedly on success path: ${err.message}` + ) + ); + }) + .on('response', resp => { + uploadSucceeded = resp.status === 200; + }) + .on('finish', () => { + assert.strictEqual(uploadSucceeded, true); + done(); + }); + }); + + it('should destroy the stream on a checksum mismatch (client-provided hash mismatch)', done => { + const EXPECTED_ERROR_MESSAGE_PART = `Provided CRC32C "${KNOWN_CRC32C_OF_ZEROS}" doesn't match calculated CRC32C`; + + fs.createReadStream(filePath) + .on('error', done) + .pipe( + upload({ + bucket: bucketName, + file: filePath, + clientCrc32c: KNOWN_CRC32C_OF_ZEROS, + crc32c: true, + retryOptions: retryOptions, + }) + ) + .on('error', (err: Error) => { + assert.ok( + err.message.includes(EXPECTED_ERROR_MESSAGE_PART), + `Expected error message part "${EXPECTED_ERROR_MESSAGE_PART}" not found in: ${err.message}` + ); + done(); + }); + }); + }); }); diff --git a/test/resumable-upload.ts b/test/resumable-upload.ts index ab0973aba..381044d64 100644 --- a/test/resumable-upload.ts +++ b/test/resumable-upload.ts @@ -33,10 +33,12 @@ import { ApiError, CreateUriCallback, PROTOCOL_REGEX, + UploadConfig, } from '../src/resumable-upload.js'; import {GaxiosOptions, GaxiosError, GaxiosResponse} from 'gaxios'; import {GCCL_GCS_CMD_KEY} from '../src/nodejs-common/util.js'; import {getDirName} from '../src/util.js'; +import {FileExceptionMessages} from '../src/file.js'; nock.disableNetConnect(); @@ -55,6 +57,10 @@ const queryPath = '/?userProject=user-project-id'; const X_GOOG_API_HEADER_REGEX = /^gl-node\/(?[^W]+) gccl\/(?[^W]+) gccl-invocation-id\/(?[^W]+) gccl-gcs-cmd\/(?[^W]+)$/; const USER_AGENT_REGEX = /^gcloud-node-storage\/(?[^W]+)$/; +const CORRECT_CLIENT_CRC32C = 'Q2hlY2tzdW0h'; +const INCORRECT_SERVER_CRC32C = 'Q2hlY2tzdVUa'; +const CORRECT_CLIENT_MD5 = 'CorrectMD5Hash'; +const INCORRECT_SERVER_MD5 = 'IncorrectMD5Hash'; function mockAuthorizeRequest( code = 200, @@ -117,6 +123,8 @@ describe('resumable-upload', () => { apiEndpoint: API_ENDPOINT, retryOptions: {...RETRY_OPTIONS}, [GCCL_GCS_CMD_KEY]: 'sample.command', + clientCrc32c: CORRECT_CLIENT_CRC32C, + clientMd5Hash: CORRECT_CLIENT_MD5, }); }); @@ -1286,6 +1294,265 @@ describe('resumable-upload', () => { }); }); }); + + describe('X-Goog-Hash header injection', () => { + const CALCULATED_CRC32C = 'bzKmHw=='; + const CALCULATED_MD5 = 'VpBzljOcorCZvRIkX5Nt3A=='; + const DUMMY_CONTENT = Buffer.alloc(512, 'a'); + const CHUNK_SIZE = 256; + + let requestCount: number; + + /** + * Creates a mocked HashValidator object with forced getters to return + * predefined hash values, bypassing internal stream calculation logic. + */ + function createMockHashValidator( + crc32cEnabled: boolean, + md5Enabled: boolean + ) { + const mockValidator = { + crc32cEnabled: crc32cEnabled, + md5Enabled: md5Enabled, + end: () => {}, // Mock the end method + write: () => {}, + }; + + Object.defineProperty(mockValidator, 'crc32c', { + get: () => CALCULATED_CRC32C, + configurable: true, + }); + Object.defineProperty(mockValidator, 'md5Digest', { + get: () => CALCULATED_MD5, + configurable: true, + }); + return mockValidator; + } + + const MOCK_AUTH_CLIENT = { + // Mock the request method to return a dummy response + request: async (opts: GaxiosOptions) => { + return { + status: 200, + data: {}, + headers: {}, + config: opts, + statusText: 'OK', + } as GaxiosResponse; + }, + getRequestHeaders: async () => ({}), + getRequestMetadata: async () => ({}), + getRequestMetadataAsync: async () => ({}), + getClient: async () => MOCK_AUTH_CLIENT, + }; + + /** + * Sets up the `up` instance for hash injection tests. + * @param configOptions Partial UploadConfig to apply. + */ + function setupHashUploadInstance( + configOptions: Partial & {crc32c?: boolean; md5?: boolean} + ) { + up = upload({ + bucket: BUCKET, + file: FILE, + authClient: MOCK_AUTH_CLIENT, + retryOptions: {...RETRY_OPTIONS, maxRetries: 0}, + metadata: { + contentLength: DUMMY_CONTENT.byteLength, + contentType: 'text/plain', + }, + ...configOptions, + }); + + // Manually inject the mock HashStreamValidator if needed + const calculateCrc32c = + !configOptions.clientCrc32c && configOptions.crc32c; + const calculateMd5 = !configOptions.clientMd5Hash && configOptions.md5; + + if (calculateCrc32c || calculateMd5) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (up as any)['#hashValidator'] = createMockHashValidator( + !!calculateCrc32c, + !!calculateMd5 + ); + } + } + + async function performUpload( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + uploadInstance: any, + data: Buffer, + isMultiChunk: boolean, + expectedCrc32c?: string, + expectedMd5?: string + ): Promise { + const capturedReqOpts: GaxiosOptions[] = []; + requestCount = 0; + + uploadInstance.makeRequestStream = async ( + requestOptions: GaxiosOptions + ) => { + requestCount++; + capturedReqOpts.push(requestOptions); + + await new Promise(resolve => { + requestOptions.body.on('data', () => {}); + requestOptions.body.on('end', resolve); + }); + + const serverCrc32c = expectedCrc32c || CALCULATED_CRC32C; + const serverMd5 = expectedMd5 || CALCULATED_MD5; + if ( + isMultiChunk && + requestCount < Math.ceil(DUMMY_CONTENT.byteLength / CHUNK_SIZE) + ) { + const lastByteReceived = requestCount * CHUNK_SIZE - 1; + return { + data: '', + status: RESUMABLE_INCOMPLETE_STATUS_CODE, + headers: {range: `bytes=0-${lastByteReceived}`}, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } as any; + } else { + return { + status: 200, + data: { + crc32c: serverCrc32c, + md5Hash: serverMd5, + name: FILE, + bucket: BUCKET, + size: DUMMY_CONTENT.byteLength.toString(), + }, + headers: {}, + config: {}, + statusText: 'OK', + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } as any; + } + }; + + return new Promise((resolve, reject) => { + uploadInstance.on('error', reject); + uploadInstance.on('uploadFinished', () => { + resolve(capturedReqOpts); + }); + + const upstreamBuffer = new Readable({ + read() { + this.push(data); + this.push(null); + }, + }); + upstreamBuffer.pipe(uploadInstance); + }); + } + + describe('single chunk', () => { + it('should include X-Goog-Hash header with crc32c when crc32c is enabled (via validator)', async () => { + setupHashUploadInstance({crc32c: true}); + const reqOpts = await performUpload(up, DUMMY_CONTENT, false); + assert.strictEqual(reqOpts.length, 1); + assert.equal( + reqOpts[0].headers!['X-Goog-Hash'], + `crc32c=${CALCULATED_CRC32C}` + ); + }); + + it('should include X-Goog-Hash header with md5 when md5 is enabled (via validator)', async () => { + setupHashUploadInstance({md5: true}); + const reqOpts = await performUpload(up, DUMMY_CONTENT, false); + assert.strictEqual(reqOpts.length, 1); + assert.equal( + reqOpts[0].headers!['X-Goog-Hash'], + `md5=${CALCULATED_MD5}` + ); + }); + + it('should include both crc32c and md5 in X-Goog-Hash when both are enabled (via validator)', async () => { + setupHashUploadInstance({crc32c: true, md5: true}); + const reqOpts = await performUpload(up, DUMMY_CONTENT, false); + assert.strictEqual(reqOpts.length, 1); + const xGoogHash = reqOpts[0].headers!['X-Goog-Hash']; + assert.ok(xGoogHash); + const expectedHashes = [ + `crc32c=${CALCULATED_CRC32C}`, + `md5=${CALCULATED_MD5}`, + ]; + const actualHashes = xGoogHash + .split(',') + .map((s: string) => s.trim()); + assert.deepStrictEqual(actualHashes.sort(), expectedHashes.sort()); + }); + + it('should use clientCrc32c if provided (pre-calculated hash)', async () => { + const customCrc32c = 'CUSTOMCRC'; + setupHashUploadInstance({crc32c: true, clientCrc32c: customCrc32c}); + const reqOpts = await performUpload( + up, + DUMMY_CONTENT, + false, + customCrc32c + ); + assert.strictEqual(reqOpts.length, 1); + assert.strictEqual( + reqOpts[0].headers!['X-Goog-Hash'], + `crc32c=${customCrc32c}` + ); + }); + + it('should use clientMd5Hash if provided (pre-calculated hash)', async () => { + const customMd5 = 'CUSTOMMD5'; + setupHashUploadInstance({md5: true, clientMd5Hash: customMd5}); + const reqOpts = await performUpload( + up, + DUMMY_CONTENT, + false, + undefined, + customMd5 + ); + assert.strictEqual(reqOpts.length, 1); + assert.strictEqual( + reqOpts[0].headers!['X-Goog-Hash'], + `md5=${customMd5}` + ); + }); + + it('should not include X-Goog-Hash if neither crc32c nor md5 are enabled', async () => { + setupHashUploadInstance({}); + const reqOpts = await performUpload(up, DUMMY_CONTENT, false); + assert.strictEqual(reqOpts.length, 1); + assert.strictEqual(reqOpts[0].headers!['X-Goog-Hash'], undefined); + }); + }); + + describe('multiple chunk', () => { + beforeEach(() => { + setupHashUploadInstance({ + crc32c: true, + md5: true, + chunkSize: CHUNK_SIZE, + }); + }); + + it('should NOT include X-Goog-Hash header on intermediate multi-chunk requests', async () => { + const reqOpts = await performUpload(up, DUMMY_CONTENT, true); + assert.strictEqual(reqOpts.length, 2); + + assert.strictEqual(reqOpts[0].headers!['Content-Length'], CHUNK_SIZE); + assert.strictEqual(reqOpts[0].headers!['X-Goog-Hash'], undefined); + }); + + it('should include X-Goog-Hash header ONLY on the final multi-chunk request', async () => { + const expectedHashHeader = `crc32c=${CALCULATED_CRC32C},md5=${CALCULATED_MD5}`; + const reqOpts = await performUpload(up, DUMMY_CONTENT, true); + assert.strictEqual(reqOpts.length, 2); + + assert.strictEqual(reqOpts[1].headers!['Content-Length'], CHUNK_SIZE); + assert.equal(reqOpts[1].headers!['X-Goog-Hash'], expectedHashHeader); + }); + }); + }); }); describe('#responseHandler', () => { @@ -1340,6 +1607,63 @@ describe('resumable-upload', () => { up.responseHandler(RESP); }); + it('should destroy the stream on CRC32C checksum mismatch', done => { + const CLIENT_CRC = 'client_hash'; + const SERVER_CRC = 'server_hash'; + const RESP = { + data: { + crc32c: SERVER_CRC, + md5Hash: 'md5_match', + size: '100', + }, + status: 200, + }; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (up as any)['#hashValidator'] = { + crc32cEnabled: true, + md5Enabled: true, + crc32c: CLIENT_CRC, + md5Digest: 'md5_match', + }; + up.upstreamEnded = true; + + up.destroy = (err: Error) => { + assert.strictEqual(err.message, FileExceptionMessages.UPLOAD_MISMATCH); + done(); + }; + + up.responseHandler(RESP); + }); + + it('should destroy the stream on MD5 checksum mismatch', done => { + const CLIENT_MD5 = 'client_md5'; + const SERVER_MD5 = 'server_md5'; + const RESP = { + data: { + crc32c: 'crc32c_match', + md5Hash: SERVER_MD5, + size: '100', + }, + status: 200, + }; + + up.md5 = true; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (up as any)['#hashValidator'] = { + crc32c: 'crc32c_match', + md5Digest: CLIENT_MD5, + }; + up.upstreamEnded = true; + + up.destroy = (err: Error) => { + assert.strictEqual(err.message, FileExceptionMessages.UPLOAD_MISMATCH); + done(); + }; + + up.responseHandler(RESP); + }); + it('should continue with multi-chunk upload when incomplete', done => { const lastByteReceived = 9; @@ -2699,4 +3023,116 @@ describe('resumable-upload', () => { }); }); }); + + describe('Validation of Client Checksums Against Server Response', () => { + const DUMMY_CONTENT = Buffer.alloc(CHUNK_SIZE_MULTIPLE * 2); + let URI = ''; + beforeEach(() => { + up.contentLength = DUMMY_CONTENT.byteLength; + URI = 'uri'; + up.createURI = (callback: (error: Error | null, uri: string) => void) => { + up.uri = URI; + up.offset = 0; + callback(null, URI); + }; + }); + const checksumScenarios = [ + { + type: 'CRC32C', + match: true, + desc: 'successfully finish the upload if server-reported CRC32C matches client CRC32C', + serverCrc: CORRECT_CLIENT_CRC32C, + serverMd5: CORRECT_CLIENT_MD5, + }, + { + type: 'CRC32C', + match: false, + desc: 'fail and destroy the stream if server-reported CRC32C mismatches client CRC32C', + serverCrc: INCORRECT_SERVER_CRC32C, + serverMd5: CORRECT_CLIENT_MD5, + errorPart: 'CRC32C checksum mismatch.', + }, + { + type: 'MD5', + match: true, + desc: 'successfully finish the upload if server-reported MD5 matches client MD5', + serverCrc: CORRECT_CLIENT_CRC32C, + serverMd5: CORRECT_CLIENT_MD5, + }, + { + type: 'MD5', + match: false, + desc: 'fail and destroy the stream if server-reported MD5 mismatches client MD5', + serverCrc: CORRECT_CLIENT_CRC32C, + serverMd5: INCORRECT_SERVER_MD5, + errorPart: 'MD5 checksum mismatch.', + }, + ]; + + checksumScenarios.forEach(scenario => { + it(`should ${scenario.desc}`, done => { + up.makeRequestStream = async (opts: GaxiosOptions) => { + await new Promise(resolve => { + opts.body.on('data', () => {}); + opts.body.on('end', resolve); + }); + + return { + status: 200, + data: { + crc32c: scenario.serverCrc, + md5Hash: scenario.serverMd5, + name: up.file, + bucket: up.bucket, + size: DUMMY_CONTENT.byteLength.toString(), + }, + headers: {}, + config: opts, + statusText: 'OK', + }; + }; + + if (scenario.match) { + up.on('error', (err: Error) => { + done(new Error(`Upload failed unexpectedly: ${err.message}`)); + }); + up.on('finish', () => { + done(); + }); + } else { + up.on('error', (err: Error) => { + assert.strictEqual( + err.message, + FileExceptionMessages.UPLOAD_MISMATCH + ); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const detailError = (err as any).errors && (err as any).errors[0]; + assert.ok( + detailError && detailError.message.includes(scenario.errorPart!), + `Error message should contain: ${scenario.errorPart}` + ); + assert.strictEqual(up.uri, URI); + done(); + }); + + up.on('finish', () => { + done( + new Error( + `Upload should have failed due to ${scenario.type} mismatch, but emitted finish.` + ) + ); + }); + } + + const upstreamBuffer = new Readable({ + read() { + this.push(DUMMY_CONTENT); + this.push(null); + }, + }); + upstreamBuffer.pipe(up); + }); + }); + }); });