From 79c29a5eb031fc79c51f99f8a4e4ff813c005fc8 Mon Sep 17 00:00:00 2001 From: Massimiliano Pippi Date: Mon, 30 May 2016 18:21:07 +0200 Subject: [PATCH 1/7] expose aggregator to the Python world unpack python list containing tags remove results processor logic --- agentmain/main.go | 44 +++---------------------- aggregator/aggregator.go | 69 +++++++++++++++------------------------- aggregator/api.c | 57 +++++++++++++++++++++++++++++++++ aggregator/api.go | 52 ++++++++++++++++++++++++++++++ aggregator/api.h | 13 ++++++++ checks/common.go | 14 ++------ checks/system/memory.go | 19 +++-------- py/check.go | 14 +++++--- py/checks/__init__.py | 12 ++++--- 9 files changed, 176 insertions(+), 118 deletions(-) create mode 100644 aggregator/api.c create mode 100644 aggregator/api.go create mode 100644 aggregator/api.h diff --git a/agentmain/main.go b/agentmain/main.go index 0f5f74401f88..3c4a64a4b2f4 100644 --- a/agentmain/main.go +++ b/agentmain/main.go @@ -1,13 +1,12 @@ package ddagentmain import ( - "encoding/json" "time" + "github.com/DataDog/datadog-agent/aggregator" "github.com/DataDog/datadog-agent/checks" "github.com/DataDog/datadog-agent/checks/system" "github.com/DataDog/datadog-agent/py" - "github.com/DataDog/datadog-go/statsd" "github.com/op/go-logging" "github.com/sbinet/go-python" ) @@ -33,37 +32,12 @@ type metric struct { type metrics map[string][]metric -// process results. Temporary solution: send results to DogStatsD -func collectResults(pending <-chan checks.CheckResult, c *statsd.Client) { - for res := range pending { - m := metrics{} - if err := json.Unmarshal([]byte(res.Result), &m); err != nil { - log.Errorf("Error parsing results: %s\n%v", res.Result, err) - } - // gauges - for _, g := range m["gauge"] { - log.Infof("gauge posted %s", g.Name) - if err := c.Gauge(g.Name, g.Value, g.Tags, 1); err != nil { - log.Errorf("Error posting gauge %s: %v", g.Name, err) - } - } - // histogram - for _, g := range m["histogram"] { - log.Infof("histogram posted %s", g.Name) - if err := c.Histogram(g.Name, g.Value, g.Tags, 1); err != nil { - log.Errorf("Error histogram %s: %v", g.Name, err) - } - } - } -} - // Start the main check loop func Start() { log.Infof("Starting Datadog Agent v%v", AGENT_VERSION) - pending := make(chan checks.Check, 100) - results := make(chan checks.CheckResult, 100) + pending := make(chan checks.Check, 10) err := python.Initialize() if err != nil { @@ -76,19 +50,11 @@ func Start() { // `python.Initialize` acquires the GIL but we don't need it, let's release it state := python.PyEval_SaveThread() - // DogStatsD client, temporary solution to post metrics - c, err := statsd.New("127.0.0.1:8125") - if err != nil { - panic(err) - } - c.Namespace = "agent6." + // for now, only Python needs it, build and pass it on the fly + aggregator.InitApi(aggregator.NewUnbufferedAggregator()) // Get a single Runner instance, i.e. we process checks sequentially - go checks.Runner(pending, results) - // Get a set of collectors able to process all the results w/o causing check starvation - for i := 0; i < 2; i++ { - go collectResults(results, c) - } + go checks.Runner(pending) // Get a list of Python checks we want to run checksNames := []string{"checks.directory", "checks.go_expvar", "checks.process"} diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index 2898c17393a0..9c8cdc7cfc0d 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -1,65 +1,46 @@ package aggregator import ( - - // stdlib - "bytes" - "sort" - + "github.com/DataDog/datadog-go/statsd" "github.com/op/go-logging" ) +var log = logging.MustGetLogger("datadog-agent") + type Aggregator interface { - Gauge(metric string, value float64, hostname string, tags *[]string) - Rate(metric string, value float64, hostname string, tags *[]string) + Gauge(metric string, value float64, hostname string, tags []string) + Histogram(metric string, value float64, hostname string, tags []string) Flush() string } -type Point struct { - Timestamp int64 - Value float32 +// UnbufferedAggregator is special aggregator that doesn't aggregate anything, +// it just forward metrics to DogStatsD +type UnbufferedAggregator struct { + client *statsd.Client } -type Serie struct { - MetricName string - Tags *[]string - Points *[]Point -} - -type DefaultAggregator struct { - Series *map[string]Serie -} - -var log = logging.MustGetLogger("datadog-agent") - -func genKey(metric string, hostname string, tags *[]string) string { - - sort.Strings(*tags) - - var buffer bytes.Buffer - buffer.WriteString(metric) - buffer.WriteString(hostname) - for _, tag := range *tags { - buffer.WriteString(tag) +func NewUnbufferedAggregator() *UnbufferedAggregator { + c, err := statsd.New("127.0.0.1:8125") + if err != nil { + panic(err) } + c.Namespace = "agent6." - return buffer.String() - + return &UnbufferedAggregator{c} } -func (agg DefaultAggregator) Gauge(metric string, value float64, hostname string, tags *[]string) { - key := genKey(metric, hostname, tags) - log.Infof("Submitted GAUGE: %v = %v", key, value) - +func (agg *UnbufferedAggregator) Gauge(metric string, value float64, hostname string, tags []string) { + if err := agg.client.Gauge(metric, value, tags, 1); err != nil { + log.Errorf("Error posting gauge %s: %v", metric, err) + } } -func (agg DefaultAggregator) Rate(metric string, value float64, hostname string, tags *[]string) { - key := genKey(metric, hostname, tags) - log.Infof("Submitted RATE: %v = %v", key, value) - +func (agg *UnbufferedAggregator) Histogram(metric string, value float64, hostname string, tags []string) { + if err := agg.client.Histogram(metric, value, tags, 1); err != nil { + log.Errorf("Error histogram %s: %v", metric, err) + } } -func (agg DefaultAggregator) Flush() string { - return "flushed!" - +func (agg UnbufferedAggregator) Flush() string { + return "" // noop } diff --git a/aggregator/api.c b/aggregator/api.c new file mode 100644 index 000000000000..ffb2e1b7f2d1 --- /dev/null +++ b/aggregator/api.c @@ -0,0 +1,57 @@ +#include "api.h" + +PyObject* SubmitData(PyObject*, MetricType, char*, float, PyObject*); + +char* MetricTypeNames[] = { + "GAUGE", + "RATE", + "HISTOGRAM" +}; + +static PyObject *submit_data(PyObject *self, PyObject *args) { + PyObject *check = NULL; + int mt; + char *name; + float value; + PyObject *tags = NULL; + + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + + // aggregator.submit_data(self, aggregator.metric_type.GAUGE, name, value, tags) + if (!PyArg_ParseTuple(args, "OisfO", &check, &mt, &name, &value, &tags)) { + PyGILState_Release(gstate); + Py_RETURN_NONE; + } + + PyGILState_Release(gstate); + return SubmitData(check, mt, name, value, tags); +} + +static PyMethodDef AggMethods[] = { + {"submit_data", (PyCFunction)submit_data, METH_VARARGS, "Submit metrics to the aggregator."}, + {NULL, NULL} // guards +}; + +PyObject* _none() { + Py_RETURN_NONE; +} + +void initaggregator() +{ + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + + PyObject *m = Py_InitModule("aggregator", AggMethods); + + for (int i=MT_FIRST; i<=MT_LAST; i++) { + PyModule_AddIntConstant(m, MetricTypeNames[i], i); + } + + PyGILState_Release(gstate); +} + +PyObject* PySequence_Fast_Get_Item(PyObject *o, Py_ssize_t i) +{ + return PySequence_Fast_GET_ITEM(o, i); +} diff --git a/aggregator/api.go b/aggregator/api.go new file mode 100644 index 000000000000..9169357426ce --- /dev/null +++ b/aggregator/api.go @@ -0,0 +1,52 @@ +package aggregator + +import "fmt" + +// #cgo pkg-config: python2 +// #include "api.h" +import "C" + +var _aggregator Aggregator + +//export SubmitData +func SubmitData(check *C.PyObject, mt C.MetricType, name *C.char, value C.float, tags *C.PyObject) *C.PyObject { + + // TODO: cleanup memory, C.stuff is going to stay there!!! + + _name := C.GoString(name) + _value := float64(value) + var _tags []string + var seq *C.PyObject + + seq = C.PySequence_Fast(tags, C.CString("expected a sequence")) + l := C.PySequence_Size(tags) + var i C.Py_ssize_t + for i = 0; i < l; i++ { + item := C.PySequence_Fast_Get_Item(seq, i) + _tags = append(_tags, C.GoString(C.PyString_AsString(item))) // YOLO! Please remove + } + C.Py_DecRef(seq) + + switch mt { + case C.RATE: + fmt.Println("Submitting Rate to the aggregator...", _name, _value, _tags) + fallthrough + case C.GAUGE: + fmt.Println("Submitting Gauge to the aggregator...", _name, _value, _tags) + _aggregator.Gauge(_name, _value, "", _tags) + case C.HISTOGRAM: + fmt.Println("Submitting Histogram to the aggregator...", _name, _value, _tags) + _aggregator.Histogram(_name, _value, "", _tags) + } + + return C._none() +} + +func Get() Aggregator { + return _aggregator +} + +func InitApi(aggregatorInstance Aggregator) { + _aggregator = aggregatorInstance + C.initaggregator() +} diff --git a/aggregator/api.h b/aggregator/api.h new file mode 100644 index 000000000000..bc772f691298 --- /dev/null +++ b/aggregator/api.h @@ -0,0 +1,13 @@ +#include + +typedef enum { + MT_FIRST = 0, + GAUGE = MT_FIRST, + RATE, + HISTOGRAM, + MT_LAST = HISTOGRAM +} MetricType; + +void initaggregator(); +PyObject* _none(); +PyObject* PySequence_Fast_Get_Item(PyObject*, Py_ssize_t); diff --git a/checks/common.go b/checks/common.go index 838b4dbc3248..2dc9b7cd6e87 100644 --- a/checks/common.go +++ b/checks/common.go @@ -6,28 +6,20 @@ var log = logging.MustGetLogger("datadog-agent") // Check is an interface for types capable to run checks type Check interface { - Run() (CheckResult, error) + Run() error String() string } -// CheckResult wraps results from Python check -type CheckResult struct { - // TODO add creation timestamp? - Result string - Error string -} - // Runner waits for checks and run them as long as they arrive on the channel -func Runner(in <-chan Check, out chan<- CheckResult) { +func Runner(in <-chan Check) { log.Debug("Ready to process checks...") for check := range in { // create call arguments log.Infof("Running check %s", check) // run the check - result, err := check.Run() + err := check.Run() if err != nil { log.Errorf("Error running check %s: %s", check, err) } - out <- result } } diff --git a/checks/system/memory.go b/checks/system/memory.go index 7d67812a6df3..16ab24ce45d8 100644 --- a/checks/system/memory.go +++ b/checks/system/memory.go @@ -1,15 +1,7 @@ package system -import -// stdlib -// project - -// 3rd party - -( - "fmt" - - "github.com/DataDog/datadog-agent/checks" +import ( + "github.com/DataDog/datadog-agent/aggregator" "github.com/op/go-logging" "github.com/shirou/gopsutil/mem" ) @@ -22,9 +14,8 @@ func (c *MemoryCheck) String() string { return "MemoryCheck" } -func (c *MemoryCheck) Run() (checks.CheckResult, error) { +func (c *MemoryCheck) Run() error { v, _ := mem.VirtualMemory() - res := fmt.Sprintf(`{"gauge": [{"Name": "system.mem.total", "Value": %f, "Tags": null}]}`, float64(v.Total)) - checkRes := checks.CheckResult{Result: res, Error: ""} - return checkRes, nil + aggregator.Get().Gauge("system.mem.total", float64(v.Total), "", []string{}) + return nil } diff --git a/py/check.go b/py/check.go index 051310c3e71b..c514dce4728c 100644 --- a/py/check.go +++ b/py/check.go @@ -1,6 +1,7 @@ package py import ( + "errors" "runtime" "github.com/DataDog/datadog-agent/checks" @@ -46,7 +47,7 @@ func NewPythonCheck(class *python.PyObject, config CheckConfig) *PythonCheck { } // Run a Python check -func (c *PythonCheck) Run() (checks.CheckResult, error) { +func (c *PythonCheck) Run() error { // Lock the GIL and release it at the end of the run _gstate := python.PyGILState_Ensure() runtime.LockOSThread() @@ -61,12 +62,15 @@ func (c *PythonCheck) Run() (checks.CheckResult, error) { var resultStr string if result == nil { python.PyErr_Print() - } else { - resultStr = python.PyString_AsString(result) + return errors.New("Unable to run Python check.") } - checkRes := checks.CheckResult{Result: resultStr, Error: ""} - return checkRes, nil + resultStr = python.PyString_AsString(result) + if resultStr == "" { + return nil + } + + return errors.New(resultStr) } // String representation (for debug and logging) diff --git a/py/checks/__init__.py b/py/checks/__init__.py index 9ccee257d469..1c07298176bd 100644 --- a/py/checks/__init__.py +++ b/py/checks/__init__.py @@ -4,6 +4,9 @@ import time from collections import defaultdict +import aggregator + + class AgentCheck(object): RATE = "rate" @@ -18,13 +21,13 @@ def __init__(self, *args, **kwargs): self.init_config = kwargs.get('init_config') or {} # could be set to None or be missing def gauge(self, name, value, tags=None): - self.metrics['gauge'].append({'Name': name, 'Value': value, 'Tags': tags}) + aggregator.submit_data(self, aggregator.GAUGE, name, value, tags) def rate(self, name, value, tags=None): - self.gauge(name, value, tags) + aggregator.submit_data(self, aggregator.RATE, name, value, tags) def histogram(self, name, value, tags=None, hostname=None, device_name=None): - self.metrics['histogram'].append({'Name': name, 'Value': value, 'Tags': tags}) + aggregator.submit_data(self, aggregator.HISTOGRAM, name, value, tags) def service_check(self, *args, **kwargs): pass @@ -89,8 +92,7 @@ def run(self): for i in self.instances: self.check(i) - result = json.dumps(self.metrics) - self.metrics = defaultdict(list) + result = '' except Exception, e: result = json.dumps( From 36fd4f38bcc0976d5129576ea6eb7e299bd9bafd Mon Sep 17 00:00:00 2001 From: Massimiliano Pippi Date: Thu, 2 Jun 2016 10:38:15 -0400 Subject: [PATCH 2/7] ignore coverage config file --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index f33cdf492540..b5017a5e1678 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ datadog-agent vendor/ .DS_Store *.out +profile.cov From 69185387656629e17cdfb8f35748a9fcf2f2a6ec Mon Sep 17 00:00:00 2001 From: Massimiliano Pippi Date: Wed, 8 Jun 2016 15:52:32 -0400 Subject: [PATCH 3/7] added test launcher --- runtests.sh | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100755 runtests.sh diff --git a/runtests.sh b/runtests.sh new file mode 100755 index 000000000000..a4b62237da2e --- /dev/null +++ b/runtests.sh @@ -0,0 +1,22 @@ +#!/bin/bash +# Code based on https://github.com/hailiang/gosweep + +set -e + +# Run test coverage on each subdirectories and merge the coverage profile. +echo "mode: count" > profile.cov + +# Standard go tooling behavior is to ignore dirs with leading underscors +for dir in $(find . -maxdepth 10 -not -path './.git*' -not -path '*/_*' -not -path './vendor*' -type d); +do +if ls $dir/*.go &> /dev/null; then + go test -short -covermode=count -coverprofile=$dir/profile.tmp $dir + if [ -f $dir/profile.tmp ] + then + cat $dir/profile.tmp | tail -n +2 >> profile.cov + rm $dir/profile.tmp + fi +fi +done + +go tool cover -func profile.cov From 82e1abfbb30a02f3d2c42a03e0dd676c084d1729 Mon Sep 17 00:00:00 2001 From: Massimiliano Pippi Date: Wed, 8 Jun 2016 15:53:18 -0400 Subject: [PATCH 4/7] fixed checks tests --- checks/common_test.go | 24 ++++-------------------- 1 file changed, 4 insertions(+), 20 deletions(-) diff --git a/checks/common_test.go b/checks/common_test.go index fbe9005c6797..5da74dbef39f 100644 --- a/checks/common_test.go +++ b/checks/common_test.go @@ -12,34 +12,18 @@ type TestCheck struct { func (c *TestCheck) String() string { return "TestCheck" } -func (c *TestCheck) Run() (CheckResult, error) { +func (c *TestCheck) Run() error { if c.doErr { msg := "A tremendous error occurred." - return CheckResult{Result: "", Error: msg}, errors.New(msg) + return errors.New(msg) } - return CheckResult{Result: "Foo", Error: ""}, nil + return nil } func TestRunner(t *testing.T) { pending := make(chan Check) - results := make(chan CheckResult) - go Runner(pending, results) + go Runner(pending) pending <- &TestCheck{doErr: false} - res := <-results - if res.Error != "" { - t.Fatalf("Expected empty error message, found: %s", res.Error) - } - if res.Result != "Foo" { - t.Fatalf("Expected: %s, found: %s", "Foo", res.Result) - } - pending <- &TestCheck{doErr: true} - res = <-results - if res.Error == "" { - t.Fatal("Found empty error message") - } - if res.Result != "" { - t.Fatalf("Expected empty Result") - } } From f0ec7cdab09982ebc47995638262291660d1ee37 Mon Sep 17 00:00:00 2001 From: Massimiliano Pippi Date: Wed, 8 Jun 2016 15:55:40 -0400 Subject: [PATCH 5/7] fixed TestRun function --- py/check_test.go | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/py/check_test.go b/py/check_test.go index 1b6a5962387f..3584fa42589d 100644 --- a/py/check_test.go +++ b/py/check_test.go @@ -39,17 +39,9 @@ func TestNewPythonCheck(t *testing.T) { func TestRun(t *testing.T) { check := getCheckInstance() - result, err := check.Run() - if err != nil { + if err := check.Run(); err != nil { t.Fatalf("Expected error nil, found: %s", err) } - out := `{"gauge": [{"Name": "foo", "Value": 0, "Tags": null}, {"Name": "foo", "Value": 1, "Tags": null}]}` - if result.Result != out { - t.Fatalf("Expected %s, found: %s", out, result.Result) - } - if result.Error != "" { - t.Fatalf("Expected empty error string, found: %s", result.Error) - } } func TestStr(t *testing.T) { From 78d2878bbab48edaa16dac79a1954b6d6db2f951 Mon Sep 17 00:00:00 2001 From: Massimiliano Pippi Date: Wed, 8 Jun 2016 15:56:39 -0400 Subject: [PATCH 6/7] initialize the aggregator in the test runner --- py/utils_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/py/utils_test.go b/py/utils_test.go index d514d28b7ab3..2401ae884a1e 100644 --- a/py/utils_test.go +++ b/py/utils_test.go @@ -4,6 +4,7 @@ import ( "os" "testing" + "github.com/DataDog/datadog-agent/aggregator" "github.com/sbinet/go-python" ) @@ -19,9 +20,13 @@ func TestMain(m *testing.M) { // Initialize acquires the GIL but we don't need it, release it state := python.PyEval_SaveThread() + + // for now, only Python needs it, build and pass it on the fly + aggregator.InitApi(aggregator.NewUnbufferedAggregator()) + ret := m.Run() - python.PyEval_RestoreThread(state) + python.PyEval_RestoreThread(state) python.Finalize() os.Exit(ret) From 71fe5927468a41b78cfb94fbb1f06035796e3e60 Mon Sep 17 00:00:00 2001 From: Massimiliano Pippi Date: Wed, 8 Jun 2016 16:03:05 -0400 Subject: [PATCH 7/7] avoid talking to the aggregator during unit tests --- py/tests/testcheck.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/py/tests/testcheck.py b/py/tests/testcheck.py index 7a00f67d3976..9bf593d8001d 100644 --- a/py/tests/testcheck.py +++ b/py/tests/testcheck.py @@ -2,5 +2,8 @@ class TestCheck(AgentCheck): def check(self, instance): - self.gauge('foo', 0) - self.gauge('foo', 1) + """ + Do not interact with the Aggregator during + unit tests. Doing anything is ok here. + """ + pass