Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
357 changes: 356 additions & 1 deletion web-console/src/druid-models/stages/stages.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

import { aggregateSortProgressCounters } from './stages';
import { aggregateSortProgressCounters, Stages } from './stages';
import { STAGES } from './stages.mock';

describe('aggregateSortProgressCounters', () => {
Expand Down Expand Up @@ -69,6 +69,361 @@ describe('Stages', () => {
});
});

describe('#getInactiveWorkerCount', () => {
it('returns undefined when no counters exist for stage', () => {
// Create a custom Stages instance where stage has no counters
const customStages = new Stages(
[
{
stageNumber: 5,
definition: {
id: 'test-stage-no-counters',
input: [
{
type: 'external',
inputSource: { type: 'http', uris: [] },
inputFormat: { type: 'json' },
signature: [],
},
],
processor: { type: 'scan' },
signature: [],
maxWorkerCount: 1,
},
phase: 'NEW',
workerCount: 1,
partitionCount: 1,
},
],
{},
);

expect(customStages.getInactiveWorkerCount(customStages.stages[0])).toBeUndefined();
});

it('counts workers with zero rows across all channels', () => {
// Stage 2 has counters data in the mock
const inactiveCount = STAGES.getInactiveWorkerCount(STAGES.stages[2]);
expect(inactiveCount).toBe(0);
});

it('identifies inactive workers correctly', () => {
// Create a custom Stages instance with workers that have zero rows
const customStages = new Stages(
[
{
stageNumber: 0,
definition: {
id: 'test-stage',
input: [
{
type: 'external',
inputSource: { type: 'http', uris: [] },
inputFormat: { type: 'json' },
signature: [],
},
],
processor: { type: 'scan' },
signature: [],
maxWorkerCount: 3,
},
phase: 'READING_INPUT',
workerCount: 3,
partitionCount: 1,
},
],
{
'0': {
'0': {
input0: {
type: 'channel',
rows: [100],
bytes: [1000],
},
output: {
type: 'channel',
rows: [100],
bytes: [1000],
},
},
'1': {
input0: {
type: 'channel',
rows: [0],
bytes: [0],
},
output: {
type: 'channel',
rows: [0],
bytes: [0],
},
},
'2': {
input0: {
type: 'channel',
rows: [0],
bytes: [0],
},
output: {
type: 'channel',
rows: [0],
bytes: [0],
},
},
},
},
);

const inactiveCount = customStages.getInactiveWorkerCount(customStages.stages[0]);
expect(inactiveCount).toBe(2);
});

it('handles missing channel data correctly', () => {
// Create a custom Stages instance where some workers have missing channels
const customStages = new Stages(
[
{
stageNumber: 0,
definition: {
id: 'test-stage',
input: [
{
type: 'external',
inputSource: { type: 'http', uris: [] },
inputFormat: { type: 'json' },
signature: [],
},
],
processor: { type: 'scan' },
signature: [],
maxWorkerCount: 2,
},
phase: 'READING_INPUT',
workerCount: 2,
partitionCount: 1,
},
],
{
'0': {
'0': {
input0: {
type: 'channel',
rows: [100],
bytes: [1000],
},
},
'1': {
// Missing input0 channel - should be counted as inactive
},
},
},
);

const inactiveCount = customStages.getInactiveWorkerCount(customStages.stages[0]);
expect(inactiveCount).toBe(1);
});

it('counts all workers as inactive when all have zero rows', () => {
const customStages = new Stages(
[
{
stageNumber: 0,
definition: {
id: 'test-stage',
input: [
{
type: 'external',
inputSource: { type: 'http', uris: [] },
inputFormat: { type: 'json' },
signature: [],
},
],
processor: { type: 'scan' },
signature: [],
maxWorkerCount: 2,
},
phase: 'READING_INPUT',
workerCount: 2,
partitionCount: 1,
},
],
{
'0': {
'0': {
input0: {
type: 'channel',
rows: [],
bytes: [],
},
},
'1': {
input0: {
type: 'channel',
rows: [0],
bytes: [0],
},
},
},
},
);

const inactiveCount = customStages.getInactiveWorkerCount(customStages.stages[0]);
expect(inactiveCount).toBe(2);
});

it('counts no inactive workers when all have non-zero rows', () => {
const customStages = new Stages(
[
{
stageNumber: 0,
definition: {
id: 'test-stage',
input: [
{
type: 'external',
inputSource: { type: 'http', uris: [] },
inputFormat: { type: 'json' },
signature: [],
},
],
processor: { type: 'scan' },
signature: [],
maxWorkerCount: 3,
},
phase: 'READING_INPUT',
workerCount: 3,
partitionCount: 1,
},
],
{
'0': {
'0': {
input0: {
type: 'channel',
rows: [100],
bytes: [1000],
},
},
'1': {
input0: {
type: 'channel',
rows: [50],
bytes: [500],
},
},
'2': {
input0: {
type: 'channel',
rows: [75],
bytes: [750],
},
},
},
},
);

const inactiveCount = customStages.getInactiveWorkerCount(customStages.stages[0]);
expect(inactiveCount).toBe(0);
});

it('counts worker as active if it has output but no input yet', () => {
// Tests the fix: input is reported in batches, so a worker might have output
// before input counters are updated. Such workers should be considered active.
const customStages = new Stages(
[
{
stageNumber: 0,
definition: {
id: 'test-stage',
input: [
{
type: 'external',
inputSource: { type: 'http', uris: [] },
inputFormat: { type: 'json' },
signature: [],
},
],
processor: { type: 'scan' },
signature: [],
shuffleSpec: {
type: 'targetSize',
clusterBy: { columns: [] },
targetSize: 3000000,
},
maxWorkerCount: 3,
},
phase: 'READING_INPUT',
workerCount: 3,
partitionCount: 1,
},
],
{
'0': {
'0': {
input0: {
type: 'channel',
rows: [100],
bytes: [1000],
},
output: {
type: 'channel',
rows: [100],
bytes: [1000],
},
shuffle: {
type: 'channel',
rows: [100],
bytes: [1000],
},
},
'1': {
// Worker 1 has output and shuffle but input is not reported yet (still zero)
// This can happen because input is reported in batches
input0: {
type: 'channel',
rows: [0],
bytes: [0],
},
output: {
type: 'channel',
rows: [50],
bytes: [500],
},
shuffle: {
type: 'channel',
rows: [50],
bytes: [500],
},
},
'2': {
// Worker 2 is truly inactive - zero across all channels
input0: {
type: 'channel',
rows: [0],
bytes: [0],
},
output: {
type: 'channel',
rows: [0],
bytes: [0],
},
shuffle: {
type: 'channel',
rows: [0],
bytes: [0],
},
},
},
},
);

const inactiveCount = customStages.getInactiveWorkerCount(customStages.stages[0]);
// Only worker 2 should be counted as inactive
// Worker 1 has output/shuffle data, so it's active even though input is zero
expect(inactiveCount).toBe(1);
});
});

describe('#getByPartitionCountersForStage', () => {
it('works for input', () => {
expect(STAGES.getByPartitionCountersForStage(STAGES.stages[2], 'in')).toMatchInlineSnapshot(`
Expand Down
6 changes: 3 additions & 3 deletions web-console/src/druid-models/stages/stages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -619,17 +619,17 @@ export class Stages {

getInactiveWorkerCount(stage: StageDefinition): number | undefined {
const { counters } = this;
const { stageNumber, definition } = stage;
const { stageNumber } = stage;
const forStageCounters = counters?.[stageNumber];
if (!forStageCounters) return;

const inputChannelCounters = definition.input.map((_, i) => `input${i}` as ChannelCounterName);
const channelCounters = this.getChannelCounterNamesForStage(stage);

// Calculate and return the number of workers that have zero count across all inputChannelCounters
return sum(
Object.values(forStageCounters).map(stageCounters =>
Number(
inputChannelCounters.every(channel => {
channelCounters.every(channel => {
const c = stageCounters[channel];
if (!c) return true;
const totalRows = sum(c.rows || []);
Expand Down
Loading