Skip to content
12 changes: 11 additions & 1 deletion src/hash-stream-validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,18 @@ class HashStreamValidator extends Transform {
return this.#crc32cHash?.toString();
}

/**
* Return the calculated MD5 value, if available.
*/
get md5Digest(): string | undefined {
if (this.#md5Hash && !this.#md5Digest) {
this.#md5Digest = this.#md5Hash.digest('base64');
}
return this.#md5Digest;
}

_flush(callback: (error?: Error | null | undefined) => void) {
if (this.#md5Hash) {
if (this.#md5Hash && !this.#md5Digest) {
this.#md5Digest = this.#md5Hash.digest('base64');
}

Expand Down
151 changes: 146 additions & 5 deletions src/resumable-upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ import {
getUserAgentString,
} from './util.js';
import {GCCL_GCS_CMD_KEY} from './nodejs-common/util.js';
import {FileMetadata} from './file.js';
import {FileExceptionMessages, FileMetadata, RequestError} from './file.js';
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
import {getPackageJSON} from './package-json-helper.cjs';
import {HashStreamValidator} from './hash-stream-validator.js';

const NOT_FOUND_STATUS_CODE = 404;
const RESUMABLE_INCOMPLETE_STATUS_CODE = 308;
Expand Down Expand Up @@ -149,6 +150,19 @@ export interface UploadConfig extends Pick<WritableOptions, 'highWaterMark'> {
*/
isPartialUpload?: boolean;

clientCrc32c?: string;
clientMd5Hash?: string;
/**
* Enables CRC32C calculation on the client side.
* The calculated hash will be sent in the final PUT request if `clientCrc32c` is not provided.
*/
crc32c?: boolean;
/**
* Enables MD5 calculation on the client side.
* The calculated hash will be sent in the final PUT request if `clientMd5Hash` is not provided.
*/
md5?: boolean;

/**
* A customer-supplied encryption key. See
* https://cloud.google.com/storage/docs/encryption#customer-supplied.
Expand Down Expand Up @@ -334,6 +348,11 @@ export class Upload extends Writable {
*/
private writeBuffers: Buffer[] = [];
private numChunksReadInRequest = 0;

#hashValidator?: HashStreamValidator;
#clientCrc32c?: string;
#clientMd5Hash?: string;

/**
* An array of buffers used for caching the most recent upload chunk.
* We should not assume that the server received all bytes sent in the request.
Expand Down Expand Up @@ -428,6 +447,20 @@ export class Upload extends Writable {
this.retryOptions = cfg.retryOptions;
this.isPartialUpload = cfg.isPartialUpload ?? false;

this.#clientCrc32c = cfg.clientCrc32c;
this.#clientMd5Hash = cfg.clientMd5Hash;

const calculateCrc32c = !cfg.clientCrc32c && cfg.crc32c;
const calculateMd5 = !cfg.clientMd5Hash && cfg.md5;

if (calculateCrc32c || calculateMd5) {
this.#hashValidator = new HashStreamValidator({
crc32c: calculateCrc32c,
md5: calculateMd5,
updateHashesOnly: true,
});
}

if (cfg.key) {
if (typeof cfg.key === 'string') {
const base64Key = Buffer.from(cfg.key).toString('base64');
Expand Down Expand Up @@ -518,9 +551,19 @@ export class Upload extends Writable {
// Backwards-compatible event
this.emit('writing');

this.writeBuffers.push(
typeof chunk === 'string' ? Buffer.from(chunk, encoding) : chunk
);
const bufferChunk =
typeof chunk === 'string' ? Buffer.from(chunk, encoding) : chunk;

if (this.#hashValidator) {
try {
this.#hashValidator.write(bufferChunk);
} catch (e) {
this.destroy(e as Error);
return;
}
}

this.writeBuffers.push(bufferChunk);

this.once('readFromChunkBuffer', readCallback);

Expand All @@ -537,6 +580,63 @@ export class Upload extends Writable {
this.localWriteCacheByteLength += buf.byteLength;
}

/**
* Compares the client's calculated or provided hash against the server's
* returned hash for a specific checksum type. Destroys the stream on mismatch.
* @param clientHash The client's calculated or provided hash (Base64).
* @param serverHash The hash returned by the server (Base64).
* @param hashType The type of hash ('CRC32C' or 'MD5').
*/
#validateChecksum(
clientHash: string | undefined,
serverHash: string | undefined,
hashType: 'CRC32C' | 'MD5'
): boolean {
// Only validate if both client and server hashes are present.
if (clientHash && serverHash) {
if (clientHash !== serverHash) {
const detailMessage = `${hashType} checksum mismatch. Client calculated: ${clientHash}, Server returned: ${serverHash}`;
const code = 'FILE_NO_UPLOAD';
const primaryMessage = FileExceptionMessages.UPLOAD_MISMATCH;
const detailError = new Error(detailMessage);
const error = new RequestError(primaryMessage);
error.code = code;
error.errors = [detailError];

this.destroy(error);
return true;
}
}
return false;
}

/**
* Builds and applies the X-Goog-Hash header to the request options
* using either calculated hashes from #hashValidator or pre-calculated
* client-side hashes. This should only be called on the final request.
*
* @param headers The headers object to modify.
*/
#applyChecksumHeaders(headers: GaxiosOptions['headers']) {
const checksums: string[] = [];

if (this.#hashValidator?.crc32cEnabled) {
checksums.push(`crc32c=${this.#hashValidator.crc32c!}`);
} else if (this.#clientCrc32c) {
checksums.push(`crc32c=${this.#clientCrc32c}`);
}

if (this.#hashValidator?.md5Enabled) {
checksums.push(`md5=${this.#hashValidator.md5Digest!}`);
} else if (this.#clientMd5Hash) {
checksums.push(`md5=${this.#clientMd5Hash}`);
}

if (checksums.length > 0) {
headers!['X-Goog-Hash'] = checksums.join(',');
}
}

/**
* Prepends the local buffer to write buffer and resets it.
*
Expand Down Expand Up @@ -929,6 +1029,10 @@ export class Upload extends Writable {
// unshifting data back into the queue. This way we will know if this is the last request or not.
const isLastChunkOfUpload = !(await this.waitForNextChunk());

if (isLastChunkOfUpload && this.#hashValidator) {
this.#hashValidator.end();
}

// Important: put the data back in the queue for the actual upload
this.prependLocalBufferToUpstream();

Expand All @@ -951,8 +1055,18 @@ export class Upload extends Writable {
headers['Content-Length'] = bytesToUpload;
headers['Content-Range'] =
`bytes ${this.offset}-${endingByte}/${totalObjectSize}`;

// Apply X-Goog-Hash header ONLY on the final chunk (WriteObject call)
if (isLastChunkOfUpload) {
this.#applyChecksumHeaders(headers);
}
} else {
headers['Content-Range'] = `bytes ${this.offset}-*/${this.contentLength}`;

if (this.#hashValidator) {
this.#hashValidator.end();
}
this.#applyChecksumHeaders(headers);
}

const reqOpts: GaxiosOptions = {
Expand Down Expand Up @@ -1046,7 +1160,29 @@ export class Upload extends Writable {
}

this.destroy(err);
} else {
} else if (this.isSuccessfulResponse(resp.status)) {
const serverCrc32c = resp.data.crc32c;
const serverMd5 = resp.data.md5Hash;

if (this.#hashValidator) {
this.#hashValidator.end();
}

const clientCrc32cToValidate =
this.#hashValidator?.crc32c || this.#clientCrc32c;
const clientMd5HashToValidate =
this.#hashValidator?.md5Digest || this.#clientMd5Hash;
if (
this.#validateChecksum(
clientCrc32cToValidate,
serverCrc32c,
'CRC32C'
) ||
this.#validateChecksum(clientMd5HashToValidate, serverMd5, 'MD5')
) {
return;
}

// no need to keep the cache
this.#resetLocalBuffersCache();

Expand All @@ -1058,6 +1194,11 @@ export class Upload extends Writable {
// Allow the object (Upload) to continue naturally so the user's
// "finish" event fires.
this.emit('uploadFinished');
} else {
// Handles the case where shouldContinueUploadInAnotherRequest is true
// and the response is not successful (e.g., 308 for a partial upload).
// This is the expected behavior for partial uploads that have finished their chunk.
this.emit('uploadFinished');
}
}

Expand Down
63 changes: 63 additions & 0 deletions system-test/kitchen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
RETRYABLE_ERR_FN_DEFAULT,
Storage,
} from '../src/storage.js';
import {CRC32C} from '../src/crc32c.js';

const bucketName = process.env.BUCKET_NAME || 'gcs-resumable-upload-test';

Expand Down Expand Up @@ -305,4 +306,66 @@ describe('resumable-upload', () => {
);
assert.equal(results.size, FILE_SIZE);
});

const KNOWN_CRC32C_OF_ZEROS = 'rthIWA==';
describe('Validation of Client Checksums Against Server Response', () => {
let crc32c: string;

before(async () => {
crc32c = (await CRC32C.fromFile(filePath)).toString();
});
it('should upload successfully when crc32c calculation is enabled', done => {
let uploadSucceeded = false;

fs.createReadStream(filePath)
.on('error', done)
.pipe(
upload({
bucket: bucketName,
file: filePath,
crc32c: true,
clientCrc32c: crc32c,
retryOptions: retryOptions,
})
)
.on('error', err => {
console.log(err);
done(
new Error(
`Upload failed unexpectedly on success path: ${err.message}`
)
);
})
.on('response', resp => {
uploadSucceeded = resp.status === 200;
})
.on('finish', () => {
assert.strictEqual(uploadSucceeded, true);
done();
});
});

it('should destroy the stream on a checksum mismatch (client-provided hash mismatch)', done => {
const EXPECTED_ERROR_MESSAGE_PART = `Provided CRC32C "${KNOWN_CRC32C_OF_ZEROS}" doesn't match calculated CRC32C`;

fs.createReadStream(filePath)
.on('error', done)
.pipe(
upload({
bucket: bucketName,
file: filePath,
clientCrc32c: KNOWN_CRC32C_OF_ZEROS,
crc32c: true,
retryOptions: retryOptions,
})
)
.on('error', (err: Error) => {
assert.ok(
err.message.includes(EXPECTED_ERROR_MESSAGE_PART),
`Expected error message part "${EXPECTED_ERROR_MESSAGE_PART}" not found in: ${err.message}`
);
done();
});
});
});
});
Loading