Skip to content

Add Prometheus metrics exporter#59

Merged
gmr merged 7 commits into
mainfrom
prometheus-exporter
Apr 2, 2026
Merged

Add Prometheus metrics exporter#59
gmr merged 7 commits into
mainfrom
prometheus-exporter

Conversation

@gmr
Copy link
Copy Markdown
Owner

@gmr gmr commented Apr 1, 2026

Summary

Adds a Prometheus-compatible HTTP metrics endpoint as an alternative to statsd, available via rejected[prometheus].

Problem

The only metrics export option was statsd, which requires a statsd daemon. Prometheus is the industry standard for cloud-native monitoring and scrapes metrics directly — no intermediary needed.

Solution

rejected/prometheus.py — thin module wrapping prometheus_client:

  • Creates labeled Gauge metrics on first start() call
  • update(stats) sets metric values from the MCP's aggregated stats dict

Metrics exposed (all labeled by consumer name):

Metric Type Description
rejected_messages_processed_total Gauge Cumulative messages processed
rejected_messages_failed_total Gauge Cumulative errors
rejected_messages_redelivered_total Gauge Cumulative redeliveries
rejected_consumer_processes Gauge Active process count

The HTTP server runs in the MCP parent process. Metrics are updated after each poll cycle from the existing stats aggregation — no changes to the child process data flow.

Config:

Application:
  stats:
    prometheus:
      enabled: true
      port: 9090

Fully inert when prometheus_client is not installed.

Impact

No changes to existing behavior. New optional extra rejected[prometheus] (prometheus-client>=0.20).

All 246 tests pass.

Summary by CodeRabbit

  • New Features
    • Optional Prometheus metrics support (disabled by default) with configurable port (default 9090); exports per-consumer stats, per-process counters, processing durations, message ages, and custom counters/gauges.
    • Stats payloads now include duration, message-age, and custom metric buckets for richer observability.
  • Chores
    • Added an optional Prometheus client dependency to enable the metrics integration.

Exposes per-consumer metrics via an HTTP endpoint for Prometheus to
scrape, as an alternative to statsd. Requires rejected[prometheus].

Metrics exposed (labeled by consumer name):
- rejected_messages_processed_total — cumulative messages processed
- rejected_messages_failed_total — cumulative errors
- rejected_messages_redelivered_total — cumulative redeliveries
- rejected_consumer_processes — active process count

The HTTP server runs in the MCP parent process. Metrics are updated
from the existing poll-based stats aggregation — no changes to the
child process data flow.

Config:
  stats:
    prometheus:
      enabled: true
      port: 9090

Fully inert when prometheus_client is not installed.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 1, 2026

Warning

Rate limit exceeded

@gmr has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 3 minutes and 12 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 3 minutes and 12 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5b39aa4e-d3bb-48d8-84fe-c64ca3cfc288

📥 Commits

Reviewing files that changed from the base of the PR and between a910c45 and a13b1cf.

📒 Files selected for processing (2)
  • docs/configuration.md
  • docs/index.md
📝 Walkthrough

Walkthrough

Adds optional Prometheus metrics: configuration and optional dependency, a new rejected.prometheus exporter, per-message observation collection in Process, and MCP wiring to start the exporter, forward observations, and push periodic metric updates.

Changes

Cohort / File(s) Summary
Configuration & metadata
example.yaml, pyproject.toml
Added Application.stats.prometheus block (enabled: False, port: 9090) and an optional prometheus dependency → prometheus-client>=0.20.
Stats configuration model
rejected/config.py
Added PrometheusConfig (enabled: bool = False, port: int = 9090) and added prometheus: PrometheusConfig to StatsConfig with a default_factory.
MCP integration
rejected/mcp.py
Imported rejected.prometheus; forwards per-message observations from collect_results() to prometheus.observe(...), calls prometheus.update(self.stats), conditionally starts exporter in run() when enabled, and expanded consumer counters initialization.
Prometheus exporter module
rejected/prometheus.py
New module implementing start(port), observe(...), and update(stats); optional prometheus_client import, server start guard, per-consumer Counters/Gauges/Histograms, custom metric helpers, and delta-based counter updates.
Per-process observations
rejected/process.py
Added internal buffers for durations, message ages, and custom metrics; new _collect_custom_measurements() call on processed messages; report_stats() now returns and clears durations, message_ages, custom_durations, custom_counters, custom_gauges.

Sequence Diagram(s)

