diff --git a/cleanupMpuOrphans.js b/cleanupMpuOrphans.js new file mode 100644 index 00000000..57c87ac6 --- /dev/null +++ b/cleanupMpuOrphans.js @@ -0,0 +1,577 @@ +/* eslint-disable no-console */ +const async = require('async'); + +const werelogs = require('werelogs'); + +const httpRequest = require('./utils/async/httpRequest'); +const listVersions = require('./utils/async/bucketd/listVersions'); + +const DEFAULT_LISTING_PAGE_SIZE = 1000; +const DEFAULT_LOG_PROGRESS_INTERVAL = 10; +const RETRY_PARAMS = { times: 100, interval: 5000 }; + +const { + BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, + BUCKETS, RAFT_SESSIONS, +} = process.env; + +const VERBOSE = process.env.VERBOSE === '1'; +const TRACE = process.env.TRACE === '1'; + +let logLevel; +if (TRACE) { + logLevel = 'trace'; +} else if (VERBOSE) { + logLevel = 'debug'; +} else { + logLevel = 'info'; +} +werelogs.configure({ level: logLevel, dump: 'error' }); + +const LISTING_PAGE_SIZE = ( + process.env.LISTING_PAGE_SIZE + && Number.parseInt(process.env.LISTING_PAGE_SIZE, 10)) + || DEFAULT_LISTING_PAGE_SIZE; + +const LOG_PROGRESS_INTERVAL = ( + process.env.LOG_PROGRESS_INTERVAL + && Number.parseInt(process.env.LOG_PROGRESS_INTERVAL, 10)) + || DEFAULT_LOG_PROGRESS_INTERVAL; + +const USAGE = ` +cleanupMpuOrphans.js + +This script cleans up orphaned multipart upload (MPU) data and +metadata from S3 buckets. + +Context: + There can be S3 metadata keys from internal MPU shadow buckets, + possibly associated with orphaned RING objects, that are left + behind in some rare cases of duplicate and concurrent "complete + MPU" or "abort MPU" requests on the same MPU object. They take up + storage space and may cause listing slowdowns, until they are + manually removed. + + Technically, orphaned parts are defined as part metadata which do + not have a corresponding overview key with the same upload ID + (overview keys are present for incomplete, but visible, MPUs). + +Principle: + This script takes care of cleaning up those orphaned parts data and + metadata keys with a two-phase process for each target bucket: + + - Discovery phase: builds a map of orphaned MPU upload IDs present + in the bucket + + - Cleanup phase: scans the bucket with a versioned listing, + matches any completed MPU with their orphaned counterpart to + detect used sproxyd keys, and only deletes the unused ones along + with the orphaned metadata. Any orphaned upload IDs that were not + matched to any completed object version are also deleted + unconditionally. + +Usage: + node cleanupMpuOrphans.js + +Mandatory environment variables: + BUCKETD_HOSTPORT: ip:port of bucketd endpoint + SPROXYD_HOSTPORT: ip:port of sproxyd endpoint + One of: + BUCKETS: comma-separated list of buckets to scan + or: + RAFT_SESSIONS: comma-separated list of raft sessions to scan + +Optional environment variables: + VERBOSE: set to 1 for more verbose output + TRACE: set to 1 to trace every request to bucketd and sproxyd + LISTING_PAGE_SIZE: number of keys to list per listing request (default ${DEFAULT_LISTING_PAGE_SIZE}) + LOG_PROGRESS_INTERVAL: interval in seconds between progress update log lines (default ${DEFAULT_LOG_PROGRESS_INTERVAL}) + +Logs: + Logs are output to stdout in JSON format using the standard + werelogs formatting. The main log messages are: + + starting discovery phase: scanning MPU shadow bucket + logged at the beginning of the discovery phase for a given bucket + orphaned MPU found + logged during discovery phase for each orphaned MPU discovered + discovery phase complete + logged when the discovery phase has completed for a given bucket + starting cleanup phase: scanning bucket + logged at the beginning of the cleanup phase for a given bucket + deleted orphaned sproxyd key + [VERBOSE or TRACE] logged for each orphaned sproxyd key deleted + not deleting sproxyd key used by completed MPU + [VERBOSE or TRACE] logged for each orphaned sproxyd key still used, NOT deleted + deleted orphaned part metadata + [VERBOSE or TRACE] logged for each orphaned part metadata key deleted + cleanup phase complete + logged when the cleanup phase has completed for a given bucket + completed MPU orphan cleanup + logged when cleanup is complete for all target buckets/RAFT sessions + progress update + logged every LOG_PROGRESS_INTERVAL with progress stats +`; + +const log = new werelogs.Logger('s3utils:cleanupMpuOrphans'); + +const status = { + phase: null, + bucket: null, + orphanedUploadIds: 0, + versionsScanned: 0, + orphanPartsDeleted: 0, + sproxydKeysDeleted: 0, +}; + +function logProgress(message) { + const fields = { + phase: status.phase, + bucket: status.bucket, + }; + if (status.phase === 'discovery') { + fields.orphanedUploadIds = status.orphanedUploadIds; + } + if (status.phase === 'cleanup') { + fields.versionsScanned = status.versionsScanned; + fields.orphanPartsDeleted = status.orphanPartsDeleted; + fields.sproxydKeysDeleted = status.sproxydKeysDeleted; + } + log.info(message, fields); +} + +let progressInterval; + +let remainingBuckets = (BUCKETS && BUCKETS.split(',')) || []; + +let sproxydAlias; + +async function getSproxydAlias() { + const url = `http://${SPROXYD_HOSTPORT}/.conf`; + const res = await httpRequest('GET', url, RETRY_PARAMS); + if (res.statusCode !== 200) { + throw new Error(`GET ${url} returned status ${res.statusCode}`); + } + const resp = JSON.parse(res.body); + sproxydAlias = resp['ring_driver:0'].alias; +} + +async function raftSessionsToBuckets() { + if (!RAFT_SESSIONS) { + return; + } + const rsList = RAFT_SESSIONS.split(','); + await Promise.all(rsList.map(async rs => { + const url = `http://${BUCKETD_HOSTPORT}/_/raft_sessions/${rs}/bucket`; + const res = await httpRequest('GET', url, RETRY_PARAMS); + if (res.statusCode !== 200) { + throw new Error(`GET ${url} returned status ${res.statusCode}`); + } + const resp = JSON.parse(res.body); + remainingBuckets = remainingBuckets.concat(resp.filter( + bucket => !bucket.startsWith('mpuShadowBucket') + && bucket !== 'users..bucket', + )); + })); +} + +const OVERVIEW_KEY_PREFIX = 'overview..|..'; + +/** + * Populate uploadIds with all upload IDs that have an overview key in the + * given MPU shadow bucket. + * + * @param {string} bucketdHostport - host:port of the bucketd endpoint + * @param {string} shadowBucket - name of the MPU shadow bucket to list + * @param {Set} uploadIds - set to populate with found upload IDs + * @param {object} [options] - listing options + * @param {number} [options.pageSize=1000] - number of keys per listing page + * @param {object} [options.retry] - retry parameters forwarded to httpRequest + * (e.g. { times: 100, interval: 5000 }) + * @returns {Promise} resolves when all overview keys have been scanned + */ +async function getUploadIdsWithOverview(bucketdHostport, shadowBucket, uploadIds, { pageSize = 1000, retry } = {}) { + let marker = ''; + let isTruncated = true; + while (isTruncated) { + const url = `http://${bucketdHostport}/default/bucket/${shadowBucket}` + + `?prefix=${encodeURIComponent(OVERVIEW_KEY_PREFIX)}&maxKeys=${pageSize}` + + `&marker=${encodeURIComponent(marker)}`; + + const res = await httpRequest('GET', url, retry); + if (res.statusCode === 404) { + break; + } + if (res.statusCode !== 200) { + throw new Error(`GET ${url} returned status ${res.statusCode}`); + } + const { Contents, IsTruncated } = JSON.parse(res.body); + for (const item of Contents) { + // overview key format: overview..|....|.. + const keyParts = item.key.split('..|..'); + uploadIds.add(keyParts[keyParts.length - 1]); + } + if (IsTruncated) { + marker = Contents[Contents.length - 1].key; + } + isTruncated = IsTruncated; + } +} + +/** + * List all part keys in the shadow bucket and return a map of orphaned entries + * whose upload ID has no corresponding overview key. + * + * @param {string} bucketdHostport - host:port of the bucketd endpoint + * @param {string} shadowBucket - MPU shadow bucket name to scan + * @param {Set} uploadIdsWithOverview - upload IDs that have an overview + * key and should be skipped + * @param {object} [options] - listing options + * @param {number} [options.pageSize=1000] - number of keys per listing page + * @param {object} [options.retry] - retry parameters forwarded to httpRequest + * (e.g. { times: 100, interval: 5000 }) + * @returns {Promise} map of the form + * { [uploadId]: { partKeys: string[], sproxydKeys: Set } } + */ +async function collectOrphanParts(bucketdHostport, shadowBucket, uploadIdsWithOverview, { pageSize = 1000, retry } = {}) { + const orphanMap = {}; + let partsMarker = ''; + let isTruncated = true; + while (isTruncated) { + const url = `http://${bucketdHostport}/default/bucket/${shadowBucket}` + + `?maxKeys=${pageSize}` + + `&marker=${encodeURIComponent(partsMarker)}`; + + const res = await httpRequest('GET', url, retry); + if (res.statusCode === 404) { + break; + } + if (res.statusCode !== 200) { + throw new Error(`GET ${url} returned status ${res.statusCode}`); + } + const { Contents, IsTruncated } = JSON.parse(res.body); + for (const item of Contents) { + if (item.key.startsWith(OVERVIEW_KEY_PREFIX)) { + continue; // skip overview keys + } + // part key format: ..|..<5-digit-index> + const sepPos = item.key.indexOf('..|..'); + if (sepPos === -1) { + log.warn('unexpected key format in MPU shadow bucket', { + shadowBucket, key: item.key, + }); + continue; + } + const uploadId = item.key.slice(0, sepPos); + if (uploadIdsWithOverview.has(uploadId)) { + continue; // has a live overview key: not orphaned + } + if (!orphanMap[uploadId]) { + orphanMap[uploadId] = { partKeys: [], sproxydKeys: new Set() }; + } + orphanMap[uploadId].partKeys.push(item.key); + + let md; + try { + md = JSON.parse(item.value); + } catch (e) { + log.warn('failed to parse part key metadata', { + shadowBucket, + key: item.key, + error: { message: e.message }, + }); + continue; + } + const { partLocations } = md; + if (!partLocations || partLocations.length === 0) { + log.warn('part key has no partLocations', { + shadowBucket, uploadId, key: item.key, + }); + continue; + } + for (const loc of partLocations) { + orphanMap[uploadId].sproxydKeys.add(loc.key); + } + } + if (IsTruncated) { + partsMarker = Contents[Contents.length - 1].key; + } + isTruncated = IsTruncated; + } + return orphanMap; +} + +/** + * Phase 1: builds a map of orphaned MPU upload IDs for a given bucket. + * + * An orphaned MPU has one or more part keys in the MPU shadow bucket but no + * corresponding overview key. + * + * @param {string} bucketdHostport - host:port of the bucketd endpoint + * @param {string} shadowBucket - MPU shadow bucket name to scan + * @param {object} [options] - listing options + * @param {number} [options.pageSize=1000] - number of keys per listing page + * @param {object} [options.retry] - retry parameters forwarded to httpRequest + * (e.g. { times: 100, interval: 5000 }) + * @returns {Promise} map of the form + * { [uploadId]: { partKeys: string[], sproxydKeys: Set } } + */ +async function buildOrphanMap(bucketdHostport, shadowBucket, { pageSize = 1000, retry } = {}) { + const uploadIdsWithOverview = new Set(); + + // Step 1: collect upload IDs that have an overview key + await getUploadIdsWithOverview( + bucketdHostport, + shadowBucket, + uploadIdsWithOverview, + { pageSize, retry }, + ); + + // Step 2: list all parts, build orphan map + const orphanMap = await collectOrphanParts( + bucketdHostport, + shadowBucket, + uploadIdsWithOverview, + { pageSize, retry }, + ); + + // Step 3: re-check overview keys to eliminate upload IDs that gained an + // overview key between step 1 and step 2 (race condition) + await getUploadIdsWithOverview( + bucketdHostport, + shadowBucket, + uploadIdsWithOverview, + { pageSize, retry }, + ); + for (const uploadId of Object.keys(orphanMap)) { + if (uploadIdsWithOverview.has(uploadId)) { + delete orphanMap[uploadId]; + } + } + return orphanMap; +} + +/** + * Delete orphaned sproxyd keys (keysToDelete) and all part metadata entries + * for the given orphaned upload ID. Failures are logged but do not abort. + * + * @param {object} reqLogger - werelogs request logger with bucket and uploadId + * already set as default fields + * @param {string} bucketdHostport - host:port of the bucketd endpoint + * @param {string} sproxydHostport - host:port of the sproxyd endpoint + * @param {string} shadowBucket - MPU shadow bucket name + * @param {{partKeys: string[]}} orphanEntry - orphan entry with part keys to delete + * @param {Set} keysToDelete - sproxyd keys to delete + * @param {object} [retry] - retry parameters forwarded to httpRequest + * @returns {Promise} resolves when all deletions have been attempted + */ +async function cleanupOrphanEntry(reqLogger, bucketdHostport, sproxydHostport, shadowBucket, orphanEntry, keysToDelete, retry) { + for (const sproxydKey of keysToDelete) { + const sproxydUrl = `http://${sproxydHostport}/${sproxydAlias}/${sproxydKey}`; + try { + const res = await httpRequest('DELETE', sproxydUrl, retry); + if (res.statusCode !== 200) { + reqLogger.error('failed to delete orphaned sproxyd key', { + sproxydKey, error: { statusCode: res.statusCode }, + }); + } else { + reqLogger.debug('deleted orphaned sproxyd key', { sproxydKey }); + status.sproxydKeysDeleted += 1; + } + } catch (err) { + reqLogger.error('failed to delete orphaned sproxyd key', { + sproxydKey, error: { message: err.message }, + }); + } + } + for (const partKey of orphanEntry.partKeys) { + const partUrl = `http://${bucketdHostport}/default/bucket/${shadowBucket}/${ + encodeURIComponent(partKey)}`; + try { + const res = await httpRequest('DELETE', partUrl, retry); + if (res.statusCode !== 200 && res.statusCode !== 404) { + reqLogger.error('failed to delete orphaned part metadata', { + partKey, error: { statusCode: res.statusCode }, + }); + } else { + reqLogger.debug('deleted orphaned part metadata', { partKey }); + status.orphanPartsDeleted += 1; + } + } catch (err) { + reqLogger.error('failed to delete orphaned part metadata', { + partKey, error: { message: err.message }, + }); + } + } +} + +/** + * Phase 2: scan all object versions in the original bucket to find completed + * MPU objects that share sproxyd keys with orphaned parts, delete only the + * orphaned keys (those not referenced by the completed object), then delete + * all remaining orphans unconditionally. + * + * @param {string} bucketdHostport - host:port of the bucketd endpoint + * @param {string} sproxydHostport - host:port of the sproxyd endpoint + * @param {string} bucket - original bucket name + * @param {string} shadowBucket - MPU shadow bucket name + * @param {object} orphanMap - map of orphaned upload IDs (mutated in place) + * @param {object} [options] - listing options + * @param {number} [options.pageSize=1000] - number of keys per listing page + * @param {object} [options.retry] - retry parameters forwarded to httpRequest + * (e.g. { times: 100, interval: 5000 }) + * @returns {Promise} resolves when all orphans have been processed + */ +async function cleanupOrphans(bucketdHostport, sproxydHostport, bucket, shadowBucket, orphanMap, { pageSize = 1000, retry } = {}) { + for await (const { value: resolvedMd } of listVersions(bucketdHostport, bucket, { + pageSize, + retry, + })) { + status.versionsScanned += 1; + // Common case: skip version if not an MPU that has orphaned parts + if (!resolvedMd.uploadId || !orphanMap[resolvedMd.uploadId] + || !Array.isArray(resolvedMd.location)) { + continue; + } + const { uploadId } = resolvedMd; + + const reqLogger = log.newRequestLogger(); + reqLogger.addDefaultFields({ bucket, uploadId }); + + const locationKeys = new Set(resolvedMd.location.map(loc => loc.key)); + const orphanEntry = orphanMap[uploadId]; + // Only delete sproxyd keys not referenced by the completed object + const keysToDelete = new Set(); + for (const sproxydKey of orphanEntry.sproxydKeys) { + if (locationKeys.has(sproxydKey)) { + reqLogger.debug('not deleting sproxyd key used by completed MPU', + { sproxydKey }); + } else { + keysToDelete.add(sproxydKey); + } + } + + await cleanupOrphanEntry(reqLogger, bucketdHostport, sproxydHostport, shadowBucket, orphanEntry, keysToDelete, retry); + // eslint-disable-next-line no-param-reassign + delete orphanMap[uploadId]; + } + + // Delete remaining orphans not referenced by any completed object + for (const uploadId of Object.keys(orphanMap)) { + const reqLogger = log.newRequestLogger(); + reqLogger.addDefaultFields({ bucket, uploadId }); + + const orphanEntry = orphanMap[uploadId]; + + await cleanupOrphanEntry(reqLogger, bucketdHostport, sproxydHostport, shadowBucket, orphanEntry, orphanEntry.sproxydKeys, retry); + // eslint-disable-next-line no-param-reassign + delete orphanMap[uploadId]; + } +} + +async function processBucket(bucket) { + const shadowBucket = `mpuShadowBucket${bucket}`; + + log.info('starting discovery phase: scanning MPU shadow bucket', { bucket, shadowBucket }); + + status.bucket = bucket; + status.phase = 'discovery'; + status.orphanedUploadIds = 0; + status.versionsScanned = 0; + status.orphanPartsDeleted = 0; + status.sproxydKeysDeleted = 0; + + const orphanMap = await buildOrphanMap( + BUCKETD_HOSTPORT, + shadowBucket, + { pageSize: LISTING_PAGE_SIZE, retry: RETRY_PARAMS }, + ); + const orphanCount = Object.keys(orphanMap).length; + status.orphanedUploadIds = orphanCount; + logProgress('discovery phase complete'); + if (orphanCount === 0) { + return; + } + + for (const [uploadId, info] of Object.entries(orphanMap)) { + log.info('orphaned MPU found', { + bucket, + uploadId, + partCount: info.partKeys.length, + sproxydKeyCount: info.sproxydKeys.size, + }); + } + + log.info('starting cleanup phase: scanning bucket', { bucket }); + + status.phase = 'cleanup'; + await cleanupOrphans( + BUCKETD_HOSTPORT, + SPROXYD_HOSTPORT, + bucket, + shadowBucket, + orphanMap, + { pageSize: LISTING_PAGE_SIZE, retry: RETRY_PARAMS }, + ); + logProgress('cleanup phase complete'); +} + +async function main() { + if (!BUCKETS && !RAFT_SESSIONS) { + console.error('ERROR: either BUCKETS or RAFT_SESSIONS environment ' + + 'variable must be defined'); + console.error(USAGE); + process.exit(1); + } + if (BUCKETS && RAFT_SESSIONS) { + console.error('ERROR: only one of BUCKETS or RAFT_SESSIONS environment ' + + 'variables can be defined'); + console.error(USAGE); + process.exit(1); + } + if (!BUCKETD_HOSTPORT) { + console.error('ERROR: BUCKETD_HOSTPORT not defined'); + console.error(USAGE); + process.exit(1); + } + if (!SPROXYD_HOSTPORT) { + console.error('ERROR: SPROXYD_HOSTPORT not defined'); + console.error(USAGE); + process.exit(1); + } + + progressInterval = setInterval( + () => logProgress('progress update'), + LOG_PROGRESS_INTERVAL * 1000, + ); + try { + await getSproxydAlias(); + await raftSessionsToBuckets(); + await async.eachSeries(remainingBuckets, processBucket); + clearInterval(progressInterval); + log.info('completed MPU orphan cleanup'); + process.exit(0); + } catch (err) { + clearInterval(progressInterval); + log.error('an error occurred during cleanup', { + error: { message: err.message }, + }); + process.exit(1); + } +} + +function stop() { + clearInterval(progressInterval); + log.info('stopping execution'); + process.exit(0); +} + +if (require.main === module) { + main(); + process.on('SIGINT', stop); + process.on('SIGHUP', stop); + process.on('SIGTERM', stop); + process.on('SIGQUIT', stop); +} + +module.exports = { getSproxydAlias, getUploadIdsWithOverview, collectOrphanParts, buildOrphanMap, cleanupOrphanEntry, cleanupOrphans }; diff --git a/tests/unit/cleanupMpuOrphans.js b/tests/unit/cleanupMpuOrphans.js new file mode 100644 index 00000000..36db274f --- /dev/null +++ b/tests/unit/cleanupMpuOrphans.js @@ -0,0 +1,648 @@ +jest.mock('../../utils/async/httpRequest'); +jest.mock('../../utils/async/bucketd/listVersions'); + +const httpRequest = require('../../utils/async/httpRequest'); +const listVersions = require('../../utils/async/bucketd/listVersions'); +const { + getSproxydAlias, + getUploadIdsWithOverview, + collectOrphanParts, + buildOrphanMap, + cleanupOrphanEntry, + cleanupOrphans, +} = require('../../cleanupMpuOrphans'); + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +const BUCKETD_HOSTPORT = 'localhost:9000'; +const SHADOW_BUCKET = 'mpuShadowBuckettest-bucket'; +const RETRY_PARAMS = { times: 5, interval: 100 }; + +function overviewUrl({ maxKeys = 1000, marker = '' } = {}) { + return `http://${BUCKETD_HOSTPORT}/default/bucket/${SHADOW_BUCKET}` + + `?prefix=${encodeURIComponent('overview..|..')}&maxKeys=${maxKeys}` + + `&marker=${encodeURIComponent(marker)}`; +} + +// Build an overview key in the format: overview..|....|.. +function overviewKey(objectKey, uploadId) { + return `overview..|..${objectKey}..|..${uploadId}`; +} + +function fakeResponse(statusCode, body) { + return { statusCode, body: body !== undefined ? JSON.stringify(body) : '' }; +} + +// Build a listing page body with the given overview key strings +function listingPage(keys, isTruncated = false) { + return { + Contents: keys.map(key => ({ key })), + IsTruncated: isTruncated, + }; +} + +beforeEach(() => { + // Reset only httpRequest so each test starts with no queued responses, + // while preserving async.retry's mock implementation. + httpRequest.mockReset(); +}); + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('getUploadIdsWithOverview', () => { + test('adds nothing when shadow bucket returns 404', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(404)); + + const uploadIds = new Set(); + await getUploadIdsWithOverview(BUCKETD_HOSTPORT, SHADOW_BUCKET, uploadIds); + + expect(uploadIds.size).toBe(0); + expect(httpRequest).toHaveBeenCalledTimes(1); + expect(httpRequest).toHaveBeenCalledWith('GET', overviewUrl(), undefined); + }); + + test('populates uploadIds from a single page of overview keys', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([ + overviewKey('object1', 'uploadId1'), + overviewKey('object2', 'uploadId2'), + ]))); + + const uploadIds = new Set(); + await getUploadIdsWithOverview(BUCKETD_HOSTPORT, SHADOW_BUCKET, uploadIds); + + expect(uploadIds).toEqual(new Set(['uploadId1', 'uploadId2'])); + expect(httpRequest).toHaveBeenCalledTimes(1); + expect(httpRequest).toHaveBeenCalledWith('GET', overviewUrl(), undefined); + }); + + test('follows pagination and uses last key of each page as the next marker', async () => { + const lastKeyPage1 = overviewKey('object1', 'uploadId1'); + httpRequest + .mockResolvedValueOnce(fakeResponse(200, { + Contents: [{ key: lastKeyPage1 }], + IsTruncated: true, + })) + .mockResolvedValueOnce(fakeResponse(200, listingPage([ + overviewKey('object2', 'uploadId2'), + ]))); + + const uploadIds = new Set(); + await getUploadIdsWithOverview(BUCKETD_HOSTPORT, SHADOW_BUCKET, uploadIds); + + expect(uploadIds).toEqual(new Set(['uploadId1', 'uploadId2'])); + expect(httpRequest).toHaveBeenCalledTimes(2); + expect(httpRequest).toHaveBeenNthCalledWith(2, 'GET', overviewUrl({ marker: lastKeyPage1 }), undefined); + }); + + test('pageSize option controls maxKeys in the URL', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([]))); + + const uploadIds = new Set(); + await getUploadIdsWithOverview(BUCKETD_HOSTPORT, SHADOW_BUCKET, uploadIds, { pageSize: 42 }); + + expect(httpRequest).toHaveBeenCalledWith('GET', overviewUrl({ maxKeys: 42 }), undefined); + }); + + test('retry option is forwarded to httpRequest', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([]))); + const retry = { times: 5, interval: 1000 }; + + const uploadIds = new Set(); + await getUploadIdsWithOverview(BUCKETD_HOSTPORT, SHADOW_BUCKET, uploadIds, { retry }); + + expect(httpRequest).toHaveBeenCalledWith('GET', overviewUrl(), retry); + }); + + test('overview key with object key containing the separator', async () => { + // overview key format: overview..|....|.. + // split('..|..') takes the LAST part as uploadId, so embedded + // separators in the object key are handled correctly + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([ + overviewKey('dir..|..subdir/object', 'uploadId1'), + ]))); + + const uploadIds = new Set(); + await getUploadIdsWithOverview(BUCKETD_HOSTPORT, SHADOW_BUCKET, uploadIds); + + expect(uploadIds).toEqual(new Set(['uploadId1'])); + }); + + test('throws when the listing returns a non-200 non-404 status', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(500)); + + const uploadIds = new Set(); + await expect(getUploadIdsWithOverview(BUCKETD_HOSTPORT, SHADOW_BUCKET, uploadIds)) + .rejects.toThrow('returned status 500'); + }); +}); + +// --------------------------------------------------------------------------- +// collectOrphanParts helpers +// --------------------------------------------------------------------------- + +function partsUrl({ maxKeys = 1000, marker = '' } = {}) { + return `http://${BUCKETD_HOSTPORT}/default/bucket/${SHADOW_BUCKET}` + + `?maxKeys=${maxKeys}&marker=${encodeURIComponent(marker)}`; +} + +// Build a part listing entry: key = "..|..<5-digit-index>", +// value = JSON with partLocations containing the given sproxyd keys. +function partEntry(uploadId, partIndex, sproxydKeys = []) { + return { + key: `${uploadId}..|..${String(partIndex).padStart(5, '0')}`, + value: JSON.stringify({ partLocations: sproxydKeys.map(key => ({ key })) }), + }; +} + +// Build a listing page body with the given entry objects ({ key, value }) +function partsPage(entries, isTruncated = false) { + return { + Contents: entries, + IsTruncated: isTruncated, + }; +} + +// --------------------------------------------------------------------------- +// collectOrphanParts tests +// --------------------------------------------------------------------------- + +describe('collectOrphanParts', () => { + test('returns empty map when shadow bucket returns 404', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(404)); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(result).toEqual({}); + expect(httpRequest).toHaveBeenCalledTimes(1); + expect(httpRequest).toHaveBeenCalledWith('GET', partsUrl(), undefined); + }); + + test('returns empty map when bucket has no parts', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(result).toEqual({}); + }); + + test('collects partKeys and sproxydKeys for orphaned upload IDs', async () => { + const entry = partEntry('uploadId1', 1, ['sproxyd-key-A', 'sproxyd-key-B']); + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([entry]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(result).toEqual({ + uploadId1: { + partKeys: [entry.key], + sproxydKeys: new Set(['sproxyd-key-A', 'sproxyd-key-B']), + }, + }); + }); + + test('accumulates multiple parts under the same upload ID', async () => { + const e1 = partEntry('uploadId1', 1, ['key-A']); + const e2 = partEntry('uploadId1', 2, ['key-B']); + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([e1, e2]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(result.uploadId1.partKeys).toEqual([e1.key, e2.key]); + expect(result.uploadId1.sproxydKeys).toEqual(new Set(['key-A', 'key-B'])); + }); + + test('skips overview keys (keys starting with "overview..|..")', async () => { + const overviewEntry = { key: overviewKey('object1', 'uploadId1'), value: '{}' }; + const partEnt = partEntry('uploadId2', 1, ['key-X']); + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([overviewEntry, partEnt]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(Object.keys(result)).toEqual(['uploadId2']); + }); + + test('skips upload IDs present in uploadIdsWithOverview', async () => { + const e1 = partEntry('uploadIdLive', 1, ['key-live']); + const e2 = partEntry('uploadIdOrphan', 1, ['key-orphan']); + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([e1, e2]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set(['uploadIdLive'])); + + expect(Object.keys(result)).toEqual(['uploadIdOrphan']); + }); + + test('skips parts with malformed key (no ..|.. separator)', async () => { + const badEntry = { key: 'no-separator-here', value: '{}' }; + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([badEntry]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(result).toEqual({}); + }); + + test('records partKey but skips sproxydKeys when part metadata JSON is malformed', async () => { + const entry = { key: 'uploadId1..|..00001', value: 'not-valid-json' }; + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([entry]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(result.uploadId1.partKeys).toEqual([entry.key]); + expect(result.uploadId1.sproxydKeys).toEqual(new Set()); + }); + + test('records partKey but skips sproxydKeys when partLocations is missing', async () => { + const entry = { key: 'uploadId1..|..00001', value: JSON.stringify({}) }; + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([entry]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(result.uploadId1.partKeys).toEqual([entry.key]); + expect(result.uploadId1.sproxydKeys).toEqual(new Set()); + }); + + test('records partKey but skips sproxydKeys when partLocations is empty', async () => { + const entry = { key: 'uploadId1..|..00001', value: JSON.stringify({ partLocations: [] }) }; + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([entry]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(result.uploadId1.partKeys).toEqual([entry.key]); + expect(result.uploadId1.sproxydKeys).toEqual(new Set()); + }); + + test('follows pagination using last key of each page as the next marker', async () => { + const e1 = partEntry('uploadId1', 1, ['key-A']); + const e2 = partEntry('uploadId2', 1, ['key-B']); + httpRequest + .mockResolvedValueOnce(fakeResponse(200, partsPage([e1], true))) + .mockResolvedValueOnce(fakeResponse(200, partsPage([e2]))); + + const result = await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set()); + + expect(Object.keys(result)).toEqual(expect.arrayContaining(['uploadId1', 'uploadId2'])); + expect(httpRequest).toHaveBeenCalledTimes(2); + expect(httpRequest).toHaveBeenNthCalledWith(2, 'GET', partsUrl({ marker: e1.key }), undefined); + }); + + test('pageSize option controls maxKeys in the URL', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([]))); + + await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set(), { pageSize: 42 }); + + expect(httpRequest).toHaveBeenCalledWith('GET', partsUrl({ maxKeys: 42 }), undefined); + }); + + test('retry option is forwarded to httpRequest', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([]))); + const retry = { times: 5, interval: 1000 }; + + await collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set(), { retry }); + + expect(httpRequest).toHaveBeenCalledWith('GET', partsUrl(), retry); + }); + + test('throws when the listing returns a non-200 non-404 status', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(500)); + + await expect(collectOrphanParts(BUCKETD_HOSTPORT, SHADOW_BUCKET, new Set())) + .rejects.toThrow('returned status 500'); + }); +}); + +// --------------------------------------------------------------------------- +// buildOrphanMap tests +// --------------------------------------------------------------------------- + +describe('buildOrphanMap', () => { + test('makes HTTP calls in the correct three-step order', async () => { + // step 1: overview listing + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([]))); + // step 2: parts listing + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([]))); + // step 3: overview re-check + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([]))); + + await buildOrphanMap(BUCKETD_HOSTPORT, SHADOW_BUCKET, { retry: RETRY_PARAMS }); + + expect(httpRequest).toHaveBeenCalledTimes(3); + expect(httpRequest).toHaveBeenNthCalledWith(1, 'GET', overviewUrl(), RETRY_PARAMS); + expect(httpRequest).toHaveBeenNthCalledWith(2, 'GET', partsUrl(), RETRY_PARAMS); + expect(httpRequest).toHaveBeenNthCalledWith(3, 'GET', overviewUrl(), RETRY_PARAMS); + }); + + test('returns empty map when shadow bucket has no parts', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([]))); + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([]))); + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([]))); + + const result = await buildOrphanMap(BUCKETD_HOSTPORT, SHADOW_BUCKET); + + expect(result).toEqual({}); + }); + + test('returns orphan map for upload IDs that have parts but no overview key', async () => { + const entry = partEntry('uploadId1', 1, ['sproxyd-key-A']); + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([]))); + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([entry]))); + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([]))); + + const result = await buildOrphanMap(BUCKETD_HOSTPORT, SHADOW_BUCKET); + + expect(result).toEqual({ + uploadId1: { + partKeys: [entry.key], + sproxydKeys: new Set(['sproxyd-key-A']), + }, + }); + }); + + test('upload IDs present in the step 1 overview are excluded from the result', async () => { + const e1 = partEntry('uploadIdLive', 1, ['key-live']); + const e2 = partEntry('uploadIdOrphan', 1, ['key-orphan']); + // step 1: uploadIdLive already has an overview key + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([ + overviewKey('some-object', 'uploadIdLive'), + ]))); + // step 2: both IDs have parts; collectOrphanParts skips uploadIdLive + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([e1, e2]))); + // step 3: uploadIdLive still has an overview key + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([ + overviewKey('some-object', 'uploadIdLive'), + ]))); + + const result = await buildOrphanMap(BUCKETD_HOSTPORT, SHADOW_BUCKET); + + expect(Object.keys(result)).toEqual(['uploadIdOrphan']); + }); + + test('removes upload IDs that gained an overview key between step 1 and step 3 (race condition)', async () => { + const entry = partEntry('uploadId1', 1, ['sproxyd-key-A']); + // step 1: no overview keys yet + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([]))); + // step 2: uploadId1 appears as an orphan candidate + httpRequest.mockResolvedValueOnce(fakeResponse(200, partsPage([entry]))); + // step 3: uploadId1 now has an overview key (MPU completed between step 1 and step 2) + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage([ + overviewKey('some-object', 'uploadId1'), + ]))); + + const result = await buildOrphanMap(BUCKETD_HOSTPORT, SHADOW_BUCKET); + + expect(result).toEqual({}); + }); +}); + +// --------------------------------------------------------------------------- +// cleanupOrphanEntry helpers and tests +// --------------------------------------------------------------------------- + +const SPROXYD_HOSTPORT = 'localhost:8181'; +const SPROXYD_ALIAS = 'test-alias'; + +function sproxydDeleteUrl(key) { + return `http://${SPROXYD_HOSTPORT}/${SPROXYD_ALIAS}/${key}`; +} + +function partDeleteUrl(partKey) { + return `http://${BUCKETD_HOSTPORT}/default/bucket/${SHADOW_BUCKET}/${encodeURIComponent(partKey)}`; +} + +function makeReqLogger() { + return { error: jest.fn(), debug: jest.fn() }; +} + +describe('cleanupOrphanEntry', () => { + beforeAll(async () => { + // Initialise the module-level sproxydAlias variable used to build URLs. + const aliasBody = JSON.stringify({ 'ring_driver:0': { alias: SPROXYD_ALIAS } }); + httpRequest.mockResolvedValueOnce({ statusCode: 200, body: aliasBody }); + await getSproxydAlias(); + }); + + test('does nothing when keysToDelete and partKeys are both empty', async () => { + const reqLogger = makeReqLogger(); + await cleanupOrphanEntry(reqLogger, BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, SHADOW_BUCKET, + { partKeys: [] }, [], undefined); + + expect(httpRequest).not.toHaveBeenCalled(); + }); + + test('sends DELETE for each sproxyd key', async () => { + httpRequest + .mockResolvedValueOnce({ statusCode: 200, body: '' }) + .mockResolvedValueOnce({ statusCode: 200, body: '' }); + const reqLogger = makeReqLogger(); + + await cleanupOrphanEntry(reqLogger, BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, SHADOW_BUCKET, + { partKeys: [] }, ['key-A', 'key-B'], undefined); + + expect(httpRequest).toHaveBeenCalledTimes(2); + expect(httpRequest).toHaveBeenNthCalledWith(1, 'DELETE', sproxydDeleteUrl('key-A'), undefined); + expect(httpRequest).toHaveBeenNthCalledWith(2, 'DELETE', sproxydDeleteUrl('key-B'), undefined); + }); + + test('sends DELETE for each part key', async () => { + httpRequest + .mockResolvedValueOnce({ statusCode: 200, body: '' }) + .mockResolvedValueOnce({ statusCode: 200, body: '' }); + const reqLogger = makeReqLogger(); + + await cleanupOrphanEntry(reqLogger, BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, SHADOW_BUCKET, + { partKeys: ['part-key-1', 'part-key-2'] }, [], undefined); + + expect(httpRequest).toHaveBeenCalledTimes(2); + expect(httpRequest).toHaveBeenNthCalledWith(1, 'DELETE', partDeleteUrl('part-key-1'), undefined); + expect(httpRequest).toHaveBeenNthCalledWith(2, 'DELETE', partDeleteUrl('part-key-2'), undefined); + }); + + test('deletes sproxyd keys before part keys', async () => { + httpRequest + .mockResolvedValueOnce({ statusCode: 200, body: '' }) + .mockResolvedValueOnce({ statusCode: 200, body: '' }); + const reqLogger = makeReqLogger(); + + await cleanupOrphanEntry(reqLogger, BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, SHADOW_BUCKET, + { partKeys: ['part-key-1'] }, ['sproxyd-key-A'], undefined); + + expect(httpRequest).toHaveBeenNthCalledWith(1, 'DELETE', sproxydDeleteUrl('sproxyd-key-A'), undefined); + expect(httpRequest).toHaveBeenNthCalledWith(2, 'DELETE', partDeleteUrl('part-key-1'), undefined); + }); + + test('forwards retry param to httpRequest', async () => { + const retry = { times: 3, interval: 100 }; + httpRequest.mockResolvedValueOnce({ statusCode: 200, body: '' }); + const reqLogger = makeReqLogger(); + + await cleanupOrphanEntry(reqLogger, BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, SHADOW_BUCKET, + { partKeys: [] }, ['key-A'], retry); + + expect(httpRequest).toHaveBeenCalledWith('DELETE', sproxydDeleteUrl('key-A'), retry); + }); + + test('logs error and continues when sproxyd DELETE returns non-200', async () => { + httpRequest + .mockResolvedValueOnce({ statusCode: 500, body: '' }) + .mockResolvedValueOnce({ statusCode: 200, body: '' }); + const reqLogger = makeReqLogger(); + + await cleanupOrphanEntry(reqLogger, BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, SHADOW_BUCKET, + { partKeys: [] }, ['key-A', 'key-B'], undefined); + + expect(reqLogger.error).toHaveBeenCalledTimes(1); + expect(reqLogger.error).toHaveBeenCalledWith( + 'failed to delete orphaned sproxyd key', expect.objectContaining({ sproxydKey: 'key-A' }), + ); + expect(httpRequest).toHaveBeenCalledTimes(2); + }); + + test('logs error and continues when sproxyd DELETE throws', async () => { + httpRequest + .mockRejectedValueOnce(new Error('network error')) + .mockResolvedValueOnce({ statusCode: 200, body: '' }); + const reqLogger = makeReqLogger(); + + await cleanupOrphanEntry(reqLogger, BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, SHADOW_BUCKET, + { partKeys: [] }, ['key-A', 'key-B'], undefined); + + expect(reqLogger.error).toHaveBeenCalledTimes(1); + expect(reqLogger.error).toHaveBeenCalledWith( + 'failed to delete orphaned sproxyd key', expect.objectContaining({ sproxydKey: 'key-A' }), + ); + expect(httpRequest).toHaveBeenCalledTimes(2); + }); + + test('accepts 404 on part DELETE without logging an error', async () => { + httpRequest.mockResolvedValueOnce({ statusCode: 404, body: '' }); + const reqLogger = makeReqLogger(); + + await cleanupOrphanEntry(reqLogger, BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, SHADOW_BUCKET, + { partKeys: ['part-key-1'] }, [], undefined); + + expect(reqLogger.error).not.toHaveBeenCalled(); + }); + + test('logs error and continues when part DELETE returns non-200 non-404', async () => { + httpRequest + .mockResolvedValueOnce({ statusCode: 500, body: '' }) + .mockResolvedValueOnce({ statusCode: 200, body: '' }); + const reqLogger = makeReqLogger(); + + await cleanupOrphanEntry(reqLogger, BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, SHADOW_BUCKET, + { partKeys: ['part-key-1', 'part-key-2'] }, [], undefined); + + expect(reqLogger.error).toHaveBeenCalledTimes(1); + expect(reqLogger.error).toHaveBeenCalledWith( + 'failed to delete orphaned part metadata', expect.objectContaining({ partKey: 'part-key-1' }), + ); + expect(httpRequest).toHaveBeenCalledTimes(2); + }); +}); + +// --------------------------------------------------------------------------- +// cleanupOrphans tests +// --------------------------------------------------------------------------- + +describe('cleanupOrphans', () => { + beforeEach(() => { + listVersions.mockReset(); + }); + + // Build an orphan map entry + function makeOrphanEntry(partKeys, sproxydKeyList) { + return { partKeys, sproxydKeys: new Set(sproxydKeyList) }; + } + + // Build a version listing entry as yielded by listVersions + function versionWithUploadId(uploadId, locationKeys = []) { + return { + key: 'some-object', + versionId: 'some-version-id', + value: { uploadId, location: locationKeys.map(key => ({ key })) }, + }; + } + + test('resolves without any DELETE calls when orphanMap is empty', async () => { + listVersions.mockImplementation(async function* () {}); + + await cleanupOrphans(BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, 'test-bucket', SHADOW_BUCKET, {}); + + expect(httpRequest).not.toHaveBeenCalled(); + }); + + test('passes bucketdHostport, bucket, pageSize and retry to listVersions', async () => { + listVersions.mockImplementation(async function* () {}); + const retry = { times: 3, interval: 100 }; + + await cleanupOrphans(BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, 'test-bucket', SHADOW_BUCKET, {}, + { pageSize: 42, retry }); + + expect(listVersions).toHaveBeenCalledWith( + BUCKETD_HOSTPORT, 'test-bucket', expect.objectContaining({ pageSize: 42, retry }), + ); + }); + + test('deletes all sproxyd keys and part keys for upload IDs not matched by any version', async () => { + listVersions.mockImplementation(async function* () {}); + httpRequest.mockResolvedValue({ statusCode: 200, body: '' }); + const orphanMap = { + uploadId1: makeOrphanEntry(['uploadId1..|..00001'], ['sproxyd-key-A', 'sproxyd-key-B']), + }; + + await cleanupOrphans(BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, 'test-bucket', SHADOW_BUCKET, orphanMap); + + expect(httpRequest).toHaveBeenCalledWith('DELETE', sproxydDeleteUrl('sproxyd-key-A'), undefined); + expect(httpRequest).toHaveBeenCalledWith('DELETE', sproxydDeleteUrl('sproxyd-key-B'), undefined); + expect(httpRequest).toHaveBeenCalledWith('DELETE', partDeleteUrl('uploadId1..|..00001'), undefined); + expect(Object.keys(orphanMap)).toHaveLength(0); + }); + + test('skips versions with no uploadId or with uploadId not in orphanMap', async () => { + listVersions.mockImplementation(async function* () { + yield { key: 'obj1', versionId: 'v1', value: {} }; // no uploadId + yield { key: 'obj2', versionId: 'v2', value: { uploadId: 'other-upload-id' } }; + }); + httpRequest.mockResolvedValue({ statusCode: 200, body: '' }); + const orphanMap = { + uploadId1: makeOrphanEntry([], ['sproxyd-key-A']), + }; + + await cleanupOrphans(BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, 'test-bucket', SHADOW_BUCKET, orphanMap); + + // uploadId1 is not matched by any version, so it's cleaned up in the remaining phase + expect(httpRequest).toHaveBeenCalledWith('DELETE', sproxydDeleteUrl('sproxyd-key-A'), undefined); + }); + + test('skips sproxyd keys referenced by a completed version, deletes only orphaned ones', async () => { + listVersions.mockImplementation(async function* () { + yield versionWithUploadId('uploadId1', ['sproxyd-key-A']); // key-A is referenced + }); + httpRequest.mockResolvedValue({ statusCode: 200, body: '' }); + const orphanMap = { + uploadId1: makeOrphanEntry([], ['sproxyd-key-A', 'sproxyd-key-B']), + }; + + await cleanupOrphans(BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, 'test-bucket', SHADOW_BUCKET, orphanMap); + + expect(httpRequest).toHaveBeenCalledWith('DELETE', sproxydDeleteUrl('sproxyd-key-B'), undefined); + expect(httpRequest).not.toHaveBeenCalledWith('DELETE', sproxydDeleteUrl('sproxyd-key-A'), undefined); + }); + + test('upload ID matched by a version is not processed again in the remaining-orphans phase', async () => { + listVersions.mockImplementation(async function* () { + yield versionWithUploadId('uploadId1', []); + }); + httpRequest.mockResolvedValue({ statusCode: 200, body: '' }); + const orphanMap = { + uploadId1: makeOrphanEntry(['uploadId1..|..00001'], ['sproxyd-key-A']), + }; + + await cleanupOrphans(BUCKETD_HOSTPORT, SPROXYD_HOSTPORT, 'test-bucket', SHADOW_BUCKET, orphanMap); + + // 2 calls: sproxyd-key-A + part-key; not 4 (which would happen if double-processed) + expect(httpRequest).toHaveBeenCalledTimes(2); + expect(Object.keys(orphanMap)).toHaveLength(0); + }); +});