diff --git a/package.json b/package.json index 262c530d6..67e5e93ac 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@vtex/api", - "version": "7.0.1", + "version": "7.1.0", "description": "VTEX I/O API client", "main": "lib/index.js", "typings": "lib/index.d.ts", @@ -47,6 +47,10 @@ }, "license": "MIT", "dependencies": { + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/host-metrics": "0.35.5", + "@opentelemetry/instrumentation": "0.57.2", + "@opentelemetry/instrumentation-koa": "0.47.1", "@types/koa": "^2.11.0", "@types/koa-compose": "^3.2.3", "@vtex/diagnostics-nodejs": "0.1.0-io-beta.19", diff --git a/src/service/index.ts b/src/service/index.ts index 190150df7..fd5a8373d 100644 --- a/src/service/index.ts +++ b/src/service/index.ts @@ -1,20 +1,23 @@ +import { initializeTelemetry } from './telemetry' import cluster from 'cluster' import { HTTP_SERVER_PORT } from '../constants' import { getServiceJSON } from './loaders' import { LogLevel, logOnceToDevConsole } from './logger' -import { startMaster } from './master' -import { startWorker } from './worker' -export const startApp = () => { +export const startApp = async () => { + await initializeTelemetry() const serviceJSON = getServiceJSON() try { // if it is a master process then call setting up worker process if(cluster.isMaster) { + const { startMaster } = await import('./master') startMaster(serviceJSON) } else { // to setup server configurations and share port address for incoming requests - startWorker(serviceJSON).listen(HTTP_SERVER_PORT) + const { startWorker } = await import('./worker') + const app = await startWorker(serviceJSON) + app.listen(HTTP_SERVER_PORT) } } catch (err: any) { logOnceToDevConsole(err.stack || err.message, LogLevel.Error) diff --git a/src/service/metrics/client.ts b/src/service/metrics/client.ts new file mode 100644 index 000000000..eaabd27dc --- /dev/null +++ b/src/service/metrics/client.ts @@ -0,0 +1,46 @@ +import { Types } from "@vtex/diagnostics-nodejs"; +import { initializeTelemetry } from '../telemetry'; + +class MetricClientSingleton { + private static instance: MetricClientSingleton | undefined; + private client: Types.MetricClient | undefined; + private initPromise: Promise | undefined; + + private constructor() {} + + public static getInstance(): MetricClientSingleton { + if (!MetricClientSingleton.instance) { + MetricClientSingleton.instance = new MetricClientSingleton(); + } + return MetricClientSingleton.instance; + } + + public async getClient(): Promise { + if (this.client) { + return this.client; + } + + if (this.initPromise) { + return this.initPromise; + } + + this.initPromise = this.initializeClient(); + + return this.initPromise; + } + + private async initializeClient(): Promise { + try { + const { metricsClient } = await initializeTelemetry(); + this.client = metricsClient; + this.initPromise = undefined; + return metricsClient; + } catch (error) { + console.error('Failed to initialize metrics client:', error); + this.initPromise = undefined; + throw error; + } + } +} + +export const getMetricClient = () => MetricClientSingleton.getInstance().getClient(); diff --git a/src/service/metrics/instruments/hostMetrics.ts b/src/service/metrics/instruments/hostMetrics.ts new file mode 100644 index 000000000..0851c1096 --- /dev/null +++ b/src/service/metrics/instruments/hostMetrics.ts @@ -0,0 +1,41 @@ +import { InstrumentationBase, InstrumentationConfig } from "@opentelemetry/instrumentation"; +import { MeterProvider } from '@opentelemetry/api'; +import { HostMetrics } from "@opentelemetry/host-metrics"; + +interface HostMetricsInstrumentationConfig extends InstrumentationConfig { + name?: string; + meterProvider?: MeterProvider; +} + +export class HostMetricsInstrumentation extends InstrumentationBase { + private hostMetrics?: HostMetrics; + + constructor(config: HostMetricsInstrumentationConfig = {}) { + const instrumentation_name = config.name || 'host-metrics-instrumentation'; + const instrumentation_version = '1.0.0'; + super(instrumentation_name, instrumentation_version, config); + } + + init(): void {} + + enable(): void { + if (!this._config.meterProvider) { + throw new Error('MeterProvider is required for HostMetricsInstrumentation'); + } + + this.hostMetrics = new HostMetrics({ + meterProvider: this._config.meterProvider, + name: this._config.name || 'host-metrics', + }); + + this.hostMetrics.start(); + console.debug('HostMetricsInstrumentation enabled'); + } + + disable(): void { + if (this.hostMetrics) { + this.hostMetrics = undefined; + console.debug('HostMetricsInstrumentation disabled'); + } + } +} diff --git a/src/service/metrics/metrics.ts b/src/service/metrics/metrics.ts new file mode 100644 index 000000000..b5b5fbc7d --- /dev/null +++ b/src/service/metrics/metrics.ts @@ -0,0 +1,119 @@ +import { Types } from '@vtex/diagnostics-nodejs' +import { getMetricClient } from './client' + +export const enum RequestsMetricLabels { + STATUS_CODE = 'status_code', + REQUEST_HANDLER = 'handler', +} + +export interface OtelRequestInstruments { + concurrentRequests: Types.Gauge + requestTimings: Types.Histogram + totalRequests: Types.Counter + responseSizes: Types.Histogram + abortedRequests: Types.Counter +} + +const createOtelConcurrentRequestsInstrument = async (): Promise => { + const metricsClient = await getMetricClient() + return metricsClient.createGauge('io_http_requests_current', { + description: 'The current number of requests in course.', + unit: '1' + }) +} + +const createOtelRequestsTimingsInstrument = async (): Promise => { + const metricsClient = await getMetricClient() + return metricsClient.createHistogram('runtime_http_requests_duration_milliseconds', { + description: 'The incoming http requests total duration.', + unit: 'ms' + }) +} + +const createOtelTotalRequestsInstrument = async (): Promise => { + const metricsClient = await getMetricClient() + return metricsClient.createCounter('runtime_http_requests_total', { + description: 'The total number of HTTP requests.', + unit: '1' + }) +} + +const createOtelRequestsResponseSizesInstrument = async (): Promise => { + const metricsClient = await getMetricClient() + return metricsClient.createHistogram('runtime_http_response_size_bytes', { + description: 'The outgoing response sizes (only applicable when the response isn\'t a stream).', + unit: 'bytes' + }) +} + +const createOtelTotalAbortedRequestsInstrument = async (): Promise => { + const metricsClient = await getMetricClient() + return metricsClient.createCounter('runtime_http_aborted_requests_total', { + description: 'The total number of HTTP requests aborted.', + unit: '1' + }) +} + +class OtelInstrumentsSingleton { + private static instance: OtelInstrumentsSingleton | undefined; + private instruments: OtelRequestInstruments | undefined; + private initializingPromise: Promise | undefined; + + private constructor() {} + + public static getInstance(): OtelInstrumentsSingleton { + if (!OtelInstrumentsSingleton.instance) { + OtelInstrumentsSingleton.instance = new OtelInstrumentsSingleton(); + } + return OtelInstrumentsSingleton.instance; + } + + public async getInstruments(): Promise { + if (this.instruments) { + return this.instruments; + } + + if (this.initializingPromise) { + return this.initializingPromise; + } + + this.initializingPromise = this.initializeInstruments(); + + try { + this.instruments = await this.initializingPromise; + return this.instruments; + } catch (error) { + console.error('Failed to initialize OTel instruments:', error); + this.initializingPromise = undefined; + throw error; + } finally { + this.initializingPromise = undefined; + } + } + + private async initializeInstruments(): Promise { + const [ + concurrentRequests, + requestTimings, + totalRequests, + responseSizes, + abortedRequests + ] = await Promise.all([ + createOtelConcurrentRequestsInstrument(), + createOtelRequestsTimingsInstrument(), + createOtelTotalRequestsInstrument(), + createOtelRequestsResponseSizesInstrument(), + createOtelTotalAbortedRequestsInstrument() + ]) + + return { + concurrentRequests, + requestTimings, + totalRequests, + responseSizes, + abortedRequests + } + } +} + +export const getOtelInstruments = () => OtelInstrumentsSingleton.getInstance().getInstruments(); diff --git a/src/service/metrics/otelRequestMetricsMiddleware.ts b/src/service/metrics/otelRequestMetricsMiddleware.ts new file mode 100644 index 000000000..6bfee65bd --- /dev/null +++ b/src/service/metrics/otelRequestMetricsMiddleware.ts @@ -0,0 +1,89 @@ +import { finished as onStreamFinished } from 'stream' +import { hrToMillisFloat } from '../../utils' +import { getOtelInstruments, RequestsMetricLabels, OtelRequestInstruments } from './metrics' +import { ServiceContext } from '../worker/runtime/typings' + +const INSTRUMENTS_INITIALIZATION_TIMEOUT = 500 + +export const addOtelRequestMetricsMiddleware = () => { + let instruments: OtelRequestInstruments | undefined + + const tryGetInstruments = async (ctx: ServiceContext): Promise => { + try { + return await Promise.race([ + getOtelInstruments(), + new Promise((_, reject) => + setTimeout( + () => reject(new Error('Timeout waiting for OpenTelemetry instruments initialization')), + INSTRUMENTS_INITIALIZATION_TIMEOUT + ) + ) + ]) + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error) + console.warn(`OpenTelemetry instruments not ready for request ${ctx.requestHandlerName}: ${errorMessage}`) + return undefined + } + } + + return async function addOtelRequestMetrics(ctx: ServiceContext, next: () => Promise) { + instruments = instruments ? instruments : await tryGetInstruments(ctx) + if (!instruments) { + await next() + return + } + + const start = process.hrtime() + instruments.concurrentRequests.add(1) + + ctx.req.once('aborted', () => { + if (instruments) { + instruments.abortedRequests.add(1, { [RequestsMetricLabels.REQUEST_HANDLER]: ctx.requestHandlerName }) + } + }) + + let responseClosed = false + ctx.res.once('close', () => (responseClosed = true)) + + try { + await next() + } finally { + const responseLength = ctx.response.length + if (responseLength && instruments) { + instruments.responseSizes.record( + responseLength, + { [RequestsMetricLabels.REQUEST_HANDLER]: ctx.requestHandlerName } + ) + } + + if (instruments) { + instruments.totalRequests.add( + 1, + { + [RequestsMetricLabels.REQUEST_HANDLER]: ctx.requestHandlerName, + [RequestsMetricLabels.STATUS_CODE]: ctx.response.status, + } + ) + } + + const onResFinished = () => { + if (instruments) { + instruments.requestTimings.record( + hrToMillisFloat(process.hrtime(start)), + { + [RequestsMetricLabels.REQUEST_HANDLER]: ctx.requestHandlerName, + } + ) + + instruments.concurrentRequests.subtract(1) + } + } + + if (responseClosed) { + onResFinished() + } else { + onStreamFinished(ctx.res, onResFinished) + } + } + } +} diff --git a/src/service/telemetry/client.ts b/src/service/telemetry/client.ts index 96a16fa48..db42e276d 100644 --- a/src/service/telemetry/client.ts +++ b/src/service/telemetry/client.ts @@ -8,6 +8,8 @@ import { } from '@vtex/diagnostics-nodejs'; import { APP } from '../../constants'; import { TelemetryClient } from '@vtex/diagnostics-nodejs/dist/telemetry'; +import { KoaInstrumentation } from '@opentelemetry/instrumentation-koa'; +import { HostMetricsInstrumentation } from '../metrics/instruments/hostMetrics'; const CLIENT_NAME = APP.NAME || 'node-vtex-api'; const APPLICATION_ID = APP.ID || 'vtex-io-app'; @@ -79,6 +81,11 @@ class TelemetryClientSingleton { const instrumentations = [ ...Instrumentation.CommonInstrumentations.minimal(), + new KoaInstrumentation(), + new HostMetricsInstrumentation({ + name: 'host-metrics-instrumentation', + meterProvider: metricsClient.provider(), + }), ]; telemetryClient.registerInstrumentations(instrumentations); diff --git a/src/service/worker/index.ts b/src/service/worker/index.ts index 420e10ff1..0e64fcf29 100644 --- a/src/service/worker/index.ts +++ b/src/service/worker/index.ts @@ -1,3 +1,4 @@ +import { Instrumentation } from '@vtex/diagnostics-nodejs'; import { request } from 'http' import Koa from 'koa' import compress from 'koa-compress' @@ -13,6 +14,7 @@ import { getService } from '../loaders' import { logOnceToDevConsole } from '../logger/console' import { LogLevel } from '../logger/loggerTypes' import { addRequestMetricsMiddleware } from '../metrics/requestMetricsMiddleware' +import { addOtelRequestMetricsMiddleware } from '../metrics/otelRequestMetricsMiddleware' import { TracerSingleton } from '../tracing/TracerSingleton' import { addTracingMiddleware } from '../tracing/tracingMiddlewares' import { addProcessListeners, logger } from './listeners' @@ -220,9 +222,11 @@ export const startWorker = (serviceJSON: ServiceJSON) => { app.proxy = true app .use(error) + .use(Instrumentation.Middlewares.ContextMiddlewares.Koa.ContextPropagationMiddleware()) .use(prometheusLoggerMiddleware()) .use(addTracingMiddleware(tracer)) .use(addRequestMetricsMiddleware()) + .use(addOtelRequestMetricsMiddleware()) .use(addMetricsLoggerMiddleware()) .use(concurrentRateLimiter(serviceJSON?.rateLimitPerReplica?.concurrent)) .use(compress()) diff --git a/yarn.lock b/yarn.lock index cf8658985..0d548c37f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -616,6 +616,13 @@ "@opentelemetry/sdk-trace-base" "1.30.1" "@opentelemetry/semantic-conventions" "1.28.0" +"@opentelemetry/host-metrics@0.35.5": + version "0.35.5" + resolved "https://registry.yarnpkg.com/@opentelemetry/host-metrics/-/host-metrics-0.35.5.tgz#1bb7453558b2623c8331d0fea5b7766c995a68f1" + integrity sha512-Zf9Cjl7H6JalspnK5KD1+LLKSVecSinouVctNmUxRy+WP+20KwHq+qg4hADllkEmJ99MZByLLmEmzrr7s92V6g== + dependencies: + systeminformation "5.23.8" + "@opentelemetry/instrumentation-express@^0.47.1": version "0.47.1" resolved "https://registry.yarnpkg.com/@opentelemetry/instrumentation-express/-/instrumentation-express-0.47.1.tgz#7cf74f35e43cc3c8186edd1249fdb225849c48b2" @@ -636,6 +643,15 @@ forwarded-parse "2.1.2" semver "^7.5.2" +"@opentelemetry/instrumentation-koa@0.47.1": + version "0.47.1" + resolved "https://registry.yarnpkg.com/@opentelemetry/instrumentation-koa/-/instrumentation-koa-0.47.1.tgz#ba57eccd44a75ec59e3129757fda4e8c8dd7ce2c" + integrity sha512-l/c+Z9F86cOiPJUllUCt09v+kICKvT+Vg1vOAJHtHPsJIzurGayucfCMq2acd/A/yxeNWunl9d9eqZ0G+XiI6A== + dependencies: + "@opentelemetry/core" "^1.8.0" + "@opentelemetry/instrumentation" "^0.57.1" + "@opentelemetry/semantic-conventions" "^1.27.0" + "@opentelemetry/instrumentation-net@^0.43.1": version "0.43.1" resolved "https://registry.yarnpkg.com/@opentelemetry/instrumentation-net/-/instrumentation-net-0.43.1.tgz#10a3030fe090ed76204ac025179501f902dcf282" @@ -5252,6 +5268,11 @@ symbol-tree@^3.2.2: resolved "https://registry.yarnpkg.com/symbol-tree/-/symbol-tree-3.2.4.tgz#430637d248ba77e078883951fb9aa0eed7c63fa2" integrity sha512-9QNk5KwDF+Bvz+PyObkmSYjI5ksVUYtjW7AU22r2NKcfLJcXp96hkDWU3+XndOsUb+AQ9QhfzfCT2O+CNWT5Tw== +systeminformation@5.23.8: + version "5.23.8" + resolved "https://registry.yarnpkg.com/systeminformation/-/systeminformation-5.23.8.tgz#b8efa73b36221cbcb432e3fe83dc1878a43f986a" + integrity sha512-Osd24mNKe6jr/YoXLLK3k8TMdzaxDffhpCxgkfgBHcapykIkd50HXThM3TCEuHO2pPuCsSx2ms/SunqhU5MmsQ== + tar-fs@^2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/tar-fs/-/tar-fs-2.0.0.tgz#677700fc0c8b337a78bee3623fdc235f21d7afad"