Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
df51513
Bump diagnostics-nodejs version
daniyelnnr Jul 25, 2025
aa2e3bb
Refactor telemetry client to support all signals
daniyelnnr Aug 4, 2025
35428ee
Refactor getLogClient to simplify initialization
daniyelnnr Aug 4, 2025
536e16c
Refactor logger client types
daniyelnnr Aug 4, 2025
b5b2425
Add resolution for @grpc/grpc-js dependency
daniyelnnr Aug 5, 2025
ec13074
Update package version to 6.49.8-beta.0
daniyelnnr Aug 5, 2025
00afbcb
Refactor singleton to use dedicated initialization methods for teleme…
daniyelnnr Aug 12, 2025
f3d9c05
Refactor telemetry client initialization logic
daniyelnnr Aug 13, 2025
58d03c9
Update package.json and yarn.lock
daniyelnnr Aug 8, 2025
cfac3a1
Update startApp function to proper init telemetry
daniyelnnr Aug 8, 2025
48127f3
Add metrics client
daniyelnnr Aug 8, 2025
e10016d
Add metrics instruments for monitoring HTTP requests
daniyelnnr Aug 8, 2025
c1d29a9
Add middleware for request metrics
daniyelnnr Aug 8, 2025
4549879
Add middleware usage on app
daniyelnnr Aug 8, 2025
cf53947
Add Koa instrumentation to telemetry client
daniyelnnr Aug 8, 2025
7faeac7
Add Koa context propagation middleware to app worker
daniyelnnr Aug 8, 2025
41f7dee
Add host-metrics instrumentation
daniyelnnr Aug 8, 2025
5b5146d
Add host-metrics instrumentation to telemetry client
daniyelnnr Aug 8, 2025
8b715bd
Merge branch 'master' into chore/bump-diagnostics
daniyelnnr Aug 14, 2025
6503f4d
Release v7.0.1
daniyelnnr Aug 14, 2025
c9bde12
Merge branch 'chore/bump-diagnostics' into update/metrics
daniyelnnr Aug 14, 2025
ff3bbb9
Release v7.1.0-beta.0
daniyelnnr Aug 14, 2025
b54e413
Improves code formatting for setTimeout
daniyelnnr Aug 18, 2025
b596cc7
Refactor instrument init logic on middleware
daniyelnnr Aug 18, 2025
08bef4e
Refactor metric client module
daniyelnnr Aug 26, 2025
1477054
Refactor metrics instruments module
daniyelnnr Aug 26, 2025
6733ee2
Add error handling when init instruments
daniyelnnr Aug 26, 2025
b0c660b
Merge branch 'master' into update/metrics
daniyelnnr Aug 29, 2025
85c7a57
Release v7.1.0
daniyelnnr Sep 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@vtex/api",
"version": "7.0.1",
"version": "7.1.0",
"description": "VTEX I/O API client",
"main": "lib/index.js",
"typings": "lib/index.d.ts",
Expand Down Expand Up @@ -47,6 +47,10 @@
},
"license": "MIT",
"dependencies": {
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/host-metrics": "0.35.5",
"@opentelemetry/instrumentation": "0.57.2",
"@opentelemetry/instrumentation-koa": "0.47.1",
"@types/koa": "^2.11.0",
"@types/koa-compose": "^3.2.3",
"@vtex/diagnostics-nodejs": "0.1.0-io-beta.19",
Expand Down
11 changes: 7 additions & 4 deletions src/service/index.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import { initializeTelemetry } from './telemetry'
import cluster from 'cluster'

import { HTTP_SERVER_PORT } from '../constants'
import { getServiceJSON } from './loaders'
import { LogLevel, logOnceToDevConsole } from './logger'
import { startMaster } from './master'
import { startWorker } from './worker'

export const startApp = () => {
export const startApp = async () => {
await initializeTelemetry()
const serviceJSON = getServiceJSON()
try {
// if it is a master process then call setting up worker process
if(cluster.isMaster) {
const { startMaster } = await import('./master')
startMaster(serviceJSON)
} else {
// to setup server configurations and share port address for incoming requests
startWorker(serviceJSON).listen(HTTP_SERVER_PORT)
const { startWorker } = await import('./worker')
const app = await startWorker(serviceJSON)
app.listen(HTTP_SERVER_PORT)
}
} catch (err: any) {
logOnceToDevConsole(err.stack || err.message, LogLevel.Error)
Expand Down
46 changes: 46 additions & 0 deletions src/service/metrics/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { Types } from "@vtex/diagnostics-nodejs";
import { initializeTelemetry } from '../telemetry';

class MetricClientSingleton {
private static instance: MetricClientSingleton | undefined;
private client: Types.MetricClient | undefined;
private initPromise: Promise<Types.MetricClient> | undefined;

private constructor() {}

public static getInstance(): MetricClientSingleton {
if (!MetricClientSingleton.instance) {
MetricClientSingleton.instance = new MetricClientSingleton();
}
return MetricClientSingleton.instance;
}

public async getClient(): Promise<Types.MetricClient> {
if (this.client) {
return this.client;
}

if (this.initPromise) {
return this.initPromise;
}

this.initPromise = this.initializeClient();

return this.initPromise;
}

private async initializeClient(): Promise<Types.MetricClient> {
try {
const { metricsClient } = await initializeTelemetry();
this.client = metricsClient;
this.initPromise = undefined;
return metricsClient;
} catch (error) {
console.error('Failed to initialize metrics client:', error);
this.initPromise = undefined;
throw error;
}
}
}

export const getMetricClient = () => MetricClientSingleton.getInstance().getClient();
41 changes: 41 additions & 0 deletions src/service/metrics/instruments/hostMetrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { InstrumentationBase, InstrumentationConfig } from "@opentelemetry/instrumentation";
import { MeterProvider } from '@opentelemetry/api';
import { HostMetrics } from "@opentelemetry/host-metrics";

interface HostMetricsInstrumentationConfig extends InstrumentationConfig {
name?: string;
meterProvider?: MeterProvider;
}

export class HostMetricsInstrumentation extends InstrumentationBase<HostMetricsInstrumentationConfig> {
private hostMetrics?: HostMetrics;

constructor(config: HostMetricsInstrumentationConfig = {}) {
const instrumentation_name = config.name || 'host-metrics-instrumentation';
const instrumentation_version = '1.0.0';
super(instrumentation_name, instrumentation_version, config);
}

init(): void {}

enable(): void {
if (!this._config.meterProvider) {
throw new Error('MeterProvider is required for HostMetricsInstrumentation');
}

this.hostMetrics = new HostMetrics({
meterProvider: this._config.meterProvider,
name: this._config.name || 'host-metrics',
});

this.hostMetrics.start();
console.debug('HostMetricsInstrumentation enabled');
}

disable(): void {
if (this.hostMetrics) {
this.hostMetrics = undefined;
console.debug('HostMetricsInstrumentation disabled');
}
}
}
119 changes: 119 additions & 0 deletions src/service/metrics/metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import { Types } from '@vtex/diagnostics-nodejs'
import { getMetricClient } from './client'

export const enum RequestsMetricLabels {
STATUS_CODE = 'status_code',
REQUEST_HANDLER = 'handler',
}

export interface OtelRequestInstruments {
concurrentRequests: Types.Gauge
requestTimings: Types.Histogram
totalRequests: Types.Counter
responseSizes: Types.Histogram
abortedRequests: Types.Counter
}

const createOtelConcurrentRequestsInstrument = async (): Promise<Types.Gauge> => {
const metricsClient = await getMetricClient()
return metricsClient.createGauge('io_http_requests_current', {
description: 'The current number of requests in course.',
unit: '1'
})
}

const createOtelRequestsTimingsInstrument = async (): Promise<Types.Histogram> => {
const metricsClient = await getMetricClient()
return metricsClient.createHistogram('runtime_http_requests_duration_milliseconds', {
description: 'The incoming http requests total duration.',
unit: 'ms'
})
}

const createOtelTotalRequestsInstrument = async (): Promise<Types.Counter> => {
const metricsClient = await getMetricClient()
return metricsClient.createCounter('runtime_http_requests_total', {
description: 'The total number of HTTP requests.',
unit: '1'
})
}

const createOtelRequestsResponseSizesInstrument = async (): Promise<Types.Histogram> => {
const metricsClient = await getMetricClient()
return metricsClient.createHistogram('runtime_http_response_size_bytes', {
description: 'The outgoing response sizes (only applicable when the response isn\'t a stream).',
unit: 'bytes'
})
}

const createOtelTotalAbortedRequestsInstrument = async (): Promise<Types.Counter> => {
const metricsClient = await getMetricClient()
return metricsClient.createCounter('runtime_http_aborted_requests_total', {
description: 'The total number of HTTP requests aborted.',
unit: '1'
})
}

class OtelInstrumentsSingleton {
private static instance: OtelInstrumentsSingleton | undefined;
private instruments: OtelRequestInstruments | undefined;
private initializingPromise: Promise<OtelRequestInstruments> | undefined;

private constructor() {}

public static getInstance(): OtelInstrumentsSingleton {
if (!OtelInstrumentsSingleton.instance) {
OtelInstrumentsSingleton.instance = new OtelInstrumentsSingleton();
}
return OtelInstrumentsSingleton.instance;
}

public async getInstruments(): Promise<OtelRequestInstruments> {
if (this.instruments) {
return this.instruments;
}

if (this.initializingPromise) {
return this.initializingPromise;
}

this.initializingPromise = this.initializeInstruments();

try {
this.instruments = await this.initializingPromise;
return this.instruments;
} catch (error) {
console.error('Failed to initialize OTel instruments:', error);
this.initializingPromise = undefined;
throw error;
} finally {
this.initializingPromise = undefined;
}
}

private async initializeInstruments(): Promise<OtelRequestInstruments> {
const [
concurrentRequests,
requestTimings,
totalRequests,
responseSizes,
abortedRequests
] = await Promise.all([
createOtelConcurrentRequestsInstrument(),
createOtelRequestsTimingsInstrument(),
createOtelTotalRequestsInstrument(),
createOtelRequestsResponseSizesInstrument(),
createOtelTotalAbortedRequestsInstrument()
])

return {
concurrentRequests,
requestTimings,
totalRequests,
responseSizes,
abortedRequests
}
}
}

export const getOtelInstruments = () => OtelInstrumentsSingleton.getInstance().getInstruments();
89 changes: 89 additions & 0 deletions src/service/metrics/otelRequestMetricsMiddleware.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { finished as onStreamFinished } from 'stream'
import { hrToMillisFloat } from '../../utils'
import { getOtelInstruments, RequestsMetricLabels, OtelRequestInstruments } from './metrics'
import { ServiceContext } from '../worker/runtime/typings'

const INSTRUMENTS_INITIALIZATION_TIMEOUT = 500

export const addOtelRequestMetricsMiddleware = () => {
let instruments: OtelRequestInstruments | undefined

const tryGetInstruments = async (ctx: ServiceContext): Promise<OtelRequestInstruments | undefined> => {
try {
return await Promise.race([
getOtelInstruments(),
new Promise<never>((_, reject) =>
setTimeout(
() => reject(new Error('Timeout waiting for OpenTelemetry instruments initialization')),
INSTRUMENTS_INITIALIZATION_TIMEOUT
)
)
])
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
console.warn(`OpenTelemetry instruments not ready for request ${ctx.requestHandlerName}: ${errorMessage}`)
return undefined
}
}

return async function addOtelRequestMetrics(ctx: ServiceContext, next: () => Promise<void>) {
instruments = instruments ? instruments : await tryGetInstruments(ctx)
if (!instruments) {
await next()
return
}

const start = process.hrtime()
instruments.concurrentRequests.add(1)

ctx.req.once('aborted', () => {
if (instruments) {
instruments.abortedRequests.add(1, { [RequestsMetricLabels.REQUEST_HANDLER]: ctx.requestHandlerName })
}
})

let responseClosed = false
ctx.res.once('close', () => (responseClosed = true))

try {
await next()
} finally {
const responseLength = ctx.response.length
if (responseLength && instruments) {
instruments.responseSizes.record(
responseLength,
{ [RequestsMetricLabels.REQUEST_HANDLER]: ctx.requestHandlerName }
)
}

if (instruments) {
instruments.totalRequests.add(
1,
{
[RequestsMetricLabels.REQUEST_HANDLER]: ctx.requestHandlerName,
[RequestsMetricLabels.STATUS_CODE]: ctx.response.status,
}
)
}

const onResFinished = () => {
if (instruments) {
instruments.requestTimings.record(
hrToMillisFloat(process.hrtime(start)),
{
[RequestsMetricLabels.REQUEST_HANDLER]: ctx.requestHandlerName,
}
)

instruments.concurrentRequests.subtract(1)
}
}

if (responseClosed) {
onResFinished()
} else {
onStreamFinished(ctx.res, onResFinished)
}
}
}
}
7 changes: 7 additions & 0 deletions src/service/telemetry/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
} from '@vtex/diagnostics-nodejs';
import { APP } from '../../constants';
import { TelemetryClient } from '@vtex/diagnostics-nodejs/dist/telemetry';
import { KoaInstrumentation } from '@opentelemetry/instrumentation-koa';
import { HostMetricsInstrumentation } from '../metrics/instruments/hostMetrics';

const CLIENT_NAME = APP.NAME || 'node-vtex-api';
const APPLICATION_ID = APP.ID || 'vtex-io-app';
Expand Down Expand Up @@ -79,6 +81,11 @@ class TelemetryClientSingleton {

const instrumentations = [
...Instrumentation.CommonInstrumentations.minimal(),
new KoaInstrumentation(),
new HostMetricsInstrumentation({
name: 'host-metrics-instrumentation',
meterProvider: metricsClient.provider(),
}),
];

telemetryClient.registerInstrumentations(instrumentations);
Expand Down
4 changes: 4 additions & 0 deletions src/service/worker/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Instrumentation } from '@vtex/diagnostics-nodejs';
import { request } from 'http'
import Koa from 'koa'
import compress from 'koa-compress'
Expand All @@ -13,6 +14,7 @@ import { getService } from '../loaders'
import { logOnceToDevConsole } from '../logger/console'
import { LogLevel } from '../logger/loggerTypes'
import { addRequestMetricsMiddleware } from '../metrics/requestMetricsMiddleware'
import { addOtelRequestMetricsMiddleware } from '../metrics/otelRequestMetricsMiddleware'
import { TracerSingleton } from '../tracing/TracerSingleton'
import { addTracingMiddleware } from '../tracing/tracingMiddlewares'
import { addProcessListeners, logger } from './listeners'
Expand Down Expand Up @@ -220,9 +222,11 @@ export const startWorker = (serviceJSON: ServiceJSON) => {
app.proxy = true
app
.use(error)
.use(Instrumentation.Middlewares.ContextMiddlewares.Koa.ContextPropagationMiddleware())
.use(prometheusLoggerMiddleware())
.use(addTracingMiddleware(tracer))
.use(addRequestMetricsMiddleware())
.use(addOtelRequestMetricsMiddleware())
.use(addMetricsLoggerMiddleware())
.use(concurrentRateLimiter(serviceJSON?.rateLimitPerReplica?.concurrent))
.use(compress())
Expand Down
Loading
Loading