From 4af1de4d1a944a8b13adc7327d6dfa842936985a Mon Sep 17 00:00:00 2001 From: Jonathan Gramain Date: Mon, 9 Mar 2026 12:15:47 +0100 Subject: [PATCH] S3UTILS-226 MPU orphan cleanup script Implement cleanupMpuOrphans.js to delete orphaned parts and sproxyd keys. Orphaned parts are part metadata which do not correspond to an overview key with the same upload ID. Sproxyd keys are only removed if no completed MPU object has a matching upload ID in the same bucket. --- cleanupMpuOrphans.js | 577 ++++++++++++++++++++++++++++ tests/unit/cleanupMpuOrphans.js | 648 ++++++++++++++++++++++++++++++++ 2 files changed, 1225 insertions(+) create mode 100644 cleanupMpuOrphans.js create mode 100644 tests/unit/cleanupMpuOrphans.js diff --git a/cleanupMpuOrphans.js b/cleanupMpuOrphans.js new file mode 100644 index 00000000..57c87ac6 --- /dev/null +++ b/cleanupMpuOrphans.js @@ -0,0 +1,577 @@ +/* eslint-disable no-console */ +const async = require('async'); + +const werelogs = require('werelogs'); + +const httpRequest = require('./utils/async/httpRequest'); +const listVersions = require('./utils/async/bucketd/listVersions'); + +const DEFAULT_LISTING_PAGE_SIZE = 1000; +const DEFAULT_LOG_PROGRESS_INTERVAL = 10; +const RETRY_PARAMS = { times: 100, interval: 5000 }; + +const { + BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, + BUCKETS, RAFT_SESSIONS, +} = process.env; + +const VERBOSE = process.env.VERBOSE === '1'; +const TRACE = process.env.TRACE === '1'; + +let logLevel; +if (TRACE) { + logLevel = 'trace'; +} else if (VERBOSE) { + logLevel = 'debug'; +} else { + logLevel = 'info'; +} +werelogs.configure({ level: logLevel, dump: 'error' }); + +const LISTING_PAGE_SIZE = ( + process.env.LISTING_PAGE_SIZE + && Number.parseInt(process.env.LISTING_PAGE_SIZE, 10)) + || DEFAULT_LISTING_PAGE_SIZE; + +const LOG_PROGRESS_INTERVAL = ( + process.env.LOG_PROGRESS_INTERVAL + && Number.parseInt(process.env.LOG_PROGRESS_INTERVAL, 10)) + || DEFAULT_LOG_PROGRESS_INTERVAL; + +const USAGE = ` +cleanupMpuOrphans.js + +This script cleans up orphaned multipart upload (MPU) data and +metadata from S3 buckets. + +Context: + There can be S3 metadata keys from internal MPU shadow buckets, + possibly associated with orphaned RING objects, that are left + behind in some rare cases of duplicate and concurrent "complete + MPU" or "abort MPU" requests on the same MPU object. They take up + storage space and may cause listing slowdowns, until they are + manually removed. + + Technically, orphaned parts are defined as part metadata which do + not have a corresponding overview key with the same upload ID + (overview keys are present for incomplete, but visible, MPUs). + +Principle: + This script takes care of cleaning up those orphaned parts data and + metadata keys with a two-phase process for each target bucket: + + - Discovery phase: builds a map of orphaned MPU upload IDs present + in the bucket + + - Cleanup phase: scans the bucket with a versioned listing, + matches any completed MPU with their orphaned counterpart to + detect used sproxyd keys, and only deletes the unused ones along + with the orphaned metadata. Any orphaned upload IDs that were not + matched to any completed object version are also deleted + unconditionally. + +Usage: + node cleanupMpuOrphans.js + +Mandatory environment variables: + BUCKETD_HOSTPORT: ip:port of bucketd endpoint + SPROXYD_HOSTPORT: ip:port of sproxyd endpoint + One of: + BUCKETS: comma-separated list of buckets to scan + or: + RAFT_SESSIONS: comma-separated list of raft sessions to scan + +Optional environment variables: + VERBOSE: set to 1 for more verbose output + TRACE: set to 1 to trace every request to bucketd and sproxyd + LISTING_PAGE_SIZE: number of keys to list per listing request (default ${DEFAULT_LISTING_PAGE_SIZE}) + LOG_PROGRESS_INTERVAL: interval in seconds between progress update log lines (default ${DEFAULT_LOG_PROGRESS_INTERVAL}) + +Logs: + Logs are output to stdout in JSON format using the standard + werelogs formatting. The main log messages are: + + starting discovery phase: scanning MPU shadow bucket + logged at the beginning of the discovery phase for a given bucket + orphaned MPU found + logged during discovery phase for each orphaned MPU discovered + discovery phase complete + logged when the discovery phase has completed for a given bucket + starting cleanup phase: scanning bucket + logged at the beginning of the cleanup phase for a given bucket + deleted orphaned sproxyd key + [VERBOSE or TRACE] logged for each orphaned sproxyd key deleted + not deleting sproxyd key used by completed MPU + [VERBOSE or TRACE] logged for each orphaned sproxyd key still used, NOT deleted + deleted orphaned part metadata + [VERBOSE or TRACE] logged for each orphaned part metadata key deleted + cleanup phase complete + logged when the cleanup phase has completed for a given bucket + completed MPU orphan cleanup + logged when cleanup is complete for all target buckets/RAFT sessions + progress update + logged every LOG_PROGRESS_INTERVAL with progress stats +`; + +const log = new werelogs.Logger('s3utils:cleanupMpuOrphans'); + +const status = { + phase: null, + bucket: null, + orphanedUploadIds: 0, + versionsScanned: 0, + orphanPartsDeleted: 0, + sproxydKeysDeleted: 0, +}; + +function logProgress(message) { + const fields = { + phase: status.phase, + bucket: status.bucket, + }; + if (status.phase === 'discovery') { + fields.orphanedUploadIds = status.orphanedUploadIds; + } + if (status.phase === 'cleanup') { + fields.versionsScanned = status.versionsScanned; + fields.orphanPartsDeleted = status.orphanPartsDeleted; + fields.sproxydKeysDeleted = status.sproxydKeysDeleted; + } + log.info(message, fields); +} + +let progressInterval; + +let remainingBuckets = (BUCKETS && BUCKETS.split(',')) || []; + +let sproxydAlias; + +async function getSproxydAlias() { + const url = `http://${SPROXYD_HOSTPORT}/.conf`; + const res = await httpRequest('GET', url, RETRY_PARAMS); + if (res.statusCode !== 200) { + throw new Error(`GET ${url} returned status ${res.statusCode}`); + } + const resp = JSON.parse(res.body); + sproxydAlias = resp['ring_driver:0'].alias; +} + +async function raftSessionsToBuckets() { + if (!RAFT_SESSIONS) { + return; + } + const rsList = RAFT_SESSIONS.split(','); + await Promise.all(rsList.map(async rs => { + const url = `http://${BUCKETD_HOSTPORT}/_/raft_sessions/${rs}/bucket`; + const res = await httpRequest('GET', url, RETRY_PARAMS); + if (res.statusCode !== 200) { + throw new Error(`GET ${url} returned status ${res.statusCode}`); + } + const resp = JSON.parse(res.body); + remainingBuckets = remainingBuckets.concat(resp.filter( + bucket => !bucket.startsWith('mpuShadowBucket') + && bucket !== 'users..bucket', + )); + })); +} + +const OVERVIEW_KEY_PREFIX = 'overview..|..'; + +/** + * Populate uploadIds with all upload IDs that have an overview key in the + * given MPU shadow bucket. + * + * @param {string} bucketdHostport - host:port of the bucketd endpoint + * @param {string} shadowBucket - name of the MPU shadow bucket to list + * @param {Set} uploadIds - set to populate with found upload IDs + * @param {object} [options] - listing options + * @param {number} [options.pageSize=1000] - number of keys per listing page + * @param {object} [options.retry] - retry parameters forwarded to httpRequest + * (e.g. { times: 100, interval: 5000 }) + * @returns {Promise} resolves when all overview keys have been scanned + */ +async function getUploadIdsWithOverview(bucketdHostport, shadowBucket, uploadIds, { pageSize = 1000, retry } = {}) { + let marker = ''; + let isTruncated = true; + while (isTruncated) { + const url = `http://${bucketdHostport}/default/bucket/${shadowBucket}` + + `?prefix=${encodeURIComponent(OVERVIEW_KEY_PREFIX)}&maxKeys=${pageSize}` + + `&marker=${encodeURIComponent(marker)}`; + + const res = await httpRequest('GET', url, retry); + if (res.statusCode === 404) { + break; + } + if (res.statusCode !== 200) { + throw new Error(`GET ${url} returned status ${res.statusCode}`); + } + const { Contents, IsTruncated } = JSON.parse(res.body); + for (const item of Contents) { + // overview key format: overview..|....|.. + const keyParts = item.key.split('..|..'); + uploadIds.add(keyParts[keyParts.length - 1]); + } + if (IsTruncated) { + marker = Contents[Contents.length - 1].key; + } + isTruncated = IsTruncated; + } +} + +/** + * List all part keys in the shadow bucket and return a map of orphaned entries + * whose upload ID has no corresponding overview key. + * + * @param {string} bucketdHostport - host:port of the bucketd endpoint + * @param {string} shadowBucket - MPU shadow bucket name to scan + * @param {Set} uploadIdsWithOverview - upload IDs that have an overview + * key and should be skipped + * @param {object} [options] - listing options + * @param {number} [options.pageSize=1000] - number of keys per listing page + * @param {object} [options.retry] - retry parameters forwarded to httpRequest + * (e.g. { times: 100, interval: 5000 }) + * @returns {Promise} map of the form + * { [uploadId]: { partKeys: string[], sproxydKeys: Set } } + */ +async function collectOrphanParts(bucketdHostport, shadowBucket, uploadIdsWithOverview, { pageSize = 1000, retry } = {}) { + const orphanMap = {}; + let partsMarker = ''; + let isTruncated = true; + while (isTruncated) { + const url = `http://${bucketdHostport}/default/bucket/${shadowBucket}` + + `?maxKeys=${pageSize}` + + `&marker=${encodeURIComponent(partsMarker)}`; + + const res = await httpRequest('GET', url, retry); + if (res.statusCode === 404) { + break; + } + if (res.statusCode !== 200) { + throw new Error(`GET ${url} returned status ${res.statusCode}`); + } + const { Contents, IsTruncated } = JSON.parse(res.body); + for (const item of Contents) { + if (item.key.startsWith(OVERVIEW_KEY_PREFIX)) { + continue; // skip overview keys + } + // part key format: ..|..<5-digit-index> + const sepPos = item.key.indexOf('..|..'); + if (sepPos === -1) { + log.warn('unexpected key format in MPU shadow bucket', { + shadowBucket, key: item.key, + }); + continue; + } + const uploadId = item.key.slice(0, sepPos); + if (uploadIdsWithOverview.has(uploadId)) { + continue; // has a live overview key: not orphaned + } + if (!orphanMap[uploadId]) { + orphanMap[uploadId] = { partKeys: [], sproxydKeys: new Set() }; + } + orphanMap[uploadId].partKeys.push(item.key); + + let md; + try { + md = JSON.parse(item.value); + } catch (e) { + log.warn('failed to parse part key metadata', { + shadowBucket, + key: item.key, + error: { message: e.message }, + }); + continue; + } + const { partLocations } = md; + if (!partLocations || partLocations.length === 0) { + log.warn('part key has no partLocations', { + shadowBucket, uploadId, key: item.key, + }); + continue; + } + for (const loc of partLocations) { + orphanMap[uploadId].sproxydKeys.add(loc.key); + } + } + if (IsTruncated) { + partsMarker = Contents[Contents.length - 1].key; + } + isTruncated = IsTruncated; + } + return orphanMap; +} + +/** + * Phase 1: builds a map of orphaned MPU upload IDs for a given bucket. + * + * An orphaned MPU has one or more part keys in the MPU shadow bucket but no + * corresponding overview key. + * + * @param {string} bucketdHostport - host:port of the bucketd endpoint + * @param {string} shadowBucket - MPU shadow bucket name to scan + * @param {object} [options] - listing options + * @param {number} [options.pageSize=1000] - number of keys per listing page + * @param {object} [options.retry] - retry parameters forwarded to httpRequest + * (e.g. { times: 100, interval: 5000 }) + * @returns {Promise} map of the form + * { [uploadId]: { partKeys: string[], sproxydKeys: Set } } + */ +async function buildOrphanMap(bucketdHostport, shadowBucket, { pageSize = 1000, retry } = {}) { + const uploadIdsWithOverview = new Set(); + + // Step 1: collect upload IDs that have an overview key + await getUploadIdsWithOverview( + bucketdHostport, + shadowBucket, + uploadIdsWithOverview, + { pageSize, retry }, + ); + + // Step 2: list all parts, build orphan map + const orphanMap = await collectOrphanParts( + bucketdHostport, + shadowBucket, + uploadIdsWithOverview, + { pageSize, retry }, + ); + + // Step 3: re-check overview keys to eliminate upload IDs that gained an + // overview key between step 1 and step 2 (race condition) + await getUploadIdsWithOverview( + bucketdHostport, + shadowBucket, + uploadIdsWithOverview, + { pageSize, retry }, + ); + for (const uploadId of Object.keys(orphanMap)) { + if (uploadIdsWithOverview.has(uploadId)) { + delete orphanMap[uploadId]; + } + } + return orphanMap; +} + +/** + * Delete orphaned sproxyd keys (keysToDelete) and all part metadata entries + * for the given orphaned upload ID. Failures are logged but do not abort. + * + * @param {object} reqLogger - werelogs request logger with bucket and uploadId + * already set as default fields + * @param {string} bucketdHostport - host:port of the bucketd endpoint + * @param {string} sproxydHostport - host:port of the sproxyd endpoint + * @param {string} shadowBucket - MPU shadow bucket name + * @param {{partKeys: string[]}} orphanEntry - orphan entry with part keys to delete + * @param {Set} keysToDelete - sproxyd keys to delete + * @param {object} [retry] - retry parameters forwarded to httpRequest + * @returns {Promise} resolves when all deletions have been attempted + */ +async function cleanupOrphanEntry(reqLogger, bucketdHostport, sproxydHostport, shadowBucket, orphanEntry, keysToDelete, retry) { + for (const sproxydKey of keysToDelete) { + const sproxydUrl = `http://${sproxydHostport}/${sproxydAlias}/${sproxydKey}`; + try { + const res = await httpRequest('DELETE', sproxydUrl, retry); + if (res.statusCode !== 200) { + reqLogger.error('failed to delete orphaned sproxyd key', { + sproxydKey, error: { statusCode: res.statusCode }, + }); + } else { + reqLogger.debug('deleted orphaned sproxyd key', { sproxydKey }); + status.sproxydKeysDeleted += 1; + } + } catch (err) { + reqLogger.error('failed to delete orphaned sproxyd key', { + sproxydKey, error: { message: err.message }, + }); + } + } + for (const partKey of orphanEntry.partKeys) { + const partUrl = `http://${bucketdHostport}/default/bucket/${shadowBucket}/${ + encodeURIComponent(partKey)}`; + try { + const res = await httpRequest('DELETE', partUrl, retry); + if (res.statusCode !== 200 && res.statusCode !== 404) { + reqLogger.error('failed to delete orphaned part metadata', { + partKey, error: { statusCode: res.statusCode }, + }); + } else { + reqLogger.debug('deleted orphaned part metadata', { partKey }); + status.orphanPartsDeleted += 1; + } + } catch (err) { + reqLogger.error('failed to delete orphaned part metadata', { + partKey, error: { message: err.message }, + }); + } + } +} + +/** + * Phase 2: scan all object versions in the original bucket to find completed + * MPU objects that share sproxyd keys with orphaned parts, delete only the + * orphaned keys (those not referenced by the completed object), then delete + * all remaining orphans unconditionally. + * + * @param {string} bucketdHostport - host:port of the bucketd endpoint + * @param {string} sproxydHostport - host:port of the sproxyd endpoint + * @param {string} bucket - original bucket name + * @param {string} shadowBucket - MPU shadow bucket name + * @param {object} orphanMap - map of orphaned upload IDs (mutated in place) + * @param {object} [options] - listing options + * @param {number} [options.pageSize=1000] - number of keys per listing page + * @param {object} [options.retry] - retry parameters forwarded to httpRequest + * (e.g. { times: 100, interval: 5000 }) + * @returns {Promise} resolves when all orphans have been processed + */ +async function cleanupOrphans(bucketdHostport, sproxydHostport, bucket, shadowBucket, orphanMap, { pageSize = 1000, retry } = {}) { + for await (const { value: resolvedMd } of listVersions(bucketdHostport, bucket, { + pageSize, + retry, + })) { + status.versionsScanned += 1; + // Common case: skip version if not an MPU that has orphaned parts + if (!resolvedMd.uploadId || !orphanMap[resolvedMd.uploadId] + || !Array.isArray(resolvedMd.location)) { + continue; + } + const { uploadId } = resolvedMd; + + const reqLogger = log.newRequestLogger(); + reqLogger.addDefaultFields({ bucket, uploadId }); + + const locationKeys = new Set(resolvedMd.location.map(loc => loc.key)); + const orphanEntry = orphanMap[uploadId]; + // Only delete sproxyd keys not referenced by the completed object + const keysToDelete = new Set(); + for (const sproxydKey of orphanEntry.sproxydKeys) { + if (locationKeys.has(sproxydKey)) { + reqLogger.debug('not deleting sproxyd key used by completed MPU', + { sproxydKey }); + } else { + keysToDelete.add(sproxydKey); + } + } + + await cleanupOrphanEntry(reqLogger, bucketdHostport, sproxydHostport, shadowBucket, orphanEntry, keysToDelete, retry); + // eslint-disable-next-line no-param-reassign + delete orphanMap[uploadId]; + } + + // Delete remaining orphans not referenced by any completed object + for (const uploadId of Object.keys(orphanMap)) { + const reqLogger = log.newRequestLogger(); + reqLogger.addDefaultFields({ bucket, uploadId }); + + const orphanEntry = orphanMap[uploadId]; + + await cleanupOrphanEntry(reqLogger, bucketdHostport, sproxydHostport, shadowBucket, orphanEntry, orphanEntry.sproxydKeys, retry); + // eslint-disable-next-line no-param-reassign + delete orphanMap[uploadId]; + } +} + +async function processBucket(bucket) { + const shadowBucket = `mpuShadowBucket${bucket}`; + + log.info('starting discovery phase: scanning MPU shadow bucket', { bucket, shadowBucket }); + + status.bucket = bucket; + status.phase = 'discovery'; + status.orphanedUploadIds = 0; + status.versionsScanned = 0; + status.orphanPartsDeleted = 0; + status.sproxydKeysDeleted = 0; + + const orphanMap = await buildOrphanMap( + BUCKETD_HOSTPORT, + shadowBucket, + { pageSize: LISTING_PAGE_SIZE, retry: RETRY_PARAMS }, + ); + const orphanCount = Object.keys(orphanMap).length; + status.orphanedUploadIds = orphanCount; + logProgress('discovery phase complete'); + if (orphanCount === 0) { + return; + } + + for (const [uploadId, info] of Object.entries(orphanMap)) { + log.info('orphaned MPU found', { + bucket, + uploadId, + partCount: info.partKeys.length, + sproxydKeyCount: info.sproxydKeys.size, + }); + } + + log.info('starting cleanup phase: scanning bucket', { bucket }); + + status.phase = 'cleanup'; + await cleanupOrphans( + BUCKETD_HOSTPORT, + SPROXYD_HOSTPORT, + bucket, + shadowBucket, + orphanMap, + { pageSize: LISTING_PAGE_SIZE, retry: RETRY_PARAMS }, + ); + logProgress('cleanup phase complete'); +} + +async function main() { + if (!BUCKETS && !RAFT_SESSIONS) { + console.error('ERROR: either BUCKETS or RAFT_SESSIONS environment ' + + 'variable must be defined'); + console.error(USAGE); + process.exit(1); + } + if (BUCKETS && RAFT_SESSIONS) { + console.error('ERROR: only one of BUCKETS or RAFT_SESSIONS environment ' + + 'variables can be defined'); + console.error(USAGE); + process.exit(1); + } + if (!BUCKETD_HOSTPORT) { + console.error('ERROR: BUCKETD_HOSTPORT not defined'); + console.error(USAGE); + process.exit(1); + } + if (!SPROXYD_HOSTPORT) { + console.error('ERROR: SPROXYD_HOSTPORT not defined'); + console.error(USAGE); + process.exit(1); + } + + progressInterval = setInterval( + () => logProgress('progress update'), + LOG_PROGRESS_INTERVAL * 1000, + ); + try { + await getSproxydAlias(); + await raftSessionsToBuckets(); + await async.eachSeries(remainingBuckets, processBucket); + clearInterval(progressInterval); + log.info('completed MPU orphan cleanup'); + process.exit(0); + } catch (err) { + clearInterval(progressInterval); + log.error('an error occurred during cleanup', { + error: { message: err.message }, + }); + process.exit(1); + } +} + +function stop() { + clearInterval(progressInterval); + log.info('stopping execution'); + process.exit(0); +} + +if (require.main === module) { + main(); + process.on('SIGINT', stop); + process.on('SIGHUP', stop); + process.on('SIGTERM', stop); + process.on('SIGQUIT', stop); +} + +module.exports = { getSproxydAlias, getUploadIdsWithOverview, collectOrphanParts, buildOrphanMap, cleanupOrphanEntry, cleanupOrphans }; diff --git a/tests/unit/cleanupMpuOrphans.js b/tests/unit/cleanupMpuOrphans.js new file mode 100644 index 00000000..36db274f --- /dev/null +++ b/tests/unit/cleanupMpuOrphans.js @@ -0,0 +1,648 @@ +jest.mock('../../utils/async/httpRequest'); +jest.mock('../../utils/async/bucketd/listVersions'); + +const httpRequest = require('../../utils/async/httpRequest'); +const listVersions = require('../../utils/async/bucketd/listVersions'); +const { + getSproxydAlias, + getUploadIdsWithOverview, + collectOrphanParts, + buildOrphanMap, + cleanupOrphanEntry, + cleanupOrphans, +} = require('../../cleanupMpuOrphans'); + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +const BUCKETD_HOSTPORT = 'localhost:9000'; +const SHADOW_BUCKET = 'mpuShadowBuckettest-bucket'; +const RETRY_PARAMS = { times: 5, interval: 100 }; + +function overviewUrl({ maxKeys = 1000, marker = '' } = {}) { + return `http://${BUCKETD_HOSTPORT}/default/bucket/${SHADOW_BUCKET}` + + `?prefix=${encodeURIComponent('overview..|..')}&maxKeys=${maxKeys}` + + `&marker=${encodeURIComponent(marker)}`; +} + +// Build an overview key in the format: overview..|....|.. +function overviewKey(objectKey, uploadId) { + return `overview..|..${objectKey}..|..${uploadId}`; +} + +function fakeResponse(statusCode, body) { + return { statusCode, body: body !== undefined ? JSON.stringify(body) : '' }; +} + +// Build a listing page body with the given overview key strings +function listingPage(keys, isTruncated = false) { + return { + Contents: keys.map(key => ({ key })), + IsTruncated: isTruncated, + }; +} + +beforeEach(() => { + // Reset only httpRequest so each test starts with no queued responses, + // while preserving async.retry's mock implementation. + httpRequest.mockReset(); +}); + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('getUploadIdsWithOverview', () => { + test('adds nothing when shadow bucket returns 404', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(404)); + + const uploadIds = new Set(); + await getUploadIdsWithOverview(BUCKETD_HOSTPORT, SHADOW_BUCKET, uploadIds); + + expect(uploadIds.size).toBe(0); + expect(httpRequest).toHaveBeenCalledTimes(1); + expect(httpRequest).toHaveBeenCalledWith('GET', overviewUrl(), undefined); + }); + + test('populates uploadIds from a single page of overview keys', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([ + overviewKey('object1', 'uploadId1'), + overviewKey('object2', 'uploadId2'), + ]))); + + const uploadIds = new Set(); + await getUploadIdsWithOverview(BUCKETD_HOSTPORT, SHADOW_BUCKET, uploadIds); + + expect(uploadIds).toEqual(new Set(['uploadId1', 'uploadId2'])); + expect(httpRequest).toHaveBeenCalledTimes(1); + expect(httpRequest).toHaveBeenCalledWith('GET', overviewUrl(), undefined); + }); + + test('follows pagination and uses last key of each page as the next marker', async () => { + const lastKeyPage1 = overviewKey('object1', 'uploadId1'); + httpRequest + .mockResolvedValueOnce(fakeResponse(200, { + Contents: [{ key: lastKeyPage1 }], + IsTruncated: true, + })) + .mockResolvedValueOnce(fakeResponse(200, listingPage([ + overviewKey('object2', 'uploadId2'), + ]))); + + const uploadIds = new Set(); + await getUploadIdsWithOverview(BUCKETD_HOSTPORT, SHADOW_BUCKET, uploadIds); + + expect(uploadIds).toEqual(new Set(['uploadId1', 'uploadId2'])); + expect(httpRequest).toHaveBeenCalledTimes(2); + expect(httpRequest).toHaveBeenNthCalledWith(2, 'GET', overviewUrl({ marker: lastKeyPage1 }), undefined); + }); + + test('pageSize option controls maxKeys in the URL', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([]))); + + const uploadIds = new Set(); + await getUploadIdsWithOverview(BUCKETD_HOSTPORT, SHADOW_BUCKET, uploadIds, { pageSize: 42 }); + + expect(httpRequest).toHaveBeenCalledWith('GET', overviewUrl({ maxKeys: 42 }), undefined); + }); + + test('retry option is forwarded to httpRequest', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([]))); + const retry = { times: 5, interval: 1000 }; + + const uploadIds = new Set(); + await getUploadIdsWithOverview(BUCKETD_HOSTPORT, SHADOW_BUCKET, uploadIds, { retry }); + + expect(httpRequest).toHaveBeenCalledWith('GET', overviewUrl(), retry); + }); + + test('overview key with object key containing the separator', async () => { + // overview key format: overview..|....|.. + // split('..|..') takes the LAST part as uploadId, so embedded + // separators in the object key are handled correctly + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([ + overviewKey('dir..|..subdir/object', 'uploadId1'), + ]))); + + const uploadIds = new Set(); + await getUploadIdsWithOverview(BUCKETD_HOSTPORT, SHADOW_BUCKET, uploadIds); + + expect(uploadIds).toEqual(new Set(['uploadId1'])); + }); + + test('throws when the listing returns a non-200 non-404 status', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(500)); + + const uploadIds = new Set(); + await expect(getUploadIdsWithOverview(BUCKETD_HOSTPORT, SHADOW_BUCKET, uploadIds)) + .rejects.toThrow('returned status 500'); + }); +}); + +// --------------------------------------------------------------------------- +// collectOrphanParts helpers +// --------------------------------------------------------------------------- + +function partsUrl({ maxKeys = 1000, marker = '' } = {}) { + return `http://${BUCKETD_HOSTPORT}/default/bucket/${SHADOW_BUCKET}` + + `?maxKeys=${maxKeys}&marker=${encodeURIComponent(marker)}`; +} + +// Build a part listing entry: key = "..|..<5-digit-index>", +// value = JSON with partLocations containing the given sproxyd keys. +function partEntry(uploadId, partIndex, sproxydKeys = []) { + return { + key: `${uploadId}..|..${String(partIndex).padStart(5, '0')}`, + value: JSON.stringify({ partLocations: sproxydKeys.map(key => ({ key })) }), + }; +} + +// Build a listing page body with the given entry objects ({ key, value }) +function partsPage(entries, isTruncated = false) { + return { + Contents: entries, + IsTruncated: isTruncated, + }; +} + +// --------------------------------------------------------------------------- +// collectOrphanParts tests +// --------------------------------------------------------------------------- + +describe('collectOrphanParts', () => { + test('returns empty map when shadow bucket returns 404', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(404)); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(result).toEqual({}); + expect(httpRequest).toHaveBeenCalledTimes(1); + expect(httpRequest).toHaveBeenCalledWith('GET', partsUrl(), undefined); + }); + + test('returns empty map when bucket has no parts', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(result).toEqual({}); + }); + + test('collects partKeys and sproxydKeys for orphaned upload IDs', async () => { + const entry = partEntry('uploadId1', 1, ['sproxyd-key-A', 'sproxyd-key-B']); + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([entry]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(result).toEqual({ + uploadId1: { + partKeys: [entry.key], + sproxydKeys: new Set(['sproxyd-key-A', 'sproxyd-key-B']), + }, + }); + }); + + test('accumulates multiple parts under the same upload ID', async () => { + const e1 = partEntry('uploadId1', 1, ['key-A']); + const e2 = partEntry('uploadId1', 2, ['key-B']); + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([e1, e2]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(result.uploadId1.partKeys).toEqual([e1.key, e2.key]); + expect(result.uploadId1.sproxydKeys).toEqual(new Set(['key-A', 'key-B'])); + }); + + test('skips overview keys (keys starting with "overview..|..")', async () => { + const overviewEntry = { key: overviewKey('object1', 'uploadId1'), value: '{}' }; + const partEnt = partEntry('uploadId2', 1, ['key-X']); + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([overviewEntry, partEnt]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(Object.keys(result)).toEqual(['uploadId2']); + }); + + test('skips upload IDs present in uploadIdsWithOverview', async () => { + const e1 = partEntry('uploadIdLive', 1, ['key-live']); + const e2 = partEntry('uploadIdOrphan', 1, ['key-orphan']); + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([e1, e2]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set(['uploadIdLive'])); + + expect(Object.keys(result)).toEqual(['uploadIdOrphan']); + }); + + test('skips parts with malformed key (no ..|.. separator)', async () => { + const badEntry = { key: 'no-separator-here', value: '{}' }; + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([badEntry]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(result).toEqual({}); + }); + + test('records partKey but skips sproxydKeys when part metadata JSON is malformed', async () => { + const entry = { key: 'uploadId1..|..00001', value: 'not-valid-json' }; + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([entry]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(result.uploadId1.partKeys).toEqual([entry.key]); + expect(result.uploadId1.sproxydKeys).toEqual(new Set()); + }); + + test('records partKey but skips sproxydKeys when partLocations is missing', async () => { + const entry = { key: 'uploadId1..|..00001', value: JSON.stringify({}) }; + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([entry]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(result.uploadId1.partKeys).toEqual([entry.key]); + expect(result.uploadId1.sproxydKeys).toEqual(new Set()); + }); + + test('records partKey but skips sproxydKeys when partLocations is empty', async () => { + const entry = { key: 'uploadId1..|..00001', value: JSON.stringify({ partLocations: [] }) }; + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([entry]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(result.uploadId1.partKeys).toEqual([entry.key]); + expect(result.uploadId1.sproxydKeys).toEqual(new Set()); + }); + + test('follows pagination using last key of each page as the next marker', async () => { + const e1 = partEntry('uploadId1', 1, ['key-A']); + const e2 = partEntry('uploadId2', 1, ['key-B']); + httpRequest + .mockResolvedValueOnce(fakeResponse(200, partsPage([e1], true))) + .mockResolvedValueOnce(fakeResponse(200, partsPage([e2]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(Object.keys(result)).toEqual(expect.arrayContaining(['uploadId1', 'uploadId2'])); + expect(httpRequest).toHaveBeenCalledTimes(2); + expect(httpRequest).toHaveBeenNthCalledWith(2, 'GET', partsUrl({ marker: e1.key }), undefined); + }); + + test('pageSize option controls maxKeys in the URL', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([]))); + + await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set(), { pageSize: 42 }); + + expect(httpRequest).toHaveBeenCalledWith('GET', partsUrl({ maxKeys: 42 }), undefined); + }); + + test('retry option is forwarded to httpRequest', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([]))); + const retry = { times: 5, interval: 1000 }; + + await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set(), { retry }); + + expect(httpRequest).toHaveBeenCalledWith('GET', partsUrl(), retry); + }); + + test('throws when the listing returns a non-200 non-404 status', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(500)); + + await expect(collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set())) + .rejects.toThrow('returned status 500'); + }); +}); + +// --------------------------------------------------------------------------- +// buildOrphanMap tests +// --------------------------------------------------------------------------- + +describe('buildOrphanMap', () => { + test('makes HTTP calls in the correct three-step order', async () => { + // step 1: overview listing + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([]))); + // step 2: parts listing + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([]))); + // step 3: overview re-check + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([]))); + + await buildOrphanMap(BUCKETD_HOSTPORT, SHADOW_BUCKET, { retry: RETRY_PARAMS }); + + expect(httpRequest).toHaveBeenCalledTimes(3); + expect(httpRequest).toHaveBeenNthCalledWith(1, 'GET', overviewUrl(), RETRY_PARAMS); + expect(httpRequest).toHaveBeenNthCalledWith(2, 'GET', partsUrl(), RETRY_PARAMS); + expect(httpRequest).toHaveBeenNthCalledWith(3, 'GET', overviewUrl(), RETRY_PARAMS); + }); + + test('returns empty map when shadow bucket has no parts', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([]))); + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([]))); + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([]))); + + const result = await buildOrphanMap(BUCKETD_HOSTPORT, SHADOW_BUCKET); + + expect(result).toEqual({}); + }); + + test('returns orphan map for upload IDs that have parts but no overview key', async () => { + const entry = partEntry('uploadId1', 1, ['sproxyd-key-A']); + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([]))); + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([entry]))); + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([]))); + + const result = await buildOrphanMap(BUCKETD_HOSTPORT, SHADOW_BUCKET); + + expect(result).toEqual({ + uploadId1: { + partKeys: [entry.key], + sproxydKeys: new Set(['sproxyd-key-A']), + }, + }); + }); + + test('upload IDs present in the step 1 overview are excluded from the result', async () => { + const e1 = partEntry('uploadIdLive', 1, ['key-live']); + const e2 = partEntry('uploadIdOrphan', 1, ['key-orphan']); + // step 1: uploadIdLive already has an overview key + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([ + overviewKey('some-object', 'uploadIdLive'), + ]))); + // step 2: both IDs have parts; collectOrphanParts skips uploadIdLive + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([e1, e2]))); + // step 3: uploadIdLive still has an overview key + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([ + overviewKey('some-object', 'uploadIdLive'), + ]))); + + const result = await buildOrphanMap(BUCKETD_HOSTPORT, SHADOW_BUCKET); + + expect(Object.keys(result)).toEqual(['uploadIdOrphan']); + }); + + test('removes upload IDs that gained an overview key between step 1 and step 3 (race condition)', async () => { + const entry = partEntry('uploadId1', 1, ['sproxyd-key-A']); + // step 1: no overview keys yet + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([]))); + // step 2: uploadId1 appears as an orphan candidate + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([entry]))); + // step 3: uploadId1 now has an overview key (MPU completed between step 1 and step 2) + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([ + overviewKey('some-object', 'uploadId1'), + ]))); + + const result = await buildOrphanMap(BUCKETD_HOSTPORT, SHADOW_BUCKET); + + expect(result).toEqual({}); + }); +}); + +// --------------------------------------------------------------------------- +// cleanupOrphanEntry helpers and tests +// --------------------------------------------------------------------------- + +const SPROXYD_HOSTPORT = 'localhost:8181'; +const SPROXYD_ALIAS = 'test-alias'; + +function sproxydDeleteUrl(key) { + return `http://${SPROXYD_HOSTPORT}/${SPROXYD_ALIAS}/${key}`; +} + +function partDeleteUrl(partKey) { + return `http://${BUCKETD_HOSTPORT}/default/bucket/${SHADOW_BUCKET}/${encodeURIComponent(partKey)}`; +} + +function makeReqLogger() { + return { error: jest.fn(), debug: jest.fn() }; +} + +describe('cleanupOrphanEntry', () => { + beforeAll(async () => { + // Initialise the module-level sproxydAlias variable used to build URLs. + const aliasBody = JSON.stringify({ 'ring_driver:0': { alias: SPROXYD_ALIAS } }); + httpRequest.mockResolvedValueOnce({ statusCode: 200, body: aliasBody }); + await getSproxydAlias(); + }); + + test('does nothing when keysToDelete and partKeys are both empty', async () => { + const reqLogger = makeReqLogger(); + await cleanupOrphanEntry(reqLogger, BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, SHADOW_BUCKET, + { partKeys: [] }, [], undefined); + + expect(httpRequest).not.toHaveBeenCalled(); + }); + + test('sends DELETE for each sproxyd key', async () => { + httpRequest + .mockResolvedValueOnce({ statusCode: 200, body: '' }) + .mockResolvedValueOnce({ statusCode: 200, body: '' }); + const reqLogger = makeReqLogger(); + + await cleanupOrphanEntry(reqLogger, BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, SHADOW_BUCKET, + { partKeys: [] }, ['key-A', 'key-B'], undefined); + + expect(httpRequest).toHaveBeenCalledTimes(2); + expect(httpRequest).toHaveBeenNthCalledWith(1, 'DELETE', sproxydDeleteUrl('key-A'), undefined); + expect(httpRequest).toHaveBeenNthCalledWith(2, 'DELETE', sproxydDeleteUrl('key-B'), undefined); + }); + + test('sends DELETE for each part key', async () => { + httpRequest + .mockResolvedValueOnce({ statusCode: 200, body: '' }) + .mockResolvedValueOnce({ statusCode: 200, body: '' }); + const reqLogger = makeReqLogger(); + + await cleanupOrphanEntry(reqLogger, BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, SHADOW_BUCKET, + { partKeys: ['part-key-1', 'part-key-2'] }, [], undefined); + + expect(httpRequest).toHaveBeenCalledTimes(2); + expect(httpRequest).toHaveBeenNthCalledWith(1, 'DELETE', partDeleteUrl('part-key-1'), undefined); + expect(httpRequest).toHaveBeenNthCalledWith(2, 'DELETE', partDeleteUrl('part-key-2'), undefined); + }); + + test('deletes sproxyd keys before part keys', async () => { + httpRequest + .mockResolvedValueOnce({ statusCode: 200, body: '' }) + .mockResolvedValueOnce({ statusCode: 200, body: '' }); + const reqLogger = makeReqLogger(); + + await cleanupOrphanEntry(reqLogger, BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, SHADOW_BUCKET, + { partKeys: ['part-key-1'] }, ['sproxyd-key-A'], undefined); + + expect(httpRequest).toHaveBeenNthCalledWith(1, 'DELETE', sproxydDeleteUrl('sproxyd-key-A'), undefined); + expect(httpRequest).toHaveBeenNthCalledWith(2, 'DELETE', partDeleteUrl('part-key-1'), undefined); + }); + + test('forwards retry param to httpRequest', async () => { + const retry = { times: 3, interval: 100 }; + httpRequest.mockResolvedValueOnce({ statusCode: 200, body: '' }); + const reqLogger = makeReqLogger(); + + await cleanupOrphanEntry(reqLogger, BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, SHADOW_BUCKET, + { partKeys: [] }, ['key-A'], retry); + + expect(httpRequest).toHaveBeenCalledWith('DELETE', sproxydDeleteUrl('key-A'), retry); + }); + + test('logs error and continues when sproxyd DELETE returns non-200', async () => { + httpRequest + .mockResolvedValueOnce({ statusCode: 500, body: '' }) + .mockResolvedValueOnce({ statusCode: 200, body: '' }); + const reqLogger = makeReqLogger(); + + await cleanupOrphanEntry(reqLogger, BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, SHADOW_BUCKET, + { partKeys: [] }, ['key-A', 'key-B'], undefined); + + expect(reqLogger.error).toHaveBeenCalledTimes(1); + expect(reqLogger.error).toHaveBeenCalledWith( + 'failed to delete orphaned sproxyd key', expect.objectContaining({ sproxydKey: 'key-A' }), + ); + expect(httpRequest).toHaveBeenCalledTimes(2); + }); + + test('logs error and continues when sproxyd DELETE throws', async () => { + httpRequest + .mockRejectedValueOnce(new Error('network error')) + .mockResolvedValueOnce({ statusCode: 200, body: '' }); + const reqLogger = makeReqLogger(); + + await cleanupOrphanEntry(reqLogger, BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, SHADOW_BUCKET, + { partKeys: [] }, ['key-A', 'key-B'], undefined); + + expect(reqLogger.error).toHaveBeenCalledTimes(1); + expect(reqLogger.error).toHaveBeenCalledWith( + 'failed to delete orphaned sproxyd key', expect.objectContaining({ sproxydKey: 'key-A' }), + ); + expect(httpRequest).toHaveBeenCalledTimes(2); + }); + + test('accepts 404 on part DELETE without logging an error', async () => { + httpRequest.mockResolvedValueOnce({ statusCode: 404, body: '' }); + const reqLogger = makeReqLogger(); + + await cleanupOrphanEntry(reqLogger, BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, SHADOW_BUCKET, + { partKeys: ['part-key-1'] }, [], undefined); + + expect(reqLogger.error).not.toHaveBeenCalled(); + }); + + test('logs error and continues when part DELETE returns non-200 non-404', async () => { + httpRequest + .mockResolvedValueOnce({ statusCode: 500, body: '' }) + .mockResolvedValueOnce({ statusCode: 200, body: '' }); + const reqLogger = makeReqLogger(); + + await cleanupOrphanEntry(reqLogger, BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, SHADOW_BUCKET, + { partKeys: ['part-key-1', 'part-key-2'] }, [], undefined); + + expect(reqLogger.error).toHaveBeenCalledTimes(1); + expect(reqLogger.error).toHaveBeenCalledWith( + 'failed to delete orphaned part metadata', expect.objectContaining({ partKey: 'part-key-1' }), + ); + expect(httpRequest).toHaveBeenCalledTimes(2); + }); +}); + +// --------------------------------------------------------------------------- +// cleanupOrphans tests +// --------------------------------------------------------------------------- + +describe('cleanupOrphans', () => { + beforeEach(() => { + listVersions.mockReset(); + }); + + // Build an orphan map entry + function makeOrphanEntry(partKeys, sproxydKeyList) { + return { partKeys, sproxydKeys: new Set(sproxydKeyList) }; + } + + // Build a version listing entry as yielded by listVersions + function versionWithUploadId(uploadId, locationKeys = []) { + return { + key: 'some-object', + versionId: 'some-version-id', + value: { uploadId, location: locationKeys.map(key => ({ key })) }, + }; + } + + test('resolves without any DELETE calls when orphanMap is empty', async () => { + listVersions.mockImplementation(async function* () {}); + + await cleanupOrphans(BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, 'test-bucket', SHADOW_BUCKET, {}); + + expect(httpRequest).not.toHaveBeenCalled(); + }); + + test('passes bucketdHostport, bucket, pageSize and retry to listVersions', async () => { + listVersions.mockImplementation(async function* () {}); + const retry = { times: 3, interval: 100 }; + + await cleanupOrphans(BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, 'test-bucket', SHADOW_BUCKET, {}, + { pageSize: 42, retry }); + + expect(listVersions).toHaveBeenCalledWith( + BUCKETD_HOSTPORT, 'test-bucket', expect.objectContaining({ pageSize: 42, retry }), + ); + }); + + test('deletes all sproxyd keys and part keys for upload IDs not matched by any version', async () => { + listVersions.mockImplementation(async function* () {}); + httpRequest.mockResolvedValue({ statusCode: 200, body: '' }); + const orphanMap = { + uploadId1: makeOrphanEntry(['uploadId1..|..00001'], ['sproxyd-key-A', 'sproxyd-key-B']), + }; + + await cleanupOrphans(BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, 'test-bucket', SHADOW_BUCKET, orphanMap); + + expect(httpRequest).toHaveBeenCalledWith('DELETE', sproxydDeleteUrl('sproxyd-key-A'), undefined); + expect(httpRequest).toHaveBeenCalledWith('DELETE', sproxydDeleteUrl('sproxyd-key-B'), undefined); + expect(httpRequest).toHaveBeenCalledWith('DELETE', partDeleteUrl('uploadId1..|..00001'), undefined); + expect(Object.keys(orphanMap)).toHaveLength(0); + }); + + test('skips versions with no uploadId or with uploadId not in orphanMap', async () => { + listVersions.mockImplementation(async function* () { + yield { key: 'obj1', versionId: 'v1', value: {} }; // no uploadId + yield { key: 'obj2', versionId: 'v2', value: { uploadId: 'other-upload-id' } }; + }); + httpRequest.mockResolvedValue({ statusCode: 200, body: '' }); + const orphanMap = { + uploadId1: makeOrphanEntry([], ['sproxyd-key-A']), + }; + + await cleanupOrphans(BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, 'test-bucket', SHADOW_BUCKET, orphanMap); + + // uploadId1 is not matched by any version, so it's cleaned up in the remaining phase + expect(httpRequest).toHaveBeenCalledWith('DELETE', sproxydDeleteUrl('sproxyd-key-A'), undefined); + }); + + test('skips sproxyd keys referenced by a completed version, deletes only orphaned ones', async () => { + listVersions.mockImplementation(async function* () { + yield versionWithUploadId('uploadId1', ['sproxyd-key-A']); // key-A is referenced + }); + httpRequest.mockResolvedValue({ statusCode: 200, body: '' }); + const orphanMap = { + uploadId1: makeOrphanEntry([], ['sproxyd-key-A', 'sproxyd-key-B']), + }; + + await cleanupOrphans(BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, 'test-bucket', SHADOW_BUCKET, orphanMap); + + expect(httpRequest).toHaveBeenCalledWith('DELETE', sproxydDeleteUrl('sproxyd-key-B'), undefined); + expect(httpRequest).not.toHaveBeenCalledWith('DELETE', sproxydDeleteUrl('sproxyd-key-A'), undefined); + }); + + test('upload ID matched by a version is not processed again in the remaining-orphans phase', async () => { + listVersions.mockImplementation(async function* () { + yield versionWithUploadId('uploadId1', []); + }); + httpRequest.mockResolvedValue({ statusCode: 200, body: '' }); + const orphanMap = { + uploadId1: makeOrphanEntry(['uploadId1..|..00001'], ['sproxyd-key-A']), + }; + + await cleanupOrphans(BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, 'test-bucket', SHADOW_BUCKET, orphanMap); + + // 2 calls: sproxyd-key-A + part-key; not 4 (which would happen if double-processed) + expect(httpRequest).toHaveBeenCalledTimes(2); + expect(Object.keys(orphanMap)).toHaveLength(0); + }); +});