Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/config/AgentConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ export function finalizeConfig(config: AgentConfig): void {
const ignorePath = '^(?:' + config.traceIgnorePath!.split(',').map(
(s1) => s1.trim().split('**').map(
(s2) => s2.split('*').map(
(s3) => s3.split('?').map(escapeRegExp).join('[^/]') // replaces "**"
(s3) => s3.split('?').map(escapeRegExp).join('[^/]') // replaces "?"
).join('[^/]*') // replaces "*"
).join('(?:(?:[^/]+\.)*[^/]+)?') // replaces "?"
).join('(?:(?:[^/]+\.)*[^/]+)?') // replaces "**"
).join('|') + ')$'; // replaces ","

config.reIgnoreOperation = RegExp(`${ignoreSuffix}|${ignorePath}`);
Expand Down
96 changes: 41 additions & 55 deletions src/plugins/AxiosPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import SwPlugin from '../core/SwPlugin';
import { URL } from 'url';
import ContextManager from '../trace/context/ContextManager';
import { Component } from '../trace/Component';
import Span from '../trace/span/Span';
import Tag from '../Tag';
import { SpanLayer } from '../proto/language-agent/Tracing_pb';
import { createLogger } from '../logging';
Expand All @@ -37,80 +36,67 @@ class AxiosPlugin implements SwPlugin {
if (logger.isDebugEnabled()) {
logger.debug('installing axios plugin');
}
const axios = installer.require('axios').default;
this.interceptClientRequest(axios);

this.interceptClientRequest(installer);
}

