From 0a64cc849a7dfac0407519517bc06274b71a3b2a Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Fri, 28 Jul 2023 06:27:04 -0700 Subject: [PATCH 1/2] Fix bug in implementation of wait, where stop isn't called after initial update --- index.js | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/index.js b/index.js index b1803a11..7e3a43f2 100644 --- a/index.js +++ b/index.js @@ -252,8 +252,6 @@ class Replicate { return prediction; } - let updatedPrediction = await this.predictions.get(id); - // eslint-disable-next-line no-promise-executor-return const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); @@ -261,11 +259,18 @@ class Replicate { const interval = options.interval || 250; const max_attempts = options.max_attempts || null; + let updatedPrediction = await this.predictions.get(id); + while ( updatedPrediction.status !== 'succeeded' && updatedPrediction.status !== 'failed' && updatedPrediction.status !== 'canceled' ) { + /* eslint-disable no-await-in-loop */ + if (stop && await stop(updatedPrediction) === true) { + break; + } + attempts += 1; if (max_attempts && attempts > max_attempts) { throw new Error( @@ -273,12 +278,8 @@ class Replicate { ); } - /* eslint-disable no-await-in-loop */ await sleep(interval); updatedPrediction = await this.predictions.get(prediction.id); - if (stop && await stop(updatedPrediction) === true) { - break; - } /* eslint-enable no-await-in-loop */ } From cefc5ae7a6ce39e372360a09e198d93dc0897511 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Fri, 28 Jul 2023 06:27:17 -0700 Subject: [PATCH 2/2] Add progress parameter to run method --- index.d.ts | 3 ++- index.js | 22 +++++++++++++++++++--- index.test.ts | 44 ++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 61 insertions(+), 8 deletions(-) diff --git a/index.d.ts b/index.d.ts index 6b56bebc..0b59dcb4 100644 --- a/index.d.ts +++ b/index.d.ts @@ -89,7 +89,8 @@ declare module 'replicate' { webhook?: string; webhook_events_filter?: WebhookEventType[]; signal?: AbortSignal; - } + }, + progress?: (Prediction) => void ): Promise; request(route: string | URL, options: { diff --git a/index.js b/index.js index 7e3a43f2..b147ea5e 100644 --- a/index.js +++ b/index.js @@ -86,10 +86,11 @@ class Replicate { * @param {string} [options.webhook] - An HTTPS URL for receiving a webhook when the prediction has new output * @param {string[]} [options.webhook_events_filter] - You can change which events trigger webhook requests by specifying webhook events (`start`|`output`|`logs`|`completed`) * @param {AbortSignal} [options.signal] - AbortSignal to cancel the prediction + * @param {Function} [progress] - Callback function that receives the prediction object as it's updated. The function is called when the prediction is created, each time its updated while polling for completion, and when it's completed. * @throws {Error} If the prediction failed * @returns {Promise} - Resolves with the output of running the model */ - async run(identifier, options) { + async run(identifier, options, progress) { const { wait, ...data } = options; // Define a pattern for owner and model names that allows @@ -117,17 +118,32 @@ class Replicate { version, }); + // Call progress callback with the initial prediction object + if (progress) { + progress(prediction); + } + const { signal } = options; - prediction = await this.wait(prediction, wait || {}, async ({ id }) => { + prediction = await this.wait(prediction, wait || {}, async (updatedPrediction) => { + // Call progress callback with the updated prediction object + if (progress) { + progress(updatedPrediction); + } + if (signal && signal.aborted) { - await this.predictions.cancel(id); + await this.predictions.cancel(updatedPrediction.id); return true; // stop polling } return false; // continue polling }); + // Call progress callback with the completed prediction object + if (progress) { + progress(prediction); + } + if (prediction.status === 'failed') { throw new Error(`Prediction failed: ${prediction.error}`); } diff --git a/index.test.ts b/index.test.ts index 9147b816..52e7c252 100644 --- a/index.test.ts +++ b/index.test.ts @@ -517,26 +517,62 @@ describe('Replicate client', () => { describe('run', () => { test('Calls the correct API routes', async () => { + let firstPollingRequest = true; + nock(BASE_URL) .post('/predictions') + .reply(201, { + id: 'ufawqhfynnddngldkgtslldrkq', + status: 'starting', + }) + .get('/predictions/ufawqhfynnddngldkgtslldrkq') + .twice() .reply(200, { id: 'ufawqhfynnddngldkgtslldrkq', status: 'processing', }) .get('/predictions/ufawqhfynnddngldkgtslldrkq') - .reply(201, { + .reply(200, { id: 'ufawqhfynnddngldkgtslldrkq', status: 'succeeded', - output: 'foobar', + output: 'Goodbye!', }); + const progress = jest.fn(); + const output = await client.run( 'owner/model:5c7d5dc6dd8bf75c1acaa8565735e7986bc5b66206b55cca93cb72c9bf15ccaa', { input: { text: 'Hello, world!' }, - } + wait: { interval: 1 } + }, + progress ); - expect(output).toBe('foobar'); + + expect(output).toBe('Goodbye!'); + + expect(progress).toHaveBeenNthCalledWith(1, { + id: 'ufawqhfynnddngldkgtslldrkq', + status: 'starting', + }); + + expect(progress).toHaveBeenNthCalledWith(2, { + id: 'ufawqhfynnddngldkgtslldrkq', + status: 'processing', + }); + + expect(progress).toHaveBeenNthCalledWith(3, { + id: 'ufawqhfynnddngldkgtslldrkq', + status: 'processing', + }); + + expect(progress).toHaveBeenNthCalledWith(4, { + id: 'ufawqhfynnddngldkgtslldrkq', + status: 'succeeded', + output: 'Goodbye!', + }); + + expect(progress).toHaveBeenCalledTimes(4); }); test('Does not throw an error for identifier containing hyphen and full stop', async () => {