From 07cb65fb13d9d3282036b76ef17395c99496a79a Mon Sep 17 00:00:00 2001 From: Jonathan Gramain Date: Tue, 3 Mar 2026 16:14:22 -0800 Subject: [PATCH] S3UTILS-225 [impr] list bucketd versions with full metadata Create a new helper function that returns an async iterator on a bucket versions listing via bucketd, fetching the full locations array when it is pruned from the listing results. It ensures to fetch the full metadata (including all locations) from the correct version, trying the different possible URLs for the specific case of null versions. Note: unfortunately we don't have a clean high-level API in bucketd to do this, Cloudserver is normally doing this preprocessing to build the correct bucketd URL to fetch. --- .../unit/utils/async/bucketd/listVersions.js | 705 ++++++++++++++++++ tests/unit/utils/async/httpRequest.js | 107 +++ utils/async/bucketd/listVersions.js | 190 +++++ utils/async/httpRequest.js | 67 ++ 4 files changed, 1069 insertions(+) create mode 100644 tests/unit/utils/async/bucketd/listVersions.js create mode 100644 tests/unit/utils/async/httpRequest.js create mode 100644 utils/async/bucketd/listVersions.js create mode 100644 utils/async/httpRequest.js diff --git a/tests/unit/utils/async/bucketd/listVersions.js b/tests/unit/utils/async/bucketd/listVersions.js new file mode 100644 index 00000000..e09928a9 --- /dev/null +++ b/tests/unit/utils/async/bucketd/listVersions.js @@ -0,0 +1,705 @@ +jest.mock('../../../../../utils/async/httpRequest'); + +const httpRequest = require('../../../../../utils/async/httpRequest'); +const listVersions = require('../../../../../utils/async/bucketd/listVersions'); + +const BUCKETD_HOSTPORT = 'localhost:9000'; +const BUCKET = 'test-bucket'; + +// Build a fake httpRequest response object +function fakeResponse(statusCode, body) { + return { statusCode, body: JSON.stringify(body) }; +} + +// Build a listing page response body +function listingPage({ + versions = [], isTruncated = false, nextKeyMarker = '', nextVersionIdMarker = '', +} = {}) { + return { + Versions: versions, + IsTruncated: isTruncated, + NextKeyMarker: nextKeyMarker, + NextVersionIdMarker: nextVersionIdMarker, + }; +} + +// Build a listing entry with minimal metadata inline +function versionEntry(key, versionId, mdOverrides = {}) { + const md = { + 'content-length': 1024, + versionId, + 'location': [{ key: 'sproxyd-key' }], + ...mdOverrides, + }; + return { key, versionId, value: JSON.stringify(md) }; +} + +// Collect all items yielded by the async generator into an array +async function collectAll(gen) { + const items = []; + for await (const item of gen) { + items.push(item); + } + return items; +} + +// Declare the exact sequence of HTTP exchanges expected during a test. +// +// Each exchange is { url, status, body?, retryParams? }. The mock validates +// that every httpRequest call is a GET that hits the declared URL in order +// and returns the declared status code and (optionally) JSON-serialised body. +// +// After the code under test runs, call mock.assertAllConsumed() to verify +// that every declared exchange was actually triggered. +// +// Example: +// const mock = mockHttpExchanges([ +// { url: listingUrl(), status: 200, body: listingPage({ versions: [...] }) }, +// { url: objectUrl('key1', 'v1'), status: 404 }, +// ]); +// const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); +// mock.assertAllConsumed(); +function mockHttpExchanges(exchanges) { + let callIndex = 0; + httpRequest.mockImplementation((method, url, retryParams) => { + const exchange = exchanges[callIndex]; + if (!exchange) { + throw new Error( + `Unexpected httpRequest call #${callIndex + 1}: ${method} ${url}`, + ); + } + expect(method).toBe('GET'); + expect(url).toBe(exchange.url); + expect(retryParams).toStrictEqual(exchange.retryParams); + callIndex++; + const body = exchange.body !== undefined ? JSON.stringify(exchange.body) : ''; + return Promise.resolve({ statusCode: exchange.status, body }); + }); + return { + assertAllConsumed() { + expect(httpRequest).toHaveBeenCalledTimes(exchanges.length); + }, + }; +} + +// --------------------------------------------------------------------------- +// URL builders — mirror the construction in listVersions.js so tests stay +// readable without repeating the concatenation logic inline. +// --------------------------------------------------------------------------- + +function listingUrl({ + maxKeys = 1000, prefix = '', keyMarker = '', versionIdMarker = '', +} = {}) { + return `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}` + + `?listingType=DelimiterVersions&maxKeys=${maxKeys}${ + prefix ? `&prefix=${encodeURIComponent(prefix)}` : '' + }&keyMarker=${encodeURIComponent(keyMarker)}` + + `&versionIdMarker=${encodeURIComponent(versionIdMarker)}`; +} + +function objectUrl(key, versionId) { + const base = `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/${encodeURIComponent(key)}`; + if (versionId === undefined) { + return base; + } + return `${base}?versionId=${encodeURIComponent(versionId)}`; +} + +beforeEach(() => { + jest.resetAllMocks(); +}); + +describe('listVersions', () => { + describe('basic listing', () => { + test('empty bucket yields nothing', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage())); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + expect(items).toHaveLength(0); + }); + + test('single page yields all versions', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1'), + versionEntry('key2', 'vid2'), + ], + }), + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(2); + expect(items[0]).toMatchObject({ key: 'key1', versionId: 'vid1' }); + expect(items[1]).toMatchObject({ key: 'key2', versionId: 'vid2' }); + }); + }); + + describe('options', () => { + test('pageSize controls maxKeys in listing URL', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl({ + maxKeys: 42, + }), + status: 200, + body: listingPage(), + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET, { + pageSize: 42, + })); + mock.assertAllConsumed(); + + expect(items).toHaveLength(0); + }); + + test('prefix is included in listing URL', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl({ + prefix: 'some/prefix/', + }), + status: 200, + body: listingPage(), + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET, { + prefix: 'some/prefix/', + })); + mock.assertAllConsumed(); + + expect(items).toHaveLength(0); + }); + + test('keyMarker and versionIdMarker are passed in the first request URL', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl({ + keyMarker: 'a/key', + versionIdMarker: 'a version', + }), + status: 200, + body: listingPage(), + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET, { + keyMarker: 'a/key', + versionIdMarker: 'a version', + })); + mock.assertAllConsumed(); + + expect(items).toHaveLength(0); + }); + + test('maxItems stops yielding after the limit is reached', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl({ + maxKeys: 2, + }), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1'), + versionEntry('key2', 'vid2'), + ], + isTruncated: true, + nextKeyMarker: 'key2', + nextVersionIdMarker: 'vid2', + }), + }, + { + url: listingUrl({ + maxKeys: 1, + keyMarker: 'key2', + versionIdMarker: 'vid2', + }), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key3', 'vid3'), + ], + }), + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET, { + pageSize: 2, + maxItems: 3, + })); + mock.assertAllConsumed(); + + expect(items).toHaveLength(3); + expect(items[0]).toMatchObject({ key: 'key1', versionId: 'vid1' }); + expect(items[1]).toMatchObject({ key: 'key2', versionId: 'vid2' }); + expect(items[2]).toMatchObject({ key: 'key3', versionId: 'vid3' }); + }); + + test('retry option is forwarded to httpRequest', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage(), + retryParams: { + times: 5, + interval: 5000, + }, + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET, { + retry: { + times: 5, + interval: 5000, + }, + })); + mock.assertAllConsumed(); + + expect(items).toHaveLength(0); + }); + }); + + describe('full metadata fetch (large MPU / pruned location)', () => { + test('fetches full metadata when location is absent and content-length > 0', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1'), + versionEntry('key2', 'vid2', { location: undefined }), + ], + }), + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key2` + + '?versionId=vid2', + status: 200, + body: { + 'content-length': 1024, + 'versionId': 'vid2', + 'location': [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(2); + expect(items[0]).toMatchObject({ key: 'key1', versionId: 'vid1' }); + expect(items[1]).toMatchObject({ + key: 'key2', + versionId: 'vid2', + value: { + location: [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }); + }); + + test('skips entry when full metadata returns 404', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1'), + versionEntry('key2', 'vid2', { location: undefined }), + ], + }), + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key2` + + '?versionId=vid2', + status: 404, + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(1); + expect(items[0]).toMatchObject({ key: 'key1', versionId: 'vid1' }); + }); + + test('does NOT fetch full metadata when content-length is 0', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1', { + 'content-length': 0, + 'location': null, + }), + ], + }), + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(1); + expect(items[0]).toMatchObject({ + key: 'key1', + versionId: 'vid1', + value: { + location: null, + }, + }); + }); + + describe('non-versioned objects (versionId === "null")', () => { + test('fetches without versionId query param and returns metadata', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'null', { + versionId: undefined, + location: undefined, + }), + ], + }), + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1`, + status: 200, + body: { + 'content-length': 1024, + 'location': [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(1); + expect(items[0]).toMatchObject({ + key: 'key1', + versionId: 'null', + value: { + location: [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }); + }); + + test('skips when fetched metadata has a versionId field (overwritten by versioned object)', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'null', { + versionId: undefined, + location: undefined, + }), + ], + }), + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1`, + status: 200, + body: { + 'content-length': 1024, + 'versionId': 'newvid', + 'location': [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(0); + }); + }); + + describe('null-version fallback (isNull in listing metadata)', () => { + test('uses primary versionId URL if it exists', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1', { + versionId: 'vid1', + location: undefined, + isNull: true, + }), + ], + }), + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1` + + '?versionId=vid1', + status: 200, + body: { + 'content-length': 1024, + 'versionId': 'vid1', + 'isNull': true, + 'location': [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(1); + expect(items[0]).toMatchObject({ + key: 'key1', + versionId: 'vid1', + value: { + location: [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }); + }); + + test('falls back to master-key URL when primary versionId URL returns 404', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1', { + versionId: 'vid1', + location: undefined, + isNull: true, + }), + ], + }), + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1` + + '?versionId=vid1', + status: 404, + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1`, + status: 200, + body: { + 'content-length': 1024, + 'versionId': 'vid1', + 'isNull': true, + 'location': [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(1); + expect(items[0]).toMatchObject({ + key: 'key1', + versionId: 'vid1', + value: { + location: [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }); + }); + + test('falls back to ?versionId=null URL when master-key versionId does not match', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1', { + versionId: 'vid1', + location: undefined, + isNull: true, + }), + ], + }), + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1` + + '?versionId=vid1', + status: 404, + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1`, + status: 200, + body: { + 'content-length': 1024, + 'versionId': 'othervid', + 'location': [{ key: 'sproxyd-key' }], + }, + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1` + + '?versionId=null', + status: 200, + body: { + 'content-length': 1024, + 'versionId': 'vid1', + 'isNull': true, + 'location': [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(1); + expect(items[0]).toMatchObject({ + key: 'key1', + versionId: 'vid1', + value: { + location: [ + { key: 'sproxyd-key-1' }, + { key: 'sproxyd-key-2' }, + ], + }, + }); + }); + + test('skips when all fallback URLs fail to match', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1', { + versionId: 'vid1', + location: undefined, + isNull: true, + }), + ], + }), + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1` + + '?versionId=vid1', + status: 404, + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1`, + status: 404, + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key1` + + '?versionId=null', + status: 404, + }, + ]); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + mock.assertAllConsumed(); + + expect(items).toHaveLength(0); + }); + }); + }); + + describe('error handling', () => { + test('throws when listing page returns non-200 status', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(500)); + + await expect(collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET))) + .rejects.toThrow('returned status 500'); + }); + + test('skips entries with malformed (non-JSON) metadata in listing response', async () => { + httpRequest.mockResolvedValueOnce(fakeResponse(200, listingPage({ + versions: [ + versionEntry('key1', 'vid1'), + { + key: 'key2', + versionId: 'vid2', + value: '{NOTJSON}', + }, + ], + }))); + + const items = await collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET)); + expect(items).toHaveLength(1); + expect(items[0]).toMatchObject({ + key: 'key1', + versionId: 'vid1', + value: { + location: [{ key: 'sproxyd-key' }], + }, + }); + }); + + test('throws when full metadata fetch returns non-200 and non-404 status', async () => { + const mock = mockHttpExchanges([ + { + url: listingUrl(), + status: 200, + body: listingPage({ + versions: [ + versionEntry('key1', 'vid1'), + versionEntry('key2', 'vid2', { location: undefined }), + ], + }), + }, + { + url: `http://${BUCKETD_HOSTPORT}/default/bucket/${BUCKET}/key2` + + '?versionId=vid2', + status: 503, + }, + ]); + + await expect(collectAll(listVersions(BUCKETD_HOSTPORT, BUCKET))) + .rejects.toThrow('returned status 503'); + }); + }); +}); diff --git a/tests/unit/utils/async/httpRequest.js b/tests/unit/utils/async/httpRequest.js new file mode 100644 index 00000000..a4fbe4c9 --- /dev/null +++ b/tests/unit/utils/async/httpRequest.js @@ -0,0 +1,107 @@ +const http = require('http'); +const httpRequest = require('../../../../utils/async/httpRequest'); + +// Spin up a minimal HTTP server, run the callback, then close it. +async function withServer(handler, fn) { + const server = http.createServer(handler); + await new Promise(resolve => server.listen(0, '127.0.0.1', resolve)); + const { port } = server.address(); + try { + return await fn(port); + } finally { + await new Promise(resolve => server.close(resolve)); + } +} + +describe('httpRequest', () => { + test('GET returns status code and buffered body', () => + withServer((req, res) => { + res.writeHead(200); + res.end(`${req.method} hello world`); + }, async port => { + const res = await httpRequest('GET', `http://127.0.0.1:${port}/`); + expect(res.statusCode).toBe(200); + expect(res.body).toBe('GET hello world'); + })); + + test('GET passes query string and path to the server', () => + withServer((req, res) => { + res.writeHead(200); + res.end(`${req.method} ${req.url}`); + }, async port => { + const res = await httpRequest('GET', `http://127.0.0.1:${port}/foo?bar=baz`); + expect(res.statusCode).toBe(200); + expect(res.body).toBe('GET /foo?bar=baz'); + })); + + test('GET resolves on 404 (non-5xx is not an error)', () => + withServer((req, res) => { + res.writeHead(404); + res.end(`${req.method} not found`); + }, async port => { + const res = await httpRequest('GET', `http://127.0.0.1:${port}/`); + expect(res.statusCode).toBe(404); + expect(res.body).toBe('GET not found'); + })); + + test('DELETE sends the correct method to the server', () => + withServer((req, res) => { + res.writeHead(200); + res.end(`${req.method} ok`); + }, async port => { + const res = await httpRequest('DELETE', `http://127.0.0.1:${port}/resource`); + expect(res.statusCode).toBe(200); + expect(res.body).toBe('DELETE ok'); + })); + + test('GET rejects on 5xx response (no retries)', () => { + let calls = 0; + withServer((req, res) => { + calls += 1; + res.writeHead(500); + res.end(`${req.method} oops`); + }, async port => { + await expect( + httpRequest('GET', `http://127.0.0.1:${port}/`) + ).rejects.toThrow('500'); + expect(calls).toBe(1); + }); + }); + + test('GET retries on 5xx when retryParams provided', () => { + let calls = 0; + return withServer((req, res) => { + calls += 1; + if (calls < 3) { + res.writeHead(500); + res.end(`${req.method} not yet`); + } else { + res.writeHead(200); + res.end(`${req.method} ok`); + } + }, async port => { + const res = await httpRequest( + 'GET', + `http://127.0.0.1:${port}/`, + { times: 5, interval: 0 }, + ); + expect(res.statusCode).toBe(200); + expect(res.body).toBe('GET ok'); + expect(calls).toBe(3); + }); + }); + + test('GET gives up after retries are exhausted', () => { + let calls = 0; + return withServer((req, res) => { + calls += 1; + res.writeHead(500); + res.end(`${req.method} oops`); + }, async port => { + await expect( + httpRequest('GET', `http://127.0.0.1:${port}/`, { times: 3, interval: 0 }) + ).rejects.toThrow('500'); + expect(calls).toBe(3); + }); + }); +}); diff --git a/utils/async/bucketd/listVersions.js b/utils/async/bucketd/listVersions.js new file mode 100644 index 00000000..4a04f78d --- /dev/null +++ b/utils/async/bucketd/listVersions.js @@ -0,0 +1,190 @@ + + +const werelogs = require('werelogs'); + +const httpRequest = require('../httpRequest'); + +const log = new werelogs.Logger('s3utils:listVersions'); + + +/** + * Fetch the complete metadata of an object version from bucketd. + * + * Correctly handles non-versioned objects and null versions: + * + * - Non-versioned (versionId === 'null'): fetch without a versionId query + * param; reject the result if it now has a versionId field (the object was + * overwritten by a versioned one since the listing was taken). + * + * - Versioned: try the primary ?versionId= URL first. If that fails and + * the listing entry carries isNull, try the master-key URL and + * ?versionId=null as fallbacks, accepting the result only when its versionId + * matches the expected one. + * + * @param {string} bucketdHostport - host:port of the bucketd endpoint + * @param {string} bucket - bucket name + * @param {string} key - object key + * @param {string} versionId - version ID from the listing entry + * @param {object} listingParsedMd - parsed metadata from the listing entry + * @param {object} [retryParams] - retry parameters forwarded to httpRequest + * @returns {Promise} full metadata object, or null if not found/skipped + */ +async function fetchFullObjectMetadata(bucketdHostport, bucket, key, versionId, listingParsedMd, retryParams) { + const baseUrl = `http://${bucketdHostport}/default/bucket/${bucket}/${ + encodeURIComponent(key)}`; + + function parseResponse(url, res) { + if (res.statusCode === 404) { + return null; + } + if (res.statusCode !== 200) { + throw new Error(`GET ${url} returned status ${res.statusCode}`); + } + try { + return JSON.parse(res.body); + } catch (e) { + throw new Error(`failed to parse metadata from ${url}: ${e.message}`); + } + } + + if (versionId === 'null') { + // Non-versioned object: fetch without versionId param + const res = await httpRequest('GET', baseUrl, retryParams); + const fullMd = parseResponse(baseUrl, res); + if (fullMd === null) { + return null; // 404: object is gone + } + if ('versionId' in fullMd) { + // Object has since been overwritten by a versioned one; skip + return null; + } + return fullMd; + } + + // Versioned object: try the primary URL first + const primaryUrl = `${baseUrl}?versionId=${encodeURIComponent(versionId)}`; + const res = await httpRequest('GET', primaryUrl, retryParams); + if (res.statusCode === 200) { + return parseResponse(primaryUrl, res); + } + if (res.statusCode !== 404) { + throw new Error(`GET ${primaryUrl} returned status ${res.statusCode}`); + } + // Primary returned 404; if the listing entry is a null version, + // try alternative URLs + if (!('isNull' in listingParsedMd)) { + return null; + } + for (const altUrl of [baseUrl, `${baseUrl}?versionId=null`]) { + const altRes = await httpRequest('GET', altUrl, retryParams); + const altMd = parseResponse(altUrl, altRes); + if (altMd !== null && altMd.versionId === versionId) { + return altMd; + } + } + return null; +} + +/** + * Async generator that iterates over all versions in a bucket using + * DelimiterVersions listing. Yields { key, versionId, value } for each + * version, where value is the fully resolved metadata. When the listing + * result has a pruned location array (large MPUs), the full metadata is + * fetched individually. + * + * Page fetches and individual metadata fetches are retried on transient + * errors (network failures and 5xx responses) using RETRY_PARAMS. + * + * @param {string} bucketdHostport - host:port of the bucketd endpoint + * @param {string} bucket - name of the bucket to list + * @param {object} [options] - listing options + * @param {number} [options.pageSize=1000] - number of entries requested per + * listing page (passed as maxKeys to bucketd) + * @param {number} [options.maxItems] - maximum total number of entries to + * yield; if omitted, all entries are yielded + * @param {string} [options.prefix] - only yield entries whose key starts with + * this prefix; an empty string (the default) applies no prefix filter + * @param {string} [options.keyMarker] - resume listing from this key marker + * (exclusive); defaults to the beginning of the bucket + * @param {string} [options.versionIdMarker] - resume listing from this + * version ID marker, used together with keyMarker + * @param {object} [options.retry] - if provided, passed as retryParams to + * httpRequest to retry on network errors and 5xx responses + * (e.g. { times: 100, interval: 5000 }); by default requests are not retried + * @returns {AsyncGenerator<{key: string, versionId: string, value: object}>} + * async generator yielding one entry per version + */ +async function* listVersions(bucketdHostport, bucket, { + pageSize = 1000, + maxItems, + prefix = '', + keyMarker: startKeyMarker = '', + versionIdMarker: startVersionIdMarker = '', + retry, +} = {}) { + let keyMarker = startKeyMarker; + let versionIdMarker = startVersionIdMarker; + let isTruncated = true; + let remaining = maxItems ?? Infinity; + + while (isTruncated && remaining > 0) { + const maxKeys = Math.min(pageSize, remaining); + const url = `http://${bucketdHostport}/default/bucket/${bucket}` + + `?listingType=DelimiterVersions&maxKeys=${maxKeys}${ + prefix ? `&prefix=${encodeURIComponent(prefix)}` : '' + }&keyMarker=${encodeURIComponent(keyMarker)}` + + `&versionIdMarker=${encodeURIComponent(versionIdMarker)}`; + + const res = await httpRequest('GET', url, retry); + if (res.statusCode !== 200) { + throw new Error(`GET ${url} returned status ${res.statusCode}`); + } + const { + Versions, + IsTruncated, + NextKeyMarker, + NextVersionIdMarker, + } = JSON.parse(res.body); + + for (const entry of (Versions || [])) { + const { key, versionId } = entry; + let parsedMd; + try { + parsedMd = JSON.parse(entry.value); + } catch (e) { + log.warn('failed to parse object metadata', { + bucket, + key, + error: e.message, + }); + continue; + } + // Only fetch full metadata when the listing result has a pruned + // location array: typically the field is absent for large MPUs + const needMdFetch = ( + 'content-length' in parsedMd + && parsedMd['content-length'] !== 0 + && (parsedMd.location === undefined || parsedMd.location === null) + ); + if (needMdFetch) { + parsedMd = await fetchFullObjectMetadata(bucketdHostport, bucket, key, versionId, parsedMd, retry); + if (parsedMd === null) { + log.debug('full object metadata not found or skipped', { + bucket, key, versionId, + }); + continue; + } + } + yield { key, versionId, value: parsedMd }; + --remaining; + } + + isTruncated = IsTruncated; + if (isTruncated) { + keyMarker = NextKeyMarker || ''; + versionIdMarker = NextVersionIdMarker || ''; + } + } +} + +module.exports = listVersions; diff --git a/utils/async/httpRequest.js b/utils/async/httpRequest.js new file mode 100644 index 00000000..6fda7608 --- /dev/null +++ b/utils/async/httpRequest.js @@ -0,0 +1,67 @@ + + +const http = require('http'); +const async = require('async'); +const { http: httpArsn } = require('httpagent'); +const werelogs = require('werelogs'); + +const log = new werelogs.Logger('s3utils:httpRequest'); + +const httpAgent = new httpArsn.Agent({ + keepAlive: true, +}); + +/** + * Makes an HTTP request and returns the response with the full body + * buffered in res.body. + * + * @param {string} method - HTTP method (e.g. 'GET', 'DELETE') + * @param {string} url - full URL to request + * @param {object} [retryParams] - if provided, failed requests are retried + * via async.retry with these params (e.g. { times: 100, interval: 5000 }); + * both network errors and 5xx responses are treated as retryable failures + * @returns {Promise} response object with body in res.body + */ +function httpRequest(method, url, retryParams) { + async function attempt() { + const res = await new Promise((resolve, reject) => { + const urlObj = new URL(url); + const req = http.request({ + hostname: urlObj.hostname, + port: urlObj.port, + path: `${urlObj.pathname}${urlObj.search}`, + method, + agent: httpAgent, + }, r => { + const chunks = []; + r.on('data', chunk => chunks.push(chunk)); + r.once('end', () => { + // eslint-disable-next-line no-param-reassign + r.body = chunks.join(''); + log.trace('received HTTP response', { method, url, statusCode: r.statusCode }); + resolve(r); + }); + r.once('error', err => reject(new Error( + 'error reading response from HTTP request ' + + `to ${url}: ${err.message}`, + ))); + }); + req.once('error', err => reject(new Error( + `error sending HTTP request to ${url}: ${err.message}`, + ))); + log.trace('sending HTTP request', { method, url }); + req.end(); + }); + if (res.statusCode >= 500) { + throw new Error(`${method} ${url} returned status ${res.statusCode}`); + } + return res; + } + + if (retryParams) { + return async.retry(retryParams, attempt); + } + return attempt(); +} + +module.exports = httpRequest;