Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 41 additions & 45 deletions test/agents/test-otlp-grpc-metrics.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,6 @@ if (process.argv[2] === 'child') {
None: 0,
ProcMetrics: 1,
ThreadMetrics: 2,
Done: 3,
};

let nsolidId;
Expand Down Expand Up @@ -550,79 +549,76 @@ if (process.argv[2] === 'child') {

if (resourceMetrics[0].scopeMetrics[0].metrics) {
context.metrics = [...resourceMetrics[0].scopeMetrics[0].metrics];
if (context.state === State.None) {
const initialState = determineInitialState(context.metrics);
if (!initialState) {
return;
}
context.state = initialState.state;
context.expected = [...initialState.expectedMetrics];
context.threadId = initialState.threadId;
}

while (context.metrics.length > 0 && context.state !== State.Done) {
// Process metrics until we've handled all of them or reached the Done state
while (context.metrics.length > 0 && !areWeDone(context)) {
if (context.state === State.None) {
updateStateAndExpected(context);
}
checkMetricsData(context);
if (context.expected.length === 0) {
updateStateAndExpected(context);
if (context.state === State.ProcMetrics) {
context.procMetricsDone = true;
} else if (context.state === State.ThreadMetrics) {
context.threadList.shift();
}
context.state = State.None;
}
}
}
}

function determineInitialState(metrics) {
// Check if the first metric contains the thread.id attribute to determine the initial state
const metric = metrics[0];
if (!metric) {
return null;
}
const attributes = metric[metric.data].dataPoints[0].attributes;
const attrIndex = attributes?.findIndex((a) => a.key === 'thread.id');
if (attrIndex > -1) {
const threadId = parseInt(attributes[attrIndex].value.intValue, 10);
// Remove threadId from context.threadList array
const threadList = context.threadList;
const index = threadList.indexOf(threadId);
if (index > -1) {
threadList.splice(index, 1);
function updateStateAndExpected(context) {
// Check what type of metrics we have in the current batch
const hasThreadMetrics = context.metrics.some((metric) => {
if (!metric || !metric[metric.data] || !metric[metric.data].dataPoints ||
!metric[metric.data].dataPoints[0] || !metric[metric.data].dataPoints[0].attributes) {
return false;
}

return { state: State.ThreadMetrics, expectedMetrics: [...expectedThreadMetrics], threadId };
}

return { state: State.ProcMetrics, expectedMetrics: [...expectedProcMetrics] };
}
const attributes = metric[metric.data].dataPoints[0].attributes;
return attributes.some((attr) => attr.key === 'thread.id');
});

function updateStateAndExpected(context) {
if (context.state === State.ProcMetrics) {
context.state = State.Done;
} else if (context.state === State.ThreadMetrics) {
if (context.threadList.length === 0) {
context.state = State.ProcMetrics;
context.expected = [...expectedProcMetrics];
} else {
context.threadId = context.threadList.shift();
context.expected = [...expectedThreadMetrics];
}
const hasProcMetrics = context.metrics.some((metric) =>
expectedProcMetrics.some((m) => m[0] === metric.name));

assert.ok(hasThreadMetrics || hasProcMetrics, 'No thread or process metrics found');
if (hasThreadMetrics) {
assert.ok(context.threadList.length > 0, 'No more threads available');
context.state = State.ThreadMetrics;
context.expected = [...expectedThreadMetrics];
} else {
context.state = State.ProcMetrics;
context.expected = [...expectedProcMetrics];
}
}

const context = {
state: State.None,
prevState: State.None,
metrics: [],
expected: [],
threadId: null,
threadList: [ threadId ],
procMetricsDone: false,
};

function areWeDone(context) {
return context.state === State.None &&
context.threadList.length === 0 &&
context.procMetricsDone;
}

async function runTest(getEnv) {
return new Promise((resolve, reject) => {
let childExited = false;
const otlpServer = new OTLPGRPCServer();
otlpServer.start(mustSucceed(async (port) => {
otlpServer.on('metrics', mustCallAtLeast((metrics) => {
checkMetrics(metrics, context);
if (context.state === State.Done) {
if (areWeDone(context) && !childExited) {
child.send('exit');
childExited = true;
}
}, 1));

Expand Down
Loading