From f18e72e3c3b33e9f7ecb344be3d502d6adb10611 Mon Sep 17 00:00:00 2001 From: Santiago Gimeno Date: Mon, 24 Mar 2025 15:39:07 +0100 Subject: [PATCH] test: fix flaky test-otlp-grpc-metrics After having enabled `ENABLE_ASYNC_EXPORT` in `opentelemetry-cpp`. --- test/agents/test-otlp-grpc-metrics.mjs | 86 ++++++++++++-------------- 1 file changed, 41 insertions(+), 45 deletions(-) diff --git a/test/agents/test-otlp-grpc-metrics.mjs b/test/agents/test-otlp-grpc-metrics.mjs index 437a3f7a5cc..698226dc8d4 100644 --- a/test/agents/test-otlp-grpc-metrics.mjs +++ b/test/agents/test-otlp-grpc-metrics.mjs @@ -437,7 +437,6 @@ if (process.argv[2] === 'child') { None: 0, ProcMetrics: 1, ThreadMetrics: 2, - Done: 3, }; let nsolidId; @@ -550,79 +549,76 @@ if (process.argv[2] === 'child') { if (resourceMetrics[0].scopeMetrics[0].metrics) { context.metrics = [...resourceMetrics[0].scopeMetrics[0].metrics]; - if (context.state === State.None) { - const initialState = determineInitialState(context.metrics); - if (!initialState) { - return; - } - context.state = initialState.state; - context.expected = [...initialState.expectedMetrics]; - context.threadId = initialState.threadId; - } - while (context.metrics.length > 0 && context.state !== State.Done) { + // Process metrics until we've handled all of them or reached the Done state + while (context.metrics.length > 0 && !areWeDone(context)) { + if (context.state === State.None) { + updateStateAndExpected(context); + } checkMetricsData(context); if (context.expected.length === 0) { - updateStateAndExpected(context); + if (context.state === State.ProcMetrics) { + context.procMetricsDone = true; + } else if (context.state === State.ThreadMetrics) { + context.threadList.shift(); + } + context.state = State.None; } } } } - function determineInitialState(metrics) { - // Check if the first metric contains the thread.id attribute to determine the initial state - const metric = metrics[0]; - if (!metric) { - return null; - } - const attributes = metric[metric.data].dataPoints[0].attributes; - const attrIndex = attributes?.findIndex((a) => a.key === 'thread.id'); - if (attrIndex > -1) { - const threadId = parseInt(attributes[attrIndex].value.intValue, 10); - // Remove threadId from context.threadList array - const threadList = context.threadList; - const index = threadList.indexOf(threadId); - if (index > -1) { - threadList.splice(index, 1); + function updateStateAndExpected(context) { + // Check what type of metrics we have in the current batch + const hasThreadMetrics = context.metrics.some((metric) => { + if (!metric || !metric[metric.data] || !metric[metric.data].dataPoints || + !metric[metric.data].dataPoints[0] || !metric[metric.data].dataPoints[0].attributes) { + return false; } - return { state: State.ThreadMetrics, expectedMetrics: [...expectedThreadMetrics], threadId }; - } - - return { state: State.ProcMetrics, expectedMetrics: [...expectedProcMetrics] }; - } + const attributes = metric[metric.data].dataPoints[0].attributes; + return attributes.some((attr) => attr.key === 'thread.id'); + }); - function updateStateAndExpected(context) { - if (context.state === State.ProcMetrics) { - context.state = State.Done; - } else if (context.state === State.ThreadMetrics) { - if (context.threadList.length === 0) { - context.state = State.ProcMetrics; - context.expected = [...expectedProcMetrics]; - } else { - context.threadId = context.threadList.shift(); - context.expected = [...expectedThreadMetrics]; - } + const hasProcMetrics = context.metrics.some((metric) => + expectedProcMetrics.some((m) => m[0] === metric.name)); + + assert.ok(hasThreadMetrics || hasProcMetrics, 'No thread or process metrics found'); + if (hasThreadMetrics) { + assert.ok(context.threadList.length > 0, 'No more threads available'); + context.state = State.ThreadMetrics; + context.expected = [...expectedThreadMetrics]; + } else { + context.state = State.ProcMetrics; + context.expected = [...expectedProcMetrics]; } } const context = { state: State.None, - prevState: State.None, metrics: [], expected: [], threadId: null, threadList: [ threadId ], + procMetricsDone: false, }; + function areWeDone(context) { + return context.state === State.None && + context.threadList.length === 0 && + context.procMetricsDone; + } + async function runTest(getEnv) { return new Promise((resolve, reject) => { + let childExited = false; const otlpServer = new OTLPGRPCServer(); otlpServer.start(mustSucceed(async (port) => { otlpServer.on('metrics', mustCallAtLeast((metrics) => { checkMetrics(metrics, context); - if (context.state === State.Done) { + if (areWeDone(context) && !childExited) { child.send('exit'); + childExited = true; } }, 1));