diff --git a/extension/registry.go b/extension/registry.go index 197a7036dc..5c045ed343 100644 --- a/extension/registry.go +++ b/extension/registry.go @@ -18,6 +18,7 @@ package extension import ( "github.com/google/trillian/crypto/keys" + "github.com/google/trillian/monitoring" "github.com/google/trillian/quota" "github.com/google/trillian/storage" "github.com/google/trillian/util" @@ -39,4 +40,6 @@ type Registry struct { util.ElectionFactory // QuotaManager provides rate limiting capabilities for Trillian. QuotaManager quota.Manager + // MetricFactory provides metrics for monitoring. + monitoring.MetricFactory } diff --git a/integration/functions.sh b/integration/functions.sh index 55249bd669..90adbbed2d 100644 --- a/integration/functions.sh +++ b/integration/functions.sh @@ -44,9 +44,13 @@ wait_for_server_startup() { # pick_unused_port selects an apparently unused port. pick_unused_port() { + local avoid=${1:-0} local base=6962 local port for (( port = "${base}" ; port <= 61000 ; port++ )); do + if [[ $port == $avoid ]]; then + continue + fi if ! lsof -i :$port > /dev/null; then echo $port break @@ -111,21 +115,24 @@ log_prep_test() { ETCD_OPTS="--etcd_servers=${etcd_server}" ETCD_DB_DIR=default.etcd wait_for_server_startup ${etcd_port} - local signer_election_opts= + local logserver_opts="--etcd_http_service=trillian-logserver-http --etcd_service=trillian-logserver" + local logsigner_opts="--etcd_http_service=trillian-logsigner-http" else if [[ ${log_signer_count} > 1 ]]; then echo "*** Warning: running multiple signers with no etcd instance ***" fi - local signer_election_opts="--force_master" + local logserver_opts= + local logsigner_opts="--force_master" fi # Start a set of Log RPC servers. for ((i=0; i < rpc_server_count; i++)); do port=$(pick_unused_port) RPC_SERVERS="${RPC_SERVERS},localhost:${port}" + http=$(pick_unused_port ${port}) - echo "Starting Log RPC server on localhost:${port}" - ./trillian_log_server ${ETCD_OPTS} --rpc_endpoint="localhost:${port}" --http_endpoint='' & + echo "Starting Log RPC server on localhost:${port}, HTTP on localhost:${http}" + ./trillian_log_server ${ETCD_OPTS} ${logserver_opts} --rpc_endpoint="localhost:${port}" --http_endpoint="localhost:${http}" & pid=$! RPC_SERVER_PIDS+=(${pid}) wait_for_server_startup ${port} @@ -137,19 +144,24 @@ log_prep_test() { done RPC_SERVERS="${RPC_SERVERS:1}" - if [[ ! -z "${ETCD_OPTS}" ]]; then - RPC_SERVERS="trillian-log" - echo "Registered log servers @${RPC_SERVERS}/" - ETCDCTL_API=3 etcdctl get ${RPC_SERVERS} --prefix - fi - # Start a set of signers. for ((i=0; i < log_signer_count; i++)); do - echo "Starting Log signer" - ./trillian_log_signer ${ETCD_OPTS} ${signer_election_opts} --sequencer_interval="1s" --batch_size=500 --http_endpoint='' --num_sequencers 2 & + http=$(pick_unused_port) + echo "Starting Log signer, HTTP on localhost:${http}" + ./trillian_log_signer ${ETCD_OPTS} ${logsigner_opts} --sequencer_interval="1s" --batch_size=500 --http_endpoint="localhost:${http}" --num_sequencers 2 & pid=$! LOG_SIGNER_PIDS+=(${pid}) + wait_for_server_startup ${http} done + + if [[ ! -z "${ETCD_OPTS}" ]]; then + RPC_SERVERS="trillian-logserver" + echo "Registered log servers @${RPC_SERVERS}/" + ETCDCTL_API=3 etcdctl get ${RPC_SERVERS}/ --prefix + echo "Registered HTTP endpoints" + ETCDCTL_API=3 etcdctl get trillian-logserver-http/ --prefix + ETCDCTL_API=3 etcdctl get trillian-logsigner-http/ --prefix + fi } # log_stop_tests closes down a set of running processes for a log test. diff --git a/monitoring/inert.go b/monitoring/inert.go new file mode 100644 index 0000000000..e0b023357f --- /dev/null +++ b/monitoring/inert.go @@ -0,0 +1,122 @@ +// Copyright 2017 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitoring + +import ( + "fmt" + "strings" + "sync" +) + +// InertMetricFactory creates inert metrics for testing. +type InertMetricFactory struct{} + +// NewCounter creates a new inert Counter. +func (imf InertMetricFactory) NewCounter(name, help string, labelNames ...string) Counter { + return &InertFloat{ + labelCount: len(labelNames), + vals: make(map[string]float64), + } +} + +// NewGauge creates a new inert Gauge. +func (imf InertMetricFactory) NewGauge(name, help string, labelNames ...string) Gauge { + return &InertFloat{ + labelCount: len(labelNames), + vals: make(map[string]float64), + } +} + +// NewHistogram creates a new inert Histogram. +func (imf InertMetricFactory) NewHistogram(name, help string, labelNames ...string) Histogram { + return &InertDistribution{ + labelCount: len(labelNames), + counts: make(map[string]uint64), + sums: make(map[string]float64), + } +} + +// InertFloat is an internal-only implementation of both the Counter and Gauge interfaces. +type InertFloat struct { + labelCount int + mu sync.Mutex + vals map[string]float64 +} + +// Inc adds 1 to the value. +func (m *InertFloat) Inc(labelVals ...string) { + m.Add(1.0, labelVals...) +} + +// Dec subtracts 1 from the value. +func (m *InertFloat) Dec(labelVals ...string) { + m.Add(-1.0, labelVals...) +} + +// Add adds the given amount to the value. +func (m *InertFloat) Add(val float64, labelVals ...string) { + m.mu.Lock() + defer m.mu.Unlock() + key := keyForLabels(labelVals, m.labelCount) + m.vals[key] += val +} + +// Set sets the value. +func (m *InertFloat) Set(val float64, labelVals ...string) { + m.mu.Lock() + defer m.mu.Unlock() + key := keyForLabels(labelVals, m.labelCount) + m.vals[key] = val +} + +// Value returns the current value. +func (m *InertFloat) Value(labelVals ...string) float64 { + m.mu.Lock() + defer m.mu.Unlock() + key := keyForLabels(labelVals, m.labelCount) + return m.vals[key] +} + +// InertDistribution is an internal-only implementation of the Distribution interface. +type InertDistribution struct { + labelCount int + mu sync.Mutex + counts map[string]uint64 + sums map[string]float64 +} + +// Observe adds a single observation to the distribution. +func (m *InertDistribution) Observe(val float64, labelVals ...string) { + m.mu.Lock() + defer m.mu.Unlock() + key := keyForLabels(labelVals, m.labelCount) + m.counts[key]++ + m.sums[key] += val +} + +// Info returns count, sum for the distribution. +func (m *InertDistribution) Info(labelVals ...string) (uint64, float64) { + m.mu.Lock() + defer m.mu.Unlock() + key := keyForLabels(labelVals, m.labelCount) + return m.counts[key], m.sums[key] +} + +func keyForLabels(labelVals []string, count int) string { + if len(labelVals) != count { + panic(fmt.Sprintf("invalid label count %d; want %d", len(labelVals), count)) + } + return strings.Join(labelVals, "|") +} diff --git a/monitoring/metric/counter.go b/monitoring/metric/counter.go deleted file mode 100644 index a60a5c6de3..0000000000 --- a/monitoring/metric/counter.go +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright 2017 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metric - -import ( - "context" - "sort" - "sync" - "time" - - "github.com/golang/glog" -) - -// A Counter is a metric that can only increase. -type Counter interface { - Add(n int64) -} -type counter struct { - mu sync.Mutex - name string - value int64 - lastDumped time.Time - lastDumpedValue int64 -} - -type safeMetrics struct { - mu sync.Mutex - m map[string]*counter -} - -var ( - metrics = safeMetrics{ - m: make(map[string]*counter), - } -) - -func (m *counter) Add(n int64) { - m.mu.Lock() - defer m.mu.Unlock() - m.value += n -} - -// NewCounter defines a cumulative metric. The name should be unique -// within a binary. -func NewCounter(name string) Counter { - c := counter{name: name, lastDumped: time.Now()} - metrics.mu.Lock() - defer metrics.mu.Unlock() - if dup := metrics.m[c.name]; dup != nil { - glog.Fatal("duplicate metric name registered: ", c.name) - } - metrics.m[c.name] = &c - return &c -} - -func dump() { - metrics.mu.Lock() - defer metrics.mu.Unlock() - glog.Info("dumping metrics:") - keys := make([]string, 0, len(metrics.m)) - for k := range metrics.m { - keys = append(keys, k) - } - sort.Strings(keys) - for _, key := range keys { - m := metrics.m[key] - m.mu.Lock() - current := m.value - delta := current - m.lastDumpedValue - now := time.Now() - duration := now.Sub(m.lastDumped) - m.lastDumped = now - m.lastDumpedValue = current - m.mu.Unlock() - - qps := float64(delta) / duration.Seconds() - glog.Infof("%v: %v (%.1f qps)", key, current, qps) - } -} - -// DumpToLog arranges for all metrics to be logged at a regular -// interval. This is not practical for production monitoring, but can -// be useful during development. -func DumpToLog(ctx context.Context, d time.Duration) { - ticker := time.NewTicker(d) - defer ticker.Stop() - for { - select { - case <-ticker.C: - dump() - case <-ctx.Done(): - return - } - } -} diff --git a/monitoring/metrics.go b/monitoring/metrics.go new file mode 100644 index 0000000000..e09767f716 --- /dev/null +++ b/monitoring/metrics.go @@ -0,0 +1,49 @@ +// Copyright 2017 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitoring + +// MetricFactory allows the creation of different types of metric. +type MetricFactory interface { + NewCounter(name, help string, labelNames ...string) Counter + NewGauge(name, help string, labelNames ...string) Gauge + NewHistogram(name, help string, labelNames ...string) Histogram +} + +// Counter is a metric class for numeric values that increase. +type Counter interface { + Inc(labelVals ...string) + Add(val float64, labelVals ...string) + Value(labelVals ...string) float64 +} + +// Gauge is a metric class for numeric values that can go up and down. +type Gauge interface { + Inc(labelVals ...string) + Dec(labelVals ...string) + Add(val float64, labelVals ...string) + Set(val float64, labelVals ...string) + // Value retrieves the value for a particular set of labels. + // This is only really useful for testing implementations. + Value(labelVals ...string) float64 +} + +// Histogram is a metric class that tracks the distribution of a collection +// of observations. +type Histogram interface { + Observe(val float64, labelVals ...string) + // Info retrieves the count and sum of observations for a particular set of labels. + // This is only really useful for testing implementations. + Info(labelVals ...string) (uint64, float64) +} diff --git a/monitoring/metrics_test.go b/monitoring/metrics_test.go new file mode 100644 index 0000000000..e73f459bd1 --- /dev/null +++ b/monitoring/metrics_test.go @@ -0,0 +1,32 @@ +// Copyright 2017 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitoring_test + +import ( + "testing" + + "github.com/google/trillian/monitoring" + "github.com/google/trillian/monitoring/testonly" +) + +func TestCounter(t *testing.T) { + testonly.TestCounter(t, monitoring.InertMetricFactory{}) +} +func TestGauge(t *testing.T) { + testonly.TestCounter(t, monitoring.InertMetricFactory{}) +} +func TestHistogram(t *testing.T) { + testonly.TestCounter(t, monitoring.InertMetricFactory{}) +} diff --git a/monitoring/prometheus/metrics.go b/monitoring/prometheus/metrics.go new file mode 100644 index 0000000000..3aca0f4e4c --- /dev/null +++ b/monitoring/prometheus/metrics.go @@ -0,0 +1,251 @@ +// Copyright 2017 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package prometheus provides a Prometheus-based implementation of the +// MetricFactory abstraction. +package prometheus + +import ( + "fmt" + + "github.com/golang/glog" + "github.com/google/trillian/monitoring" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +// MetricFactory allows the creation of Prometheus-based metrics. +type MetricFactory struct { + Prefix string +} + +// NewCounter creates a new Counter object backed by Prometheus. +func (pmf MetricFactory) NewCounter(name, help string, labelNames ...string) monitoring.Counter { + if labelNames == nil || len(labelNames) == 0 { + counter := prometheus.NewCounter( + prometheus.CounterOpts{ + Name: pmf.Prefix + name, + Help: help, + }) + prometheus.MustRegister(counter) + return &Counter{single: counter} + } + + vec := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: pmf.Prefix + name, + Help: help, + }, + labelNames) + prometheus.MustRegister(vec) + return &Counter{labelNames: labelNames, vec: vec} +} + +// NewGauge creates a new Gauge object backed by Prometheus. +func (pmf MetricFactory) NewGauge(name, help string, labelNames ...string) monitoring.Gauge { + if labelNames == nil || len(labelNames) == 0 { + gauge := prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: pmf.Prefix + name, + Help: help, + }) + prometheus.MustRegister(gauge) + return &Gauge{single: gauge} + } + vec := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: pmf.Prefix + name, + Help: help, + }, + labelNames) + prometheus.MustRegister(vec) + return &Gauge{labelNames: labelNames, vec: vec} +} + +// NewHistogram creates a new Histogram object backed by Prometheus. +func (pmf MetricFactory) NewHistogram(name, help string, labelNames ...string) monitoring.Histogram { + if labelNames == nil || len(labelNames) == 0 { + histogram := prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: pmf.Prefix + name, + Help: help, + }) + prometheus.MustRegister(histogram) + return &Histogram{single: histogram} + } + vec := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: pmf.Prefix + name, + Help: help, + }, + labelNames) + prometheus.MustRegister(vec) + return &Histogram{labelNames: labelNames, vec: vec} +} + +// Counter is a wrapper around a Prometheus Counter or CounterVec object. +type Counter struct { + labelNames []string + single prometheus.Counter + vec *prometheus.CounterVec +} + +// Inc adds 1 to a counter. +func (m *Counter) Inc(labelVals ...string) { + if m.vec != nil { + m.vec.With(labelsFor(m.labelNames, labelVals)).Inc() + } else { + m.single.Inc() + } +} + +// Add adds the given amount to a counter. +func (m *Counter) Add(val float64, labelVals ...string) { + if m.vec != nil { + m.vec.With(labelsFor(m.labelNames, labelVals)).Add(val) + } else { + m.single.Add(val) + } +} + +// Value returns the current amount of a counter. +func (m *Counter) Value(labelVals ...string) float64 { + var metric prometheus.Metric + if m.vec != nil { + metric = m.vec.With(labelsFor(m.labelNames, labelVals)) + } else { + metric = m.single + } + var metricpb dto.Metric + if err := metric.Write(&metricpb); err != nil { + glog.Errorf("failed to Write metric: %v", err) + return 0.0 + } + if metricpb.Counter == nil { + glog.Errorf("counter field missing") + return 0.0 + } + return metricpb.Counter.GetValue() +} + +// Gauge is a wrapper around a Prometheus Gauge or GaugeVec object. +type Gauge struct { + labelNames []string + single prometheus.Gauge + vec *prometheus.GaugeVec +} + +// Inc adds 1 to a gauge. +func (m *Gauge) Inc(labelVals ...string) { + if m.vec != nil { + m.vec.With(labelsFor(m.labelNames, labelVals)).Inc() + } else { + m.single.Inc() + } +} + +// Dec subtracts 1 from a gauge. +func (m *Gauge) Dec(labelVals ...string) { + if m.vec != nil { + m.vec.With(labelsFor(m.labelNames, labelVals)).Dec() + } else { + m.single.Dec() + } +} + +// Add adds given value to a gauge. +func (m *Gauge) Add(val float64, labelVals ...string) { + if m.vec != nil { + m.vec.With(labelsFor(m.labelNames, labelVals)).Add(val) + } else { + m.single.Add(val) + } +} + +// Set sets the value of a gauge. +func (m *Gauge) Set(val float64, labelVals ...string) { + if m.vec != nil { + m.vec.With(labelsFor(m.labelNames, labelVals)).Set(val) + } else { + m.single.Set(val) + } +} + +// Value returns the current amount of a gauge. +func (m *Gauge) Value(labelVals ...string) float64 { + var metric prometheus.Metric + if m.vec != nil { + metric = m.vec.With(labelsFor(m.labelNames, labelVals)) + } else { + metric = m.single + } + var metricpb dto.Metric + if err := metric.Write(&metricpb); err != nil { + glog.Errorf("failed to Write metric: %v", err) + return 0.0 + } + if metricpb.Gauge == nil { + glog.Errorf("gauge field missing") + return 0.0 + } + return metricpb.Gauge.GetValue() +} + +// Histogram is a wrapper around a Prometheus Histogram or HistogramVec object. +type Histogram struct { + labelNames []string + single prometheus.Histogram + vec *prometheus.HistogramVec +} + +// Observe adds a single observation to the histogram. +func (m *Histogram) Observe(val float64, labelVals ...string) { + if m.vec != nil { + m.vec.With(labelsFor(m.labelNames, labelVals)).Observe(val) + } else { + m.single.Observe(val) + } +} + +// Info returns the count and sum of observations for the histogram. +func (m *Histogram) Info(labelVals ...string) (uint64, float64) { + var metric prometheus.Metric + if m.vec != nil { + metric = m.vec.MetricVec.With(labelsFor(m.labelNames, labelVals)).(prometheus.Metric) + } else { + metric = m.single + } + var metricpb dto.Metric + if err := metric.Write(&metricpb); err != nil { + glog.Errorf("failed to Write metric: %v", err) + return 0, 0.0 + } + histVal := metricpb.GetHistogram() + if histVal == nil { + glog.Errorf("histogram field missing") + return 0, 0.0 + } + return histVal.GetSampleCount(), histVal.GetSampleSum() +} + +func labelsFor(names, values []string) prometheus.Labels { + if len(names) != len(values) { + panic(fmt.Sprintf("got %d (%v) values for %d labels (%v)", len(values), values, len(names), names)) + } + labels := make(prometheus.Labels) + for i, name := range names { + labels[name] = values[i] + } + return labels +} diff --git a/monitoring/prometheus/metrics_test.go b/monitoring/prometheus/metrics_test.go new file mode 100644 index 0000000000..86ec2ed243 --- /dev/null +++ b/monitoring/prometheus/metrics_test.go @@ -0,0 +1,31 @@ +// Copyright 2017 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheus + +import ( + "testing" + + "github.com/google/trillian/monitoring/testonly" +) + +func TestCounter(t *testing.T) { + testonly.TestCounter(t, MetricFactory{Prefix: "TestCounter"}) +} +func TestGauge(t *testing.T) { + testonly.TestCounter(t, MetricFactory{Prefix: "TestGauge"}) +} +func TestHistogram(t *testing.T) { + testonly.TestCounter(t, MetricFactory{Prefix: "TestHistogram"}) +} diff --git a/monitoring/rpc_stats_interceptor.go b/monitoring/rpc_stats_interceptor.go index 3a622b35c4..0cb8912de8 100644 --- a/monitoring/rpc_stats_interceptor.go +++ b/monitoring/rpc_stats_interceptor.go @@ -16,7 +16,6 @@ package monitoring import ( - "expvar" "fmt" "time" @@ -26,89 +25,81 @@ import ( ) const ( - nanosToMillisDivisor int64 = 1000000 - - requestCountMapName string = "requests-by-handler" - requestSucceededCountMapName string = "success-by-handler" - requestErrorCountMapName string = "errors-by-handler" - requestSucceededLatencyMapName string = "succeeded-request-total-latency-by-handler-ms" - requestFailedLatencyMapName string = "failed-request-total-latency-by-handler-ms" + reqCountName = "rpc_requests_total" + reqSuccessCountName = "rpc_success_total" + reqSuccessLatencyName = "rpc_success_latency_ms" + reqErrorCountName = "rpc_errors_total" + reqErrorLatencyName = "rpc_errors_latency_ms" + methodName = "method" ) // RPCStatsInterceptor provides a gRPC interceptor that records statistics about the RPCs passing through it. type RPCStatsInterceptor struct { - baseName string - timeSource util.TimeSource - handlerRequestCountMap *expvar.Map - handlerRequestSucceededCountMap *expvar.Map - handlerRequestErrorCountMap *expvar.Map - handlerRequestSucceededLatencyMap *expvar.Map - handlerRequestFailedLatencyMap *expvar.Map + prefix string + timeSource util.TimeSource + ReqCount Counter + ReqSuccessCount Counter + ReqSuccessLatency Histogram + ReqErrorCount Counter + ReqErrorLatency Histogram } // NewRPCStatsInterceptor creates a new RPCStatsInterceptor for the given application/component, with // a specified time source. -func NewRPCStatsInterceptor(timeSource util.TimeSource, application, component string) *RPCStatsInterceptor { - return &RPCStatsInterceptor{baseName: fmt.Sprintf("%s/%s", application, component), timeSource: timeSource, - handlerRequestCountMap: new(expvar.Map).Init(), - handlerRequestSucceededCountMap: new(expvar.Map).Init(), - handlerRequestErrorCountMap: new(expvar.Map).Init(), - handlerRequestSucceededLatencyMap: new(expvar.Map).Init(), - handlerRequestFailedLatencyMap: new(expvar.Map).Init()} -} - -func (r RPCStatsInterceptor) nameForMap(name string) string { - return fmt.Sprintf("%s/%s", r.baseName, name) +func NewRPCStatsInterceptor(timeSource util.TimeSource, prefix string, mf MetricFactory) *RPCStatsInterceptor { + interceptor := RPCStatsInterceptor{ + prefix: prefix, + timeSource: timeSource, + ReqCount: mf.NewCounter(prefixedName(prefix, reqCountName), "Number of requests", methodName), + ReqSuccessCount: mf.NewCounter(prefixedName(prefix, reqSuccessCountName), "Number of successful requests", methodName), + ReqSuccessLatency: mf.NewHistogram(prefixedName(prefix, reqSuccessLatencyName), "Latency of successful requests", methodName), + ReqErrorCount: mf.NewCounter(prefixedName(prefix, reqErrorCountName), "Number of errored requests", methodName), + ReqErrorLatency: mf.NewHistogram(prefixedName(prefix, reqErrorLatencyName), "Latency of errored requests", methodName), + } + return &interceptor } -// Publish must be called for stats to be visible. The expvar framework will prevent -// multiple calls to Publish from succeeding. -func (r RPCStatsInterceptor) Publish() { - expvar.Publish(r.nameForMap(requestCountMapName), r.handlerRequestCountMap) - expvar.Publish(r.nameForMap(requestSucceededCountMapName), r.handlerRequestSucceededCountMap) - expvar.Publish(r.nameForMap(requestErrorCountMapName), r.handlerRequestErrorCountMap) - expvar.Publish(r.nameForMap(requestSucceededLatencyMapName), r.handlerRequestSucceededLatencyMap) - expvar.Publish(r.nameForMap(requestFailedLatencyMapName), r.handlerRequestFailedLatencyMap) +func prefixedName(prefix, name string) string { + return fmt.Sprintf("%s_%s", prefix, name) } -func (r RPCStatsInterceptor) recordFailureLatency(method string, startTime time.Time) { +func (r *RPCStatsInterceptor) recordFailureLatency(labels []string, startTime time.Time) { latency := r.timeSource.Now().Sub(startTime) - r.handlerRequestErrorCountMap.Add(method, 1) - r.handlerRequestFailedLatencyMap.Add(method, latency.Nanoseconds()/nanosToMillisDivisor) + r.ReqErrorCount.Inc(labels...) + r.ReqErrorLatency.Observe(float64(latency/time.Millisecond), labels...) } // Interceptor returns a UnaryServerInterceptor that can be registered with an RPC server and // will record request counts / errors and latencies for that servers handlers -func (r RPCStatsInterceptor) Interceptor() grpc.UnaryServerInterceptor { +func (r *RPCStatsInterceptor) Interceptor() grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - method := info.FullMethod + labels := []string{info.FullMethod} // Increase the request count for the method and start the clock - r.handlerRequestCountMap.Add(method, 1) + r.ReqCount.Inc(labels...) startTime := r.timeSource.Now() defer func() { if rec := recover(); rec != nil { // If we reach here then the handler exited via panic, count it as a server failure - r.recordFailureLatency(method, startTime) + r.recordFailureLatency(labels, startTime) panic(rec) } }() // Invoke the actual operation - res, err := handler(ctx, req) + rsp, err := handler(ctx, req) // Record success / failure and latency if err != nil { - r.recordFailureLatency(method, startTime) + r.recordFailureLatency(labels, startTime) } else { latency := r.timeSource.Now().Sub(startTime) - - r.handlerRequestSucceededCountMap.Add(method, 1) - r.handlerRequestSucceededLatencyMap.Add(method, latency.Nanoseconds()/nanosToMillisDivisor) + r.ReqSuccessCount.Inc(labels...) + r.ReqSuccessLatency.Observe(float64(latency/time.Millisecond), labels...) } // Pass the result of the handler invocation back - return res, err + return rsp, err } } diff --git a/monitoring/rpc_stats_interceptor_test.go b/monitoring/rpc_stats_interceptor_test.go index bee042be2e..f8ed91b064 100644 --- a/monitoring/rpc_stats_interceptor_test.go +++ b/monitoring/rpc_stats_interceptor_test.go @@ -12,15 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -package monitoring +package monitoring_test import ( "errors" - "expvar" "fmt" "testing" "time" + "github.com/google/trillian/monitoring" "github.com/google/trillian/util" "golang.org/x/net/context" "google.golang.org/grpc" @@ -30,170 +30,151 @@ import ( var fakeTime = time.Date(2016, 10, 3, 12, 38, 27, 36, time.UTC) type recordingUnaryHandler struct { - called bool - ctx context.Context - req interface{} - resp interface{} - err error + // ctx and req are recorded on invocation + ctx context.Context + req interface{} + // rsp and err are returned on invocation + rsp interface{} + err error } func (r recordingUnaryHandler) handler() grpc.UnaryHandler { return func(ctx context.Context, req interface{}) (interface{}, error) { r.ctx = ctx r.req = req - - return r.resp, r.err + return r.rsp, r.err } } -type singleRequestTestCase struct { - name string - method string - handler recordingUnaryHandler - timeSource util.IncrementingFakeTimeSource - panics bool -} - -// This is an OK request with 500ms latency -var okRequest500 = singleRequestTestCase{name: "ok request", method: "getmethod", handler: recordingUnaryHandler{req: "OK", err: nil}, timeSource: util.IncrementingFakeTimeSource{BaseTime: fakeTime, Increments: []time.Duration{0, time.Millisecond * 500}}} - -// This is an errored request with 3000ms latency -var errorRequest3000 = singleRequestTestCase{name: "error request", method: "setmethod", handler: recordingUnaryHandler{err: errors.New("bang")}, timeSource: util.IncrementingFakeTimeSource{BaseTime: fakeTime, Increments: []time.Duration{0, time.Millisecond * 3000}}} - -// This request panics with 1500ms latency -var panicRequest1500 = singleRequestTestCase{name: "panic request", method: "getmethod", panics: true, handler: recordingUnaryHandler{req: "OK", err: nil}, timeSource: util.IncrementingFakeTimeSource{BaseTime: fakeTime, Increments: []time.Duration{0, time.Millisecond * 1500}}} +func TestSingleRequests(t *testing.T) { + var tests = []struct { + name string + method string + handler recordingUnaryHandler + timeSource util.IncrementingFakeTimeSource + }{ + // This is an OK request with 500ms latency + { + name: "ok_request", + method: "getmethod", + handler: recordingUnaryHandler{req: "OK", err: nil}, + timeSource: util.IncrementingFakeTimeSource{ + BaseTime: fakeTime, + Increments: []time.Duration{0, time.Millisecond * 500}, + }, + }, + // This is an errored request with 3000ms latency + { + name: "error_request", + method: "setmethod", + handler: recordingUnaryHandler{err: errors.New("bang")}, + timeSource: util.IncrementingFakeTimeSource{ + BaseTime: fakeTime, + Increments: []time.Duration{0, time.Millisecond * 3000}, + }, + }, + } + + for _, test := range tests { + prefix := fmt.Sprintf("test_%s", test.name) + stats := monitoring.NewRPCStatsInterceptor(&test.timeSource, prefix, monitoring.InertMetricFactory{}) + i := stats.Interceptor() + + // Invoke the test handler wrapped by the interceptor. + got, err := i(context.Background(), "wibble", &grpc.UnaryServerInfo{FullMethod: test.method}, test.handler.handler()) + + // Check the interceptor passed through the results. + if got != test.handler.rsp || (err != nil) != (test.handler.err != nil) { + t.Errorf("interceptor(%s)=%v,%v; want %v,%v", test.name, got, err, test.handler.rsp, test.handler.err) + } -var singleRequestTestCases = []singleRequestTestCase{okRequest500, errorRequest3000, panicRequest1500} + // Now check the resulting state of the metrics. + if got, want := stats.ReqCount.Value(test.method), 1.0; got != want { + t.Errorf("stats.ReqCount=%v; want %v", got, want) + } + wantLatency := float64(test.timeSource.Increments[1].Nanoseconds() / int64(time.Millisecond)) + wantErrors := 0.0 + wantSuccess := 0.0 + if test.handler.err == nil { + wantSuccess = 1.0 + } else { + wantErrors = 1.0 + } + if got := stats.ReqSuccessCount.Value(test.method); got != wantSuccess { + t.Errorf("stats.ReqSuccessCount=%v; want %v", got, wantSuccess) + } + if got := stats.ReqErrorCount.Value(test.method); got != wantErrors { + t.Errorf("stats.ReqErrorCount=%v; want %v", got, wantSuccess) + } -func TestSingleRequests(t *testing.T) { - for _, req := range singleRequestTestCases { - req.execute(t) + if gotCount, gotSum := stats.ReqSuccessLatency.Info(test.method); gotCount != uint64(wantSuccess) { + t.Errorf("stats.ReqSuccessLatency.Count=%v; want %v", gotCount, wantSuccess) + } else if gotSum != wantLatency*wantSuccess { + t.Errorf("stats.ReqSuccessLatency.Sum=%v; want %v", gotSum, wantLatency*wantSuccess) + } + if gotCount, gotSum := stats.ReqErrorLatency.Info(test.method); gotCount != uint64(wantErrors) { + t.Errorf("stats.ReqErrorLatency.Count=%v; want %v", gotCount, wantErrors) + } else if gotSum != wantLatency*wantErrors { + t.Errorf("stats.ReqErrorLatency.Sum=%v; want %v", gotSum, wantLatency*wantErrors) + } } } func TestMultipleOKRequestsTotalLatency(t *testing.T) { // We're going to make 3 requests so set up the time source appropriately - ts := util.IncrementingFakeTimeSource{BaseTime: fakeTime, Increments: []time.Duration{0, time.Millisecond * 500, 0, time.Millisecond * 2000, 0, time.Millisecond * 1337}} - handler := recordingUnaryHandler{resp: "OK", err: nil} - stats := NewRPCStatsInterceptor(&ts, "test", "test") + ts := util.IncrementingFakeTimeSource{ + BaseTime: fakeTime, + Increments: []time.Duration{ + 0, + time.Millisecond * 500, + 0, + time.Millisecond * 2000, + 0, + time.Millisecond * 1337, + }, + } + handler := recordingUnaryHandler{rsp: "OK", err: nil} + stats := monitoring.NewRPCStatsInterceptor(&ts, "test_multi_ok", monitoring.InertMetricFactory{}) i := stats.Interceptor() for r := 0; r < 3; r++ { - resp, err := i(context.Background(), "wibble", &grpc.UnaryServerInfo{FullMethod: "testmethod"}, handler.handler()) - if resp != "OK" || err != nil { - t.Fatal("request handler returned an error unexpectedly") + rsp, err := i(context.Background(), "wibble", &grpc.UnaryServerInfo{FullMethod: "testmethod"}, handler.handler()) + if rsp != "OK" || err != nil { + t.Fatalf("interceptor()=%v,%v; want 'OK',nil", rsp, err) } } - - if want, got := "3837", stats.handlerRequestSucceededLatencyMap.Get("testmethod").String(); want != got { - t.Fatalf("wanted total latency: %s but got: %s", want, got) - - } - if !testMapSizeIs(stats.handlerRequestFailedLatencyMap, 0) { - t.Fatal("incorrectly recorded success latency on errors") + count, sum := stats.ReqSuccessLatency.Info("testmethod") + if wantCount, wantSum := uint64(3), 3837.0; count != wantCount || sum != wantSum { + t.Errorf("stats.ReqSuccessLatency.Info=%v,%v; want %v,%v", count, sum, wantCount, wantSum) } } func TestMultipleErrorRequestsTotalLatency(t *testing.T) { // We're going to make 3 requests so set up the time source appropriately - ts := util.IncrementingFakeTimeSource{BaseTime: fakeTime, Increments: []time.Duration{0, time.Millisecond * 427, 0, time.Millisecond * 1066, 0, time.Millisecond * 1123}} - handler := recordingUnaryHandler{resp: "", err: errors.New("bang")} - stats := NewRPCStatsInterceptor(&ts, "test", "test") + ts := util.IncrementingFakeTimeSource{ + BaseTime: fakeTime, + Increments: []time.Duration{ + 0, + time.Millisecond * 427, + 0, + time.Millisecond * 1066, + 0, + time.Millisecond * 1123, + }, + } + handler := recordingUnaryHandler{rsp: "", err: errors.New("bang")} + stats := monitoring.NewRPCStatsInterceptor(&ts, "test_multi_err", monitoring.InertMetricFactory{}) i := stats.Interceptor() for r := 0; r < 3; r++ { _, err := i(context.Background(), "wibble", &grpc.UnaryServerInfo{FullMethod: "testmethod"}, handler.handler()) if err == nil { - t.Fatal("request handler did not return an error unexpectedly") + t.Fatalf("interceptor()=_,%v; want _,'bang'", err) } } - if want, got := "2616", stats.handlerRequestFailedLatencyMap.Get("testmethod").String(); want != got { - t.Fatalf("wanted total latency: %s but got: %s", want, got) + count, sum := stats.ReqErrorLatency.Info("testmethod") + if wantCount, wantSum := uint64(3), 2616.0; count != wantCount || sum != wantSum { + t.Errorf("stats.ReqSuccessLatency.Info=%v,%v; want %v,%v", count, sum, wantCount, wantSum) } - - if !testMapSizeIs(stats.handlerRequestSucceededLatencyMap, 0) { - t.Fatal("incorrectly recorded success latency on errors") - } -} - -func (s singleRequestTestCase) execute(t *testing.T) { - stats := NewRPCStatsInterceptor(&s.timeSource, "test", "test") - i := stats.Interceptor() - resp, err := i(context.Background(), "wibble", &grpc.UnaryServerInfo{FullMethod: s.method}, s.handler.handler()) - - // These checks are the that the stats interceptor called the handler and correctly forwarded the - // result and error returned by the wrapped request handler - if got, want := s.handler.resp, resp; got != want { - t.Fatalf("%s: Got result: %v but wanted: %v", s.name, got, want) - } - - if (err != nil && s.handler.err == nil) || (err == nil && s.handler.err != nil) { - t.Fatalf("%s: Error status was incorrect: %v got %v", s.name, s.handler.err, err) - } - - // Now check the resulting state of the stats maps - - // Because we only made a single request there should only be one recorded (with either success or - // failure depending on the error status and the other maps should count zero for the method - if stats.handlerRequestCountMap.Get(s.method).String() != "1" { - t.Fatalf("%s: Expected one request for method but got: %v", s.name, stats.handlerRequestCountMap.Get(s.method)) - } - - expectedTotalLatency := s.timeSource.Increments[1].Nanoseconds() / nanosToMillisDivisor - - var expectOneMap *expvar.Map - var expectZeroMap *expvar.Map - var expectLatencyMap *expvar.Map - var expectNoLatencyMap *expvar.Map - var logComment string - - if err == nil { - // Request should have been a success - expectOneMap = stats.handlerRequestSucceededCountMap - expectZeroMap = stats.handlerRequestErrorCountMap - expectLatencyMap = stats.handlerRequestSucceededLatencyMap - expectNoLatencyMap = stats.handlerRequestFailedLatencyMap - logComment = "ok" - } else { - // Request should be recorded as failed - expectOneMap = stats.handlerRequestErrorCountMap - expectZeroMap = stats.handlerRequestSucceededCountMap - expectLatencyMap = stats.handlerRequestFailedLatencyMap - expectNoLatencyMap = stats.handlerRequestSucceededLatencyMap - logComment = "error" - } - - // There should only be one key in the expected map and request count map - if !testMapSizeIs(expectOneMap, 1) || !testMapSizeIs(expectZeroMap, 0) || !testMapSizeIs(stats.handlerRequestCountMap, 1) { - t.Fatalf("%s: Map key counts are incorrect", s.name) - } - - if !testMapSizeIs(expectLatencyMap, 1) || !testMapSizeIs(expectNoLatencyMap, 0) { - t.Fatalf("%s: Latency map key counts are incorrect", s.name) - } - - if expectOneMap.Get(s.method).String() != "1" { - t.Fatalf("%s: Expected one %s request for method but got: %v", s.name, logComment, expectOneMap.Get(s.method)) - } - - if expectZeroMap.Get(s.method) != nil { - t.Fatalf("%s: Expected zero %s request for method but got: %v", s.name, logComment, expectZeroMap.Get(s.method)) - } - - if expectLatencyMap.Get(s.method).String() != fmt.Sprintf("%d", expectedTotalLatency) { - t.Fatalf("%s: Expected %s latency: %v but got: %v", s.name, logComment, expectedTotalLatency, expectLatencyMap.Get(s.method)) - } - if expectNoLatencyMap.Get(s.method) != nil { - t.Fatalf("%s: Expected %s latency: nil but got: %v", s.name, logComment, expectNoLatencyMap.Get(s.method)) - } -} - -func testMapSizeIs(mapVar *expvar.Map, expectedSize int) bool { - keys := 0 - mapVar.Do(func(expvar.KeyValue) { - keys++ - }) - - return keys == expectedSize } diff --git a/monitoring/testonly/metrics.go b/monitoring/testonly/metrics.go new file mode 100644 index 0000000000..6199ed7c67 --- /dev/null +++ b/monitoring/testonly/metrics.go @@ -0,0 +1,138 @@ +// Copyright 2017 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testonly + +import ( + "testing" + + "github.com/google/trillian/monitoring" +) + +// TestCounter runs a test on a Counter produced from the provided MetricFactory. +func TestCounter(t *testing.T, factory monitoring.MetricFactory) { + var tests = []struct { + name string + labelNames []string + labelVals []string + }{ + { + name: "counter0", + labelNames: nil, + labelVals: nil, + }, + { + name: "counter1", + labelNames: []string{"key1"}, + labelVals: []string{"val1"}, + }, + { + name: "counter2", + labelNames: []string{"key1", "key2"}, + labelVals: []string{"val1", "val2"}, + }, + } + for _, test := range tests { + counter := factory.NewCounter("test_"+test.name, "Test only", test.labelNames...) + if got, want := counter.Value(test.labelVals...), 0.0; got != want { + t.Errorf("Counter(test_%s)[%v].Value()=%v; want %v", test.name, test.labelVals, got, want) + } + counter.Inc(test.labelVals...) + if got, want := counter.Value(test.labelVals...), 1.0; got != want { + t.Errorf("Counter(test_%s)[%v].Value()=%v; want %v", test.name, test.labelVals, got, want) + } + counter.Add(2.5, test.labelVals...) + if got, want := counter.Value(test.labelVals...), 3.5; got != want { + t.Errorf("Counter(test_%s)[%v].Value()=%v; want %v", test.name, test.labelVals, got, want) + } + } +} + +// TestGauge runs a test on a Gauge produced from the provided MetricFactory. +func TestGauge(t *testing.T, factory monitoring.MetricFactory) { + var tests = []struct { + name string + labelNames []string + labelVals []string + }{ + { + name: "gauge0", + labelNames: nil, + labelVals: nil, + }, + { + name: "gauge1", + labelNames: []string{"key1"}, + labelVals: []string{"val1"}, + }, + { + name: "gauge2", + labelNames: []string{"key1", "key2"}, + labelVals: []string{"val1", "val2"}, + }, + } + for _, test := range tests { + gauge := factory.NewGauge("test_"+test.name, "Test only", test.labelNames...) + if got, want := gauge.Value(test.labelVals...), 0.0; got != want { + t.Errorf("Gauge(test_%s)[%v].Value()=%v; want %v", test.name, test.labelVals, got, want) + } + gauge.Inc(test.labelVals...) + if got, want := gauge.Value(test.labelVals...), 1.0; got != want { + t.Errorf("Gauge(test_%s)[%v].Value()=%v; want %v", test.name, test.labelVals, got, want) + } + gauge.Add(2.5, test.labelVals...) + if got, want := gauge.Value(test.labelVals...), 3.5; got != want { + t.Errorf("Gauge(test_%s)[%v].Value()=%v; want %v", test.name, test.labelVals, got, want) + } + } +} + +// TestHistogram runs a test on a Histogram produced from the provided MetricFactory. +func TestHistogram(t *testing.T, factory monitoring.MetricFactory) { + var tests = []struct { + name string + labelNames []string + labelVals []string + }{ + { + name: "histogram0", + labelNames: nil, + labelVals: nil, + }, + { + name: "histogram1", + labelNames: []string{"key1"}, + labelVals: []string{"val1"}, + }, + { + name: "histogram2", + labelNames: []string{"key1", "key2"}, + labelVals: []string{"val1", "val2"}, + }, + } + for _, test := range tests { + histogram := factory.NewHistogram("test_"+test.name, "Test only", test.labelNames...) + gotCount, gotSum := histogram.Info(test.labelVals...) + if wantCount, wantSum := uint64(0), 0.0; gotCount != wantCount || gotSum != wantSum { + t.Errorf("Gauge(test_%s)[%v].Value()=%v,%v; want %v,%v", test.name, test.labelVals, gotCount, gotSum, wantCount, wantSum) + } + histogram.Observe(1.0, test.labelVals...) + histogram.Observe(2.0, test.labelVals...) + histogram.Observe(3.0, test.labelVals...) + gotCount, gotSum = histogram.Info(test.labelVals...) + if wantCount, wantSum := uint64(3), 6.0; gotCount != wantCount || gotSum != wantSum { + t.Errorf("Gauge(test_%s)[%v].Value()=%v,%v; want %v,%v", test.name, test.labelVals, gotCount, gotSum, wantCount, wantSum) + } + } +} diff --git a/server/log_rpc_server.go b/server/log_rpc_server.go index 0e31efb4eb..625cf92018 100644 --- a/server/log_rpc_server.go +++ b/server/log_rpc_server.go @@ -19,6 +19,7 @@ import ( "github.com/google/trillian" "github.com/google/trillian/extension" "github.com/google/trillian/merkle" + "github.com/google/trillian/monitoring" "github.com/google/trillian/storage" "github.com/google/trillian/trees" "github.com/google/trillian/util" @@ -35,15 +36,25 @@ const proofMaxBitLen = 64 // TrillianLogRPCServer implements the RPC API defined in the proto type TrillianLogRPCServer struct { - registry extension.Registry - timeSource util.TimeSource + registry extension.Registry + timeSource util.TimeSource + leafCounter monitoring.Counter } // NewTrillianLogRPCServer creates a new RPC server backed by a LogStorageProvider. func NewTrillianLogRPCServer(registry extension.Registry, timeSource util.TimeSource) *TrillianLogRPCServer { + mf := registry.MetricFactory + if mf == nil { + mf = monitoring.InertMetricFactory{} + } return &TrillianLogRPCServer{ registry: registry, timeSource: timeSource, + leafCounter: mf.NewCounter( + "queued_leaves", + "Number of leaves requested to be queued", + "status", + ), } } @@ -112,10 +123,12 @@ func (t *TrillianLogRPCServer) QueueLeaves(ctx context.Context, req *trillian.Qu Status: status.Newf(codes.AlreadyExists, "Leaf already exists: %v", existingLeaf.LeafIdentityHash).Proto(), } queuedLeaves = append(queuedLeaves, &queuedLeaf) + t.leafCounter.Inc("existing") } else { // Return the leaf from the request if it is new. queuedLeaf := trillian.QueuedLogLeaf{Leaf: req.Leaves[i]} queuedLeaves = append(queuedLeaves, &queuedLeaf) + t.leafCounter.Inc("new") } } return &trillian.QueueLeavesResponse{QueuedLeaves: queuedLeaves}, nil diff --git a/server/main.go b/server/main.go index 15352bf27c..28a20144dd 100644 --- a/server/main.go +++ b/server/main.go @@ -17,19 +17,22 @@ package server import ( "context" "database/sql" - "expvar" "net" "net/http" + "strings" "time" + "github.com/coreos/etcd/clientv3" + etcdnaming "github.com/coreos/etcd/clientv3/naming" "github.com/golang/glog" "github.com/google/trillian" "github.com/google/trillian/extension" - "github.com/google/trillian/monitoring/metric" "github.com/google/trillian/server/admin" "github.com/google/trillian/util" "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" + "google.golang.org/grpc/naming" "google.golang.org/grpc/reflection" ) @@ -44,8 +47,7 @@ type Main struct { // RegisterHandlerFn is called to register REST-proxy handlers. RegisterHandlerFn func(context.Context, *runtime.ServeMux, string, []grpc.DialOption) error // RegisterServerFn is called to register RPC servers. - RegisterServerFn func(*grpc.Server, extension.Registry) error - DumpMetricsInterval time.Duration + RegisterServerFn func(*grpc.Server, extension.Registry) error } // Run starts the configured server. Blocks until the server exits. @@ -55,10 +57,6 @@ func (m *Main) Run(ctx context.Context) error { defer m.Server.GracefulStop() defer m.DB.Close() - if m.DumpMetricsInterval > 0 { - go metric.DumpToLog(ctx, m.DumpMetricsInterval) - } - if err := m.RegisterServerFn(m.Server, m.Registry); err != nil { return err } @@ -75,10 +73,11 @@ func (m *Main) Run(ctx context.Context) error { return err } glog.Infof("HTTP server starting on %v", endpoint) + go http.ListenAndServe(endpoint, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { switch { - case req.RequestURI == "/debug/vars": - expvar.Handler().ServeHTTP(w, req) + case req.RequestURI == "/metrics": + promhttp.Handler().ServeHTTP(w, req) default: mux.ServeHTTP(w, req) } @@ -104,3 +103,27 @@ func (m *Main) Run(ctx context.Context) error { return nil } + +// AnnounceSelf announces this binary's presence to etcd. Returns a function that +// should be called on process exit. +func AnnounceSelf(ctx context.Context, etcdServers, etcdService, endpoint string) func() { + if len(etcdServers) == 0 { + return nil + } + cfg := clientv3.Config{Endpoints: strings.Split(etcdServers, ","), DialTimeout: 5 * time.Second} + client, err := clientv3.New(cfg) + if err != nil { + glog.Exitf("Failed to connect to etcd at %v: %v", etcdServers, err) + } + res := etcdnaming.GRPCResolver{Client: client} + + update := naming.Update{Op: naming.Add, Addr: endpoint} + res.Update(ctx, etcdService, update) + glog.Infof("Announcing our presence in %v with %+v", etcdService, update) + + bye := naming.Update{Op: naming.Delete, Addr: endpoint} + return func() { + glog.Infof("Removing our presence in %v with %+v", etcdService, update) + res.Update(ctx, etcdService, bye) + } +} diff --git a/server/trillian_log_server/main.go b/server/trillian_log_server/main.go index 2b4167509d..1fbc077b03 100644 --- a/server/trillian_log_server/main.go +++ b/server/trillian_log_server/main.go @@ -20,19 +20,16 @@ import ( "context" "flag" _ "net/http/pprof" - "strings" - "time" _ "github.com/go-sql-driver/mysql" // Load MySQL driver - "github.com/coreos/etcd/clientv3" - etcdnaming "github.com/coreos/etcd/clientv3/naming" "github.com/golang/glog" "github.com/google/trillian" "github.com/google/trillian/cmd" "github.com/google/trillian/crypto/keys" "github.com/google/trillian/extension" "github.com/google/trillian/monitoring" + "github.com/google/trillian/monitoring/prometheus" mysqlq "github.com/google/trillian/quota/mysql" "github.com/google/trillian/server" "github.com/google/trillian/server/interceptor" @@ -40,17 +37,16 @@ import ( "github.com/google/trillian/util" "github.com/grpc-ecosystem/grpc-gateway/runtime" "google.golang.org/grpc" - "google.golang.org/grpc/naming" ) var ( - mySQLURI = flag.String("mysql_uri", "test:zaphod@tcp(127.0.0.1:3306)/test", "Connection URI for MySQL database") - rpcEndpoint = flag.String("rpc_endpoint", "localhost:8090", "Endpoint for RPC requests (host:port)") - httpEndpoint = flag.String("http_endpoint", "localhost:8091", "Endpoint for HTTP metrics and REST requests on (host:port, empty means disabled)") - dumpMetricsInterval = flag.Duration("dump_metrics_interval", 0, "If greater than 0, how often to dump metrics to the logs.") - etcdServers = flag.String("etcd_servers", "", "A comma-separated list of etcd servers; no etcd registration if empty") - etcdService = flag.String("etcd_service", "trillian-log", "Service name to announce ourselves under") - maxUnsequencedRows = flag.Int("max_unsequenced_rows", mysqlq.DefaultMaxUnsequenced, "Max number of unsequenced rows before rate limiting kicks in") + mySQLURI = flag.String("mysql_uri", "test:zaphod@tcp(127.0.0.1:3306)/test", "Connection URI for MySQL database") + rpcEndpoint = flag.String("rpc_endpoint", "localhost:8090", "Endpoint for RPC requests (host:port)") + httpEndpoint = flag.String("http_endpoint", "localhost:8091", "Endpoint for HTTP metrics and REST requests on (host:port, empty means disabled)") + etcdServers = flag.String("etcd_servers", "", "A comma-separated list of etcd servers; no etcd registration if empty") + etcdService = flag.String("etcd_service", "trillian-logserver", "Service name to announce ourselves under") + etcdHTTPService = flag.String("etcd_http_service", "trillian-logserver-http", "Service name to announce our HTTP endpoint under") + maxUnsequencedRows = flag.Int("max_unsequenced_rows", mysqlq.DefaultMaxUnsequenced, "Max number of unsequenced rows before rate limiting kicks in") configFile = flag.String("config", "", "Config file containing flags, file contents can be overridden by command line flags") ) @@ -73,20 +69,16 @@ func main() { } // No defer: database ownership is delegated to server.Main - if len(*etcdServers) > 0 { - // Announce ourselves to etcd if so configured. - cfg := clientv3.Config{Endpoints: strings.Split(*etcdServers, ","), DialTimeout: 5 * time.Second} - client, err := clientv3.New(cfg) - if err != nil { - glog.Exitf("Failed to connect to etcd at %v: %v", *etcdServers, err) + // Announce our endpoints to etcd if so configured. + unannounce := server.AnnounceSelf(ctx, *etcdServers, *etcdService, *rpcEndpoint) + if unannounce != nil { + defer unannounce() + } + if *httpEndpoint != "" { + unannounceHTTP := server.AnnounceSelf(ctx, *etcdServers, *etcdHTTPService, *httpEndpoint) + if unannounceHTTP != nil { + defer unannounceHTTP() } - res := etcdnaming.GRPCResolver{Client: client} - - update := naming.Update{Op: naming.Add, Addr: *rpcEndpoint} - res.Update(ctx, *etcdService, update) - glog.Infof("Announcing our presence to %v with %+v", *etcdService, update) - bye := naming.Update{Op: naming.Delete, Addr: *rpcEndpoint} - defer res.Update(ctx, *etcdService, bye) } registry := extension.Registry{ @@ -94,11 +86,11 @@ func main() { SignerFactory: keys.PEMSignerFactory{}, LogStorage: mysql.NewLogStorage(db), QuotaManager: &mysqlq.QuotaManager{DB: db, MaxUnsequencedRows: *maxUnsequencedRows}, + MetricFactory: prometheus.MetricFactory{}, } ts := util.SystemTimeSource{} - stats := monitoring.NewRPCStatsInterceptor(ts, "ct", "example") - stats.Publish() + stats := monitoring.NewRPCStatsInterceptor(ts, "log", registry.MetricFactory) ti := &interceptor.TrillianInterceptor{ Admin: registry.AdminStorage, QuotaManager: registry.QuotaManager, @@ -124,7 +116,6 @@ func main() { trillian.RegisterTrillianLogServer(s, logServer) return err }, - DumpMetricsInterval: *dumpMetricsInterval, } if err := m.Run(ctx); err != nil { diff --git a/server/trillian_log_signer/main.go b/server/trillian_log_signer/main.go index eff5641cb0..8551987002 100644 --- a/server/trillian_log_signer/main.go +++ b/server/trillian_log_signer/main.go @@ -18,21 +18,22 @@ package main import ( "flag" "fmt" + "net/http" "os" "time" _ "github.com/go-sql-driver/mysql" // Load MySQL driver - "github.com/golang/glog" "github.com/google/trillian/cmd" "github.com/google/trillian/crypto/keys" "github.com/google/trillian/extension" - "github.com/google/trillian/monitoring/metric" + "github.com/google/trillian/monitoring/prometheus" "github.com/google/trillian/quota" "github.com/google/trillian/server" "github.com/google/trillian/storage/mysql" "github.com/google/trillian/util" "github.com/google/trillian/util/etcd" + "github.com/prometheus/client_golang/prometheus/promhttp" "golang.org/x/net/context" ) @@ -43,9 +44,9 @@ var ( batchSizeFlag = flag.Int("batch_size", 50, "Max number of leaves to process per batch") numSeqFlag = flag.Int("num_sequencers", 10, "Number of sequencer workers to run in parallel") sequencerGuardWindowFlag = flag.Duration("sequencer_guard_window", 0, "If set, the time elapsed before submitted leaves are eligible for sequencing") - dumpMetricsInterval = flag.Duration("dump_metrics_interval", 0, "If greater than 0, how often to dump metrics to the logs.") forceMaster = flag.Bool("force_master", false, "If true, assume master for all logs") - etcdServers = flag.String("etcd_servers", "localhost:2379", "A comma-separated list of etcd servers") + etcdServers = flag.String("etcd_servers", "", "A comma-separated list of etcd servers") + etcdHTTPService = flag.String("etcd_http_service", "trillian-logsigner-http", "Service name to announce our HTTP endpoint under") lockDir = flag.String("lock_file_path", "/test/multimaster", "etcd lock file directory path") preElectionPause = flag.Duration("pre_election_pause", 1*time.Second, "Maximum time to wait before starting elections") @@ -68,12 +69,6 @@ func main() { glog.CopyStandardLogTo("WARNING") glog.Info("**** Log Signer Starting ****") - // Enable dumping of metrics to the log at regular interval, - // if requested. - if *dumpMetricsInterval > 0 { - go metric.DumpToLog(context.Background(), *dumpMetricsInterval) - } - // First make sure we can access the database, quit if not db, err := mysql.OpenDB(*mySQLURI) if err != nil { @@ -100,11 +95,19 @@ func main() { LogStorage: mysql.NewLogStorage(db), ElectionFactory: electionFactory, QuotaManager: quota.Noop(), + MetricFactory: prometheus.MetricFactory{}, } // Start HTTP server (optional) if *httpEndpoint != "" { + // Announce our endpoint to etcd if so configured. + unannounceHTTP := server.AnnounceSelf(ctx, *etcdServers, *etcdHTTPService, *httpEndpoint) + if unannounceHTTP != nil { + defer unannounceHTTP() + } + glog.Infof("Creating HTTP server starting on %v", *httpEndpoint) + http.Handle("/metrics", promhttp.Handler()) if err := util.StartHTTPServer(*httpEndpoint); err != nil { glog.Exitf("Failed to start HTTP server on %v: %v", *httpEndpoint, err) } diff --git a/storage/memory/log_storage.go b/storage/memory/log_storage.go index 8c4847b63c..4be91a8047 100644 --- a/storage/memory/log_storage.go +++ b/storage/memory/log_storage.go @@ -21,23 +21,38 @@ import ( "errors" "fmt" "math" + "strconv" + "sync" "time" "github.com/google/btree" "github.com/google/trillian" - "github.com/google/trillian/monitoring/metric" + "github.com/google/trillian/monitoring" + "github.com/google/trillian/monitoring/prometheus" "github.com/google/trillian/storage" "github.com/google/trillian/storage/cache" "github.com/google/trillian/trees" ) +const logIDLabel = "logid" + var ( defaultLogStrata = []int{8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8} - queuedCounter = metric.NewCounter("mem_queued_leaves") - dequeuedCounter = metric.NewCounter("mem_dequeued_leaves") + once sync.Once + queuedCounter monitoring.Counter + dequeuedCounter monitoring.Counter ) +func createMetrics(mf monitoring.MetricFactory) { + queuedCounter = mf.NewCounter("mem_queued_leaves", "Number of leaves queued", logIDLabel) + dequeuedCounter = mf.NewCounter("mem_dequeued_leaves", "Number of leaves dequeued", logIDLabel) +} + +func labelForTX(t *logTreeTX) string { + return strconv.FormatInt(t.treeID, 10) +} + func unseqKey(treeID int64) btree.Item { return &kv{k: fmt.Sprintf("/%d/unseq", treeID)} } @@ -109,6 +124,10 @@ func (t *readOnlyLogTX) GetActiveLogIDsWithPendingWork(ctx context.Context) ([]i } func (m *memoryLogStorage) beginInternal(ctx context.Context, treeID int64, readonly bool) (storage.LogTreeTX, error) { + once.Do(func() { + // TODO(drysdale): this should come from the registry rather than hard-coding use of Prometheus + createMetrics(prometheus.MetricFactory{}) + }) tree, err := trees.GetTree( ctx, m.admin, @@ -180,6 +199,7 @@ func (t *logTreeTX) DequeueLeaves(ctx context.Context, limit int, cutoffTime tim e = e.Next() } + dequeuedCounter.Add(float64(len(leaves)), labelForTX(t)) return leaves, nil } @@ -190,6 +210,7 @@ func (t *logTreeTX) QueueLeaves(ctx context.Context, leaves []*trillian.LogLeaf, return nil, fmt.Errorf("queued leaf must have a leaf ID hash of length %d", t.hashSizeBytes) } } + queuedCounter.Add(float64(len(leaves)), labelForTX(t)) // No deduping in this storage! k := unseqKey(t.treeID) q := t.tx.Get(k).(*kv).v.(*list.List) diff --git a/storage/mysql/log_storage.go b/storage/mysql/log_storage.go index 53ddaae308..0459ed5ed1 100644 --- a/storage/mysql/log_storage.go +++ b/storage/mysql/log_storage.go @@ -21,6 +21,8 @@ import ( "errors" "fmt" "sort" + "strconv" + "sync" "time" "github.com/go-sql-driver/mysql" @@ -28,7 +30,8 @@ import ( "github.com/golang/protobuf/proto" "github.com/google/trillian" spb "github.com/google/trillian/crypto/sigpb" - "github.com/google/trillian/monitoring/metric" + "github.com/google/trillian/monitoring" + "github.com/google/trillian/monitoring/prometheus" "github.com/google/trillian/storage" "github.com/google/trillian/storage/cache" "github.com/google/trillian/trees" @@ -78,15 +81,27 @@ const ( // Error code returned by driver when inserting a duplicate row errNumDuplicate = 1062 + + logIDLabel = "logid" ) var ( defaultLogStrata = []int{8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8} - queuedCounter = metric.NewCounter("mysql_queued_leaves") - dequeuedCounter = metric.NewCounter("mysql_dequeued_leaves") + once sync.Once + queuedCounter monitoring.Counter + dequeuedCounter monitoring.Counter ) +func createMetrics(mf monitoring.MetricFactory) { + queuedCounter = mf.NewCounter("mysql_queued_leaves", "Number of leaves queued", logIDLabel) + dequeuedCounter = mf.NewCounter("mysql_dequeued_leaves", "Number of leaves dequeued", logIDLabel) +} + +func labelForTX(t *logTreeTX) string { + return strconv.FormatInt(t.treeID, 10) +} + type mySQLLogStorage struct { *mySQLTreeStorage admin storage.AdminStorage @@ -191,6 +206,10 @@ func (t *readOnlyLogTX) GetActiveLogIDsWithPendingWork(ctx context.Context) ([]i } func (m *mySQLLogStorage) beginInternal(ctx context.Context, treeID int64, readonly bool) (storage.LogTreeTX, error) { + once.Do(func() { + // TODO(drysdale): this should come from the registry rather than hard-coding use of Prometheus + createMetrics(prometheus.MetricFactory{}) + }) tree, err := trees.GetTree( ctx, m.admin, @@ -317,7 +336,7 @@ func (t *logTreeTX) DequeueLeaves(ctx context.Context, limit int, cutoffTime tim return nil, err } - dequeuedCounter.Add(int64(len(leaves))) + dequeuedCounter.Add(float64(len(leaves)), labelForTX(t)) return leaves, nil } @@ -368,9 +387,9 @@ func (t *logTreeTX) QueueLeaves(ctx context.Context, leaves []*trillian.LogLeaf, return nil, fmt.Errorf("Unsequenced: %v", err) } } + queuedCounter.Add(float64(len(leaves)), labelForTX(t)) if existingCount == 0 { - queuedCounter.Add(int64(len(leaves))) return existingLeaves, nil } @@ -406,7 +425,6 @@ func (t *logTreeTX) QueueLeaves(ctx context.Context, leaves []*trillian.LogLeaf, } } - queuedCounter.Add(int64(len(leaves))) return existingLeaves, nil }