sequenceDiagram
  participant Config as Config (StatsConfig)
  participant MCP as MCP
  participant Proc as Process
  participant PromMod as rejected.prometheus
  participant PromClient as prometheus_client

  MCP->>Config: read stats.prometheus
  alt prometheus enabled
    MCP->>PromMod: start(port)
    PromMod->>PromClient: start_http_server(port)
  end

  loop per-message
    Proc->>Proc: collect durations, ages, custom metrics
    Proc->>MCP: report_stats() (includes observations)
    MCP->>PromMod: observe(consumer, durations, ages, custom_*)
    PromMod->>PromClient: observe / inc / set
  end

  loop periodic polling
    MCP->>MCP: collect_results() -> recompute self.stats
    MCP->>PromMod: update(self.stats)
    PromMod->>PromClient: inc Counters / set Gauges
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰 I hopped through logs and measured beats,

Timings tucked in tiny treats,
On port nine-oh-nine I hum and cheer,
Counting hops both far and near,
Metrics sprout where carrots meet.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Add Prometheus metrics exporter' directly and accurately summarizes the main change: introducing a new Prometheus metrics exporter module as a new optional feature to the codebase.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch prometheus-exporter

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@gmr
Copy link
Copy Markdown
Owner Author

gmr commented Apr 1, 2026

@CodeRabbit review now

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 1, 2026

@gmr Sure! I'll start reviewing the PR now.

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
rejected/prometheus.py (1)

44-63: Consider using Counters instead of Gauges for cumulative metrics, or remove the _total suffix from Gauge names.

The metrics rejected_messages_processed_total, rejected_messages_failed_total, and rejected_messages_redelivered_total have the _total suffix, which per Prometheus naming conventions is reserved for Counters. However, they're implemented as Gauges.

Since the code uses .set() to apply absolute values from MCP's aggregated stats (not incremental deltas), using Gauge is pragmatically simpler. In this case, consider removing the _total suffix to avoid confusion (e.g., rejected_messages_processed). Alternatively, if you want to keep the _total semantics, switch to Counter with .inc() pattern—though this would require summing deltas from MCP data rather than setting absolute values.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rejected/prometheus.py` around lines 44 - 63, The three cumulative metrics
are defined as Gauges but use the "_total" suffix reserved for Counters; update
the metric names to remove the "_total" suffix since the code uses .set() with
absolute values—change the Gauge instantiations for _messages_processed,
_messages_failed, and _messages_redelivered to use names like
"rejected_messages_processed", "rejected_messages_failed", and
"rejected_messages_redelivered" (leave _consumer_processes as-is), or
alternatively convert these three variables to prometheus_client.Counter and
switch the code that updates them from .set() to .inc() with computed deltas;
adjust any downstream references to the old metric names accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@rejected/prometheus.py`:
- Around line 44-63: The three cumulative metrics are defined as Gauges but use
the "_total" suffix reserved for Counters; update the metric names to remove the
"_total" suffix since the code uses .set() with absolute values—change the Gauge
instantiations for _messages_processed, _messages_failed, and
_messages_redelivered to use names like "rejected_messages_processed",
"rejected_messages_failed", and "rejected_messages_redelivered" (leave
_consumer_processes as-is), or alternatively convert these three variables to
prometheus_client.Counter and switch the code that updates them from .set() to
.inc() with computed deltas; adjust any downstream references to the old metric
names accordingly.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 92182a93-af27-4feb-bf7b-a7ced1a289c4

📥 Commits

Reviewing files that changed from the base of the PR and between 133907e and 01c3acc.

📒 Files selected for processing (5)
  • example.yaml
  • pyproject.toml
  • rejected/config.py
  • rejected/mcp.py
  • rejected/prometheus.py

gmr and others added 3 commits April 1, 2026 19:48
processed, failed, redelivered are monotonically increasing — they
should be Prometheus Counters (with the _total suffix convention),
not Gauges. Tracks previous poll values and increments by the delta
each cycle. consumer_processes remains a Gauge.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Expanded consumer_stats_counter() in MCP to aggregate all counter keys
from child processes, not just processed/failed/redelivered. All data
was already flowing through the stats_queue — it just wasn't being
aggregated.

Prometheus metrics now exposed:

Counters (labeled by consumer):
  rejected_messages_acked_total
  rejected_messages_dropped_total
  rejected_messages_failed_total
  rejected_messages_nacked_total
  rejected_messages_processed_total
  rejected_messages_redelivered_total
  rejected_messages_requeued_total
  rejected_processing_seconds_total

  rejected_exceptions_total (labeled by consumer + type)
    type: consumer_exception, message_exception,
          processing_exception, unhandled_exception

Gauges (labeled by consumer):
  rejected_consumer_processes

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…etrics

Per-message timing observations now flow from child processes to the MCP
and into Prometheus Histograms:

  rejected_processing_duration_seconds — per-message processing time
  rejected_message_age_seconds — age of messages at processing time

Child processes accumulate individual observations between poll cycles
(in _duration_observations and _message_age_observations lists), include
them in report_stats(), and drain them after each poll.

Consumer ad-hoc stats (stats_add_duration, stats_incr, stats_set_value,
stats_track_duration) are also forwarded:

  rejected_custom_{key}_seconds — Histogram for custom durations
  rejected_custom_{key}_total — Counter for custom increments
  rejected_custom_{key} — Gauge for custom values

These are created dynamically as new keys appear, with metric names
sanitized to valid Prometheus identifiers.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rejected/prometheus.py`:
- Around line 238-244: The loop in prometheus reporting treats custom_counters
values as cumulative and computes a delta against _custom_counter_prev; instead,
custom_counters already contains per-interval sums, so remove the delta logic
and previous-value bookkeeping: for each (key, value) in custom_counters, if
value > 0 call
_get_custom_counter(key).labels(consumer=consumer_name).inc(value) directly and
delete any writes/reads to _custom_counter_prev (and remove the
_custom_counter_prev dict if unused elsewhere) to fix incorrect negative deltas;
keep using consumer_name and _get_custom_counter as-is.
- Around line 125-158: The current code registers Prometheus metric objects into
the default registry before calling prometheus_client.start_http_server(port),
which leaves those metrics permanently registered if start_http_server raises
(e.g., port in use) and causes duplicated timeseries on retry; fix by attempting
to start the HTTP server first (call prometheus_client.start_http_server(port)
inside a try/except), only create and assign entries into the _metrics dict and
set _started = True after the start succeeds (or alternatively use a dedicated
CollectorRegistry and pass it to metric constructors so you can discard the
registry on failure); reference _metrics, start_http_server, _started and the
prometheus_client.Counter/Gauge/Histogram constructors when making the change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 42fc36f8-caa7-4db1-9818-f014c3eeacdd

