Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ Environment Variable | Description | Default
| `SW_AGENT_LOGGING_LEVEL` | The logging level, could be one of `CRITICAL`, `FATAL`, `ERROR`, `WARN`(`WARNING`), `INFO`, `DEBUG` | `INFO` |
| `SW_IGNORE_SUFFIX` | The suffices of endpoints that will be ignored (not traced), comma separated | `.jpg,.jpeg,.js,.css,.png,.bmp,.gif,.ico,.mp3,.mp4,.html,.svg` |
| `SW_TRACE_IGNORE_PATH` | The paths of endpoints that will be ignored (not traced), comma separated | `` |
| `SW_SQL_TRACE_PARAMETERS` | If set to 'true' then SQL query parameters will be included | `false` |
| `SW_SQL_PARAMETERS_MAX_LENGTH` | The maximum string length of SQL parameters to log | `512` |
| `SW_MONGO_TRACE_PARAMETERS` | If set to 'true' then mongodb query parameters will be included | `false` |
| `SW_MONGO_PARAMETERS_MAX_LENGTH` | The maximum string length of mongodb parameters to log | `512` |
| `SW_AGENT_MAX_BUFFER_SIZE` | The maximum buffer size before sending the segment data to backend | `'1000'` |

## Supported Libraries
Expand All @@ -71,6 +74,8 @@ Library | Plugin Name
| [`axios`](https://github.com/axios/axios) | `axios` |
| [`mysql`](https://github.com/mysqljs/mysql) | `mysql` |
| [`pg`](https://github.com/brianc/node-postgres) | `pg` |
| [`pg-cursor`](https://github.com/brianc/node-postgres) | `pg-cursor` |
| [`mongodb`](https://github.com/mongodb/node-mongodb-native) | `mongodb` |

### Compatible Libraries

Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@
"@types/uuid": "^8.0.0",
"axios": "^0.21.0",
"express": "^4.17.1",
"grpc_tools_node_protoc_ts": "^4.0.0",
"grpc-tools": "^1.10.0",
"grpc_tools_node_protoc_ts": "^4.0.0",
"jest": "^26.6.3",
"mongodb": "^3.6.4",
"mysql": "^2.18.1",
"pg": "^8.5.1",
"prettier": "^2.0.5",
Expand Down
8 changes: 8 additions & 0 deletions src/Tag.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export default {
dbInstanceKey: 'db.instance',
dbStatementKey: 'db.statement',
dbSqlParametersKey: 'db.sql.parameters',
dbMongoParametersKey: 'db.mongo.parameters',

httpStatusCode(val: string | number | undefined): Tag {
return {
Expand Down Expand Up @@ -89,4 +90,11 @@ export default {
val: `${val}`,
} as Tag;
},
dbMongoParameters(val: string | undefined): Tag {
return {
key: this.dbMongoParametersKey,
overridable: false,
val: `${val}`,
} as Tag;
},
};
6 changes: 6 additions & 0 deletions src/config/AgentConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ export type AgentConfig = {
maxBufferSize?: number;
ignoreSuffix?: string;
traceIgnorePath?: string;
sql_trace_parameters?: boolean;
sql_parameters_max_length?: number;
mongo_trace_parameters?: boolean;
mongo_parameters_max_length?: number;
// the following is internal state computed from config values
reIgnoreOperation?: RegExp;
};
Expand Down Expand Up @@ -60,6 +63,9 @@ export default {
Number.parseInt(process.env.SW_AGENT_MAX_BUFFER_SIZE as string, 10) : 1000,
ignoreSuffix: process.env.SW_IGNORE_SUFFIX ?? '.jpg,.jpeg,.js,.css,.png,.bmp,.gif,.ico,.mp3,.mp4,.html,.svg',
traceIgnorePath: process.env.SW_TRACE_IGNORE_PATH || '',
sql_trace_parameters: (process.env.SW_SQL_TRACE_PARAMETERS || '').toLowerCase() === 'true',
sql_parameters_max_length: Math.trunc(Math.max(0, Number(process.env.SW_SQL_PARAMETERS_MAX_LENGTH))) || 512,
mongo_trace_parameters: (process.env.SW_MONGO_TRACE_PARAMETERS || '').toLowerCase() === 'true',
mongo_parameters_max_length: Math.trunc(Math.max(0, Number(process.env.SW_MONGO_PARAMETERS_MAX_LENGTH))) || 512,
reIgnoreOperation: RegExp(''), // temporary placeholder so Typescript doesn't throw a fit
};
304 changes: 304 additions & 0 deletions src/plugins/MongoDBPlugin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
/*!
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import SwPlugin from '../core/SwPlugin';
import ContextManager from '../trace/context/ContextManager';
import { Component } from '../trace/Component';
import ExitSpan from '../trace/span/ExitSpan';
import Tag from '../Tag';
import { SpanLayer } from '../proto/language-agent/Tracing_pb';
import PluginInstaller from '../core/PluginInstaller';
import agentConfig from '../config/AgentConfig';

class MongoDBPlugin implements SwPlugin {
readonly module = 'mongodb';
readonly versions = '*';

Cursor: any;

// Experimental method to determine proper end time of cursor DB operation, we stop the span when the cursor is closed.
// Problematic because other exit spans may be created during processing, for this reason we do not .resync() this
// span to the span list until it is closed. If the cursor is never closed then the span will not be sent.

maybeHookCursor(span: any, cursor: any): boolean {
if (!(cursor instanceof this.Cursor))
return false;

cursor.on('error', (err: any) => {
span.resync(); // this may precede 'close' .resync() but its fine
span.error(err);
span.stop();
});

cursor.on('close', () => {
span.resync(); // cursor does not .resync() until it is closed because maybe other exit spans will be opened during processing
span.stop();
});

return true;
}

install(installer: PluginInstaller): void {
const plugin = this;
const Collection = installer.require('mongodb/lib/collection');
this.Cursor = installer.require('mongodb/lib/cursor');

const wrapCallback = (span: any, args: any[], idx: number): boolean => {
const callback = args.length > idx && typeof args[idx = args.length - 1] === 'function' ? args[idx] : null;

if (!callback)
return false;

args[idx] = function(this: any, error: any, result: any) {
if (error || !plugin.maybeHookCursor(span, result)) {
span.resync();

if (error)
span.error(error);

span.stop();
}

return callback.call(this, error, result);
}

return true;
};

const stringify = (params: any) => {
if (params === undefined)
return '';

let str = JSON.stringify(params);

if (str.length > agentConfig.mongo_parameters_max_length)
str = str.slice(0, agentConfig.mongo_parameters_max_length) + ' ...';

return str;
}

const insertFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [doc(s), options, callback]
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}()`));

if (agentConfig.mongo_trace_parameters)
span.tag(Tag.dbMongoParameters(stringify(args[0])));

return wrapCallback(span, args, 1);
};

const deleteFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [filter, options, callback]
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}(${stringify(args[0])})`));

return wrapCallback(span, args, 1);
};

const updateFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [filter, update, options, callback]
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}(${stringify(args[0])})`));

if (agentConfig.mongo_trace_parameters)
span.tag(Tag.dbMongoParameters(stringify(args[1])));

return wrapCallback(span, args, 2);
};

const findOneFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [query, options, callback]
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}(${typeof args[0] !== 'function' ? stringify(args[0]) : ''})`));

return wrapCallback(span, args, 0);
};

const findAndRemoveFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [query, sort, options, callback]
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}(${stringify(args[0])}${typeof args[1] !== 'function' && args[1] !== undefined ? ', ' + stringify(args[1]) : ''})`));

return wrapCallback(span, args, 1);
};

const findAndModifyFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [query, sort, doc, options, callback]
let params = stringify(args[0]);

if (typeof args[1] !== 'function' && args[1] !== undefined) {
params += ', ' + stringify(args[1]);

if (typeof args[2] !== 'function' && args[2] !== undefined) {
if (agentConfig.mongo_trace_parameters)
span.tag(Tag.dbMongoParameters(stringify(args[2])));
}
}

span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}(${params})`));

return wrapCallback(span, args, 1);
};

const mapReduceFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [map, reduce, options, callback]
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}(${args[0]}, ${args[1]})`));

return wrapCallback(span, args, 2);
};

const dropFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [options, callback]
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}()`));

return wrapCallback(span, args, 0);
};

this.interceptOperation(Collection, 'insert', insertFunc);
this.interceptOperation(Collection, 'insertOne', insertFunc);
this.interceptOperation(Collection, 'insertMany', insertFunc);
this.interceptOperation(Collection, 'save', insertFunc);
this.interceptOperation(Collection, 'deleteOne', deleteFunc);
this.interceptOperation(Collection, 'deleteMany', deleteFunc);
this.interceptOperation(Collection, 'remove', deleteFunc);
this.interceptOperation(Collection, 'removeOne', deleteFunc);
this.interceptOperation(Collection, 'removeMany', deleteFunc);
this.interceptOperation(Collection, 'update', updateFunc);
this.interceptOperation(Collection, 'updateOne', updateFunc);
this.interceptOperation(Collection, 'updateMany', updateFunc);
this.interceptOperation(Collection, 'replaceOne', updateFunc);
this.interceptOperation(Collection, 'find', findOneFunc); // cursor
this.interceptOperation(Collection, 'findOne', findOneFunc);
this.interceptOperation(Collection, 'findOneAndDelete', deleteFunc);
this.interceptOperation(Collection, 'findOneAndReplace', updateFunc);
this.interceptOperation(Collection, 'findOneAndUpdate', updateFunc);
this.interceptOperation(Collection, 'findAndRemove', findAndRemoveFunc);
this.interceptOperation(Collection, 'findAndModify', findAndModifyFunc);

this.interceptOperation(Collection, 'bulkWrite', insertFunc);
this.interceptOperation(Collection, 'mapReduce', mapReduceFunc);
this.interceptOperation(Collection, 'aggregate', deleteFunc); // cursor
this.interceptOperation(Collection, 'distinct', findAndRemoveFunc);
this.interceptOperation(Collection, 'count', findOneFunc);
this.interceptOperation(Collection, 'estimatedDocumentCount', dropFunc);
this.interceptOperation(Collection, 'countDocuments', findOneFunc);

this.interceptOperation(Collection, 'rename', deleteFunc);
this.interceptOperation(Collection, 'drop', dropFunc);


// TODO?

// createIndex
// createIndexes
// dropIndex
// dropIndexes
// dropAllIndexes
// ensureIndex
// indexExists
// indexInformation
// indexes
// listIndexes
// reIndex

// stats
// options
// isCapped
// initializeUnorderedBulkOp
// initializeOrderedBulkOp
// watch


// NODO:

// group
// parallelCollectionScan
// geoHaystackSearch
}

interceptOperation(Collection: any, operation: string, operationFunc: any): void {
const plugin = this;
const _original = Collection.prototype[operation];

if (!_original)
return;

Collection.prototype[operation] = function(...args: any[]) {
const spans = ContextManager.spans;
let span = spans[spans.length - 1];

if (span && span.component === Component.MONGODB && span instanceof ExitSpan) // mongodb has called into itself internally
return _original.apply(this, args);

let ret: any;
let host: string;

try {
host = this.s.db.serverConfig.s.options.servers.map((s: any) => `${s.host}:${s.port}`).join(','); // will this work for non-NativeTopology?
} catch {
host = '???';
}

span = ContextManager.current.newExitSpan('/' + this.s.namespace.db, host).start(); // or this.s.db.databaseName

try {
span.component = Component.MONGODB;
span.layer = SpanLayer.DATABASE;
span.peer = host;

span.tag(Tag.dbType('MongoDB'));
span.tag(Tag.dbInstance(`${this.s.namespace.db}`));

const hasCB = operationFunc.call(this, operation, span, args);

ret = _original.apply(this, args);

if (!hasCB) {
if (plugin.maybeHookCursor(span, ret)) {
// NOOP

} else if (!ret || typeof ret.then !== 'function') { // generic Promise check
span.stop(); // no callback passed in and no Promise or Cursor returned, play it safe

return ret;

} else {
ret = ret.then(
(res: any) => {
span.resync();
span.stop();

return res;
},

(err: any) => {
span.resync();
span.error(err);
span.stop();

return Promise.reject(err);
}
);
}
}

} catch (e) {
span.error(e);
span.stop();

throw e;
}

span.async();

return ret;
};
}
}

// noinspection JSUnusedGlobalSymbols
export default new MongoDBPlugin();
Loading