Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ datadog-agent
vendor/
.DS_Store
*.out
profile.cov
44 changes: 5 additions & 39 deletions agentmain/main.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package ddagentmain

import (
"encoding/json"
"time"

"github.com/DataDog/datadog-agent/aggregator"
"github.com/DataDog/datadog-agent/checks"
"github.com/DataDog/datadog-agent/checks/system"
"github.com/DataDog/datadog-agent/py"
"github.com/DataDog/datadog-go/statsd"
"github.com/op/go-logging"
"github.com/sbinet/go-python"
)
Expand All @@ -33,37 +32,12 @@ type metric struct {

type metrics map[string][]metric

// process results. Temporary solution: send results to DogStatsD
func collectResults(pending <-chan checks.CheckResult, c *statsd.Client) {
for res := range pending {
m := metrics{}
if err := json.Unmarshal([]byte(res.Result), &m); err != nil {
log.Errorf("Error parsing results: %s\n%v", res.Result, err)
}
// gauges
for _, g := range m["gauge"] {
log.Infof("gauge posted %s", g.Name)
if err := c.Gauge(g.Name, g.Value, g.Tags, 1); err != nil {
log.Errorf("Error posting gauge %s: %v", g.Name, err)
}
}
// histogram
for _, g := range m["histogram"] {
log.Infof("histogram posted %s", g.Name)
if err := c.Histogram(g.Name, g.Value, g.Tags, 1); err != nil {
log.Errorf("Error histogram %s: %v", g.Name, err)
}
}
}
}

// Start the main check loop
func Start() {

log.Infof("Starting Datadog Agent v%v", AGENT_VERSION)

pending := make(chan checks.Check, 100)
results := make(chan checks.CheckResult, 100)
pending := make(chan checks.Check, 10)

err := python.Initialize()
if err != nil {
Expand All @@ -76,19 +50,11 @@ func Start() {
// `python.Initialize` acquires the GIL but we don't need it, let's release it
state := python.PyEval_SaveThread()

// DogStatsD client, temporary solution to post metrics
c, err := statsd.New("127.0.0.1:8125")
if err != nil {
panic(err)
}
c.Namespace = "agent6."
// for now, only Python needs it, build and pass it on the fly
aggregator.InitApi(aggregator.NewUnbufferedAggregator())

// Get a single Runner instance, i.e. we process checks sequentially
go checks.Runner(pending, results)
// Get a set of collectors able to process all the results w/o causing check starvation
for i := 0; i < 2; i++ {
go collectResults(results, c)
}
go checks.Runner(pending)

// Get a list of Python checks we want to run
checksNames := []string{"checks.directory", "checks.go_expvar", "checks.process"}
Expand Down
69 changes: 25 additions & 44 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
@@ -1,65 +1,46 @@
package aggregator

import (

// stdlib
"bytes"
"sort"

"github.com/DataDog/datadog-go/statsd"
"github.com/op/go-logging"
)

var log = logging.MustGetLogger("datadog-agent")

type Aggregator interface {
Gauge(metric string, value float64, hostname string, tags *[]string)
Rate(metric string, value float64, hostname string, tags *[]string)
Gauge(metric string, value float64, hostname string, tags []string)
Histogram(metric string, value float64, hostname string, tags []string)
Flush() string
}

type Point struct {
Timestamp int64
Value float32
// UnbufferedAggregator is special aggregator that doesn't aggregate anything,
// it just forward metrics to DogStatsD
type UnbufferedAggregator struct {
client *statsd.Client
}

type Serie struct {
MetricName string
Tags *[]string
Points *[]Point
}

type DefaultAggregator struct {
Series *map[string]Serie
}

var log = logging.MustGetLogger("datadog-agent")

func genKey(metric string, hostname string, tags *[]string) string {

sort.Strings(*tags)

var buffer bytes.Buffer
buffer.WriteString(metric)
buffer.WriteString(hostname)
for _, tag := range *tags {
buffer.WriteString(tag)
func NewUnbufferedAggregator() *UnbufferedAggregator {
c, err := statsd.New("127.0.0.1:8125")
if err != nil {
panic(err)
}
c.Namespace = "agent6."

return buffer.String()

return &UnbufferedAggregator{c}
}

func (agg DefaultAggregator) Gauge(metric string, value float64, hostname string, tags *[]string) {
key := genKey(metric, hostname, tags)
log.Infof("Submitted GAUGE: %v = %v", key, value)

func (agg *UnbufferedAggregator) Gauge(metric string, value float64, hostname string, tags []string) {
if err := agg.client.Gauge(metric, value, tags, 1); err != nil {
log.Errorf("Error posting gauge %s: %v", metric, err)
}
}

func (agg DefaultAggregator) Rate(metric string, value float64, hostname string, tags *[]string) {
key := genKey(metric, hostname, tags)
log.Infof("Submitted RATE: %v = %v", key, value)

func (agg *UnbufferedAggregator) Histogram(metric string, value float64, hostname string, tags []string) {
if err := agg.client.Histogram(metric, value, tags, 1); err != nil {
log.Errorf("Error histogram %s: %v", metric, err)
}
}

func (agg DefaultAggregator) Flush() string {
return "flushed!"

func (agg UnbufferedAggregator) Flush() string {
return "" // noop
}
57 changes: 57 additions & 0 deletions aggregator/api.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#include "api.h"

PyObject* SubmitData(PyObject*, MetricType, char*, float, PyObject*);

char* MetricTypeNames[] = {
"GAUGE",
"RATE",
"HISTOGRAM"
};

static PyObject *submit_data(PyObject *self, PyObject *args) {
PyObject *check = NULL;
int mt;
char *name;
float value;
PyObject *tags = NULL;

PyGILState_STATE gstate;
gstate = PyGILState_Ensure();

// aggregator.submit_data(self, aggregator.metric_type.GAUGE, name, value, tags)
if (!PyArg_ParseTuple(args, "OisfO", &check, &mt, &name, &value, &tags)) {
PyGILState_Release(gstate);
Py_RETURN_NONE;
}

PyGILState_Release(gstate);
return SubmitData(check, mt, name, value, tags);
}

static PyMethodDef AggMethods[] = {
{"submit_data", (PyCFunction)submit_data, METH_VARARGS, "Submit metrics to the aggregator."},
{NULL, NULL} // guards
};

PyObject* _none() {
Py_RETURN_NONE;
}

void initaggregator()
{
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();

PyObject *m = Py_InitModule("aggregator", AggMethods);

for (int i=MT_FIRST; i<=MT_LAST; i++) {
PyModule_AddIntConstant(m, MetricTypeNames[i], i);
}

PyGILState_Release(gstate);
}

PyObject* PySequence_Fast_Get_Item(PyObject *o, Py_ssize_t i)
{
return PySequence_Fast_GET_ITEM(o, i);
}
52 changes: 52 additions & 0 deletions aggregator/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package aggregator

import "fmt"

// #cgo pkg-config: python2
// #include "api.h"
import "C"

var _aggregator Aggregator

//export SubmitData
func SubmitData(check *C.PyObject, mt C.MetricType, name *C.char, value C.float, tags *C.PyObject) *C.PyObject {

// TODO: cleanup memory, C.stuff is going to stay there!!!

_name := C.GoString(name)
_value := float64(value)
var _tags []string
var seq *C.PyObject

seq = C.PySequence_Fast(tags, C.CString("expected a sequence"))
l := C.PySequence_Size(tags)
var i C.Py_ssize_t
for i = 0; i < l; i++ {
item := C.PySequence_Fast_Get_Item(seq, i)
_tags = append(_tags, C.GoString(C.PyString_AsString(item))) // YOLO! Please remove
}
C.Py_DecRef(seq)

switch mt {
case C.RATE:
fmt.Println("Submitting Rate to the aggregator...", _name, _value, _tags)
fallthrough
case C.GAUGE:
fmt.Println("Submitting Gauge to the aggregator...", _name, _value, _tags)
_aggregator.Gauge(_name, _value, "", _tags)
case C.HISTOGRAM:
fmt.Println("Submitting Histogram to the aggregator...", _name, _value, _tags)
_aggregator.Histogram(_name, _value, "", _tags)
}

return C._none()
}

func Get() Aggregator {
return _aggregator
}

func InitApi(aggregatorInstance Aggregator) {
_aggregator = aggregatorInstance
C.initaggregator()
}
13 changes: 13 additions & 0 deletions aggregator/api.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#include <Python.h>

typedef enum {
MT_FIRST = 0,
GAUGE = MT_FIRST,
RATE,
HISTOGRAM,
MT_LAST = HISTOGRAM
} MetricType;

void initaggregator();
PyObject* _none();
PyObject* PySequence_Fast_Get_Item(PyObject*, Py_ssize_t);
14 changes: 3 additions & 11 deletions checks/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,20 @@ var log = logging.MustGetLogger("datadog-agent")

// Check is an interface for types capable to run checks
type Check interface {
Run() (CheckResult, error)
Run() error
String() string
}

// CheckResult wraps results from Python check
type CheckResult struct {
// TODO add creation timestamp?
Result string
Error string
}

// Runner waits for checks and run them as long as they arrive on the channel
func Runner(in <-chan Check, out chan<- CheckResult) {
func Runner(in <-chan Check) {
log.Debug("Ready to process checks...")
for check := range in {
// create call arguments
log.Infof("Running check %s", check)
// run the check
result, err := check.Run()
err := check.Run()
if err != nil {
log.Errorf("Error running check %s: %s", check, err)
}
out <- result
}
}
24 changes: 4 additions & 20 deletions checks/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,18 @@ type TestCheck struct {

func (c *TestCheck) String() string { return "TestCheck" }

func (c *TestCheck) Run() (CheckResult, error) {
func (c *TestCheck) Run() error {
if c.doErr {
msg := "A tremendous error occurred."
return CheckResult{Result: "", Error: msg}, errors.New(msg)
return errors.New(msg)
}
return CheckResult{Result: "Foo", Error: ""}, nil
return nil
}

func TestRunner(t *testing.T) {
pending := make(chan Check)
results := make(chan CheckResult)
go Runner(pending, results)
go Runner(pending)

pending <- &TestCheck{doErr: false}
res := <-results
if res.Error != "" {
t.Fatalf("Expected empty error message, found: %s", res.Error)
}
if res.Result != "Foo" {
t.Fatalf("Expected: %s, found: %s", "Foo", res.Result)
}

pending <- &TestCheck{doErr: true}
res = <-results
if res.Error == "" {
t.Fatal("Found empty error message")
}
if res.Result != "" {
t.Fatalf("Expected empty Result")
}
}
19 changes: 5 additions & 14 deletions checks/system/memory.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
package system

import
// stdlib
// project

// 3rd party

(
"fmt"

"github.com/DataDog/datadog-agent/checks"
import (
"github.com/DataDog/datadog-agent/aggregator"
"github.com/op/go-logging"
"github.com/shirou/gopsutil/mem"
)
Expand All @@ -22,9 +14,8 @@ func (c *MemoryCheck) String() string {
return "MemoryCheck"
}

func (c *MemoryCheck) Run() (checks.CheckResult, error) {
func (c *MemoryCheck) Run() error {
v, _ := mem.VirtualMemory()
res := fmt.Sprintf(`{"gauge": [{"Name": "system.mem.total", "Value": %f, "Tags": null}]}`, float64(v.Total))
checkRes := checks.CheckResult{Result: res, Error: ""}
return checkRes, nil
aggregator.Get().Gauge("system.mem.total", float64(v.Total), "", []string{})
return nil
}
Loading