📥 Commits

Reviewing files that changed from the base of the PR and between 26bfd79 and be252c2.

📒 Files selected for processing (3)
  • rejected/mcp.py
  • rejected/process.py
  • rejected/prometheus.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • rejected/mcp.py

Comment thread rejected/prometheus.py Outdated
Comment thread rejected/prometheus.py Outdated
Start the HTTP server before registering metrics so that a failed
bind does not leave orphan metrics in the default registry, which
would cause duplicated-timeseries errors on retry.

Remove incorrect delta bookkeeping for custom counters -- the values
from process.Process.report_stats() are already per-interval sums
(the buffer is cleared after each report), so they should be
incremented directly.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@gmr
Copy link
Copy Markdown
Owner Author

gmr commented Apr 2, 2026

🤖 This comment was posted by Claude on behalf of @gmr

PR Monitor Summary

Response to CodeRabbit's Review

  • Metric registration ordering (prometheus.py:158, Minor): Moved start_http_server() before metric registration so a failed bind (e.g., port in use) does not leave orphan metrics in the default registry, avoiding ValueError: Duplicated timeseries on retry. Thread resolved.

  • Custom counter delta logic (prometheus.py:244, Major): Removed incorrect delta bookkeeping for custom counters. The values from process.Process.report_stats() are already per-interval sums (the _custom_counters buffer is cleared after each report call), so they should be incremented directly rather than computing deltas against a previous cumulative value. Also removed the now-unused _custom_counter_prev module-level dict. Thread resolved.

All 246 existing tests pass after these changes.

configuration.md:
- Remove InfluxDB stats section (removed in prior PR)
- Remove Daemon section (always foreground now)
- Add prometheus stats config section with options table
- Add schema_uri_format to consumer config table
- Add full Prometheus Metrics reference: built-in counters,
  histograms, gauges, and custom metric patterns

index.md:
- Replace "statsd and InfluxDB" with "statsd and/or Prometheus"
- List all optional extras: avro, html, msgpack, prometheus, sentry

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@gmr gmr merged commit 8478c39 into main Apr 2, 2026
5 checks passed
@gmr gmr deleted the prometheus-exporter branch April 2, 2026 01:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant