diff --git a/tests/unit/utils/async/bucketd/listVersions.js b/tests/unit/utils/async/bucketd/listVersions.js new file mode 100644 index 00000000..e09928a9 --- /dev/null +++ b/tests/unit/utils/async/bucketd/listVersions.js @@ -0,0 +1,705 @@ +jest.mock('../../../../../utils/async/httpRequest'); + +const httpRequest = require('../../../../../utils/async/httpRequest'); +const listVersions = require('../../../../../utils/async/bucketd/listVersions'); + +const BUCKETD_HOSTPORT = 'localhost:9000'; +const BUCKET = 'test-bucket'; + +// Build a fake httpRequest response object +function fakeResponse(statusCode, body) { + return { statusCode, body: JSON.stringify(body) }; +} + +// Build a listing page response body +function listingPage({ + versions = [], isTruncated = false, nextKeyMarker = '', nextVersionIdMarker = '', +} = {}) { + return { + Versions: versions, + IsTruncated: isTruncated, + NextKeyMarker: nextKeyMarker, + NextVersionIdMarker: nextVersionIdMarker, + }; +} + +// Build a listing entry with minimal metadata inline +function versionEntry(key, versionId, mdOverrides = {}) { + const md = { + 'content-length': 1024, + versionId, + 'location': [{ key: 'sproxyd-key' }], + ...mdOverrides, + }; + return { key, versionId, value: JSON.stringify(md) }; +} + +// Collect all items yielded by the async generator into an array +async function collectAll(gen) { + const items = []; + for await (const item of gen) { + items.push(item); + } + return items; +} + +// Declare the exact sequence of HTTP exchanges expected during a test. +// +// Each exchange is { url, status, body?, retryParams? }. The mock validates +// that every httpRequest call is a GET that hits the declared URL in order +// and returns the declared status code and (optionally) JSON-serialised body. +// +// After the code under test runs, call mock.assertAllConsumed() to verify +// that every declared exchange was actually triggered. +// +// Example: +// const mock = mockHttpExchanges([ +// { url: listingUrl(), status: 200, body: listingPage({ versions: [...] }) }, +// { url: objectUrl('key1', 'v1'), status: 404 }, +// ]); +// const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); +// mock.assertAllConsumed(); +function mockHttpExchanges(exchanges) { + let callIndex = 0; + httpRequest.mockImplementation((method, url, retryParams) => { + const exchange = exchanges[callIndex]; + if (!exchange) { + throw new Error( + `Unexpected httpRequest call #${callIndex + 1}: ${method} ${url}`, + ); + } + expect(method).toBe('GET'); + expect(url).toBe(exchange.url); + expect(retryParams).toStrictEqual(exchange.retryParams); + callIndex++; + const body = exchange.body !== undefined ? JSON.stringify(exchange.body) : ''; + return Promise.resolve({ statusCode: exchange.status, body }); + }); + return { + assertAllConsumed() { + expect(httpRequest).toHaveBeenCalledTimes(exchanges.length); + }, + }; +} + +// --------------------------------------------------------------------------- +// URL builders — mirror the construction in listVersions.js so tests stay +// readable without repeating the concatenation logic inline. +// --------------------------------------------------------------------------- + +function listingUrl({ + maxKeys = 1000, prefix = '', keyMarker = '', versionIdMarker = '', +} = {}) { + return `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}` + + `?listingType=DelimiterVersions&maxKeys=${maxKeys}${ + prefix ? `&prefix=${encodeURIComponent(prefix)}` : '' + }&keyMarker=${encodeURIComponent(keyMarker)}` + + `&versionIdMarker=${encodeURIComponent(versionIdMarker)}`; +} + +function objectUrl(key, versionId) { + const base = `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/${encodeURIComponent(key)}`; + if (versionId === undefined) { + return base; + } + return `${base}?versionId=${encodeURIComponent(versionId)}`; +} + +beforeEach(() => { + jest.resetAllMocks(); +}); + +describe('listVersions', () => { + describe('basic listing', () => { + test('empty bucket yields nothing', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage())); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + expect(items).toHaveLength(0); + }); + + test('single page yields all versions', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1'), + versionEntry('key2', 'vid2'), + ], + }), + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(2); + expect(items[0]).toMatchObject({ key: 'key1', versionId: 'vid1' }); + expect(items[1]).toMatchObject({ key: 'key2', versionId: 'vid2' }); + }); + }); + + describe('options', () => { + test('pageSize controls maxKeys in listing URL', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl({ + maxKeys: 42, + }), + status: 200, + body: listingPage(), + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET, { + pageSize: 42, + })); + mock.assertAllConsumed(); + + expect(items).toHaveLength(0); + }); + + test('prefix is included in listing URL', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl({ + prefix: 'some/prefix/', + }), + status: 200, + body: listingPage(), + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET, { + prefix: 'some/prefix/', + })); + mock.assertAllConsumed(); + + expect(items).toHaveLength(0); + }); + + test('keyMarker and versionIdMarker are passed in the first request URL', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl({ + keyMarker: 'a/key', + versionIdMarker: 'a version', + }), + status: 200, + body: listingPage(), + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET, { + keyMarker: 'a/key', + versionIdMarker: 'a version', + })); + mock.assertAllConsumed(); + + expect(items).toHaveLength(0); + }); + + test('maxItems stops yielding after the limit is reached', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl({ + maxKeys: 2, + }), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1'), + versionEntry('key2', 'vid2'), + ], + isTruncated: true, + nextKeyMarker: 'key2', + nextVersionIdMarker: 'vid2', + }), + }, + { + url: listingUrl({ + maxKeys: 1, + keyMarker: 'key2', + versionIdMarker: 'vid2', + }), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key3', 'vid3'), + ], + }), + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET, { + pageSize: 2, + maxItems: 3, + })); + mock.assertAllConsumed(); + + expect(items).toHaveLength(3); + expect(items[0]).toMatchObject({ key: 'key1', versionId: 'vid1' }); + expect(items[1]).toMatchObject({ key: 'key2', versionId: 'vid2' }); + expect(items[2]).toMatchObject({ key: 'key3', versionId: 'vid3' }); + }); + + test('retry option is forwarded to httpRequest', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage(), + retryParams: { + times: 5, + interval: 5000, + }, + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET, { + retry: { + times: 5, + interval: 5000, + }, + })); + mock.assertAllConsumed(); + + expect(items).toHaveLength(0); + }); + }); + + describe('full metadata fetch (large MPU / pruned location)', () => { + test('fetches full metadata when location is absent and content-length > 0', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1'), + versionEntry('key2', 'vid2', { location: undefined }), + ], + }), + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key2` + + '?versionId=vid2', + status: 200, + body: { + 'content-length': 1024, + 'versionId': 'vid2', + 'location': [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(2); + expect(items[0]).toMatchObject({ key: 'key1', versionId: 'vid1' }); + expect(items[1]).toMatchObject({ + key: 'key2', + versionId: 'vid2', + value: { + location: [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }); + }); + + test('skips entry when full metadata returns 404', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1'), + versionEntry('key2', 'vid2', { location: undefined }), + ], + }), + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key2` + + '?versionId=vid2', + status: 404, + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(1); + expect(items[0]).toMatchObject({ key: 'key1', versionId: 'vid1' }); + }); + + test('does NOT fetch full metadata when content-length is 0', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1', { + 'content-length': 0, + 'location': null, + }), + ], + }), + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(1); + expect(items[0]).toMatchObject({ + key: 'key1', + versionId: 'vid1', + value: { + location: null, + }, + }); + }); + + describe('non-versioned objects (versionId === "null")', () => { + test('fetches without versionId query param and returns metadata', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'null', { + versionId: undefined, + location: undefined, + }), + ], + }), + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1`, + status: 200, + body: { + 'content-length': 1024, + 'location': [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(1); + expect(items[0]).toMatchObject({ + key: 'key1', + versionId: 'null', + value: { + location: [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }); + }); + + test('skips when fetched metadata has a versionId field (overwritten by versioned object)', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'null', { + versionId: undefined, + location: undefined, + }), + ], + }), + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1`, + status: 200, + body: { + 'content-length': 1024, + 'versionId': 'newvid', + 'location': [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(0); + }); + }); + + describe('null-version fallback (isNull in listing metadata)', () => { + test('uses primary versionId URL if it exists', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1', { + versionId: 'vid1', + location: undefined, + isNull: true, + }), + ], + }), + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1` + + '?versionId=vid1', + status: 200, + body: { + 'content-length': 1024, + 'versionId': 'vid1', + 'isNull': true, + 'location': [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(1); + expect(items[0]).toMatchObject({ + key: 'key1', + versionId: 'vid1', + value: { + location: [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }); + }); + + test('falls back to master-key URL when primary versionId URL returns 404', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1', { + versionId: 'vid1', + location: undefined, + isNull: true, + }), + ], + }), + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1` + + '?versionId=vid1', + status: 404, + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1`, + status: 200, + body: { + 'content-length': 1024, + 'versionId': 'vid1', + 'isNull': true, + 'location': [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(1); + expect(items[0]).toMatchObject({ + key: 'key1', + versionId: 'vid1', + value: { + location: [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }); + }); + + test('falls back to ?versionId=null URL when master-key versionId does not match', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1', { + versionId: 'vid1', + location: undefined, + isNull: true, + }), + ], + }), + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1` + + '?versionId=vid1', + status: 404, + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1`, + status: 200, + body: { + 'content-length': 1024, + 'versionId': 'othervid', + 'location': [{ key: 'sproxyd-key' }], + }, + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1` + + '?versionId=null', + status: 200, + body: { + 'content-length': 1024, + 'versionId': 'vid1', + 'isNull': true, + 'location': [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(1); + expect(items[0]).toMatchObject({ + key: 'key1', + versionId: 'vid1', + value: { + location: [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }); + }); + + test('skips when all fallback URLs fail to match', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1', { + versionId: 'vid1', + location: undefined, + isNull: true, + }), + ], + }), + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1` + + '?versionId=vid1', + status: 404, + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1`, + status: 404, + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1` + + '?versionId=null', + status: 404, + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(0); + }); + }); + }); + + describe('error handling', () => { + test('throws when listing page returns non-200 status', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(500)); + + await expect(collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET))) + .rejects.toThrow('returned status 500'); + }); + + test('skips entries with malformed (non-JSON) metadata in listing response', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage({ + versions: [ + versionEntry('key1', 'vid1'), + { + key: 'key2', + versionId: 'vid2', + value: '{NOTJSON}', + }, + ], + }))); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + expect(items).toHaveLength(1); + expect(items[0]).toMatchObject({ + key: 'key1', + versionId: 'vid1', + value: { + location: [{ key: 'sproxyd-key' }], + }, + }); + }); + + test('throws when full metadata fetch returns non-200 and non-404 status', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1'), + versionEntry('key2', 'vid2', { location: undefined }), + ], + }), + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key2` + + '?versionId=vid2', + status: 503, + }, + ]); + + await expect(collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET))) + .rejects.toThrow('returned status 503'); + }); + }); +}); diff --git a/tests/unit/utils/async/httpRequest.js b/tests/unit/utils/async/httpRequest.js new file mode 100644 index 00000000..a4fbe4c9 --- /dev/null +++ b/tests/unit/utils/async/httpRequest.js @@ -0,0 +1,107 @@ +const http = require('http'); +const httpRequest = require('../../../../utils/async/httpRequest'); + +// Spin up a minimal HTTP server, run the callback, then close it. +async function withServer(handler, fn) { + const server = http.createServer(handler); + await new Promise(resolve => server.listen(0, '127.0.0.1', resolve)); + const { port } = server.address(); + try { + return await fn(port); + } finally { + await new Promise(resolve => server.close(resolve)); + } +} + +describe('httpRequest', () => { + test('GET returns status code and buffered body', () => + withServer((req, res) => { + res.writeHead(200); + res.end(`${req.method} hello world`); + }, async port => { + const res = await httpRequest('GET', `http://127.0.0.1:${port}/`); + expect(res.statusCode).toBe(200); + expect(res.body).toBe('GET hello world'); + })); + + test('GET passes query string and path to the server', () => + withServer((req, res) => { + res.writeHead(200); + res.end(`${req.method} ${req.url}`); + }, async port => { + const res = await httpRequest('GET', `http://127.0.0.1:${port}/foo?bar=baz`); + expect(res.statusCode).toBe(200); + expect(res.body).toBe('GET /foo?bar=baz'); + })); + + test('GET resolves on 404 (non-5xx is not an error)', () => + withServer((req, res) => { + res.writeHead(404); + res.end(`${req.method} not found`); + }, async port => { + const res = await httpRequest('GET', `http://127.0.0.1:${port}/`); + expect(res.statusCode).toBe(404); + expect(res.body).toBe('GET not found'); + })); + + test('DELETE sends the correct method to the server', () => + withServer((req, res) => { + res.writeHead(200); + res.end(`${req.method} ok`); + }, async port => { + const res = await httpRequest('DELETE', `http://127.0.0.1:${port}/resource`); + expect(res.statusCode).toBe(200); + expect(res.body).toBe('DELETE ok'); + })); + + test('GET rejects on 5xx response (no retries)', () => { + let calls = 0; + withServer((req, res) => { + calls += 1; + res.writeHead(500); + res.end(`${req.method} oops`); + }, async port => { + await expect( + httpRequest('GET', `http://127.0.0.1:${port}/`) + ).rejects.toThrow('500'); + expect(calls).toBe(1); + }); + }); + + test('GET retries on 5xx when retryParams provided', () => { + let calls = 0; + return withServer((req, res) => { + calls += 1; + if (calls < 3) { + res.writeHead(500); + res.end(`${req.method} not yet`); + } else { + res.writeHead(200); + res.end(`${req.method} ok`); + } + }, async port => { + const res = await httpRequest( + 'GET', + `http://127.0.0.1:${port}/`, + { times: 5, interval: 0 }, + ); + expect(res.statusCode).toBe(200); + expect(res.body).toBe('GET ok'); + expect(calls).toBe(3); + }); + }); + + test('GET gives up after retries are exhausted', () => { + let calls = 0; + return withServer((req, res) => { + calls += 1; + res.writeHead(500); + res.end(`${req.method} oops`); + }, async port => { + await expect( + httpRequest('GET', `http://127.0.0.1:${port}/`, { times: 3, interval: 0 }) + ).rejects.toThrow('500'); + expect(calls).toBe(3); + }); + }); +}); diff --git a/utils/async/bucketd/listVersions.js b/utils/async/bucketd/listVersions.js new file mode 100644 index 00000000..4a04f78d --- /dev/null +++ b/utils/async/bucketd/listVersions.js @@ -0,0 +1,190 @@ + + +const werelogs = require('werelogs'); + +const httpRequest = require('../httpRequest'); + +const log = new werelogs.Logger('s3utils:listVersions'); + + +/** + * Fetch the complete metadata of an object version from bucketd. + * + * Correctly handles non-versioned objects and null versions: + * + * - Non-versioned (versionId === 'null'): fetch without a versionId query + * param; reject the result if it now has a versionId field (the object was + * overwritten by a versioned one since the listing was taken). + * + * - Versioned: try the primary ?versionId= URL first. If that fails and + * the listing entry carries isNull, try the master-key URL and + * ?versionId=null as fallbacks, accepting the result only when its versionId + * matches the expected one. + * + * @param {string} bucketdHostport - host:port of the bucketd endpoint + * @param {string} bucket - bucket name + * @param {string} key - object key + * @param {string} versionId - version ID from the listing entry + * @param {object} listingParsedMd - parsed metadata from the listing entry + * @param {object} [retryParams] - retry parameters forwarded to httpRequest + * @returns {Promise} full metadata object, or null if not found/skipped + */ +async function fetchFullObjectMetadata(bucketdHostport, bucket, key, versionId, listingParsedMd, retryParams) { + const baseUrl = `http://${bucketdHostport}/default/bucket/${bucket}/${ + encodeURIComponent(key)}`; + + function parseResponse(url, res) { + if (res.statusCode === 404) { + return null; + } + if (res.statusCode !== 200) { + throw new Error(`GET ${url} returned status ${res.statusCode}`); + } + try { + return JSON.parse(res.body); + } catch (e) { + throw new Error(`failed to parse metadata from ${url}: ${e.message}`); + } + } + + if (versionId === 'null') { + // Non-versioned object: fetch without versionId param + const res = await httpRequest('GET', baseUrl, retryParams); + const fullMd = parseResponse(baseUrl, res); + if (fullMd === null) { + return null; // 404: object is gone + } + if ('versionId' in fullMd) { + // Object has since been overwritten by a versioned one; skip + return null; + } + return fullMd; + } + + // Versioned object: try the primary URL first + const primaryUrl = `${baseUrl}?versionId=${encodeURIComponent(versionId)}`; + const res = await httpRequest('GET', primaryUrl, retryParams); + if (res.statusCode === 200) { + return parseResponse(primaryUrl, res); + } + if (res.statusCode !== 404) { + throw new Error(`GET ${primaryUrl} returned status ${res.statusCode}`); + } + // Primary returned 404; if the listing entry is a null version, + // try alternative URLs + if (!('isNull' in listingParsedMd)) { + return null; + } + for (const altUrl of [baseUrl, `${baseUrl}?versionId=null`]) { + const altRes = await httpRequest('GET', altUrl, retryParams); + const altMd = parseResponse(altUrl, altRes); + if (altMd !== null && altMd.versionId === versionId) { + return altMd; + } + } + return null; +} + +/** + * Async generator that iterates over all versions in a bucket using + * DelimiterVersions listing. Yields { key, versionId, value } for each + * version, where value is the fully resolved metadata. When the listing + * result has a pruned location array (large MPUs), the full metadata is + * fetched individually. + * + * Page fetches and individual metadata fetches are retried on transient + * errors (network failures and 5xx responses) using RETRY_PARAMS. + * + * @param {string} bucketdHostport - host:port of the bucketd endpoint + * @param {string} bucket - name of the bucket to list + * @param {object} [options] - listing options + * @param {number} [options.pageSize=1000] - number of entries requested per + * listing page (passed as maxKeys to bucketd) + * @param {number} [options.maxItems] - maximum total number of entries to + * yield; if omitted, all entries are yielded + * @param {string} [options.prefix] - only yield entries whose key starts with + * this prefix; an empty string (the default) applies no prefix filter + * @param {string} [options.keyMarker] - resume listing from this key marker + * (exclusive); defaults to the beginning of the bucket + * @param {string} [options.versionIdMarker] - resume listing from this + * version ID marker, used together with keyMarker + * @param {object} [options.retry] - if provided, passed as retryParams to + * httpRequest to retry on network errors and 5xx responses + * (e.g. { times: 100, interval: 5000 }); by default requests are not retried + * @returns {AsyncGenerator<{key: string, versionId: string, value: object}>} + * async generator yielding one entry per version + */ +async function* listVersions(bucketdHostport, bucket, { + pageSize = 1000, + maxItems, + prefix = '', + keyMarker: startKeyMarker = '', + versionIdMarker: startVersionIdMarker = '', + retry, +} = {}) { + let keyMarker = startKeyMarker; + let versionIdMarker = startVersionIdMarker; + let isTruncated = true; + let remaining = maxItems ?? Infinity; + + while (isTruncated && remaining > 0) { + const maxKeys = Math.min(pageSize, remaining); + const url = `http://${bucketdHostport}/default/bucket/${bucket}` + + `?listingType=DelimiterVersions&maxKeys=${maxKeys}${ + prefix ? `&prefix=${encodeURIComponent(prefix)}` : '' + }&keyMarker=${encodeURIComponent(keyMarker)}` + + `&versionIdMarker=${encodeURIComponent(versionIdMarker)}`; + + const res = await httpRequest('GET', url, retry); + if (res.statusCode !== 200) { + throw new Error(`GET ${url} returned status ${res.statusCode}`); + } + const { + Versions, + IsTruncated, + NextKeyMarker, + NextVersionIdMarker, + } = JSON.parse(res.body); + + for (const entry of (Versions || [])) { + const { key, versionId } = entry; + let parsedMd; + try { + parsedMd = JSON.parse(entry.value); + } catch (e) { + log.warn('failed to parse object metadata', { + bucket, + key, + error: e.message, + }); + continue; + } + // Only fetch full metadata when the listing result has a pruned + // location array: typically the field is absent for large MPUs + const needMdFetch = ( + 'content-length' in parsedMd + && parsedMd['content-length'] !== 0 + && (parsedMd.location === undefined || parsedMd.location === null) + ); + if (needMdFetch) { + parsedMd = await fetchFullObjectMetadata(bucketdHostport, bucket, key, versionId, parsedMd, retry); + if (parsedMd === null) { + log.debug('full object metadata not found or skipped', { + bucket, key, versionId, + }); + continue; + } + } + yield { key, versionId, value: parsedMd }; + --remaining; + } + + isTruncated = IsTruncated; + if (isTruncated) { + keyMarker = NextKeyMarker || ''; + versionIdMarker = NextVersionIdMarker || ''; + } + } +} + +module.exports = listVersions; diff --git a/utils/async/httpRequest.js b/utils/async/httpRequest.js new file mode 100644 index 00000000..6fda7608 --- /dev/null +++ b/utils/async/httpRequest.js @@ -0,0 +1,67 @@ + + +const http = require('http'); +const async = require('async'); +const { http: httpArsn } = require('httpagent'); +const werelogs = require('werelogs'); + +const log = new werelogs.Logger('s3utils:httpRequest'); + +const httpAgent = new httpArsn.Agent({ + keepAlive: true, +}); + +/** + * Makes an HTTP request and returns the response with the full body + * buffered in res.body. + * + * @param {string} method - HTTP method (e.g. 'GET', 'DELETE') + * @param {string} url - full URL to request + * @param {object} [retryParams] - if provided, failed requests are retried + * via async.retry with these params (e.g. { times: 100, interval: 5000 }); + * both network errors and 5xx responses are treated as retryable failures + * @returns {Promise} response object with body in res.body + */ +function httpRequest(method, url, retryParams) { + async function attempt() { + const res = await new Promise((resolve, reject) => { + const urlObj = new URL(url); + const req = http.request({ + hostname: urlObj.hostname, + port: urlObj.port, + path: `${urlObj.pathname}${urlObj.search}`, + method, + agent: httpAgent, + }, r => { + const chunks = []; + r.on('data', chunk => chunks.push(chunk)); + r.once('end', () => { + // eslint-disable-next-line no-param-reassign + r.body = chunks.join(''); + log.trace('received HTTP response', { method, url, statusCode: r.statusCode }); + resolve(r); + }); + r.once('error', err => reject(new Error( + 'error reading response from HTTP request ' + + `to ${url}: ${err.message}`, + ))); + }); + req.once('error', err => reject(new Error( + `error sending HTTP request to ${url}: ${err.message}`, + ))); + log.trace('sending HTTP request', { method, url }); + req.end(); + }); + if (res.statusCode >= 500) { + throw new Error(`${method} ${url} returned status ${res.statusCode}`); + } + return res; + } + + if (retryParams) { + return async.retry(retryParams, attempt); + } + return attempt(); +} + +module.exports = httpRequest;