private interceptClientRequest(axios: any) {
const copyStatusAndStop = (span: Span, response: any) => {
if (response) {
if (response.status) {
span.tag(Tag.httpStatusCode(response.status));
}
if (response.statusText) {
span.tag(Tag.httpStatusMsg(response.statusText));
}
}
private interceptClientRequest(installer: PluginInstaller): void {
const defaults = installer.require('axios/lib/defaults');
const defaultAdapter = defaults.adapter; // this will be http adapter

span.stop();
};
defaults.adapter = (config: any) => {
const { host, pathname: operation } = new URL(config.url); // TODO: this may throw invalid URL
const span = ContextManager.current.newExitSpan(operation, host).start();

axios.interceptors.request.use(
(config: any) => {
// config.span.resync(); // TODO: fix this https://github.com/apache/skywalking-nodejs/pull/20#issuecomment-753323425
try {
span.component = Component.AXIOS;
span.layer = SpanLayer.HTTP;
span.peer = host;
span.tag(Tag.httpURL(host + operation));

(config.span as Span).inject().items.forEach((item) => {
config.headers.common[item.key] = item.value;
span.inject().items.forEach((item) => {
config.headers[item.key] = item.value;
});

return config;
},

(error: any) => {
error.config.span.error(error);
error.config.span.stop();

return Promise.reject(error);
},
);
const copyStatusAndStop = (response: any) => {
if (response) {
if (response.status) {
span.tag(Tag.httpStatusCode(response.status));
if (response.status >= 400) {
span.errored = true;
}
}

axios.interceptors.response.use(
(response: any) => {
copyStatusAndStop(response.config.span, response);
if (response.statusText) {
span.tag(Tag.httpStatusMsg(response.statusText));
}
}

return response;
},
span.stop();
};

(error: any) => {
error.config.span.error(error);
return defaultAdapter(config).then(
(response: any) => {
copyStatusAndStop(response);

copyStatusAndStop(error.config.span, error.response);
return response;
},

return Promise.reject(error);
},
);
(error: any) => {
span.error(error);
copyStatusAndStop(error.response);

const _request = axios.Axios.prototype.request;
return Promise.reject(error);
}
);

axios.Axios.prototype.request = function(config: any) {
const { host, pathname: operation } = new URL(config.url); // TODO: this may throw invalid URL
const span = ContextManager.current.newExitSpan(operation, host).start();

try {
span.component = Component.AXIOS;
span.layer = SpanLayer.HTTP;
span.peer = host;
span.tag(Tag.httpURL(host + operation));
// span.async(); TODO: fix this https://github.com/apache/skywalking-nodejs/pull/20#issuecomment-753323425

return _request.call(this, { ...config, span });
} catch (e) {
span.error(e);
span.stop();

throw e;
}
};
}
}
}

Expand Down
106 changes: 70 additions & 36 deletions src/plugins/HttpPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import ExitSpan from '../trace/span/ExitSpan';
import { SpanLayer } from '../proto/language-agent/Tracing_pb';
import { ContextCarrier } from '../trace/context/ContextCarrier';

const NativePromise = (async () => null)().constructor; // may be different from globally overridden Promise

class HttpPlugin implements SwPlugin {
readonly module = 'http';
readonly versions = '*';
Expand Down Expand Up @@ -75,30 +77,33 @@ class HttpPlugin implements SwPlugin {
span.tag(Tag.httpURL(httpURL));
}

const request: ClientRequest = _request.apply(this, arguments);
const req: ClientRequest = _request.apply(this, arguments);

span.inject().items.forEach((item) => {
request.setHeader(item.key, item.value);
req.setHeader(item.key, item.value);
});

request.on('close', stopIfNotStopped);
request.on('abort', () => (span.errored = true, stopIfNotStopped()));
request.on('error', (err) => (span.error(err), stopIfNotStopped()));
req.on('close', stopIfNotStopped);
req.on('abort', () => (span.errored = true, stopIfNotStopped()));
req.on('error', (err) => (span.error(err), stopIfNotStopped()));

request.prependListener('response', (res) => {
req.prependListener('response', (res) => {
span.resync();
span.tag(Tag.httpStatusCode(res.statusCode));

if (res.statusCode && res.statusCode >= 400) {
span.errored = true;
}
if (res.statusMessage) {
span.tag(Tag.httpStatusMsg(res.statusMessage));
}

res.on('end', stopIfNotStopped);
});

span.async();

return request;
return req;

} catch (e) {
if (!stopped) { // don't want to set error if exception occurs after clean close
Expand All @@ -112,45 +117,74 @@ class HttpPlugin implements SwPlugin {
}

private interceptServerRequest(module: any) {
const _emit = module.Server.prototype.emit;
/// TODO? full event protocol support not currently implemented (prependListener(), removeListener(), etc...)
const _addListener = module.Server.prototype.addListener;

module.Server.prototype.emit = function () {
if (arguments[0] !== 'request') {
return _emit.apply(this, arguments);
}
module.Server.prototype.addListener = module.Server.prototype.on = function (event: any, handler: any, ...addArgs: any[]) {
return _addListener.call(this, event, event === 'request' ? _sw_request : handler, ...addArgs);

const [req, res] = [arguments[1] as IncomingMessage, arguments[2] as ServerResponse];
function _sw_request(this: any, req: IncomingMessage, res: ServerResponse, ...reqArgs: any[]) {
const headers = req.rawHeaders || [];
const headersMap: { [key: string]: string } = {};

const headers = req.rawHeaders || [];
const headersMap: { [key: string]: string } = {};
for (let i = 0; i < headers.length / 2; i += 2) {
headersMap[headers[i]] = headers[i + 1];
}

for (let i = 0; i < headers.length / 2; i += 2) {
headersMap[headers[i]] = headers[i + 1];
}
const carrier = ContextCarrier.from(headersMap);
const operation = (req.url || '/').replace(/\?.*/g, '');
const span = ContextManager.current.newEntrySpan(operation, carrier).start();

const copyStatusAndStop = () => {
span.tag(Tag.httpStatusCode(res.statusCode));
if (res.statusCode && res.statusCode >= 400) {
span.errored = true;
}
if (res.statusMessage) {
span.tag(Tag.httpStatusMsg(res.statusMessage));
}

const carrier = ContextCarrier.from(headersMap);
const operation = (req.url || '/').replace(/\?.*/g, '');
const span = ContextManager.current.newEntrySpan(operation, carrier);
span.stop();
};

return ContextManager.withSpan(span, (self, args) => {
span.component = Component.HTTP_SERVER;
span.layer = SpanLayer.HTTP;
span.peer = req.headers.host || '';
span.tag(Tag.httpURL(span.peer + req.url));
try {
span.component = Component.HTTP_SERVER;
span.layer = SpanLayer.HTTP;
span.peer = req.connection.remoteFamily === 'IPv6'
? `[${req.connection.remoteAddress}]:${req.connection.remotePort}`
: `${req.connection.remoteAddress}:${req.connection.remotePort}`;
span.tag(Tag.httpURL((req.headers.host || '') + req.url));

const ret = _emit.apply(self, args);
let ret = handler.call(this, req, res, ...reqArgs);
const type = ret?.constructor;

span.tag(Tag.httpStatusCode(res.statusCode));
if (res.statusCode && res.statusCode >= 400) {
span.errored = true;
}
if (res.statusMessage) {
span.tag(Tag.httpStatusMsg(res.statusMessage));
}
if (type !== Promise && type !== NativePromise) {
copyStatusAndStop();

return ret;
} else {
ret = ret.then((r: any) => {
copyStatusAndStop();

}, this, arguments);
return r;
},

(error: any) => {
span.error(error);
span.stop();

return Promise.reject(error);
})
}

return ret;

} catch (e) {
span.error(e);
span.stop();

throw e;
}
}
};
}
}
Expand Down
4 changes: 2 additions & 2 deletions tests/plugins/axios/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ agent.start({
maxBufferSize: 1000,
});

const server = http.createServer((req, res) => {
axios
const server = http.createServer(async (req, res) => {
await axios
.get(`http://${process.env.SERVER || 'localhost:5000'}${req.url}`)
.then((r) => res.end(JSON.stringify(r.data)))
.catch(err => res.end(JSON.stringify(err)));
Expand Down
Loading