From 7bc11353b9cebf3796ce3dc26c527ca9b56c72cf Mon Sep 17 00:00:00 2001 From: Vadim Ogievetsky Date: Wed, 3 Dec 2025 11:07:41 -0800 Subject: [PATCH 1/2] inactive if zero across all counters, not just input --- web-console/src/druid-models/stages/stages.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/web-console/src/druid-models/stages/stages.ts b/web-console/src/druid-models/stages/stages.ts index 65818c804679..13131dc344ce 100644 --- a/web-console/src/druid-models/stages/stages.ts +++ b/web-console/src/druid-models/stages/stages.ts @@ -619,17 +619,17 @@ export class Stages { getInactiveWorkerCount(stage: StageDefinition): number | undefined { const { counters } = this; - const { stageNumber, definition } = stage; + const { stageNumber } = stage; const forStageCounters = counters?.[stageNumber]; if (!forStageCounters) return; - const inputChannelCounters = definition.input.map((_, i) => `input${i}` as ChannelCounterName); + const channelCounters = this.getChannelCounterNamesForStage(stage); // Calculate and return the number of workers that have zero count across all inputChannelCounters return sum( Object.values(forStageCounters).map(stageCounters => Number( - inputChannelCounters.every(channel => { + channelCounters.every(channel => { const c = stageCounters[channel]; if (!c) return true; const totalRows = sum(c.rows || []); From 9dcd2e9c7ad33dfc88d16e59fd4aece48886e4e3 Mon Sep 17 00:00:00 2001 From: Vadim Ogievetsky Date: Wed, 3 Dec 2025 11:46:37 -0800 Subject: [PATCH 2/2] add test --- .../src/druid-models/stages/stages.spec.ts | 357 +++++++++++++++++- 1 file changed, 356 insertions(+), 1 deletion(-) diff --git a/web-console/src/druid-models/stages/stages.spec.ts b/web-console/src/druid-models/stages/stages.spec.ts index 0cc49486ba62..9b931215930d 100644 --- a/web-console/src/druid-models/stages/stages.spec.ts +++ b/web-console/src/druid-models/stages/stages.spec.ts @@ -16,7 +16,7 @@ * limitations under the License. */ -import { aggregateSortProgressCounters } from './stages'; +import { aggregateSortProgressCounters, Stages } from './stages'; import { STAGES } from './stages.mock'; describe('aggregateSortProgressCounters', () => { @@ -69,6 +69,361 @@ describe('Stages', () => { }); }); + describe('#getInactiveWorkerCount', () => { + it('returns undefined when no counters exist for stage', () => { + // Create a custom Stages instance where stage has no counters + const customStages = new Stages( + [ + { + stageNumber: 5, + definition: { + id: 'test-stage-no-counters', + input: [ + { + type: 'external', + inputSource: { type: 'http', uris: [] }, + inputFormat: { type: 'json' }, + signature: [], + }, + ], + processor: { type: 'scan' }, + signature: [], + maxWorkerCount: 1, + }, + phase: 'NEW', + workerCount: 1, + partitionCount: 1, + }, + ], + {}, + ); + + expect(customStages.getInactiveWorkerCount(customStages.stages[0])).toBeUndefined(); + }); + + it('counts workers with zero rows across all channels', () => { + // Stage 2 has counters data in the mock + const inactiveCount = STAGES.getInactiveWorkerCount(STAGES.stages[2]); + expect(inactiveCount).toBe(0); + }); + + it('identifies inactive workers correctly', () => { + // Create a custom Stages instance with workers that have zero rows + const customStages = new Stages( + [ + { + stageNumber: 0, + definition: { + id: 'test-stage', + input: [ + { + type: 'external', + inputSource: { type: 'http', uris: [] }, + inputFormat: { type: 'json' }, + signature: [], + }, + ], + processor: { type: 'scan' }, + signature: [], + maxWorkerCount: 3, + }, + phase: 'READING_INPUT', + workerCount: 3, + partitionCount: 1, + }, + ], + { + '0': { + '0': { + input0: { + type: 'channel', + rows: [100], + bytes: [1000], + }, + output: { + type: 'channel', + rows: [100], + bytes: [1000], + }, + }, + '1': { + input0: { + type: 'channel', + rows: [0], + bytes: [0], + }, + output: { + type: 'channel', + rows: [0], + bytes: [0], + }, + }, + '2': { + input0: { + type: 'channel', + rows: [0], + bytes: [0], + }, + output: { + type: 'channel', + rows: [0], + bytes: [0], + }, + }, + }, + }, + ); + + const inactiveCount = customStages.getInactiveWorkerCount(customStages.stages[0]); + expect(inactiveCount).toBe(2); + }); + + it('handles missing channel data correctly', () => { + // Create a custom Stages instance where some workers have missing channels + const customStages = new Stages( + [ + { + stageNumber: 0, + definition: { + id: 'test-stage', + input: [ + { + type: 'external', + inputSource: { type: 'http', uris: [] }, + inputFormat: { type: 'json' }, + signature: [], + }, + ], + processor: { type: 'scan' }, + signature: [], + maxWorkerCount: 2, + }, + phase: 'READING_INPUT', + workerCount: 2, + partitionCount: 1, + }, + ], + { + '0': { + '0': { + input0: { + type: 'channel', + rows: [100], + bytes: [1000], + }, + }, + '1': { + // Missing input0 channel - should be counted as inactive + }, + }, + }, + ); + + const inactiveCount = customStages.getInactiveWorkerCount(customStages.stages[0]); + expect(inactiveCount).toBe(1); + }); + + it('counts all workers as inactive when all have zero rows', () => { + const customStages = new Stages( + [ + { + stageNumber: 0, + definition: { + id: 'test-stage', + input: [ + { + type: 'external', + inputSource: { type: 'http', uris: [] }, + inputFormat: { type: 'json' }, + signature: [], + }, + ], + processor: { type: 'scan' }, + signature: [], + maxWorkerCount: 2, + }, + phase: 'READING_INPUT', + workerCount: 2, + partitionCount: 1, + }, + ], + { + '0': { + '0': { + input0: { + type: 'channel', + rows: [], + bytes: [], + }, + }, + '1': { + input0: { + type: 'channel', + rows: [0], + bytes: [0], + }, + }, + }, + }, + ); + + const inactiveCount = customStages.getInactiveWorkerCount(customStages.stages[0]); + expect(inactiveCount).toBe(2); + }); + + it('counts no inactive workers when all have non-zero rows', () => { + const customStages = new Stages( + [ + { + stageNumber: 0, + definition: { + id: 'test-stage', + input: [ + { + type: 'external', + inputSource: { type: 'http', uris: [] }, + inputFormat: { type: 'json' }, + signature: [], + }, + ], + processor: { type: 'scan' }, + signature: [], + maxWorkerCount: 3, + }, + phase: 'READING_INPUT', + workerCount: 3, + partitionCount: 1, + }, + ], + { + '0': { + '0': { + input0: { + type: 'channel', + rows: [100], + bytes: [1000], + }, + }, + '1': { + input0: { + type: 'channel', + rows: [50], + bytes: [500], + }, + }, + '2': { + input0: { + type: 'channel', + rows: [75], + bytes: [750], + }, + }, + }, + }, + ); + + const inactiveCount = customStages.getInactiveWorkerCount(customStages.stages[0]); + expect(inactiveCount).toBe(0); + }); + + it('counts worker as active if it has output but no input yet', () => { + // Tests the fix: input is reported in batches, so a worker might have output + // before input counters are updated. Such workers should be considered active. + const customStages = new Stages( + [ + { + stageNumber: 0, + definition: { + id: 'test-stage', + input: [ + { + type: 'external', + inputSource: { type: 'http', uris: [] }, + inputFormat: { type: 'json' }, + signature: [], + }, + ], + processor: { type: 'scan' }, + signature: [], + shuffleSpec: { + type: 'targetSize', + clusterBy: { columns: [] }, + targetSize: 3000000, + }, + maxWorkerCount: 3, + }, + phase: 'READING_INPUT', + workerCount: 3, + partitionCount: 1, + }, + ], + { + '0': { + '0': { + input0: { + type: 'channel', + rows: [100], + bytes: [1000], + }, + output: { + type: 'channel', + rows: [100], + bytes: [1000], + }, + shuffle: { + type: 'channel', + rows: [100], + bytes: [1000], + }, + }, + '1': { + // Worker 1 has output and shuffle but input is not reported yet (still zero) + // This can happen because input is reported in batches + input0: { + type: 'channel', + rows: [0], + bytes: [0], + }, + output: { + type: 'channel', + rows: [50], + bytes: [500], + }, + shuffle: { + type: 'channel', + rows: [50], + bytes: [500], + }, + }, + '2': { + // Worker 2 is truly inactive - zero across all channels + input0: { + type: 'channel', + rows: [0], + bytes: [0], + }, + output: { + type: 'channel', + rows: [0], + bytes: [0], + }, + shuffle: { + type: 'channel', + rows: [0], + bytes: [0], + }, + }, + }, + }, + ); + + const inactiveCount = customStages.getInactiveWorkerCount(customStages.stages[0]); + // Only worker 2 should be counted as inactive + // Worker 1 has output/shuffle data, so it's active even though input is zero + expect(inactiveCount).toBe(1); + }); + }); + describe('#getByPartitionCountersForStage', () => { it('works for input', () => { expect(STAGES.getByPartitionCountersForStage(STAGES.stages[2], 'in')).toMatchInlineSnapshot(`