From 425e5e3a69f6e23401de6a3a291921a29f7dab1a Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 19 Sep 2023 14:01:39 +0800 Subject: [PATCH 01/18] refactor prometheus layer --- core/src/layers/prometheus.rs | 254 +++++++++++----------------------- 1 file changed, 78 insertions(+), 176 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 2558bb50b618..2a2b849cfe3c 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -21,6 +21,7 @@ use std::io; use std::sync::Arc; use std::task::Context; use std::task::Poll; +use std::time::Instant; use async_trait::async_trait; use bytes::Bytes; @@ -168,6 +169,27 @@ impl PrometheusMetrics { kind.into_static() ); } + + #[inline] + fn increment_request_total(&self, scheme: &str, op: Operation) { + self.requests_total + .with_label_values(&[scheme, op.into_static()]) + .inc(); + } + + #[inline] + fn observe_bytes_total(&self, scheme: &str, op: Operation, bytes: usize) { + self.bytes_total + .with_label_values(&[scheme, op.into_static()]) + .observe(bytes as f64); + } + + #[inline] + fn observe_request_duration(&self, scheme: &str, op: Operation, duration: std::time::Duration) { + self.requests_duration_seconds + .with_label_values(&[scheme, op.into_static()]) + .observe(duration.as_secs_f64()); + } } #[derive(Clone)] @@ -200,19 +222,12 @@ impl LayeredAccessor for PrometheusAccessor { } async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { - self.stats - .requests_total - .with_label_values(&[&self.scheme]) - .inc(); + self.stats.increment_request_total(&self.scheme, Operation::CreateDir); - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::CreateDir.into_static()]) - .start_timer(); + let start_time = Instant::now(); let create_res = self.inner.create_dir(path, args).await; - timer.observe_duration(); + self.stats.observe_request_duration(&self.scheme, Operation::CreateDir, start_time.elapsed()); create_res.map_err(|e| { self.stats .increment_errors_total(Operation::CreateDir, e.kind()); @@ -221,26 +236,15 @@ impl LayeredAccessor for PrometheusAccessor { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) - .inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) - .start_timer(); + self.stats.increment_request_total(&self.scheme, Operation::Read); + let start_time = Instant::now(); let read_res = self .inner .read(path, args) .map(|v| { v.map(|(rp, r)| { - self.stats - .bytes_total - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) - .observe(rp.metadata().content_length() as f64); + self.stats.observe_bytes_total(&self.scheme, Operation::Read, rp.metadata().content_length() as usize); ( rp, PrometheusMetricWrapper::new( @@ -253,7 +257,8 @@ impl LayeredAccessor for PrometheusAccessor { }) }) .await; - timer.observe_duration(); + self.stats.observe_request_duration(&self.scheme, Operation::Read, start_time.elapsed()); + read_res.map_err(|e| { self.stats.increment_errors_total(Operation::Read, e.kind()); e @@ -261,16 +266,8 @@ impl LayeredAccessor for PrometheusAccessor { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::Write.into_static()]) - .inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::Write.into_static()]) - .start_timer(); + self.stats.increment_request_total(&self.scheme, Operation::Write); + let start_time = Instant::now(); let write_res = self .inner @@ -289,7 +286,8 @@ impl LayeredAccessor for PrometheusAccessor { }) }) .await; - timer.observe_duration(); + + self.stats.observe_request_duration(&self.scheme, Operation::Write, start_time.elapsed()); write_res.map_err(|e| { self.stats .increment_errors_total(Operation::Write, e.kind()); @@ -298,15 +296,8 @@ impl LayeredAccessor for PrometheusAccessor { } async fn stat(&self, path: &str, args: OpStat) -> Result { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::Stat.into_static()]) - .inc(); - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::Stat.into_static()]) - .start_timer(); + self.stats.increment_request_total(&self.scheme, Operation::Stat); + let start_time = Instant::now(); let stat_res = self .inner @@ -315,7 +306,8 @@ impl LayeredAccessor for PrometheusAccessor { self.stats.increment_errors_total(Operation::Stat, e.kind()); }) .await; - timer.observe_duration(); + + self.stats.observe_request_duration(&self.scheme, Operation::Stat, start_time.elapsed()); stat_res.map_err(|e| { self.stats.increment_errors_total(Operation::Stat, e.kind()); e @@ -323,19 +315,13 @@ impl LayeredAccessor for PrometheusAccessor { } async fn delete(&self, path: &str, args: OpDelete) -> Result { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::Stat.into_static()]) - .inc(); + self.stats.increment_request_total(&self.scheme, Operation::Delete); + let start_time = Instant::now(); - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::Stat.into_static()]) - .start_timer(); let delete_res = self.inner.delete(path, args).await; - timer.observe_duration(); + + self.stats.observe_request_duration(&self.scheme, Operation::Delete, start_time.elapsed()); delete_res.map_err(|e| { self.stats .increment_errors_total(Operation::Delete, e.kind()); @@ -344,20 +330,12 @@ impl LayeredAccessor for PrometheusAccessor { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::List.into_static()]) - .inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::List.into_static()]) - .start_timer(); + self.stats.increment_request_total(&self.scheme, Operation::List); + let start_time = Intant::now(); let list_res = self.inner.list(path, args).await; - timer.observe_duration(); + self.stats.observe_request_duration(&self.scheme, Operation::List, start_time.elapsed()); list_res.map_err(|e| { self.stats.increment_errors_total(Operation::List, e.kind()); e @@ -365,19 +343,12 @@ impl LayeredAccessor for PrometheusAccessor { } async fn batch(&self, args: OpBatch) -> Result { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::Batch.into_static()]) - .inc(); + self.stats.increment_request_total(&self.scheme, Operation::Batch); + let start_time = Instant::now(); - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::Batch.into_static()]) - .start_timer(); let result = self.inner.batch(args).await; - timer.observe_duration(); + self.stats.observe_request_duration(&self.scheme, Operation::Batch, start_time.elapsed()); result.map_err(|e| { self.stats .increment_errors_total(Operation::Batch, e.kind()); @@ -386,19 +357,12 @@ impl LayeredAccessor for PrometheusAccessor { } async fn presign(&self, path: &str, args: OpPresign) -> Result { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::Presign.into_static()]) - .inc(); + self.stats.increment_request_total(&self.scheme, Operation::Presign); + let start_time = Instant::now(); - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::Presign.into_static()]) - .start_timer(); let result = self.inner.presign(path, args).await; - timer.observe_duration(); + self.stats.observe_request_duration(&self.scheme, Operation::Presign, start_time.elapsed()); result.map_err(|e| { self.stats .increment_errors_total(Operation::Presign, e.kind()); @@ -407,20 +371,12 @@ impl LayeredAccessor for PrometheusAccessor { } fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::BlockingCreateDir.into_static()]) - .inc(); + self.stats.increment_request_total(&self.scheme, Operation::BlockingCreateDir); + let start_time = Instant::now(); - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::BlockingCreateDir.into_static()]) - .start_timer(); let result = self.inner.blocking_create_dir(path, args); - timer.observe_duration(); - + self.stats.observe_request_duration(&self.scheme, Operation::BlockingCreateDir, start_time.elapsed()); result.map_err(|e| { self.stats .increment_errors_total(Operation::BlockingCreateDir, e.kind()); @@ -429,21 +385,11 @@ impl LayeredAccessor for PrometheusAccessor { } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) - .inc(); + self.stats.increment_request_total(&self.scheme, Operation::BlockingRead); + let start_time = Instant::now(); - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&[&self.scheme]) - .start_timer(); let result = self.inner.blocking_read(path, args).map(|(rp, r)| { - self.stats - .bytes_total - .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) - .observe(rp.metadata().content_length() as f64); + self.stats.observe_bytes_total(&self.scheme, Operation::BlockingRead, rp.metadata().content_length() as usize); ( rp, PrometheusMetricWrapper::new( @@ -454,7 +400,8 @@ impl LayeredAccessor for PrometheusAccessor { ), ) }); - timer.observe_duration(); + + self.stats.observe_request_duration(&self.scheme, Operation::BlockingRead, start_time.elapsed()); result.map_err(|e| { self.stats .increment_errors_total(Operation::BlockingRead, e.kind()); @@ -463,16 +410,9 @@ impl LayeredAccessor for PrometheusAccessor { } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) - .inc(); + self.stats.increment_request_total(&self.scheme, Operation::BlockingWrite); + let start_time = Instant::now(); - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) - .start_timer(); let result = self.inner.blocking_write(path, args).map(|(rp, r)| { ( rp, @@ -484,7 +424,8 @@ impl LayeredAccessor for PrometheusAccessor { ), ) }); - timer.observe_duration(); + + self.stats.observe_request_duration(&self.scheme, Operation::BlockingWrite, start_time.elapsed()); result.map_err(|e| { self.stats .increment_errors_total(Operation::BlockingWrite, e.kind()); @@ -493,18 +434,11 @@ impl LayeredAccessor for PrometheusAccessor { } fn blocking_stat(&self, path: &str, args: OpStat) -> Result { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::BlockingStat.into_static()]) - .inc(); + self.stats.increment_request_total(&self.scheme, Operation::BlockingStat); + let start_time = Instant::now(); - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::BlockingStat.into_static()]) - .start_timer(); let result = self.inner.blocking_stat(path, args); - timer.observe_duration(); + self.stats.observe_request_duration(&self.scheme, Operation::BlockingStat, start_time.elapsed()); result.map_err(|e| { self.stats .increment_errors_total(Operation::BlockingStat, e.kind()); @@ -513,19 +447,12 @@ impl LayeredAccessor for PrometheusAccessor { } fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::BlockingDelete.into_static()]) - .inc(); + self.stats.increment_request_total(&self.scheme, Operation::BlockingDelete); + let start_time = Instant::now(); - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::BlockingDelete.into_static()]) - .start_timer(); let result = self.inner.blocking_delete(path, args); - timer.observe_duration(); + self.stats.observe_request_duration(&self.scheme, Operation::BlockingDelete, start_time.elapsed()); result.map_err(|e| { self.stats .increment_errors_total(Operation::BlockingDelete, e.kind()); @@ -534,19 +461,12 @@ impl LayeredAccessor for PrometheusAccessor { } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::BlockingList.into_static()]) - .inc(); + self.stats.increment_request_total(&self.scheme, Operation::BlockingList); + let start_time = Instant::now(); - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::BlockingList.into_static()]) - .start_timer(); let result = self.inner.blocking_list(path, args); - timer.observe_duration(); + self.stats.observe_request_duration(&self.scheme, Operation::BlockingList, start_time.elapsed()); result.map_err(|e| { self.stats .increment_errors_total(Operation::BlockingList, e.kind()); @@ -578,10 +498,7 @@ impl oio::Read for PrometheusMetricWrapper { fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { self.inner.poll_read(cx, buf).map(|res| match res { Ok(bytes) => { - self.stats - .bytes_total - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) - .observe(bytes as f64); + self.stats.observe_bytes_total(&self.scheme, self.op, bytes); Ok(bytes) } Err(e) => { @@ -604,10 +521,7 @@ impl oio::Read for PrometheusMetricWrapper { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { self.inner.poll_next(cx).map(|res| match res { Some(Ok(bytes)) => { - self.stats - .bytes_total - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) - .observe(bytes.len() as f64); + self.stats.observe_bytes_total(&self.scheme, self.op, bytes.len()); Some(Ok(bytes)) } Some(Err(e)) => { @@ -624,10 +538,7 @@ impl oio::BlockingRead for PrometheusMetricWrapper { self.inner .read(buf) .map(|n| { - self.stats - .bytes_total - .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) - .observe(n as f64); + self.stats.observe_bytes_total(&self.scheme, self.op, n); n }) .map_err(|e| { @@ -646,10 +557,7 @@ impl oio::BlockingRead for PrometheusMetricWrapper { fn next(&mut self) -> Option> { self.inner.next().map(|res| match res { Ok(bytes) => { - self.stats - .bytes_total - .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) - .observe(bytes.len() as f64); + self.stats.observe_bytes_total(&self.scheme, self.op, n); Ok(bytes) } Err(e) => { @@ -666,10 +574,7 @@ impl oio::Write for PrometheusMetricWrapper { self.inner .poll_write(cx, bs) .map_ok(|n| { - self.stats - .bytes_total - .with_label_values(&[&self.scheme, Operation::Write.into_static()]) - .observe(n as f64); + self.stats.observe_bytes_total(&self.scheme, self.op, n); n }) .map_err(|err| { @@ -698,10 +603,7 @@ impl oio::BlockingWrite for PrometheusMetricWrapper { self.inner .write(bs) .map(|n| { - self.stats - .bytes_total - .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) - .observe(n as f64); + self.stats.observe_bytes_total(&self.scheme, self.op, n); n }) .map_err(|err| { From b97f6b371f7f6b95ef47c4c852a9b98770f9975f Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 19 Sep 2023 14:04:36 +0800 Subject: [PATCH 02/18] add prometheus-client to deps --- Cargo.lock | 30 ++++++++++++++++++++++++++++++ core/Cargo.toml | 4 ++++ 2 files changed, 34 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 24bc8a76fc5a..10e8741c7006 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1572,6 +1572,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ea835d29036a4087793836fa931b08837ad5e957da9e23886b29586fb9b6650" +[[package]] +name = "dtoa" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbb2bf8e87535c23f7a8a321e364ce21462d0ff10cb6407820e8e96dfff6653" + [[package]] name = "either" version = "1.8.1" @@ -3580,6 +3586,7 @@ dependencies = [ "pin-project", "pretty_assertions", "prometheus", + "prometheus-client", "prost", "quick-xml", "rand 0.8.5", @@ -4493,6 +4500,29 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prometheus-client" +version = "0.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c99afa9a01501019ac3a14d71d9f94050346f55ca471ce90c799a15c58f61e2" +dependencies = [ + "dtoa", + "itoa", + "parking_lot 0.12.1", + "prometheus-client-derive-encode", +] + +[[package]] +name = "prometheus-client-derive-encode" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.23", +] + [[package]] name = "prost" version = "0.11.9" diff --git a/core/Cargo.toml b/core/Cargo.toml index 33232299ddf6..fff7e04e36eb 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -100,6 +100,9 @@ layers-await-tree = ["dep:await-tree"] # Enable layers async-backtrace support. layers-async-backtrace = ["dep:async-backtrace"] +# Enable prometheus-client instead of prometheus-rs. +use-prometheus-client = ["dep:prometheus-client"] + services-atomicserver = ["dep:atomic_lib"] services-azblob = [ "dep:sha2", @@ -243,6 +246,7 @@ percent-encoding = "2" persy = { version = "1.4.4", optional = true } pin-project = "1" prometheus = { version = "0.13", features = ["process"], optional = true } +prometheus-client = { version = "0.21.2", optional = true } prost = { version = "0.11", optional = true } quick-xml = { version = "0.29", features = ["serialize", "overlapped-lists"] } rand = { version = "0.8", optional = true } From 5bb66928767dcbe916a5f51d548e72b44749315a Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 19 Sep 2023 14:10:28 +0800 Subject: [PATCH 03/18] chore: simplify imports --- core/src/layers/prometheus.rs | 36 +++++++++++++++-------------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 2a2b849cfe3c..0f76f0dab53a 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -28,14 +28,7 @@ use bytes::Bytes; use futures::FutureExt; use futures::TryFutureExt; use log::debug; -use prometheus::core::AtomicU64; -use prometheus::core::GenericCounterVec; -use prometheus::exponential_buckets; -use prometheus::histogram_opts; -use prometheus::register_histogram_vec_with_registry; -use prometheus::register_int_counter_vec_with_registry; -use prometheus::HistogramVec; -use prometheus::Registry; +use prometheus; use crate::raw::Accessor; use crate::raw::*; @@ -88,12 +81,12 @@ use crate::*; /// ``` #[derive(Default, Debug, Clone)] pub struct PrometheusLayer { - registry: Registry, + registry: prometheus::Registry, } impl PrometheusLayer { /// create PrometheusLayer by incoming registry. - pub fn with_registry(registry: Registry) -> Self { + pub fn with_registry(registry: prometheus::Registry) -> Self { Self { registry } } } @@ -112,44 +105,45 @@ impl Layer for PrometheusLayer { } } } + /// [`PrometheusMetrics`] provide the performance and IO metrics. #[derive(Debug)] pub struct PrometheusMetrics { /// Total times of the specific operation be called. - pub requests_total: GenericCounterVec, + pub requests_total: prometheus::core::GenericCounterVec, /// Latency of the specific operation be called. - pub requests_duration_seconds: HistogramVec, + pub requests_duration_seconds: prometheus::HistogramVec, /// Size of the specific metrics. - pub bytes_total: HistogramVec, + pub bytes_total: prometheus::HistogramVec, } impl PrometheusMetrics { /// new with prometheus register. - pub fn new(registry: Registry) -> Self { - let requests_total = register_int_counter_vec_with_registry!( + pub fn new(registry: prometheus::Registry) -> Self { + let requests_total = prometheus::register_int_counter_vec_with_registry!( "requests_total", "Total times of create be called", &["scheme", "operation"], registry ) .unwrap(); - let opts = histogram_opts!( + let opts = prometheus::histogram_opts!( "requests_duration_seconds", "Histogram of the time spent on specific operation", - exponential_buckets(0.01, 2.0, 16).unwrap() + prometheus::exponential_buckets(0.01, 2.0, 16).unwrap() ); let requests_duration_seconds = - register_histogram_vec_with_registry!(opts, &["scheme", "operation"], registry) + prometheus::register_histogram_vec_with_registry!(opts, &["scheme", "operation"], registry) .unwrap(); - let opts = histogram_opts!( + let opts = prometheus::histogram_opts!( "bytes_total", "Total size of ", - exponential_buckets(0.01, 2.0, 16).unwrap() + prometheus::exponential_buckets(0.01, 2.0, 16).unwrap() ); let bytes_total = - register_histogram_vec_with_registry!(opts, &["scheme", "operation"], registry) + prometheus::register_histogram_vec_with_registry!(opts, &["scheme", "operation"], registry) .unwrap(); Self { From 9e24e4e82ee4708a452e88e2285a3525c7ed1767 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 19 Sep 2023 14:22:29 +0800 Subject: [PATCH 04/18] refactor the metrics into a trait --- core/src/layers/prometheus.rs | 41 ++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 0f76f0dab53a..51281926d5b4 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -33,6 +33,7 @@ use prometheus; use crate::raw::Accessor; use crate::raw::*; use crate::*; + /// Add [prometheus](https://docs.rs/prometheus) for every operations. /// /// # Examples @@ -91,8 +92,8 @@ impl PrometheusLayer { } } -impl Layer for PrometheusLayer { - type LayeredAccessor = PrometheusAccessor; +impl Layer for PrometheusLayer { + type LayeredAccessor = PrometheusAccessor; fn layer(&self, inner: A) -> Self::LayeredAccessor { let meta = inner.info(); @@ -100,15 +101,23 @@ impl Layer for PrometheusLayer { PrometheusAccessor { inner, - stats: Arc::new(PrometheusMetrics::new(self.registry.clone())), + stats: Arc::new(PrometheusLibMetrics::new(self.registry.clone())), scheme: scheme.to_string(), } } } -/// [`PrometheusMetrics`] provide the performance and IO metrics. +/// [`LayerPrometheusMetrics`] is called on every operation in [`PrometheusAccessor`]. +trait PrometheusLayerMetrics { + fn increment_errors_total(&self, op: Operation, kind: ErrorKind); + fn increment_request_total(&self, scheme: &str, op: Operation); + fn observe_bytes_total(&self, scheme: &str, op: Operation, bytes: usize); + fn observe_request_duration(&self, scheme: &str, op: Operation, duration: std::time::Duration); +} + +/// [`PrometheusLibMetrics`] provide the performance and IO metrics with the `prometheus` crate. #[derive(Debug)] -pub struct PrometheusMetrics { +struct PrometheusLibMetrics { /// Total times of the specific operation be called. pub requests_total: prometheus::core::GenericCounterVec, /// Latency of the specific operation be called. @@ -117,7 +126,7 @@ pub struct PrometheusMetrics { pub bytes_total: prometheus::HistogramVec, } -impl PrometheusMetrics { +impl PrometheusLibMetrics { /// new with prometheus register. pub fn new(registry: prometheus::Registry) -> Self { let requests_total = prometheus::register_int_counter_vec_with_registry!( @@ -126,7 +135,7 @@ impl PrometheusMetrics { &["scheme", "operation"], registry ) - .unwrap(); + .unwrap(); let opts = prometheus::histogram_opts!( "requests_duration_seconds", "Histogram of the time spent on specific operation", @@ -152,10 +161,11 @@ impl PrometheusMetrics { bytes_total, } } +} +impl PrometheusLayerMetrics for PrometheusLibMetrics { /// error handling is the cold path, so we will not init error counters /// in advance. - #[inline] fn increment_errors_total(&self, op: Operation, kind: ErrorKind) { debug!( "Prometheus statistics metrics error, operation {} error {}", @@ -164,21 +174,18 @@ impl PrometheusMetrics { ); } - #[inline] fn increment_request_total(&self, scheme: &str, op: Operation) { self.requests_total .with_label_values(&[scheme, op.into_static()]) .inc(); } - #[inline] fn observe_bytes_total(&self, scheme: &str, op: Operation, bytes: usize) { self.bytes_total .with_label_values(&[scheme, op.into_static()]) .observe(bytes as f64); } - #[inline] fn observe_request_duration(&self, scheme: &str, op: Operation, duration: std::time::Duration) { self.requests_duration_seconds .with_label_values(&[scheme, op.into_static()]) @@ -187,13 +194,13 @@ impl PrometheusMetrics { } #[derive(Clone)] -pub struct PrometheusAccessor { +pub struct PrometheusAccessor { inner: A, - stats: Arc, + stats: Arc, scheme: String, } -impl Debug for PrometheusAccessor { +impl Debug for PrometheusAccessor { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("PrometheusAccessor") .field("inner", &self.inner) @@ -202,7 +209,7 @@ impl Debug for PrometheusAccessor { } #[async_trait] -impl LayeredAccessor for PrometheusAccessor { +impl LayeredAccessor for PrometheusAccessor { type Inner = A; type Reader = PrometheusMetricWrapper; type BlockingReader = PrometheusMetricWrapper; @@ -473,12 +480,12 @@ pub struct PrometheusMetricWrapper { inner: R, op: Operation, - stats: Arc, + stats: Arc, scheme: String, } impl PrometheusMetricWrapper { - fn new(inner: R, op: Operation, stats: Arc, scheme: &String) -> Self { + fn new(inner: R, op: Operation, stats: Arc, scheme: &String) -> Self { Self { inner, op, From 670408401123eb0ce0a8867afc797f0d401a7514 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 19 Sep 2023 14:50:31 +0800 Subject: [PATCH 05/18] feat: add implementation with prometheus-client --- core/src/layers/prometheus.rs | 93 ++++++++++++++++++++++++++++++++++- 1 file changed, 91 insertions(+), 2 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 51281926d5b4..8fae5f06a09f 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -28,7 +28,6 @@ use bytes::Bytes; use futures::FutureExt; use futures::TryFutureExt; use log::debug; -use prometheus; use crate::raw::Accessor; use crate::raw::*; @@ -82,14 +81,24 @@ use crate::*; /// ``` #[derive(Default, Debug, Clone)] pub struct PrometheusLayer { + #[cfg(not(feature = "use-prometheus-client"))] registry: prometheus::Registry, + + #[cfg(feature = "use-prometheus-client")] + registry: prometheus_client::registry::Registry, } impl PrometheusLayer { /// create PrometheusLayer by incoming registry. + #[cfg(not(feature = "use-prometheus-client"))] pub fn with_registry(registry: prometheus::Registry) -> Self { Self { registry } } + + #[cfg(feature = "use-prometheus-client")] + pub fn with_registry(registry: prometheus_client::registry::Registry) -> Self { + Self { registry } + } } impl Layer for PrometheusLayer { @@ -99,9 +108,15 @@ impl Layer for PrometheusLayer { let meta = inner.info(); let scheme = meta.scheme(); + #[cfg(not(feature = "use-prometheus-client"))] + let stats = Arc::new(PrometheusLibMetrics::new(self.registry.clone())); + + #[cfg(feature = "use-prometheus-client")] + let stats = Arc::new(PrometheusClientMetrics::new(self.registry.clone())); + PrometheusAccessor { inner, - stats: Arc::new(PrometheusLibMetrics::new(self.registry.clone())), + stats, scheme: scheme.to_string(), } } @@ -115,7 +130,79 @@ trait PrometheusLayerMetrics { fn observe_request_duration(&self, scheme: &str, op: Operation, duration: std::time::Duration); } +/// [`PrometheusClientMetrics`] provide the performance and IO metrics with the `prometheus-client` crate. +#[cfg(feature = "use-prometheus-client")] +#[derive(Debug)] +struct PrometheusClientMetrics { + /// Total counter of the specific operation be called. + requests_total: prometheus_client::metrics::family::Family, prometheus_client::metrics::counter::Counter>, + /// Latency of the specific operation be called. + request_duration_seconds: prometheus_client::metrics::family::Family, prometheus_client::metrics::histogram::Histogram>, + /// The histogram of bytes + bytes_histogram: prometheus_client::metrics::family::Family, prometheus_client::metrics::histogram::Histogram>, +} + +#[cfg(feature = "use-prometheus-client")] +impl PrometheusClientMetrics { + pub fn new(mut registry: prometheus_client::registry::Registry) -> Self { + let requests_total = prometheus_client::metrics::family::Family::default(); + let request_duration_seconds = prometheus_client::metrics::family::Family::new_with_constructor(|| { + let buckets = prometheus_client::metrics::histogram::exponential_buckets(0.01, 2.0, 16); + prometheus_client::metrics::histogram::Histogram::new(buckets) + }); + let bytes_histogram = prometheus_client::metrics::family::Family::new_with_constructor(|| { + let buckets = prometheus_client::metrics::histogram::exponential_buckets( 1.0, 2.0, 16); + prometheus_client::metrics::histogram::Histogram::new(buckets) + }); + + registry.register("requests_total", "", requests_total.clone()); + registry.register("request_duration_seconds", "", request_duration_seconds.clone()); + registry.register("bytes_histogram", "", bytes_histogram.clone()); + Self { + requests_total, + request_duration_seconds, + bytes_histogram, + } + } +} + +#[cfg(feature = "use-prometheus-client")] +impl PrometheusLayerMetrics for PrometheusClientMetrics { + fn increment_errors_total(&self, op: Operation, kind: ErrorKind) { + let labels = vec![ + ("operation", op.as_str()), + ("kind", kind.as_str()), + ]; + self.requests_total.get_or_create(&labels).inc(); + } + + fn increment_request_total(&self, scheme: &str, op: Operation) { + let labels = vec![ + ("scheme", scheme), + ("operation", op.as_str()), + ]; + self.requests_total.get_or_create(&labels).inc(); + } + + fn observe_bytes_total(&self, scheme: &str, op: Operation, bytes: usize) { + let labels = vec![ + ("scheme", scheme), + ("operation", op.as_str()), + ]; + self.bytes_histogram.get_or_create(&labels).observe(bytes as f64); + } + + fn observe_request_duration(&self, scheme: &str, op: Operation, duration: std::time::Duration) { + let labels = vec![ + ("scheme", scheme), + ("operation", op.as_str()), + ]; + self.request_duration_seconds.get_or_create(&labels).observe(duration.as_secs_f64()); + } +} + /// [`PrometheusLibMetrics`] provide the performance and IO metrics with the `prometheus` crate. +#[cfg(not(feature = "use-prometheus-client"))] #[derive(Debug)] struct PrometheusLibMetrics { /// Total times of the specific operation be called. @@ -126,6 +213,7 @@ struct PrometheusLibMetrics { pub bytes_total: prometheus::HistogramVec, } +#[cfg(not(feature = "use-prometheus-client"))] impl PrometheusLibMetrics { /// new with prometheus register. pub fn new(registry: prometheus::Registry) -> Self { @@ -163,6 +251,7 @@ impl PrometheusLibMetrics { } } +#[cfg(not(feature = "use-prometheus-client"))] impl PrometheusLayerMetrics for PrometheusLibMetrics { /// error handling is the cold path, so we will not init error counters /// in advance. From bc787733647d763512cfd37421e356d9d559793d Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 19 Sep 2023 15:12:00 +0800 Subject: [PATCH 06/18] fix: allow using different trait --- core/src/layers/prometheus.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 8fae5f06a09f..a6e8ada5a275 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -565,16 +565,16 @@ impl LayeredAccessor for PrometheusAcces } } -pub struct PrometheusMetricWrapper { +pub struct PrometheusMetricWrapper { inner: R, op: Operation, - stats: Arc, + stats: Arc, scheme: String, } -impl PrometheusMetricWrapper { - fn new(inner: R, op: Operation, stats: Arc, scheme: &String) -> Self { +impl PrometheusMetricWrapper { + fn new(inner: R, op: Operation, stats: Arc, scheme: &String) -> Self { Self { inner, op, @@ -584,7 +584,7 @@ impl PrometheusMetricWrapper { } } -impl oio::Read for PrometheusMetricWrapper { +impl oio::Read for PrometheusMetricWrapper { fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { self.inner.poll_read(cx, buf).map(|res| match res { Ok(bytes) => { @@ -623,7 +623,7 @@ impl oio::Read for PrometheusMetricWrapper { } } -impl oio::BlockingRead for PrometheusMetricWrapper { +impl oio::BlockingRead for PrometheusMetricWrapper { fn read(&mut self, buf: &mut [u8]) -> Result { self.inner .read(buf) @@ -659,7 +659,7 @@ impl oio::BlockingRead for PrometheusMetricWrapper { } #[async_trait] -impl oio::Write for PrometheusMetricWrapper { +impl oio::Write for PrometheusMetricWrapper { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll> { self.inner .poll_write(cx, bs) @@ -688,7 +688,7 @@ impl oio::Write for PrometheusMetricWrapper { } } -impl oio::BlockingWrite for PrometheusMetricWrapper { +impl oio::BlockingWrite for PrometheusMetricWrapper { fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result { self.inner .write(bs) From 21384f2edbfb37e6a0576117462c43a77a931c5a Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 19 Sep 2023 15:14:11 +0800 Subject: [PATCH 07/18] cargo fmt --- core/src/layers/prometheus.rs | 210 +++++++++++++++++++++++----------- 1 file changed, 141 insertions(+), 69 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index a6e8ada5a275..e4b3ac9747fc 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -135,28 +135,45 @@ trait PrometheusLayerMetrics { #[derive(Debug)] struct PrometheusClientMetrics { /// Total counter of the specific operation be called. - requests_total: prometheus_client::metrics::family::Family, prometheus_client::metrics::counter::Counter>, + requests_total: prometheus_client::metrics::family::Family< + Vec<(&'static str, &'static str)>, + prometheus_client::metrics::counter::Counter, + >, /// Latency of the specific operation be called. - request_duration_seconds: prometheus_client::metrics::family::Family, prometheus_client::metrics::histogram::Histogram>, + request_duration_seconds: prometheus_client::metrics::family::Family< + Vec<(&'static str, &'static str)>, + prometheus_client::metrics::histogram::Histogram, + >, /// The histogram of bytes - bytes_histogram: prometheus_client::metrics::family::Family, prometheus_client::metrics::histogram::Histogram>, + bytes_histogram: prometheus_client::metrics::family::Family< + Vec<(&'static str, &'static str)>, + prometheus_client::metrics::histogram::Histogram, + >, } #[cfg(feature = "use-prometheus-client")] impl PrometheusClientMetrics { pub fn new(mut registry: prometheus_client::registry::Registry) -> Self { let requests_total = prometheus_client::metrics::family::Family::default(); - let request_duration_seconds = prometheus_client::metrics::family::Family::new_with_constructor(|| { - let buckets = prometheus_client::metrics::histogram::exponential_buckets(0.01, 2.0, 16); - prometheus_client::metrics::histogram::Histogram::new(buckets) - }); - let bytes_histogram = prometheus_client::metrics::family::Family::new_with_constructor(|| { - let buckets = prometheus_client::metrics::histogram::exponential_buckets( 1.0, 2.0, 16); - prometheus_client::metrics::histogram::Histogram::new(buckets) - }); + let request_duration_seconds = + prometheus_client::metrics::family::Family::new_with_constructor(|| { + let buckets = + prometheus_client::metrics::histogram::exponential_buckets(0.01, 2.0, 16); + prometheus_client::metrics::histogram::Histogram::new(buckets) + }); + let bytes_histogram = + prometheus_client::metrics::family::Family::new_with_constructor(|| { + let buckets = + prometheus_client::metrics::histogram::exponential_buckets(1.0, 2.0, 16); + prometheus_client::metrics::histogram::Histogram::new(buckets) + }); registry.register("requests_total", "", requests_total.clone()); - registry.register("request_duration_seconds", "", request_duration_seconds.clone()); + registry.register( + "request_duration_seconds", + "", + request_duration_seconds.clone(), + ); registry.register("bytes_histogram", "", bytes_histogram.clone()); Self { requests_total, @@ -169,35 +186,27 @@ impl PrometheusClientMetrics { #[cfg(feature = "use-prometheus-client")] impl PrometheusLayerMetrics for PrometheusClientMetrics { fn increment_errors_total(&self, op: Operation, kind: ErrorKind) { - let labels = vec![ - ("operation", op.as_str()), - ("kind", kind.as_str()), - ]; + let labels = vec![("operation", op.as_str()), ("kind", kind.as_str())]; self.requests_total.get_or_create(&labels).inc(); } fn increment_request_total(&self, scheme: &str, op: Operation) { - let labels = vec![ - ("scheme", scheme), - ("operation", op.as_str()), - ]; + let labels = vec![("scheme", scheme), ("operation", op.as_str())]; self.requests_total.get_or_create(&labels).inc(); } fn observe_bytes_total(&self, scheme: &str, op: Operation, bytes: usize) { - let labels = vec![ - ("scheme", scheme), - ("operation", op.as_str()), - ]; - self.bytes_histogram.get_or_create(&labels).observe(bytes as f64); + let labels = vec![("scheme", scheme), ("operation", op.as_str())]; + self.bytes_histogram + .get_or_create(&labels) + .observe(bytes as f64); } fn observe_request_duration(&self, scheme: &str, op: Operation, duration: std::time::Duration) { - let labels = vec![ - ("scheme", scheme), - ("operation", op.as_str()), - ]; - self.request_duration_seconds.get_or_create(&labels).observe(duration.as_secs_f64()); + let labels = vec![("scheme", scheme), ("operation", op.as_str())]; + self.request_duration_seconds + .get_or_create(&labels) + .observe(duration.as_secs_f64()); } } @@ -223,25 +232,31 @@ impl PrometheusLibMetrics { &["scheme", "operation"], registry ) - .unwrap(); + .unwrap(); let opts = prometheus::histogram_opts!( "requests_duration_seconds", "Histogram of the time spent on specific operation", prometheus::exponential_buckets(0.01, 2.0, 16).unwrap() ); - let requests_duration_seconds = - prometheus::register_histogram_vec_with_registry!(opts, &["scheme", "operation"], registry) - .unwrap(); + let requests_duration_seconds = prometheus::register_histogram_vec_with_registry!( + opts, + &["scheme", "operation"], + registry + ) + .unwrap(); let opts = prometheus::histogram_opts!( "bytes_total", "Total size of ", prometheus::exponential_buckets(0.01, 2.0, 16).unwrap() ); - let bytes_total = - prometheus::register_histogram_vec_with_registry!(opts, &["scheme", "operation"], registry) - .unwrap(); + let bytes_total = prometheus::register_histogram_vec_with_registry!( + opts, + &["scheme", "operation"], + registry + ) + .unwrap(); Self { requests_total, @@ -312,12 +327,17 @@ impl LayeredAccessor for PrometheusAcces } async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { - self.stats.increment_request_total(&self.scheme, Operation::CreateDir); + self.stats + .increment_request_total(&self.scheme, Operation::CreateDir); let start_time = Instant::now(); let create_res = self.inner.create_dir(path, args).await; - self.stats.observe_request_duration(&self.scheme, Operation::CreateDir, start_time.elapsed()); + self.stats.observe_request_duration( + &self.scheme, + Operation::CreateDir, + start_time.elapsed(), + ); create_res.map_err(|e| { self.stats .increment_errors_total(Operation::CreateDir, e.kind()); @@ -326,7 +346,8 @@ impl LayeredAccessor for PrometheusAcces } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - self.stats.increment_request_total(&self.scheme, Operation::Read); + self.stats + .increment_request_total(&self.scheme, Operation::Read); let start_time = Instant::now(); let read_res = self @@ -334,7 +355,11 @@ impl LayeredAccessor for PrometheusAcces .read(path, args) .map(|v| { v.map(|(rp, r)| { - self.stats.observe_bytes_total(&self.scheme, Operation::Read, rp.metadata().content_length() as usize); + self.stats.observe_bytes_total( + &self.scheme, + Operation::Read, + rp.metadata().content_length() as usize, + ); ( rp, PrometheusMetricWrapper::new( @@ -347,7 +372,8 @@ impl LayeredAccessor for PrometheusAcces }) }) .await; - self.stats.observe_request_duration(&self.scheme, Operation::Read, start_time.elapsed()); + self.stats + .observe_request_duration(&self.scheme, Operation::Read, start_time.elapsed()); read_res.map_err(|e| { self.stats.increment_errors_total(Operation::Read, e.kind()); @@ -356,7 +382,8 @@ impl LayeredAccessor for PrometheusAcces } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - self.stats.increment_request_total(&self.scheme, Operation::Write); + self.stats + .increment_request_total(&self.scheme, Operation::Write); let start_time = Instant::now(); let write_res = self @@ -377,7 +404,8 @@ impl LayeredAccessor for PrometheusAcces }) .await; - self.stats.observe_request_duration(&self.scheme, Operation::Write, start_time.elapsed()); + self.stats + .observe_request_duration(&self.scheme, Operation::Write, start_time.elapsed()); write_res.map_err(|e| { self.stats .increment_errors_total(Operation::Write, e.kind()); @@ -386,7 +414,8 @@ impl LayeredAccessor for PrometheusAcces } async fn stat(&self, path: &str, args: OpStat) -> Result { - self.stats.increment_request_total(&self.scheme, Operation::Stat); + self.stats + .increment_request_total(&self.scheme, Operation::Stat); let start_time = Instant::now(); let stat_res = self @@ -397,7 +426,8 @@ impl LayeredAccessor for PrometheusAcces }) .await; - self.stats.observe_request_duration(&self.scheme, Operation::Stat, start_time.elapsed()); + self.stats + .observe_request_duration(&self.scheme, Operation::Stat, start_time.elapsed()); stat_res.map_err(|e| { self.stats.increment_errors_total(Operation::Stat, e.kind()); e @@ -405,13 +435,14 @@ impl LayeredAccessor for PrometheusAcces } async fn delete(&self, path: &str, args: OpDelete) -> Result { - self.stats.increment_request_total(&self.scheme, Operation::Delete); + self.stats + .increment_request_total(&self.scheme, Operation::Delete); let start_time = Instant::now(); - let delete_res = self.inner.delete(path, args).await; - self.stats.observe_request_duration(&self.scheme, Operation::Delete, start_time.elapsed()); + self.stats + .observe_request_duration(&self.scheme, Operation::Delete, start_time.elapsed()); delete_res.map_err(|e| { self.stats .increment_errors_total(Operation::Delete, e.kind()); @@ -420,12 +451,14 @@ impl LayeredAccessor for PrometheusAcces } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - self.stats.increment_request_total(&self.scheme, Operation::List); + self.stats + .increment_request_total(&self.scheme, Operation::List); let start_time = Intant::now(); let list_res = self.inner.list(path, args).await; - self.stats.observe_request_duration(&self.scheme, Operation::List, start_time.elapsed()); + self.stats + .observe_request_duration(&self.scheme, Operation::List, start_time.elapsed()); list_res.map_err(|e| { self.stats.increment_errors_total(Operation::List, e.kind()); e @@ -433,12 +466,14 @@ impl LayeredAccessor for PrometheusAcces } async fn batch(&self, args: OpBatch) -> Result { - self.stats.increment_request_total(&self.scheme, Operation::Batch); + self.stats + .increment_request_total(&self.scheme, Operation::Batch); let start_time = Instant::now(); let result = self.inner.batch(args).await; - self.stats.observe_request_duration(&self.scheme, Operation::Batch, start_time.elapsed()); + self.stats + .observe_request_duration(&self.scheme, Operation::Batch, start_time.elapsed()); result.map_err(|e| { self.stats .increment_errors_total(Operation::Batch, e.kind()); @@ -447,12 +482,14 @@ impl LayeredAccessor for PrometheusAcces } async fn presign(&self, path: &str, args: OpPresign) -> Result { - self.stats.increment_request_total(&self.scheme, Operation::Presign); + self.stats + .increment_request_total(&self.scheme, Operation::Presign); let start_time = Instant::now(); let result = self.inner.presign(path, args).await; - self.stats.observe_request_duration(&self.scheme, Operation::Presign, start_time.elapsed()); + self.stats + .observe_request_duration(&self.scheme, Operation::Presign, start_time.elapsed()); result.map_err(|e| { self.stats .increment_errors_total(Operation::Presign, e.kind()); @@ -461,12 +498,17 @@ impl LayeredAccessor for PrometheusAcces } fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { - self.stats.increment_request_total(&self.scheme, Operation::BlockingCreateDir); + self.stats + .increment_request_total(&self.scheme, Operation::BlockingCreateDir); let start_time = Instant::now(); let result = self.inner.blocking_create_dir(path, args); - self.stats.observe_request_duration(&self.scheme, Operation::BlockingCreateDir, start_time.elapsed()); + self.stats.observe_request_duration( + &self.scheme, + Operation::BlockingCreateDir, + start_time.elapsed(), + ); result.map_err(|e| { self.stats .increment_errors_total(Operation::BlockingCreateDir, e.kind()); @@ -475,11 +517,16 @@ impl LayeredAccessor for PrometheusAcces } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - self.stats.increment_request_total(&self.scheme, Operation::BlockingRead); + self.stats + .increment_request_total(&self.scheme, Operation::BlockingRead); let start_time = Instant::now(); let result = self.inner.blocking_read(path, args).map(|(rp, r)| { - self.stats.observe_bytes_total(&self.scheme, Operation::BlockingRead, rp.metadata().content_length() as usize); + self.stats.observe_bytes_total( + &self.scheme, + Operation::BlockingRead, + rp.metadata().content_length() as usize, + ); ( rp, PrometheusMetricWrapper::new( @@ -491,7 +538,11 @@ impl LayeredAccessor for PrometheusAcces ) }); - self.stats.observe_request_duration(&self.scheme, Operation::BlockingRead, start_time.elapsed()); + self.stats.observe_request_duration( + &self.scheme, + Operation::BlockingRead, + start_time.elapsed(), + ); result.map_err(|e| { self.stats .increment_errors_total(Operation::BlockingRead, e.kind()); @@ -500,7 +551,8 @@ impl LayeredAccessor for PrometheusAcces } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - self.stats.increment_request_total(&self.scheme, Operation::BlockingWrite); + self.stats + .increment_request_total(&self.scheme, Operation::BlockingWrite); let start_time = Instant::now(); let result = self.inner.blocking_write(path, args).map(|(rp, r)| { @@ -515,7 +567,11 @@ impl LayeredAccessor for PrometheusAcces ) }); - self.stats.observe_request_duration(&self.scheme, Operation::BlockingWrite, start_time.elapsed()); + self.stats.observe_request_duration( + &self.scheme, + Operation::BlockingWrite, + start_time.elapsed(), + ); result.map_err(|e| { self.stats .increment_errors_total(Operation::BlockingWrite, e.kind()); @@ -524,11 +580,16 @@ impl LayeredAccessor for PrometheusAcces } fn blocking_stat(&self, path: &str, args: OpStat) -> Result { - self.stats.increment_request_total(&self.scheme, Operation::BlockingStat); + self.stats + .increment_request_total(&self.scheme, Operation::BlockingStat); let start_time = Instant::now(); let result = self.inner.blocking_stat(path, args); - self.stats.observe_request_duration(&self.scheme, Operation::BlockingStat, start_time.elapsed()); + self.stats.observe_request_duration( + &self.scheme, + Operation::BlockingStat, + start_time.elapsed(), + ); result.map_err(|e| { self.stats .increment_errors_total(Operation::BlockingStat, e.kind()); @@ -537,12 +598,17 @@ impl LayeredAccessor for PrometheusAcces } fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { - self.stats.increment_request_total(&self.scheme, Operation::BlockingDelete); + self.stats + .increment_request_total(&self.scheme, Operation::BlockingDelete); let start_time = Instant::now(); let result = self.inner.blocking_delete(path, args); - self.stats.observe_request_duration(&self.scheme, Operation::BlockingDelete, start_time.elapsed()); + self.stats.observe_request_duration( + &self.scheme, + Operation::BlockingDelete, + start_time.elapsed(), + ); result.map_err(|e| { self.stats .increment_errors_total(Operation::BlockingDelete, e.kind()); @@ -551,12 +617,17 @@ impl LayeredAccessor for PrometheusAcces } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { - self.stats.increment_request_total(&self.scheme, Operation::BlockingList); + self.stats + .increment_request_total(&self.scheme, Operation::BlockingList); let start_time = Instant::now(); let result = self.inner.blocking_list(path, args); - self.stats.observe_request_duration(&self.scheme, Operation::BlockingList, start_time.elapsed()); + self.stats.observe_request_duration( + &self.scheme, + Operation::BlockingList, + start_time.elapsed(), + ); result.map_err(|e| { self.stats .increment_errors_total(Operation::BlockingList, e.kind()); @@ -611,7 +682,8 @@ impl oio::Read for PrometheusMetricWrapper { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { self.inner.poll_next(cx).map(|res| match res { Some(Ok(bytes)) => { - self.stats.observe_bytes_total(&self.scheme, self.op, bytes.len()); + self.stats + .observe_bytes_total(&self.scheme, self.op, bytes.len()); Some(Ok(bytes)) } Some(Err(e)) => { From 8d20c75206b44212b74fbe6c1cb436c5b00ab2a4 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 19 Sep 2023 15:42:21 +0800 Subject: [PATCH 08/18] refactor: add a seperate layer --- core/src/layers/prometheus.rs | 476 ++++++++----------- core/src/layers/prometheus_client.rs | 670 +++++++++++++++++++++++++++ 2 files changed, 876 insertions(+), 270 deletions(-) create mode 100644 core/src/layers/prometheus_client.rs diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index e4b3ac9747fc..2558bb50b618 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -21,18 +21,24 @@ use std::io; use std::sync::Arc; use std::task::Context; use std::task::Poll; -use std::time::Instant; use async_trait::async_trait; use bytes::Bytes; use futures::FutureExt; use futures::TryFutureExt; use log::debug; +use prometheus::core::AtomicU64; +use prometheus::core::GenericCounterVec; +use prometheus::exponential_buckets; +use prometheus::histogram_opts; +use prometheus::register_histogram_vec_with_registry; +use prometheus::register_int_counter_vec_with_registry; +use prometheus::HistogramVec; +use prometheus::Registry; use crate::raw::Accessor; use crate::raw::*; use crate::*; - /// Add [prometheus](https://docs.rs/prometheus) for every operations. /// /// # Examples @@ -81,182 +87,69 @@ use crate::*; /// ``` #[derive(Default, Debug, Clone)] pub struct PrometheusLayer { - #[cfg(not(feature = "use-prometheus-client"))] - registry: prometheus::Registry, - - #[cfg(feature = "use-prometheus-client")] - registry: prometheus_client::registry::Registry, + registry: Registry, } impl PrometheusLayer { /// create PrometheusLayer by incoming registry. - #[cfg(not(feature = "use-prometheus-client"))] - pub fn with_registry(registry: prometheus::Registry) -> Self { - Self { registry } - } - - #[cfg(feature = "use-prometheus-client")] - pub fn with_registry(registry: prometheus_client::registry::Registry) -> Self { + pub fn with_registry(registry: Registry) -> Self { Self { registry } } } -impl Layer for PrometheusLayer { - type LayeredAccessor = PrometheusAccessor; +impl Layer for PrometheusLayer { + type LayeredAccessor = PrometheusAccessor; fn layer(&self, inner: A) -> Self::LayeredAccessor { let meta = inner.info(); let scheme = meta.scheme(); - #[cfg(not(feature = "use-prometheus-client"))] - let stats = Arc::new(PrometheusLibMetrics::new(self.registry.clone())); - - #[cfg(feature = "use-prometheus-client")] - let stats = Arc::new(PrometheusClientMetrics::new(self.registry.clone())); - PrometheusAccessor { inner, - stats, + stats: Arc::new(PrometheusMetrics::new(self.registry.clone())), scheme: scheme.to_string(), } } } - -/// [`LayerPrometheusMetrics`] is called on every operation in [`PrometheusAccessor`]. -trait PrometheusLayerMetrics { - fn increment_errors_total(&self, op: Operation, kind: ErrorKind); - fn increment_request_total(&self, scheme: &str, op: Operation); - fn observe_bytes_total(&self, scheme: &str, op: Operation, bytes: usize); - fn observe_request_duration(&self, scheme: &str, op: Operation, duration: std::time::Duration); -} - -/// [`PrometheusClientMetrics`] provide the performance and IO metrics with the `prometheus-client` crate. -#[cfg(feature = "use-prometheus-client")] +/// [`PrometheusMetrics`] provide the performance and IO metrics. #[derive(Debug)] -struct PrometheusClientMetrics { - /// Total counter of the specific operation be called. - requests_total: prometheus_client::metrics::family::Family< - Vec<(&'static str, &'static str)>, - prometheus_client::metrics::counter::Counter, - >, - /// Latency of the specific operation be called. - request_duration_seconds: prometheus_client::metrics::family::Family< - Vec<(&'static str, &'static str)>, - prometheus_client::metrics::histogram::Histogram, - >, - /// The histogram of bytes - bytes_histogram: prometheus_client::metrics::family::Family< - Vec<(&'static str, &'static str)>, - prometheus_client::metrics::histogram::Histogram, - >, -} - -#[cfg(feature = "use-prometheus-client")] -impl PrometheusClientMetrics { - pub fn new(mut registry: prometheus_client::registry::Registry) -> Self { - let requests_total = prometheus_client::metrics::family::Family::default(); - let request_duration_seconds = - prometheus_client::metrics::family::Family::new_with_constructor(|| { - let buckets = - prometheus_client::metrics::histogram::exponential_buckets(0.01, 2.0, 16); - prometheus_client::metrics::histogram::Histogram::new(buckets) - }); - let bytes_histogram = - prometheus_client::metrics::family::Family::new_with_constructor(|| { - let buckets = - prometheus_client::metrics::histogram::exponential_buckets(1.0, 2.0, 16); - prometheus_client::metrics::histogram::Histogram::new(buckets) - }); - - registry.register("requests_total", "", requests_total.clone()); - registry.register( - "request_duration_seconds", - "", - request_duration_seconds.clone(), - ); - registry.register("bytes_histogram", "", bytes_histogram.clone()); - Self { - requests_total, - request_duration_seconds, - bytes_histogram, - } - } -} - -#[cfg(feature = "use-prometheus-client")] -impl PrometheusLayerMetrics for PrometheusClientMetrics { - fn increment_errors_total(&self, op: Operation, kind: ErrorKind) { - let labels = vec![("operation", op.as_str()), ("kind", kind.as_str())]; - self.requests_total.get_or_create(&labels).inc(); - } - - fn increment_request_total(&self, scheme: &str, op: Operation) { - let labels = vec![("scheme", scheme), ("operation", op.as_str())]; - self.requests_total.get_or_create(&labels).inc(); - } - - fn observe_bytes_total(&self, scheme: &str, op: Operation, bytes: usize) { - let labels = vec![("scheme", scheme), ("operation", op.as_str())]; - self.bytes_histogram - .get_or_create(&labels) - .observe(bytes as f64); - } - - fn observe_request_duration(&self, scheme: &str, op: Operation, duration: std::time::Duration) { - let labels = vec![("scheme", scheme), ("operation", op.as_str())]; - self.request_duration_seconds - .get_or_create(&labels) - .observe(duration.as_secs_f64()); - } -} - -/// [`PrometheusLibMetrics`] provide the performance and IO metrics with the `prometheus` crate. -#[cfg(not(feature = "use-prometheus-client"))] -#[derive(Debug)] -struct PrometheusLibMetrics { +pub struct PrometheusMetrics { /// Total times of the specific operation be called. - pub requests_total: prometheus::core::GenericCounterVec, + pub requests_total: GenericCounterVec, /// Latency of the specific operation be called. - pub requests_duration_seconds: prometheus::HistogramVec, + pub requests_duration_seconds: HistogramVec, /// Size of the specific metrics. - pub bytes_total: prometheus::HistogramVec, + pub bytes_total: HistogramVec, } -#[cfg(not(feature = "use-prometheus-client"))] -impl PrometheusLibMetrics { +impl PrometheusMetrics { /// new with prometheus register. - pub fn new(registry: prometheus::Registry) -> Self { - let requests_total = prometheus::register_int_counter_vec_with_registry!( + pub fn new(registry: Registry) -> Self { + let requests_total = register_int_counter_vec_with_registry!( "requests_total", "Total times of create be called", &["scheme", "operation"], registry ) .unwrap(); - let opts = prometheus::histogram_opts!( + let opts = histogram_opts!( "requests_duration_seconds", "Histogram of the time spent on specific operation", - prometheus::exponential_buckets(0.01, 2.0, 16).unwrap() + exponential_buckets(0.01, 2.0, 16).unwrap() ); - let requests_duration_seconds = prometheus::register_histogram_vec_with_registry!( - opts, - &["scheme", "operation"], - registry - ) - .unwrap(); + let requests_duration_seconds = + register_histogram_vec_with_registry!(opts, &["scheme", "operation"], registry) + .unwrap(); - let opts = prometheus::histogram_opts!( + let opts = histogram_opts!( "bytes_total", "Total size of ", - prometheus::exponential_buckets(0.01, 2.0, 16).unwrap() + exponential_buckets(0.01, 2.0, 16).unwrap() ); - let bytes_total = prometheus::register_histogram_vec_with_registry!( - opts, - &["scheme", "operation"], - registry - ) - .unwrap(); + let bytes_total = + register_histogram_vec_with_registry!(opts, &["scheme", "operation"], registry) + .unwrap(); Self { requests_total, @@ -264,12 +157,10 @@ impl PrometheusLibMetrics { bytes_total, } } -} -#[cfg(not(feature = "use-prometheus-client"))] -impl PrometheusLayerMetrics for PrometheusLibMetrics { /// error handling is the cold path, so we will not init error counters /// in advance. + #[inline] fn increment_errors_total(&self, op: Operation, kind: ErrorKind) { debug!( "Prometheus statistics metrics error, operation {} error {}", @@ -277,34 +168,16 @@ impl PrometheusLayerMetrics for PrometheusLibMetrics { kind.into_static() ); } - - fn increment_request_total(&self, scheme: &str, op: Operation) { - self.requests_total - .with_label_values(&[scheme, op.into_static()]) - .inc(); - } - - fn observe_bytes_total(&self, scheme: &str, op: Operation, bytes: usize) { - self.bytes_total - .with_label_values(&[scheme, op.into_static()]) - .observe(bytes as f64); - } - - fn observe_request_duration(&self, scheme: &str, op: Operation, duration: std::time::Duration) { - self.requests_duration_seconds - .with_label_values(&[scheme, op.into_static()]) - .observe(duration.as_secs_f64()); - } } #[derive(Clone)] -pub struct PrometheusAccessor { +pub struct PrometheusAccessor { inner: A, - stats: Arc, + stats: Arc, scheme: String, } -impl Debug for PrometheusAccessor { +impl Debug for PrometheusAccessor { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("PrometheusAccessor") .field("inner", &self.inner) @@ -313,7 +186,7 @@ impl Debug for PrometheusAccessor { } #[async_trait] -impl LayeredAccessor for PrometheusAccessor { +impl LayeredAccessor for PrometheusAccessor { type Inner = A; type Reader = PrometheusMetricWrapper; type BlockingReader = PrometheusMetricWrapper; @@ -328,16 +201,18 @@ impl LayeredAccessor for PrometheusAcces async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { self.stats - .increment_request_total(&self.scheme, Operation::CreateDir); + .requests_total + .with_label_values(&[&self.scheme]) + .inc(); - let start_time = Instant::now(); + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&[&self.scheme, Operation::CreateDir.into_static()]) + .start_timer(); let create_res = self.inner.create_dir(path, args).await; - self.stats.observe_request_duration( - &self.scheme, - Operation::CreateDir, - start_time.elapsed(), - ); + timer.observe_duration(); create_res.map_err(|e| { self.stats .increment_errors_total(Operation::CreateDir, e.kind()); @@ -347,19 +222,25 @@ impl LayeredAccessor for PrometheusAcces async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { self.stats - .increment_request_total(&self.scheme, Operation::Read); - let start_time = Instant::now(); + .requests_total + .with_label_values(&[&self.scheme, Operation::Read.into_static()]) + .inc(); + + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&[&self.scheme, Operation::Read.into_static()]) + .start_timer(); let read_res = self .inner .read(path, args) .map(|v| { v.map(|(rp, r)| { - self.stats.observe_bytes_total( - &self.scheme, - Operation::Read, - rp.metadata().content_length() as usize, - ); + self.stats + .bytes_total + .with_label_values(&[&self.scheme, Operation::Read.into_static()]) + .observe(rp.metadata().content_length() as f64); ( rp, PrometheusMetricWrapper::new( @@ -372,9 +253,7 @@ impl LayeredAccessor for PrometheusAcces }) }) .await; - self.stats - .observe_request_duration(&self.scheme, Operation::Read, start_time.elapsed()); - + timer.observe_duration(); read_res.map_err(|e| { self.stats.increment_errors_total(Operation::Read, e.kind()); e @@ -383,8 +262,15 @@ impl LayeredAccessor for PrometheusAcces async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { self.stats - .increment_request_total(&self.scheme, Operation::Write); - let start_time = Instant::now(); + .requests_total + .with_label_values(&[&self.scheme, Operation::Write.into_static()]) + .inc(); + + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&[&self.scheme, Operation::Write.into_static()]) + .start_timer(); let write_res = self .inner @@ -403,9 +289,7 @@ impl LayeredAccessor for PrometheusAcces }) }) .await; - - self.stats - .observe_request_duration(&self.scheme, Operation::Write, start_time.elapsed()); + timer.observe_duration(); write_res.map_err(|e| { self.stats .increment_errors_total(Operation::Write, e.kind()); @@ -415,8 +299,14 @@ impl LayeredAccessor for PrometheusAcces async fn stat(&self, path: &str, args: OpStat) -> Result { self.stats - .increment_request_total(&self.scheme, Operation::Stat); - let start_time = Instant::now(); + .requests_total + .with_label_values(&[&self.scheme, Operation::Stat.into_static()]) + .inc(); + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&[&self.scheme, Operation::Stat.into_static()]) + .start_timer(); let stat_res = self .inner @@ -425,9 +315,7 @@ impl LayeredAccessor for PrometheusAcces self.stats.increment_errors_total(Operation::Stat, e.kind()); }) .await; - - self.stats - .observe_request_duration(&self.scheme, Operation::Stat, start_time.elapsed()); + timer.observe_duration(); stat_res.map_err(|e| { self.stats.increment_errors_total(Operation::Stat, e.kind()); e @@ -436,13 +324,18 @@ impl LayeredAccessor for PrometheusAcces async fn delete(&self, path: &str, args: OpDelete) -> Result { self.stats - .increment_request_total(&self.scheme, Operation::Delete); - let start_time = Instant::now(); + .requests_total + .with_label_values(&[&self.scheme, Operation::Stat.into_static()]) + .inc(); - let delete_res = self.inner.delete(path, args).await; + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&[&self.scheme, Operation::Stat.into_static()]) + .start_timer(); - self.stats - .observe_request_duration(&self.scheme, Operation::Delete, start_time.elapsed()); + let delete_res = self.inner.delete(path, args).await; + timer.observe_duration(); delete_res.map_err(|e| { self.stats .increment_errors_total(Operation::Delete, e.kind()); @@ -452,13 +345,19 @@ impl LayeredAccessor for PrometheusAcces async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { self.stats - .increment_request_total(&self.scheme, Operation::List); - let start_time = Intant::now(); + .requests_total + .with_label_values(&[&self.scheme, Operation::List.into_static()]) + .inc(); + + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&[&self.scheme, Operation::List.into_static()]) + .start_timer(); let list_res = self.inner.list(path, args).await; - self.stats - .observe_request_duration(&self.scheme, Operation::List, start_time.elapsed()); + timer.observe_duration(); list_res.map_err(|e| { self.stats.increment_errors_total(Operation::List, e.kind()); e @@ -467,13 +366,18 @@ impl LayeredAccessor for PrometheusAcces async fn batch(&self, args: OpBatch) -> Result { self.stats - .increment_request_total(&self.scheme, Operation::Batch); - let start_time = Instant::now(); + .requests_total + .with_label_values(&[&self.scheme, Operation::Batch.into_static()]) + .inc(); + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&[&self.scheme, Operation::Batch.into_static()]) + .start_timer(); let result = self.inner.batch(args).await; - self.stats - .observe_request_duration(&self.scheme, Operation::Batch, start_time.elapsed()); + timer.observe_duration(); result.map_err(|e| { self.stats .increment_errors_total(Operation::Batch, e.kind()); @@ -483,13 +387,18 @@ impl LayeredAccessor for PrometheusAcces async fn presign(&self, path: &str, args: OpPresign) -> Result { self.stats - .increment_request_total(&self.scheme, Operation::Presign); - let start_time = Instant::now(); + .requests_total + .with_label_values(&[&self.scheme, Operation::Presign.into_static()]) + .inc(); + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&[&self.scheme, Operation::Presign.into_static()]) + .start_timer(); let result = self.inner.presign(path, args).await; + timer.observe_duration(); - self.stats - .observe_request_duration(&self.scheme, Operation::Presign, start_time.elapsed()); result.map_err(|e| { self.stats .increment_errors_total(Operation::Presign, e.kind()); @@ -499,16 +408,19 @@ impl LayeredAccessor for PrometheusAcces fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { self.stats - .increment_request_total(&self.scheme, Operation::BlockingCreateDir); - let start_time = Instant::now(); + .requests_total + .with_label_values(&[&self.scheme, Operation::BlockingCreateDir.into_static()]) + .inc(); + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&[&self.scheme, Operation::BlockingCreateDir.into_static()]) + .start_timer(); let result = self.inner.blocking_create_dir(path, args); - self.stats.observe_request_duration( - &self.scheme, - Operation::BlockingCreateDir, - start_time.elapsed(), - ); + timer.observe_duration(); + result.map_err(|e| { self.stats .increment_errors_total(Operation::BlockingCreateDir, e.kind()); @@ -518,15 +430,20 @@ impl LayeredAccessor for PrometheusAcces fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { self.stats - .increment_request_total(&self.scheme, Operation::BlockingRead); - let start_time = Instant::now(); + .requests_total + .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) + .inc(); + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&[&self.scheme]) + .start_timer(); let result = self.inner.blocking_read(path, args).map(|(rp, r)| { - self.stats.observe_bytes_total( - &self.scheme, - Operation::BlockingRead, - rp.metadata().content_length() as usize, - ); + self.stats + .bytes_total + .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) + .observe(rp.metadata().content_length() as f64); ( rp, PrometheusMetricWrapper::new( @@ -537,12 +454,7 @@ impl LayeredAccessor for PrometheusAcces ), ) }); - - self.stats.observe_request_duration( - &self.scheme, - Operation::BlockingRead, - start_time.elapsed(), - ); + timer.observe_duration(); result.map_err(|e| { self.stats .increment_errors_total(Operation::BlockingRead, e.kind()); @@ -552,9 +464,15 @@ impl LayeredAccessor for PrometheusAcces fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { self.stats - .increment_request_total(&self.scheme, Operation::BlockingWrite); - let start_time = Instant::now(); + .requests_total + .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) + .inc(); + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) + .start_timer(); let result = self.inner.blocking_write(path, args).map(|(rp, r)| { ( rp, @@ -566,12 +484,7 @@ impl LayeredAccessor for PrometheusAcces ), ) }); - - self.stats.observe_request_duration( - &self.scheme, - Operation::BlockingWrite, - start_time.elapsed(), - ); + timer.observe_duration(); result.map_err(|e| { self.stats .increment_errors_total(Operation::BlockingWrite, e.kind()); @@ -581,15 +494,17 @@ impl LayeredAccessor for PrometheusAcces fn blocking_stat(&self, path: &str, args: OpStat) -> Result { self.stats - .increment_request_total(&self.scheme, Operation::BlockingStat); - let start_time = Instant::now(); + .requests_total + .with_label_values(&[&self.scheme, Operation::BlockingStat.into_static()]) + .inc(); + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&[&self.scheme, Operation::BlockingStat.into_static()]) + .start_timer(); let result = self.inner.blocking_stat(path, args); - self.stats.observe_request_duration( - &self.scheme, - Operation::BlockingStat, - start_time.elapsed(), - ); + timer.observe_duration(); result.map_err(|e| { self.stats .increment_errors_total(Operation::BlockingStat, e.kind()); @@ -599,16 +514,18 @@ impl LayeredAccessor for PrometheusAcces fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { self.stats - .increment_request_total(&self.scheme, Operation::BlockingDelete); - let start_time = Instant::now(); + .requests_total + .with_label_values(&[&self.scheme, Operation::BlockingDelete.into_static()]) + .inc(); + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&[&self.scheme, Operation::BlockingDelete.into_static()]) + .start_timer(); let result = self.inner.blocking_delete(path, args); + timer.observe_duration(); - self.stats.observe_request_duration( - &self.scheme, - Operation::BlockingDelete, - start_time.elapsed(), - ); result.map_err(|e| { self.stats .increment_errors_total(Operation::BlockingDelete, e.kind()); @@ -618,16 +535,18 @@ impl LayeredAccessor for PrometheusAcces fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { self.stats - .increment_request_total(&self.scheme, Operation::BlockingList); - let start_time = Instant::now(); + .requests_total + .with_label_values(&[&self.scheme, Operation::BlockingList.into_static()]) + .inc(); + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&[&self.scheme, Operation::BlockingList.into_static()]) + .start_timer(); let result = self.inner.blocking_list(path, args); + timer.observe_duration(); - self.stats.observe_request_duration( - &self.scheme, - Operation::BlockingList, - start_time.elapsed(), - ); result.map_err(|e| { self.stats .increment_errors_total(Operation::BlockingList, e.kind()); @@ -636,16 +555,16 @@ impl LayeredAccessor for PrometheusAcces } } -pub struct PrometheusMetricWrapper { +pub struct PrometheusMetricWrapper { inner: R, op: Operation, - stats: Arc, + stats: Arc, scheme: String, } -impl PrometheusMetricWrapper { - fn new(inner: R, op: Operation, stats: Arc, scheme: &String) -> Self { +impl PrometheusMetricWrapper { + fn new(inner: R, op: Operation, stats: Arc, scheme: &String) -> Self { Self { inner, op, @@ -655,11 +574,14 @@ impl PrometheusMetricWrapper { } } -impl oio::Read for PrometheusMetricWrapper { +impl oio::Read for PrometheusMetricWrapper { fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { self.inner.poll_read(cx, buf).map(|res| match res { Ok(bytes) => { - self.stats.observe_bytes_total(&self.scheme, self.op, bytes); + self.stats + .bytes_total + .with_label_values(&[&self.scheme, Operation::Read.into_static()]) + .observe(bytes as f64); Ok(bytes) } Err(e) => { @@ -683,7 +605,9 @@ impl oio::Read for PrometheusMetricWrapper { self.inner.poll_next(cx).map(|res| match res { Some(Ok(bytes)) => { self.stats - .observe_bytes_total(&self.scheme, self.op, bytes.len()); + .bytes_total + .with_label_values(&[&self.scheme, Operation::Read.into_static()]) + .observe(bytes.len() as f64); Some(Ok(bytes)) } Some(Err(e)) => { @@ -695,12 +619,15 @@ impl oio::Read for PrometheusMetricWrapper { } } -impl oio::BlockingRead for PrometheusMetricWrapper { +impl oio::BlockingRead for PrometheusMetricWrapper { fn read(&mut self, buf: &mut [u8]) -> Result { self.inner .read(buf) .map(|n| { - self.stats.observe_bytes_total(&self.scheme, self.op, n); + self.stats + .bytes_total + .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) + .observe(n as f64); n }) .map_err(|e| { @@ -719,7 +646,10 @@ impl oio::BlockingRead for PrometheusMetricWrapper { fn next(&mut self) -> Option> { self.inner.next().map(|res| match res { Ok(bytes) => { - self.stats.observe_bytes_total(&self.scheme, self.op, n); + self.stats + .bytes_total + .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) + .observe(bytes.len() as f64); Ok(bytes) } Err(e) => { @@ -731,12 +661,15 @@ impl oio::BlockingRead for PrometheusMetricWrapper { } #[async_trait] -impl oio::Write for PrometheusMetricWrapper { +impl oio::Write for PrometheusMetricWrapper { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll> { self.inner .poll_write(cx, bs) .map_ok(|n| { - self.stats.observe_bytes_total(&self.scheme, self.op, n); + self.stats + .bytes_total + .with_label_values(&[&self.scheme, Operation::Write.into_static()]) + .observe(n as f64); n }) .map_err(|err| { @@ -760,12 +693,15 @@ impl oio::Write for PrometheusMetricWrapper { } } -impl oio::BlockingWrite for PrometheusMetricWrapper { +impl oio::BlockingWrite for PrometheusMetricWrapper { fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result { self.inner .write(bs) .map(|n| { - self.stats.observe_bytes_total(&self.scheme, self.op, n); + self.stats + .bytes_total + .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) + .observe(n as f64); n }) .map_err(|err| { diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs new file mode 100644 index 000000000000..877bf0976291 --- /dev/null +++ b/core/src/layers/prometheus_client.rs @@ -0,0 +1,670 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; +use std::io; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; +use std::time::Instant; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::FutureExt; +use futures::TryFutureExt; +use log::debug; + +use crate::raw::Accessor; +use crate::raw::*; +use crate::*; + +/// Add [prometheus](https://docs.rs/prometheus) for every operations. +/// +/// # Examples +/// +/// ``` +/// use log::debug; +/// use log::info; +/// use opendal::layers::PrometheusLayer; +/// use opendal::services; +/// use opendal::Operator; +/// use opendal::Result; +/// use prometheus::Encoder; +/// +/// /// Visit [`opendal::services`] for more service related config. +/// /// Visit [`opendal::Operator`] for more operator level APIs. +/// #[tokio::main] +/// async fn main() -> Result<()> { +/// // Pick a builder and configure it. +/// let builder = services::Memory::default(); +/// let registry = prometheus::default_registry(); +/// +/// let op = Operator::new(builder) +/// .expect("must init") +/// .layer(PrometheusLayer::with_registry(registry.clone())) +/// .finish(); +/// debug!("operator: {op:?}"); +/// +/// // Write data into object test. +/// op.write("test", "Hello, World!").await?; +/// // Read data from object. +/// let bs = op.read("test").await?; +/// info!("content: {}", String::from_utf8_lossy(&bs)); +/// +/// // Get object metadata. +/// let meta = op.stat("test").await?; +/// info!("meta: {:?}", meta); +/// +/// // Export prometheus metrics. +/// let mut buffer = Vec::::new(); +/// let encoder = prometheus::TextEncoder::new(); +/// encoder.encode(&prometheus::gather(), &mut buffer).unwrap(); +/// println!("## Prometheus Metrics"); +/// println!("{}", String::from_utf8(buffer.clone()).unwrap()); +/// Ok(()) +/// } +/// ``` +#[derive(Default, Debug, Clone)] +pub struct PrometheusLayer { + registry: prometheus_client::registry::Registry, +} + +impl PrometheusClientLayer { + pub fn with_registry(registry: prometheus_client::registry::Registry) -> Self { + Self { registry } + } +} + +impl Layer for PrometheusClientLayer { + type LayeredAccessor = PrometheusAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccessor { + let meta = inner.info(); + let scheme = meta.scheme(); + + let stats = PrometheusLibMetrics::new(self.registry.clone()); + + let stats = Arc::new(PrometheusClientMetrics::new(self.registry.clone())); + + PrometheusAccessor { + inner, + stats, + scheme: scheme.to_string(), + } + } +} + +/// [`PrometheusClientMetrics`] provide the performance and IO metrics with the `prometheus-client` crate. +#[derive(Debug)] +struct PrometheusClientMetrics { + /// Total counter of the specific operation be called. + requests_total: prometheus_client::metrics::family::Family< + Vec<(&'static str, &'static str)>, + prometheus_client::metrics::counter::Counter, + >, + /// Latency of the specific operation be called. + request_duration_seconds: prometheus_client::metrics::family::Family< + Vec<(&'static str, &'static str)>, + prometheus_client::metrics::histogram::Histogram, + >, + /// The histogram of bytes + bytes_histogram: prometheus_client::metrics::family::Family< + Vec<(&'static str, &'static str)>, + prometheus_client::metrics::histogram::Histogram, + >, +} + +impl PrometheusClientMetrics { + pub fn new(mut registry: prometheus_client::registry::Registry) -> Self { + let requests_total = prometheus_client::metrics::family::Family::default(); + let request_duration_seconds = + prometheus_client::metrics::family::Family::new_with_constructor(|| { + let buckets = + prometheus_client::metrics::histogram::exponential_buckets(0.01, 2.0, 16); + prometheus_client::metrics::histogram::Histogram::new(buckets) + }); + let bytes_histogram = + prometheus_client::metrics::family::Family::new_with_constructor(|| { + let buckets = + prometheus_client::metrics::histogram::exponential_buckets(1.0, 2.0, 16); + prometheus_client::metrics::histogram::Histogram::new(buckets) + }); + + registry.register("requests_total", "", requests_total.clone()); + registry.register( + "request_duration_seconds", + "", + request_duration_seconds.clone(), + ); + registry.register("bytes_histogram", "", bytes_histogram.clone()); + Self { + requests_total, + request_duration_seconds, + bytes_histogram, + } + } + + fn increment_errors_total(&self, op: Operation, kind: ErrorKind) { + let labels = vec![("operation", op.as_str()), ("kind", kind.as_str())]; + self.requests_total.get_or_create(&labels).inc(); + } + + fn increment_request_total(&self, scheme: &str, op: Operation) { + let labels = vec![("scheme", scheme), ("operation", op.as_str())]; + self.requests_total.get_or_create(&labels).inc(); + } + + fn observe_bytes_total(&self, scheme: &str, op: Operation, bytes: usize) { + let labels = vec![("scheme", scheme), ("operation", op.as_str())]; + self.bytes_histogram + .get_or_create(&labels) + .observe(bytes as f64); + } + + fn observe_request_duration(&self, scheme: &str, op: Operation, duration: std::time::Duration) { + let labels = vec![("scheme", scheme), ("operation", op.as_str())]; + self.request_duration_seconds + .get_or_create(&labels) + .observe(duration.as_secs_f64()); + } +} + +#[derive(Clone)] +struct PrometheusAccessor { + inner: A, + stats: PrometheusClientMetrics, + scheme: String, +} + +impl Debug for PrometheusAccessor { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PrometheusAccessor") + .field("inner", &self.inner) + .finish_non_exhaustive() + } +} + +#[async_trait] +impl LayeredAccessor for PrometheusAccessor { + type Inner = A; + type Reader = PrometheusMetricWrapper; + type BlockingReader = PrometheusMetricWrapper; + type Writer = PrometheusMetricWrapper; + type BlockingWriter = PrometheusMetricWrapper; + type Pager = A::Pager; + type BlockingPager = A::BlockingPager; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { + self.stats + .increment_request_total(&self.scheme, Operation::CreateDir); + + let start_time = Instant::now(); + let create_res = self.inner.create_dir(path, args).await; + + self.stats.observe_request_duration( + &self.scheme, + Operation::CreateDir, + start_time.elapsed(), + ); + create_res.map_err(|e| { + self.stats + .increment_errors_total(Operation::CreateDir, e.kind()); + e + }) + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + self.stats + .increment_request_total(&self.scheme, Operation::Read); + let start_time = Instant::now(); + + let read_res = self + .inner + .read(path, args) + .map(|v| { + v.map(|(rp, r)| { + self.stats.observe_bytes_total( + &self.scheme, + Operation::Read, + rp.metadata().content_length() as usize, + ); + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::Read, + self.stats.clone(), + &self.scheme, + ), + ) + }) + }) + .await; + self.stats + .observe_request_duration(&self.scheme, Operation::Read, start_time.elapsed()); + + read_res.map_err(|e| { + self.stats.increment_errors_total(Operation::Read, e.kind()); + e + }) + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + self.stats + .increment_request_total(&self.scheme, Operation::Write); + let start_time = Instant::now(); + + let write_res = self + .inner + .write(path, args) + .map(|v| { + v.map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::Write, + self.stats.clone(), + &self.scheme, + ), + ) + }) + }) + .await; + + self.stats + .observe_request_duration(&self.scheme, Operation::Write, start_time.elapsed()); + write_res.map_err(|e| { + self.stats + .increment_errors_total(Operation::Write, e.kind()); + e + }) + } + + async fn stat(&self, path: &str, args: OpStat) -> Result { + self.stats + .increment_request_total(&self.scheme, Operation::Stat); + let start_time = Instant::now(); + + let stat_res = self + .inner + .stat(path, args) + .inspect_err(|e| { + self.stats.increment_errors_total(Operation::Stat, e.kind()); + }) + .await; + + self.stats + .observe_request_duration(&self.scheme, Operation::Stat, start_time.elapsed()); + stat_res.map_err(|e| { + self.stats.increment_errors_total(Operation::Stat, e.kind()); + e + }) + } + + async fn delete(&self, path: &str, args: OpDelete) -> Result { + self.stats + .increment_request_total(&self.scheme, Operation::Delete); + let start_time = Instant::now(); + + let delete_res = self.inner.delete(path, args).await; + + self.stats + .observe_request_duration(&self.scheme, Operation::Delete, start_time.elapsed()); + delete_res.map_err(|e| { + self.stats + .increment_errors_total(Operation::Delete, e.kind()); + e + }) + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { + self.stats + .increment_request_total(&self.scheme, Operation::List); + let start_time = Intant::now(); + + let list_res = self.inner.list(path, args).await; + + self.stats + .observe_request_duration(&self.scheme, Operation::List, start_time.elapsed()); + list_res.map_err(|e| { + self.stats.increment_errors_total(Operation::List, e.kind()); + e + }) + } + + async fn batch(&self, args: OpBatch) -> Result { + self.stats + .increment_request_total(&self.scheme, Operation::Batch); + let start_time = Instant::now(); + + let result = self.inner.batch(args).await; + + self.stats + .observe_request_duration(&self.scheme, Operation::Batch, start_time.elapsed()); + result.map_err(|e| { + self.stats + .increment_errors_total(Operation::Batch, e.kind()); + e + }) + } + + async fn presign(&self, path: &str, args: OpPresign) -> Result { + self.stats + .increment_request_total(&self.scheme, Operation::Presign); + let start_time = Instant::now(); + + let result = self.inner.presign(path, args).await; + + self.stats + .observe_request_duration(&self.scheme, Operation::Presign, start_time.elapsed()); + result.map_err(|e| { + self.stats + .increment_errors_total(Operation::Presign, e.kind()); + e + }) + } + + fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { + self.stats + .increment_request_total(&self.scheme, Operation::BlockingCreateDir); + let start_time = Instant::now(); + + let result = self.inner.blocking_create_dir(path, args); + + self.stats.observe_request_duration( + &self.scheme, + Operation::BlockingCreateDir, + start_time.elapsed(), + ); + result.map_err(|e| { + self.stats + .increment_errors_total(Operation::BlockingCreateDir, e.kind()); + e + }) + } + + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { + self.stats + .increment_request_total(&self.scheme, Operation::BlockingRead); + let start_time = Instant::now(); + + let result = self.inner.blocking_read(path, args).map(|(rp, r)| { + self.stats.observe_bytes_total( + &self.scheme, + Operation::BlockingRead, + rp.metadata().content_length() as usize, + ); + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::BlockingRead, + self.stats.clone(), + &self.scheme, + ), + ) + }); + + self.stats.observe_request_duration( + &self.scheme, + Operation::BlockingRead, + start_time.elapsed(), + ); + result.map_err(|e| { + self.stats + .increment_errors_total(Operation::BlockingRead, e.kind()); + e + }) + } + + fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { + self.stats + .increment_request_total(&self.scheme, Operation::BlockingWrite); + let start_time = Instant::now(); + + let result = self.inner.blocking_write(path, args).map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::BlockingWrite, + self.stats.clone(), + &self.scheme, + ), + ) + }); + + self.stats.observe_request_duration( + &self.scheme, + Operation::BlockingWrite, + start_time.elapsed(), + ); + result.map_err(|e| { + self.stats + .increment_errors_total(Operation::BlockingWrite, e.kind()); + e + }) + } + + fn blocking_stat(&self, path: &str, args: OpStat) -> Result { + self.stats + .increment_request_total(&self.scheme, Operation::BlockingStat); + let start_time = Instant::now(); + + let result = self.inner.blocking_stat(path, args); + self.stats.observe_request_duration( + &self.scheme, + Operation::BlockingStat, + start_time.elapsed(), + ); + result.map_err(|e| { + self.stats + .increment_errors_total(Operation::BlockingStat, e.kind()); + e + }) + } + + fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { + self.stats + .increment_request_total(&self.scheme, Operation::BlockingDelete); + let start_time = Instant::now(); + + let result = self.inner.blocking_delete(path, args); + + self.stats.observe_request_duration( + &self.scheme, + Operation::BlockingDelete, + start_time.elapsed(), + ); + result.map_err(|e| { + self.stats + .increment_errors_total(Operation::BlockingDelete, e.kind()); + e + }) + } + + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { + self.stats + .increment_request_total(&self.scheme, Operation::BlockingList); + let start_time = Instant::now(); + + let result = self.inner.blocking_list(path, args); + + self.stats.observe_request_duration( + &self.scheme, + Operation::BlockingList, + start_time.elapsed(), + ); + result.map_err(|e| { + self.stats + .increment_errors_total(Operation::BlockingList, e.kind()); + e + }) + } +} + +pub struct PrometheusMetricWrapper { + inner: R, + + op: Operation, + stats: Arc>, + scheme: String, +} + +impl PrometheusMetricWrapper { + fn new(inner: R, op: Operation, stats: Arc>, scheme: &String) -> Self { + Self { + inner, + op, + stats, + scheme: scheme.to_string(), + } + } +} + +impl oio::Read for PrometheusMetricWrapper { + fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + self.inner.poll_read(cx, buf).map(|res| match res { + Ok(bytes) => { + self.stats.observe_bytes_total(&self.scheme, self.op, bytes); + Ok(bytes) + } + Err(e) => { + self.stats.increment_errors_total(self.op, e.kind()); + Err(e) + } + }) + } + + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> Poll> { + self.inner.poll_seek(cx, pos).map(|res| match res { + Ok(n) => Ok(n), + Err(e) => { + self.stats.increment_errors_total(self.op, e.kind()); + Err(e) + } + }) + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + self.inner.poll_next(cx).map(|res| match res { + Some(Ok(bytes)) => { + self.stats + .observe_bytes_total(&self.scheme, self.op, bytes.len()); + Some(Ok(bytes)) + } + Some(Err(e)) => { + self.stats.increment_errors_total(self.op, e.kind()); + Some(Err(e)) + } + None => None, + }) + } +} + +impl oio::BlockingRead for PrometheusMetricWrapper { + fn read(&mut self, buf: &mut [u8]) -> Result { + self.inner + .read(buf) + .map(|n| { + self.stats.observe_bytes_total(&self.scheme, self.op, n); + n + }) + .map_err(|e| { + self.stats.increment_errors_total(self.op, e.kind()); + e + }) + } + + fn seek(&mut self, pos: io::SeekFrom) -> Result { + self.inner.seek(pos).map_err(|err| { + self.stats.increment_errors_total(self.op, err.kind()); + err + }) + } + + fn next(&mut self) -> Option> { + self.inner.next().map(|res| match res { + Ok(bytes) => { + self.stats.observe_bytes_total(&self.scheme, self.op, n); + Ok(bytes) + } + Err(e) => { + self.stats.increment_errors_total(self.op, e.kind()); + Err(e) + } + }) + } +} + +#[async_trait] +impl oio::Write for PrometheusMetricWrapper { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll> { + self.inner + .poll_write(cx, bs) + .map_ok(|n| { + self.stats.observe_bytes_total(&self.scheme, self.op, n); + n + }) + .map_err(|err| { + self.stats.increment_errors_total(self.op, err.kind()); + err + }) + } + + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_abort(cx).map_err(|err| { + self.stats.increment_errors_total(self.op, err.kind()); + err + }) + } + + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_close(cx).map_err(|err| { + self.stats.increment_errors_total(self.op, err.kind()); + err + }) + } +} + +impl oio::BlockingWrite for PrometheusMetricWrapper { + fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result { + self.inner + .write(bs) + .map(|n| { + self.stats.observe_bytes_total(&self.scheme, self.op, n); + n + }) + .map_err(|err| { + self.stats.increment_errors_total(self.op, err.kind()); + err + }) + } + + fn close(&mut self) -> Result<()> { + self.inner.close().map_err(|err| { + self.stats.increment_errors_total(self.op, err.kind()); + err + }) + } +} From cfc95364e66dc2c01fdf834c7a0ece090217e02a Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 19 Sep 2023 16:13:03 +0800 Subject: [PATCH 09/18] fix: docs --- core/Cargo.toml | 7 +- core/src/layers/mod.rs | 6 ++ core/src/layers/prometheus_client.rs | 104 +++++++++++++-------------- 3 files changed, 58 insertions(+), 59 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index fff7e04e36eb..e09d94c5116f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -83,8 +83,10 @@ layers-all = [ layers-chaos = ["dep:rand"] # Enable layers metrics support layers-metrics = ["dep:metrics"] -# Enable layers prometheus support +# Enable layers prometheus support, with tikv/prometheus-rs crate layers-prometheus = ["dep:prometheus"] +# Enable layers prometheus support, with prometheus-client crate +layers-prometheus-client = ["dep:prometheus-client"] # Enable layers madsim support layers-madsim = ["dep:madsim"] # Enable layers minitrace support. @@ -100,9 +102,6 @@ layers-await-tree = ["dep:await-tree"] # Enable layers async-backtrace support. layers-async-backtrace = ["dep:async-backtrace"] -# Enable prometheus-client instead of prometheus-rs. -use-prometheus-client = ["dep:prometheus-client"] - services-atomicserver = ["dep:atomic_lib"] services-azblob = [ "dep:sha2", diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index cd9414ba64ed..73b0b0375ffa 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -56,6 +56,11 @@ mod prometheus; #[cfg(feature = "layers-prometheus")] pub use self::prometheus::PrometheusLayer; +#[cfg(feature = "layers-prometheus-client")] +mod prometheus_client; +#[cfg(feature = "layers-prometheus-client")] +pub use self::prometheus_client::PrometheusClientLayer; + mod retry; pub use self::retry::RetryInterceptor; pub use self::retry::RetryLayer; @@ -94,5 +99,6 @@ pub use self::await_tree::AwaitTreeLayer; #[cfg(feature = "layers-async-backtrace")] mod async_backtrace; + #[cfg(feature = "layers-async-backtrace")] pub use self::async_backtrace::AsyncBacktraceLayer; diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index 877bf0976291..2c033da5f629 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -27,7 +27,10 @@ use async_trait::async_trait; use bytes::Bytes; use futures::FutureExt; use futures::TryFutureExt; -use log::debug; +use prometheus_client::metrics::counter::Counter; +use prometheus_client::metrics::family::Family; +use prometheus_client::metrics::histogram::Histogram; +use prometheus_client::registry::Registry; use crate::raw::Accessor; use crate::raw::*; @@ -40,11 +43,10 @@ use crate::*; /// ``` /// use log::debug; /// use log::info; -/// use opendal::layers::PrometheusLayer; +/// use opendal::layers::PrometheusClientLayer; /// use opendal::services; /// use opendal::Operator; /// use opendal::Result; -/// use prometheus::Encoder; /// /// /// Visit [`opendal::services`] for more service related config. /// /// Visit [`opendal::Operator`] for more operator level APIs. @@ -52,11 +54,11 @@ use crate::*; /// async fn main() -> Result<()> { /// // Pick a builder and configure it. /// let builder = services::Memory::default(); -/// let registry = prometheus::default_registry(); +/// let mut registry = prometheus_client::registry::Registry::default(); /// /// let op = Operator::new(builder) /// .expect("must init") -/// .layer(PrometheusLayer::with_registry(registry.clone())) +/// .layer(PrometheusClientLayer::with_registry(&mut registry)) /// .finish(); /// debug!("operator: {op:?}"); /// @@ -71,22 +73,23 @@ use crate::*; /// info!("meta: {:?}", meta); /// /// // Export prometheus metrics. -/// let mut buffer = Vec::::new(); -/// let encoder = prometheus::TextEncoder::new(); -/// encoder.encode(&prometheus::gather(), &mut buffer).unwrap(); +/// let mut buf = String::new(); +/// prometheus_client::encoding::text::encode(&mut buf, ®istry).unwrap(); /// println!("## Prometheus Metrics"); -/// println!("{}", String::from_utf8(buffer.clone()).unwrap()); +/// println!("{}", buf); /// Ok(()) /// } /// ``` -#[derive(Default, Debug, Clone)] -pub struct PrometheusLayer { - registry: prometheus_client::registry::Registry, +#[derive(Debug)] +pub struct PrometheusClientLayer { + metrics: PrometheusClientMetrics, } impl PrometheusClientLayer { - pub fn with_registry(registry: prometheus_client::registry::Registry) -> Self { - Self { registry } + /// create PrometheusClientLayer while registering itself to this registry. + pub fn with_registry(registry: &mut Registry) -> Self { + let metrics = PrometheusClientMetrics::register(registry); + Self { metrics } } } @@ -97,10 +100,7 @@ impl Layer for PrometheusClientLayer { let meta = inner.info(); let scheme = meta.scheme(); - let stats = PrometheusLibMetrics::new(self.registry.clone()); - - let stats = Arc::new(PrometheusClientMetrics::new(self.registry.clone())); - + let stats = Arc::new(self.metrics.clone()); PrometheusAccessor { inner, stats, @@ -109,41 +109,30 @@ impl Layer for PrometheusClientLayer { } } +type VecLabels = Vec<(&'static str, String)>; + /// [`PrometheusClientMetrics`] provide the performance and IO metrics with the `prometheus-client` crate. -#[derive(Debug)] +#[derive(Debug, Clone)] struct PrometheusClientMetrics { /// Total counter of the specific operation be called. - requests_total: prometheus_client::metrics::family::Family< - Vec<(&'static str, &'static str)>, - prometheus_client::metrics::counter::Counter, - >, + requests_total: Family, /// Latency of the specific operation be called. - request_duration_seconds: prometheus_client::metrics::family::Family< - Vec<(&'static str, &'static str)>, - prometheus_client::metrics::histogram::Histogram, - >, + request_duration_seconds: Family, /// The histogram of bytes - bytes_histogram: prometheus_client::metrics::family::Family< - Vec<(&'static str, &'static str)>, - prometheus_client::metrics::histogram::Histogram, - >, + bytes_histogram: Family, } impl PrometheusClientMetrics { - pub fn new(mut registry: prometheus_client::registry::Registry) -> Self { - let requests_total = prometheus_client::metrics::family::Family::default(); - let request_duration_seconds = - prometheus_client::metrics::family::Family::new_with_constructor(|| { - let buckets = - prometheus_client::metrics::histogram::exponential_buckets(0.01, 2.0, 16); - prometheus_client::metrics::histogram::Histogram::new(buckets) - }); - let bytes_histogram = - prometheus_client::metrics::family::Family::new_with_constructor(|| { - let buckets = - prometheus_client::metrics::histogram::exponential_buckets(1.0, 2.0, 16); - prometheus_client::metrics::histogram::Histogram::new(buckets) - }); + pub fn register(registry: &mut Registry) -> Self { + let requests_total = Family::default(); + let request_duration_seconds = Family::::new_with_constructor(|| { + let buckets = prometheus_client::metrics::histogram::exponential_buckets(0.01, 2.0, 16); + Histogram::new(buckets) + }); + let bytes_histogram = Family::::new_with_constructor(|| { + let buckets = prometheus_client::metrics::histogram::exponential_buckets(1.0, 2.0, 16); + Histogram::new(buckets) + }); registry.register("requests_total", "", requests_total.clone()); registry.register( @@ -160,24 +149,24 @@ impl PrometheusClientMetrics { } fn increment_errors_total(&self, op: Operation, kind: ErrorKind) { - let labels = vec![("operation", op.as_str()), ("kind", kind.as_str())]; + let labels = vec![("operation", op.to_string()), ("kind", kind.to_string())]; self.requests_total.get_or_create(&labels).inc(); } fn increment_request_total(&self, scheme: &str, op: Operation) { - let labels = vec![("scheme", scheme), ("operation", op.as_str())]; + let labels = vec![("scheme", scheme.to_string()), ("operation", op.to_string())]; self.requests_total.get_or_create(&labels).inc(); } fn observe_bytes_total(&self, scheme: &str, op: Operation, bytes: usize) { - let labels = vec![("scheme", scheme), ("operation", op.as_str())]; + let labels = vec![("scheme", scheme.to_string()), ("operation", op.to_string())]; self.bytes_histogram .get_or_create(&labels) .observe(bytes as f64); } fn observe_request_duration(&self, scheme: &str, op: Operation, duration: std::time::Duration) { - let labels = vec![("scheme", scheme), ("operation", op.as_str())]; + let labels = vec![("scheme", scheme.to_string()), ("operation", op.to_string())]; self.request_duration_seconds .get_or_create(&labels) .observe(duration.as_secs_f64()); @@ -185,9 +174,9 @@ impl PrometheusClientMetrics { } #[derive(Clone)] -struct PrometheusAccessor { +pub struct PrometheusAccessor { inner: A, - stats: PrometheusClientMetrics, + stats: Arc, scheme: String, } @@ -340,7 +329,7 @@ impl LayeredAccessor for PrometheusAccessor { async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { self.stats .increment_request_total(&self.scheme, Operation::List); - let start_time = Intant::now(); + let start_time = Instant::now(); let list_res = self.inner.list(path, args).await; @@ -527,12 +516,17 @@ pub struct PrometheusMetricWrapper { inner: R, op: Operation, - stats: Arc>, + stats: Arc, scheme: String, } impl PrometheusMetricWrapper { - fn new(inner: R, op: Operation, stats: Arc>, scheme: &String) -> Self { + fn new( + inner: R, + op: Operation, + stats: Arc, + scheme: &String, + ) -> Self { Self { inner, op, @@ -606,7 +600,7 @@ impl oio::BlockingRead for PrometheusMetricWrapper { fn next(&mut self) -> Option> { self.inner.next().map(|res| match res { Ok(bytes) => { - self.stats.observe_bytes_total(&self.scheme, self.op, n); + self.stats.observe_bytes_total(&self.scheme, self.op, bytes.len()); Ok(bytes) } Err(e) => { From 9b4737ee61a4c7c602e6cf8dbe07e5d6d35f653d Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 19 Sep 2023 16:15:19 +0800 Subject: [PATCH 10/18] fix typo --- core/src/layers/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index 73b0b0375ffa..7c064cdcf967 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -99,6 +99,5 @@ pub use self::await_tree::AwaitTreeLayer; #[cfg(feature = "layers-async-backtrace")] mod async_backtrace; - #[cfg(feature = "layers-async-backtrace")] pub use self::async_backtrace::AsyncBacktraceLayer; From b214cb0a6d82bc5191b962c88947cd5a3c5807bf Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 19 Sep 2023 16:22:26 +0800 Subject: [PATCH 11/18] fix: cargo fmt --- core/src/layers/prometheus_client.rs | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index 2c033da5f629..48bb828dc221 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -129,7 +129,7 @@ impl PrometheusClientMetrics { let buckets = prometheus_client::metrics::histogram::exponential_buckets(0.01, 2.0, 16); Histogram::new(buckets) }); - let bytes_histogram = Family::::new_with_constructor(|| { + let bytes_histogram = Family::::new_with_constructor(|| { let buckets = prometheus_client::metrics::histogram::exponential_buckets(1.0, 2.0, 16); Histogram::new(buckets) }); @@ -154,19 +154,28 @@ impl PrometheusClientMetrics { } fn increment_request_total(&self, scheme: &str, op: Operation) { - let labels = vec![("scheme", scheme.to_string()), ("operation", op.to_string())]; + let labels = vec![ + ("scheme", scheme.to_string()), + ("operation", op.to_string()), + ]; self.requests_total.get_or_create(&labels).inc(); } fn observe_bytes_total(&self, scheme: &str, op: Operation, bytes: usize) { - let labels = vec![("scheme", scheme.to_string()), ("operation", op.to_string())]; + let labels = vec![ + ("scheme", scheme.to_string()), + ("operation", op.to_string()), + ]; self.bytes_histogram .get_or_create(&labels) .observe(bytes as f64); } fn observe_request_duration(&self, scheme: &str, op: Operation, duration: std::time::Duration) { - let labels = vec![("scheme", scheme.to_string()), ("operation", op.to_string())]; + let labels = vec![ + ("scheme", scheme.to_string()), + ("operation", op.to_string()), + ]; self.request_duration_seconds .get_or_create(&labels) .observe(duration.as_secs_f64()); @@ -521,12 +530,7 @@ pub struct PrometheusMetricWrapper { } impl PrometheusMetricWrapper { - fn new( - inner: R, - op: Operation, - stats: Arc, - scheme: &String, - ) -> Self { + fn new(inner: R, op: Operation, stats: Arc, scheme: &String) -> Self { Self { inner, op, @@ -600,7 +604,8 @@ impl oio::BlockingRead for PrometheusMetricWrapper { fn next(&mut self) -> Option> { self.inner.next().map(|res| match res { Ok(bytes) => { - self.stats.observe_bytes_total(&self.scheme, self.op, bytes.len()); + self.stats + .observe_bytes_total(&self.scheme, self.op, bytes.len()); Ok(bytes) } Err(e) => { From d6200379407c12a01f17983c6130c817450d3636 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 19 Sep 2023 17:43:46 +0800 Subject: [PATCH 12/18] add a prefix --- core/src/layers/prometheus_client.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index 48bb828dc221..5a186eede6ac 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -31,6 +31,7 @@ use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; use prometheus_client::metrics::histogram::Histogram; use prometheus_client::registry::Registry; +use prometheus_client::metrics::histogram; use crate::raw::Accessor; use crate::raw::*; @@ -87,7 +88,7 @@ pub struct PrometheusClientLayer { impl PrometheusClientLayer { /// create PrometheusClientLayer while registering itself to this registry. - pub fn with_registry(registry: &mut Registry) -> Self { + pub fn new(registry: &mut Registry) -> Self { let metrics = PrometheusClientMetrics::register(registry); Self { metrics } } @@ -126,21 +127,21 @@ impl PrometheusClientMetrics { pub fn register(registry: &mut Registry) -> Self { let requests_total = Family::default(); let request_duration_seconds = Family::::new_with_constructor(|| { - let buckets = prometheus_client::metrics::histogram::exponential_buckets(0.01, 2.0, 16); + let buckets = histogram::exponential_buckets(0.01, 2.0, 16); Histogram::new(buckets) }); let bytes_histogram = Family::::new_with_constructor(|| { - let buckets = prometheus_client::metrics::histogram::exponential_buckets(1.0, 2.0, 16); + let buckets = histogram::exponential_buckets(1.0, 2.0, 16); Histogram::new(buckets) }); - registry.register("requests_total", "", requests_total.clone()); + registry.register("opendal_requests_total", "", requests_total.clone()); registry.register( - "request_duration_seconds", + "opendal_request_duration_seconds", "", request_duration_seconds.clone(), ); - registry.register("bytes_histogram", "", bytes_histogram.clone()); + registry.register("opendal_bytes_histogram", "", bytes_histogram.clone()); Self { requests_total, request_duration_seconds, From 49fc79d467ac91eba0d45b3ba68b5981f7839a40 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 19 Sep 2023 18:07:27 +0800 Subject: [PATCH 13/18] use structed labels --- core/src/layers/prometheus_client.rs | 209 +++++++++++++++------------ 1 file changed, 117 insertions(+), 92 deletions(-) diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index 5a186eede6ac..797dc0dabf10 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -21,17 +21,18 @@ use std::io; use std::sync::Arc; use std::task::Context; use std::task::Poll; -use std::time::Instant; +use std::time::{Duration, Instant}; use async_trait::async_trait; use bytes::Bytes; use futures::FutureExt; use futures::TryFutureExt; +use prometheus_client::encoding::EncodeLabelSet; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; +use prometheus_client::metrics::histogram; use prometheus_client::metrics::histogram::Histogram; use prometheus_client::registry::Registry; -use prometheus_client::metrics::histogram; use crate::raw::Accessor; use crate::raw::*; @@ -112,30 +113,47 @@ impl Layer for PrometheusClientLayer { type VecLabels = Vec<(&'static str, String)>; +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +struct OperationLabels { + scheme: Scheme, + op: Operation, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +struct ErrorLabels { + scheme: Scheme, + op: Operation, + err: ErrorKind, +} + /// [`PrometheusClientMetrics`] provide the performance and IO metrics with the `prometheus-client` crate. #[derive(Debug, Clone)] struct PrometheusClientMetrics { /// Total counter of the specific operation be called. - requests_total: Family, + requests_total: Family, + /// Total counter of the errors. + errors_total: Family, /// Latency of the specific operation be called. - request_duration_seconds: Family, + request_duration_seconds: Family, /// The histogram of bytes - bytes_histogram: Family, + bytes_histogram: Family, } impl PrometheusClientMetrics { pub fn register(registry: &mut Registry) -> Self { let requests_total = Family::default(); - let request_duration_seconds = Family::::new_with_constructor(|| { + let errors_total = Family::default(); + let request_duration_seconds = Family::::new_with_constructor(|| { let buckets = histogram::exponential_buckets(0.01, 2.0, 16); Histogram::new(buckets) }); - let bytes_histogram = Family::::new_with_constructor(|| { + let bytes_histogram = Family::::new_with_constructor(|| { let buckets = histogram::exponential_buckets(1.0, 2.0, 16); Histogram::new(buckets) }); registry.register("opendal_requests_total", "", requests_total.clone()); + registry.register("opendal_errors_total", "", errors_total.clone()); registry.register( "opendal_request_duration_seconds", "", @@ -144,39 +162,31 @@ impl PrometheusClientMetrics { registry.register("opendal_bytes_histogram", "", bytes_histogram.clone()); Self { requests_total, + errors_total, request_duration_seconds, bytes_histogram, } } - fn increment_errors_total(&self, op: Operation, kind: ErrorKind) { - let labels = vec![("operation", op.to_string()), ("kind", kind.to_string())]; - self.requests_total.get_or_create(&labels).inc(); + fn increment_errors_total(&self, scheme: Scheme, op: Operation, err: ErrorKind) { + let labels = ErrorLabels { op, scheme, err }; + self.errors_total.get_or_create(&labels).inc(); } - fn increment_request_total(&self, scheme: &str, op: Operation) { - let labels = vec![ - ("scheme", scheme.to_string()), - ("operation", op.to_string()), - ]; + fn increment_request_total(&self, scheme: Scheme, op: Operation) { + let labels = OperationLabels { scheme, op }; self.requests_total.get_or_create(&labels).inc(); } - fn observe_bytes_total(&self, scheme: &str, op: Operation, bytes: usize) { - let labels = vec![ - ("scheme", scheme.to_string()), - ("operation", op.to_string()), - ]; + fn observe_bytes_total(&self, scheme: Scheme, op: Operation, bytes: usize) { + let labels = OperationLabels { scheme, op }; self.bytes_histogram .get_or_create(&labels) .observe(bytes as f64); } - fn observe_request_duration(&self, scheme: &str, op: Operation, duration: std::time::Duration) { - let labels = vec![ - ("scheme", scheme.to_string()), - ("operation", op.to_string()), - ]; + fn observe_request_duration(&self, scheme: Scheme, op: Operation, duration: Duration) { + let labels = OperationLabels { scheme, op }; self.request_duration_seconds .get_or_create(&labels) .observe(duration.as_secs_f64()); @@ -187,7 +197,7 @@ impl PrometheusClientMetrics { pub struct PrometheusAccessor { inner: A, stats: Arc, - scheme: String, + scheme: Scheme, } impl Debug for PrometheusAccessor { @@ -214,26 +224,26 @@ impl LayeredAccessor for PrometheusAccessor { async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { self.stats - .increment_request_total(&self.scheme, Operation::CreateDir); + .increment_request_total(self.scheme, Operation::CreateDir); let start_time = Instant::now(); let create_res = self.inner.create_dir(path, args).await; self.stats.observe_request_duration( - &self.scheme, + self.scheme, Operation::CreateDir, start_time.elapsed(), ); create_res.map_err(|e| { self.stats - .increment_errors_total(Operation::CreateDir, e.kind()); + .increment_errors_total(self.scheme, Operation::CreateDir, e.kind()); e }) } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { self.stats - .increment_request_total(&self.scheme, Operation::Read); + .increment_request_total(self.scheme, Operation::Read); let start_time = Instant::now(); let read_res = self @@ -242,7 +252,7 @@ impl LayeredAccessor for PrometheusAccessor { .map(|v| { v.map(|(rp, r)| { self.stats.observe_bytes_total( - &self.scheme, + self.scheme, Operation::Read, rp.metadata().content_length() as usize, ); @@ -252,24 +262,25 @@ impl LayeredAccessor for PrometheusAccessor { r, Operation::Read, self.stats.clone(), - &self.scheme, + self.scheme, ), ) }) }) .await; self.stats - .observe_request_duration(&self.scheme, Operation::Read, start_time.elapsed()); + .observe_request_duration(self.scheme, Operation::Read, start_time.elapsed()); read_res.map_err(|e| { - self.stats.increment_errors_total(Operation::Read, e.kind()); + self.stats + .increment_errors_total(self.scheme, Operation::Read, e.kind()); e }) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { self.stats - .increment_request_total(&self.scheme, Operation::Write); + .increment_request_total(self.scheme, Operation::Write); let start_time = Instant::now(); let write_res = self @@ -283,7 +294,7 @@ impl LayeredAccessor for PrometheusAccessor { r, Operation::Write, self.stats.clone(), - &self.scheme, + self.scheme, ), ) }) @@ -291,125 +302,128 @@ impl LayeredAccessor for PrometheusAccessor { .await; self.stats - .observe_request_duration(&self.scheme, Operation::Write, start_time.elapsed()); + .observe_request_duration(self.scheme, Operation::Write, start_time.elapsed()); write_res.map_err(|e| { self.stats - .increment_errors_total(Operation::Write, e.kind()); + .increment_errors_total(self.scheme, Operation::Write, e.kind()); e }) } async fn stat(&self, path: &str, args: OpStat) -> Result { self.stats - .increment_request_total(&self.scheme, Operation::Stat); + .increment_request_total(self.scheme, Operation::Stat); let start_time = Instant::now(); let stat_res = self .inner .stat(path, args) .inspect_err(|e| { - self.stats.increment_errors_total(Operation::Stat, e.kind()); + self.stats + .increment_errors_total(self.scheme, Operation::Stat, e.kind()); }) .await; self.stats - .observe_request_duration(&self.scheme, Operation::Stat, start_time.elapsed()); + .observe_request_duration(self.scheme, Operation::Stat, start_time.elapsed()); stat_res.map_err(|e| { - self.stats.increment_errors_total(Operation::Stat, e.kind()); + self.stats + .increment_errors_total(self.scheme, Operation::Stat, e.kind()); e }) } async fn delete(&self, path: &str, args: OpDelete) -> Result { self.stats - .increment_request_total(&self.scheme, Operation::Delete); + .increment_request_total(self.scheme, Operation::Delete); let start_time = Instant::now(); let delete_res = self.inner.delete(path, args).await; self.stats - .observe_request_duration(&self.scheme, Operation::Delete, start_time.elapsed()); + .observe_request_duration(self.scheme, Operation::Delete, start_time.elapsed()); delete_res.map_err(|e| { self.stats - .increment_errors_total(Operation::Delete, e.kind()); + .increment_errors_total(self.scheme, Operation::Delete, e.kind()); e }) } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { self.stats - .increment_request_total(&self.scheme, Operation::List); + .increment_request_total(self.scheme, Operation::List); let start_time = Instant::now(); let list_res = self.inner.list(path, args).await; self.stats - .observe_request_duration(&self.scheme, Operation::List, start_time.elapsed()); + .observe_request_duration(self.scheme, Operation::List, start_time.elapsed()); list_res.map_err(|e| { - self.stats.increment_errors_total(Operation::List, e.kind()); + self.stats + .increment_errors_total(self.scheme, Operation::List, e.kind()); e }) } async fn batch(&self, args: OpBatch) -> Result { self.stats - .increment_request_total(&self.scheme, Operation::Batch); + .increment_request_total(self.scheme, Operation::Batch); let start_time = Instant::now(); let result = self.inner.batch(args).await; self.stats - .observe_request_duration(&self.scheme, Operation::Batch, start_time.elapsed()); + .observe_request_duration(self.scheme, Operation::Batch, start_time.elapsed()); result.map_err(|e| { self.stats - .increment_errors_total(Operation::Batch, e.kind()); + .increment_errors_total(self.scheme, Operation::Batch, e.kind()); e }) } async fn presign(&self, path: &str, args: OpPresign) -> Result { self.stats - .increment_request_total(&self.scheme, Operation::Presign); + .increment_request_total(self.scheme, Operation::Presign); let start_time = Instant::now(); let result = self.inner.presign(path, args).await; self.stats - .observe_request_duration(&self.scheme, Operation::Presign, start_time.elapsed()); + .observe_request_duration(self.scheme, Operation::Presign, start_time.elapsed()); result.map_err(|e| { self.stats - .increment_errors_total(Operation::Presign, e.kind()); + .increment_errors_total(self.scheme, Operation::Presign, e.kind()); e }) } fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { self.stats - .increment_request_total(&self.scheme, Operation::BlockingCreateDir); + .increment_request_total(self.scheme, Operation::BlockingCreateDir); let start_time = Instant::now(); let result = self.inner.blocking_create_dir(path, args); self.stats.observe_request_duration( - &self.scheme, + self.scheme, Operation::BlockingCreateDir, start_time.elapsed(), ); result.map_err(|e| { self.stats - .increment_errors_total(Operation::BlockingCreateDir, e.kind()); + .increment_errors_total(self.scheme, Operation::BlockingCreateDir, e.kind()); e }) } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { self.stats - .increment_request_total(&self.scheme, Operation::BlockingRead); + .increment_request_total(self.scheme, Operation::BlockingRead); let start_time = Instant::now(); let result = self.inner.blocking_read(path, args).map(|(rp, r)| { self.stats.observe_bytes_total( - &self.scheme, + self.scheme, Operation::BlockingRead, rp.metadata().content_length() as usize, ); @@ -419,26 +433,26 @@ impl LayeredAccessor for PrometheusAccessor { r, Operation::BlockingRead, self.stats.clone(), - &self.scheme, + self.scheme, ), ) }); self.stats.observe_request_duration( - &self.scheme, + self.scheme, Operation::BlockingRead, start_time.elapsed(), ); result.map_err(|e| { self.stats - .increment_errors_total(Operation::BlockingRead, e.kind()); + .increment_errors_total(self.scheme, Operation::BlockingRead, e.kind()); e }) } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { self.stats - .increment_request_total(&self.scheme, Operation::BlockingWrite); + .increment_request_total(self.scheme, Operation::BlockingWrite); let start_time = Instant::now(); let result = self.inner.blocking_write(path, args).map(|(rp, r)| { @@ -448,75 +462,75 @@ impl LayeredAccessor for PrometheusAccessor { r, Operation::BlockingWrite, self.stats.clone(), - &self.scheme, + self.scheme, ), ) }); self.stats.observe_request_duration( - &self.scheme, + self.scheme, Operation::BlockingWrite, start_time.elapsed(), ); result.map_err(|e| { self.stats - .increment_errors_total(Operation::BlockingWrite, e.kind()); + .increment_errors_total(self.scheme, Operation::BlockingWrite, e.kind()); e }) } fn blocking_stat(&self, path: &str, args: OpStat) -> Result { self.stats - .increment_request_total(&self.scheme, Operation::BlockingStat); + .increment_request_total(self.scheme, Operation::BlockingStat); let start_time = Instant::now(); let result = self.inner.blocking_stat(path, args); self.stats.observe_request_duration( - &self.scheme, + self.scheme, Operation::BlockingStat, start_time.elapsed(), ); result.map_err(|e| { self.stats - .increment_errors_total(Operation::BlockingStat, e.kind()); + .increment_errors_total(self.scheme, Operation::BlockingStat, e.kind()); e }) } fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { self.stats - .increment_request_total(&self.scheme, Operation::BlockingDelete); + .increment_request_total(self.scheme, Operation::BlockingDelete); let start_time = Instant::now(); let result = self.inner.blocking_delete(path, args); self.stats.observe_request_duration( - &self.scheme, + self.scheme, Operation::BlockingDelete, start_time.elapsed(), ); result.map_err(|e| { self.stats - .increment_errors_total(Operation::BlockingDelete, e.kind()); + .increment_errors_total(self.scheme, Operation::BlockingDelete, e.kind()); e }) } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { self.stats - .increment_request_total(&self.scheme, Operation::BlockingList); + .increment_request_total(self.scheme, Operation::BlockingList); let start_time = Instant::now(); let result = self.inner.blocking_list(path, args); self.stats.observe_request_duration( - &self.scheme, + self.scheme, Operation::BlockingList, start_time.elapsed(), ); result.map_err(|e| { self.stats - .increment_errors_total(Operation::BlockingList, e.kind()); + .increment_errors_total(self.scheme, Operation::BlockingList, e.kind()); e }) } @@ -527,16 +541,16 @@ pub struct PrometheusMetricWrapper { op: Operation, stats: Arc, - scheme: String, + scheme: Scheme, } impl PrometheusMetricWrapper { - fn new(inner: R, op: Operation, stats: Arc, scheme: &String) -> Self { + fn new(inner: R, op: Operation, stats: Arc, scheme: Scheme) -> Self { Self { inner, op, stats, - scheme: scheme.to_string(), + scheme, } } } @@ -549,7 +563,8 @@ impl oio::Read for PrometheusMetricWrapper { Ok(bytes) } Err(e) => { - self.stats.increment_errors_total(self.op, e.kind()); + self.stats + .increment_errors_total(self.scheme, self.op, e.kind()); Err(e) } }) @@ -559,7 +574,8 @@ impl oio::Read for PrometheusMetricWrapper { self.inner.poll_seek(cx, pos).map(|res| match res { Ok(n) => Ok(n), Err(e) => { - self.stats.increment_errors_total(self.op, e.kind()); + self.stats + .increment_errors_total(self.scheme, self.op, e.kind()); Err(e) } }) @@ -569,11 +585,12 @@ impl oio::Read for PrometheusMetricWrapper { self.inner.poll_next(cx).map(|res| match res { Some(Ok(bytes)) => { self.stats - .observe_bytes_total(&self.scheme, self.op, bytes.len()); + .observe_bytes_total(self.scheme, self.op, bytes.len()); Some(Ok(bytes)) } Some(Err(e)) => { - self.stats.increment_errors_total(self.op, e.kind()); + self.stats + .increment_errors_total(self.scheme, self.op, e.kind()); Some(Err(e)) } None => None, @@ -586,18 +603,20 @@ impl oio::BlockingRead for PrometheusMetricWrapper { self.inner .read(buf) .map(|n| { - self.stats.observe_bytes_total(&self.scheme, self.op, n); + self.stats.observe_bytes_total(self.scheme, self.op, n); n }) .map_err(|e| { - self.stats.increment_errors_total(self.op, e.kind()); + self.stats + .increment_errors_total(self.scheme, self.op, e.kind()); e }) } fn seek(&mut self, pos: io::SeekFrom) -> Result { self.inner.seek(pos).map_err(|err| { - self.stats.increment_errors_total(self.op, err.kind()); + self.stats + .increment_errors_total(self.scheme, self.op, err.kind()); err }) } @@ -610,7 +629,8 @@ impl oio::BlockingRead for PrometheusMetricWrapper { Ok(bytes) } Err(e) => { - self.stats.increment_errors_total(self.op, e.kind()); + self.stats + .increment_errors_total(self.scheme, self.op, e.kind()); Err(e) } }) @@ -627,21 +647,24 @@ impl oio::Write for PrometheusMetricWrapper { n }) .map_err(|err| { - self.stats.increment_errors_total(self.op, err.kind()); + self.stats + .increment_errors_total(self.scheme, self.op, err.kind()); err }) } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_abort(cx).map_err(|err| { - self.stats.increment_errors_total(self.op, err.kind()); + self.stats + .increment_errors_total(self.scheme, self.op, err.kind()); err }) } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_close(cx).map_err(|err| { - self.stats.increment_errors_total(self.op, err.kind()); + self.stats + .increment_errors_total(self.scheme, self.op, err.kind()); err }) } @@ -652,18 +675,20 @@ impl oio::BlockingWrite for PrometheusMetricWrapper { self.inner .write(bs) .map(|n| { - self.stats.observe_bytes_total(&self.scheme, self.op, n); + self.stats.observe_bytes_total(self.scheme, self.op, n); n }) .map_err(|err| { - self.stats.increment_errors_total(self.op, err.kind()); + self.stats + .increment_errors_total(self.scheme, self.op, err.kind()); err }) } fn close(&mut self) -> Result<()> { self.inner.close().map_err(|err| { - self.stats.increment_errors_total(self.op, err.kind()); + self.stats + .increment_errors_total(self.scheme, self.op, err.kind()); err }) } From 5c100c58be7c54f007235b3c854c478ed84d0e06 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 19 Sep 2023 18:42:43 +0800 Subject: [PATCH 14/18] use labels in array --- core/src/layers/prometheus_client.rs | 39 +++++++++++----------------- 1 file changed, 15 insertions(+), 24 deletions(-) diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index 797dc0dabf10..a446db66e126 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::Debug; use std::fmt::Formatter; +use std::fmt::Debug; use std::io; use std::sync::Arc; use std::task::Context; @@ -27,7 +27,6 @@ use async_trait::async_trait; use bytes::Bytes; use futures::FutureExt; use futures::TryFutureExt; -use prometheus_client::encoding::EncodeLabelSet; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; use prometheus_client::metrics::histogram; @@ -106,25 +105,13 @@ impl Layer for PrometheusClientLayer { PrometheusAccessor { inner, stats, - scheme: scheme.to_string(), + scheme, } } } -type VecLabels = Vec<(&'static str, String)>; - -#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] -struct OperationLabels { - scheme: Scheme, - op: Operation, -} - -#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] -struct ErrorLabels { - scheme: Scheme, - op: Operation, - err: ErrorKind, -} +type OperationLabels = [(&'static str, &'static str); 2]; +type ErrorLabels = [(&'static str, &'static str); 3]; /// [`PrometheusClientMetrics`] provide the performance and IO metrics with the `prometheus-client` crate. #[derive(Debug, Clone)] @@ -169,24 +156,28 @@ impl PrometheusClientMetrics { } fn increment_errors_total(&self, scheme: Scheme, op: Operation, err: ErrorKind) { - let labels = ErrorLabels { op, scheme, err }; + let labels = [ + ("scheme", scheme.into_static()), + ("op", op.into_static()), + ("err", err.into_static()), + ]; self.errors_total.get_or_create(&labels).inc(); } fn increment_request_total(&self, scheme: Scheme, op: Operation) { - let labels = OperationLabels { scheme, op }; + let labels = [("scheme", scheme.into_static()), ("op", op.into_static())]; self.requests_total.get_or_create(&labels).inc(); } fn observe_bytes_total(&self, scheme: Scheme, op: Operation, bytes: usize) { - let labels = OperationLabels { scheme, op }; + let labels = [("scheme", scheme.into_static()), ("op", op.into_static())]; self.bytes_histogram .get_or_create(&labels) .observe(bytes as f64); } fn observe_request_duration(&self, scheme: Scheme, op: Operation, duration: Duration) { - let labels = OperationLabels { scheme, op }; + let labels = [("scheme", scheme.into_static()), ("op", op.into_static())]; self.request_duration_seconds .get_or_create(&labels) .observe(duration.as_secs_f64()); @@ -559,7 +550,7 @@ impl oio::Read for PrometheusMetricWrapper { fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { self.inner.poll_read(cx, buf).map(|res| match res { Ok(bytes) => { - self.stats.observe_bytes_total(&self.scheme, self.op, bytes); + self.stats.observe_bytes_total(self.scheme, self.op, bytes); Ok(bytes) } Err(e) => { @@ -625,7 +616,7 @@ impl oio::BlockingRead for PrometheusMetricWrapper { self.inner.next().map(|res| match res { Ok(bytes) => { self.stats - .observe_bytes_total(&self.scheme, self.op, bytes.len()); + .observe_bytes_total(self.scheme, self.op, bytes.len()); Ok(bytes) } Err(e) => { @@ -643,7 +634,7 @@ impl oio::Write for PrometheusMetricWrapper { self.inner .poll_write(cx, bs) .map_ok(|n| { - self.stats.observe_bytes_total(&self.scheme, self.op, n); + self.stats.observe_bytes_total(self.scheme, self.op, n); n }) .map_err(|err| { From 90ee9618ea9ac13eadf30094efabd50b981103c7 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 19 Sep 2023 18:52:21 +0800 Subject: [PATCH 15/18] remove the unused metrics --- core/src/layers/prometheus_client.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index a446db66e126..54d82b9ea148 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -242,11 +242,6 @@ impl LayeredAccessor for PrometheusAccessor { .read(path, args) .map(|v| { v.map(|(rp, r)| { - self.stats.observe_bytes_total( - self.scheme, - Operation::Read, - rp.metadata().content_length() as usize, - ); ( rp, PrometheusMetricWrapper::new( @@ -413,11 +408,6 @@ impl LayeredAccessor for PrometheusAccessor { let start_time = Instant::now(); let result = self.inner.blocking_read(path, args).map(|(rp, r)| { - self.stats.observe_bytes_total( - self.scheme, - Operation::BlockingRead, - rp.metadata().content_length() as usize, - ); ( rp, PrometheusMetricWrapper::new( From dfc5e8f4433be9fc51658fb2d995db82001434ee Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 19 Sep 2023 19:06:47 +0800 Subject: [PATCH 16/18] rename stats to metrics --- core/src/layers/prometheus_client.rs | 169 ++++++++++++++------------- 1 file changed, 90 insertions(+), 79 deletions(-) diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index 54d82b9ea148..858ba679a230 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::Formatter; use std::fmt::Debug; +use std::fmt::Formatter; use std::io; use std::sync::Arc; use std::task::Context; @@ -101,10 +101,10 @@ impl Layer for PrometheusClientLayer { let meta = inner.info(); let scheme = meta.scheme(); - let stats = Arc::new(self.metrics.clone()); + let metrics = Arc::new(self.metrics.clone()); PrometheusAccessor { inner, - stats, + metrics, scheme, } } @@ -187,7 +187,7 @@ impl PrometheusClientMetrics { #[derive(Clone)] pub struct PrometheusAccessor { inner: A, - stats: Arc, + metrics: Arc, scheme: Scheme, } @@ -214,26 +214,26 @@ impl LayeredAccessor for PrometheusAccessor { } async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { - self.stats + self.metrics .increment_request_total(self.scheme, Operation::CreateDir); let start_time = Instant::now(); let create_res = self.inner.create_dir(path, args).await; - self.stats.observe_request_duration( + self.metrics.observe_request_duration( self.scheme, Operation::CreateDir, start_time.elapsed(), ); create_res.map_err(|e| { - self.stats + self.metrics .increment_errors_total(self.scheme, Operation::CreateDir, e.kind()); e }) } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - self.stats + self.metrics .increment_request_total(self.scheme, Operation::Read); let start_time = Instant::now(); @@ -247,25 +247,27 @@ impl LayeredAccessor for PrometheusAccessor { PrometheusMetricWrapper::new( r, Operation::Read, - self.stats.clone(), + self.metrics.clone(), self.scheme, ), ) }) }) .await; - self.stats - .observe_request_duration(self.scheme, Operation::Read, start_time.elapsed()); + self.metrics.observe_request_duration( + self.scheme, + Operation::Read, + start_time.elapsed()); read_res.map_err(|e| { - self.stats + self.metrics .increment_errors_total(self.scheme, Operation::Read, e.kind()); e }) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - self.stats + self.metrics .increment_request_total(self.scheme, Operation::Write); let start_time = Instant::now(); @@ -279,7 +281,7 @@ impl LayeredAccessor for PrometheusAccessor { PrometheusMetricWrapper::new( r, Operation::Write, - self.stats.clone(), + self.metrics.clone(), self.scheme, ), ) @@ -287,17 +289,20 @@ impl LayeredAccessor for PrometheusAccessor { }) .await; - self.stats - .observe_request_duration(self.scheme, Operation::Write, start_time.elapsed()); + self.metrics.observe_request_duration( + self.scheme, + Operation::Write, + start_time.elapsed(), + ); write_res.map_err(|e| { - self.stats + self.metrics .increment_errors_total(self.scheme, Operation::Write, e.kind()); e }) } async fn stat(&self, path: &str, args: OpStat) -> Result { - self.stats + self.metrics .increment_request_total(self.scheme, Operation::Stat); let start_time = Instant::now(); @@ -305,105 +310,105 @@ impl LayeredAccessor for PrometheusAccessor { .inner .stat(path, args) .inspect_err(|e| { - self.stats + self.metrics .increment_errors_total(self.scheme, Operation::Stat, e.kind()); }) .await; - self.stats + self.metrics .observe_request_duration(self.scheme, Operation::Stat, start_time.elapsed()); stat_res.map_err(|e| { - self.stats + self.metrics .increment_errors_total(self.scheme, Operation::Stat, e.kind()); e }) } async fn delete(&self, path: &str, args: OpDelete) -> Result { - self.stats + self.metrics .increment_request_total(self.scheme, Operation::Delete); let start_time = Instant::now(); let delete_res = self.inner.delete(path, args).await; - self.stats + self.metrics .observe_request_duration(self.scheme, Operation::Delete, start_time.elapsed()); delete_res.map_err(|e| { - self.stats + self.metrics .increment_errors_total(self.scheme, Operation::Delete, e.kind()); e }) } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - self.stats + self.metrics .increment_request_total(self.scheme, Operation::List); let start_time = Instant::now(); let list_res = self.inner.list(path, args).await; - self.stats + self.metrics .observe_request_duration(self.scheme, Operation::List, start_time.elapsed()); list_res.map_err(|e| { - self.stats + self.metrics .increment_errors_total(self.scheme, Operation::List, e.kind()); e }) } async fn batch(&self, args: OpBatch) -> Result { - self.stats + self.metrics .increment_request_total(self.scheme, Operation::Batch); let start_time = Instant::now(); let result = self.inner.batch(args).await; - self.stats + self.metrics .observe_request_duration(self.scheme, Operation::Batch, start_time.elapsed()); result.map_err(|e| { - self.stats + self.metrics .increment_errors_total(self.scheme, Operation::Batch, e.kind()); e }) } async fn presign(&self, path: &str, args: OpPresign) -> Result { - self.stats + self.metrics .increment_request_total(self.scheme, Operation::Presign); let start_time = Instant::now(); let result = self.inner.presign(path, args).await; - self.stats + self.metrics .observe_request_duration(self.scheme, Operation::Presign, start_time.elapsed()); result.map_err(|e| { - self.stats + self.metrics .increment_errors_total(self.scheme, Operation::Presign, e.kind()); e }) } fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { - self.stats + self.metrics .increment_request_total(self.scheme, Operation::BlockingCreateDir); let start_time = Instant::now(); let result = self.inner.blocking_create_dir(path, args); - self.stats.observe_request_duration( + self.metrics.observe_request_duration( self.scheme, Operation::BlockingCreateDir, start_time.elapsed(), ); result.map_err(|e| { - self.stats + self.metrics .increment_errors_total(self.scheme, Operation::BlockingCreateDir, e.kind()); e }) } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - self.stats + self.metrics .increment_request_total(self.scheme, Operation::BlockingRead); let start_time = Instant::now(); @@ -413,26 +418,25 @@ impl LayeredAccessor for PrometheusAccessor { PrometheusMetricWrapper::new( r, Operation::BlockingRead, - self.stats.clone(), + self.metrics.clone(), self.scheme, ), ) }); - self.stats.observe_request_duration( + self.metrics.observe_request_duration( self.scheme, Operation::BlockingRead, - start_time.elapsed(), - ); + start_time.elapsed()); result.map_err(|e| { - self.stats + self.metrics .increment_errors_total(self.scheme, Operation::BlockingRead, e.kind()); e }) } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - self.stats + self.metrics .increment_request_total(self.scheme, Operation::BlockingWrite); let start_time = Instant::now(); @@ -442,75 +446,75 @@ impl LayeredAccessor for PrometheusAccessor { PrometheusMetricWrapper::new( r, Operation::BlockingWrite, - self.stats.clone(), + self.metrics.clone(), self.scheme, ), ) }); - self.stats.observe_request_duration( + self.metrics.observe_request_duration( self.scheme, Operation::BlockingWrite, - start_time.elapsed(), - ); + start_time.elapsed()); result.map_err(|e| { - self.stats + self.metrics .increment_errors_total(self.scheme, Operation::BlockingWrite, e.kind()); e }) } fn blocking_stat(&self, path: &str, args: OpStat) -> Result { - self.stats + self.metrics .increment_request_total(self.scheme, Operation::BlockingStat); let start_time = Instant::now(); let result = self.inner.blocking_stat(path, args); - self.stats.observe_request_duration( + self.metrics.observe_request_duration( self.scheme, Operation::BlockingStat, start_time.elapsed(), ); + result.map_err(|e| { - self.stats + self.metrics .increment_errors_total(self.scheme, Operation::BlockingStat, e.kind()); e }) } fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { - self.stats + self.metrics .increment_request_total(self.scheme, Operation::BlockingDelete); let start_time = Instant::now(); let result = self.inner.blocking_delete(path, args); - self.stats.observe_request_duration( + self.metrics.observe_request_duration( self.scheme, Operation::BlockingDelete, start_time.elapsed(), ); result.map_err(|e| { - self.stats + self.metrics .increment_errors_total(self.scheme, Operation::BlockingDelete, e.kind()); e }) } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { - self.stats + self.metrics .increment_request_total(self.scheme, Operation::BlockingList); let start_time = Instant::now(); let result = self.inner.blocking_list(path, args); - self.stats.observe_request_duration( + self.metrics.observe_request_duration( self.scheme, Operation::BlockingList, start_time.elapsed(), ); result.map_err(|e| { - self.stats + self.metrics .increment_errors_total(self.scheme, Operation::BlockingList, e.kind()); e }) @@ -521,17 +525,19 @@ pub struct PrometheusMetricWrapper { inner: R, op: Operation, - stats: Arc, + metrics: Arc, scheme: Scheme, + bytes_total: usize, } impl PrometheusMetricWrapper { - fn new(inner: R, op: Operation, stats: Arc, scheme: Scheme) -> Self { + fn new(inner: R, op: Operation, metrics: Arc, scheme: Scheme) -> Self { Self { inner, op, - stats, + metrics, scheme, + bytes_total: 0, } } } @@ -540,11 +546,11 @@ impl oio::Read for PrometheusMetricWrapper { fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { self.inner.poll_read(cx, buf).map(|res| match res { Ok(bytes) => { - self.stats.observe_bytes_total(self.scheme, self.op, bytes); + self.bytes_total += bytes; Ok(bytes) } Err(e) => { - self.stats + self.metrics .increment_errors_total(self.scheme, self.op, e.kind()); Err(e) } @@ -555,7 +561,7 @@ impl oio::Read for PrometheusMetricWrapper { self.inner.poll_seek(cx, pos).map(|res| match res { Ok(n) => Ok(n), Err(e) => { - self.stats + self.metrics .increment_errors_total(self.scheme, self.op, e.kind()); Err(e) } @@ -565,12 +571,11 @@ impl oio::Read for PrometheusMetricWrapper { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { self.inner.poll_next(cx).map(|res| match res { Some(Ok(bytes)) => { - self.stats - .observe_bytes_total(self.scheme, self.op, bytes.len()); + self.bytes_total += bytes.len(); Some(Ok(bytes)) } Some(Err(e)) => { - self.stats + self.metrics .increment_errors_total(self.scheme, self.op, e.kind()); Some(Err(e)) } @@ -584,11 +589,11 @@ impl oio::BlockingRead for PrometheusMetricWrapper { self.inner .read(buf) .map(|n| { - self.stats.observe_bytes_total(self.scheme, self.op, n); + self.bytes_total += n; n }) .map_err(|e| { - self.stats + self.metrics .increment_errors_total(self.scheme, self.op, e.kind()); e }) @@ -596,7 +601,7 @@ impl oio::BlockingRead for PrometheusMetricWrapper { fn seek(&mut self, pos: io::SeekFrom) -> Result { self.inner.seek(pos).map_err(|err| { - self.stats + self.metrics .increment_errors_total(self.scheme, self.op, err.kind()); err }) @@ -605,12 +610,11 @@ impl oio::BlockingRead for PrometheusMetricWrapper { fn next(&mut self) -> Option> { self.inner.next().map(|res| match res { Ok(bytes) => { - self.stats - .observe_bytes_total(self.scheme, self.op, bytes.len()); + self.bytes_total += bytes.len(); Ok(bytes) } Err(e) => { - self.stats + self.metrics .increment_errors_total(self.scheme, self.op, e.kind()); Err(e) } @@ -624,11 +628,11 @@ impl oio::Write for PrometheusMetricWrapper { self.inner .poll_write(cx, bs) .map_ok(|n| { - self.stats.observe_bytes_total(self.scheme, self.op, n); + self.bytes_total += n; n }) .map_err(|err| { - self.stats + self.metrics .increment_errors_total(self.scheme, self.op, err.kind()); err }) @@ -636,7 +640,7 @@ impl oio::Write for PrometheusMetricWrapper { fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_abort(cx).map_err(|err| { - self.stats + self.metrics .increment_errors_total(self.scheme, self.op, err.kind()); err }) @@ -644,7 +648,7 @@ impl oio::Write for PrometheusMetricWrapper { fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_close(cx).map_err(|err| { - self.stats + self.metrics .increment_errors_total(self.scheme, self.op, err.kind()); err }) @@ -656,11 +660,11 @@ impl oio::BlockingWrite for PrometheusMetricWrapper { self.inner .write(bs) .map(|n| { - self.stats.observe_bytes_total(self.scheme, self.op, n); + self.bytes_total += n; n }) .map_err(|err| { - self.stats + self.metrics .increment_errors_total(self.scheme, self.op, err.kind()); err }) @@ -668,9 +672,16 @@ impl oio::BlockingWrite for PrometheusMetricWrapper { fn close(&mut self) -> Result<()> { self.inner.close().map_err(|err| { - self.stats + self.metrics .increment_errors_total(self.scheme, self.op, err.kind()); err }) } } + +impl Drop for PrometheusMetricWrapper { + fn drop(&mut self) { + self.metrics + .observe_bytes_total(self.scheme, self.op, self.bytes_total); + } +} From 3afeadec7250e5f17f66aa9eaf2de981982b50f6 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 19 Sep 2023 19:11:10 +0800 Subject: [PATCH 17/18] record request duration in wrapper --- core/src/layers/prometheus_client.rs | 26 ++++---------------------- 1 file changed, 4 insertions(+), 22 deletions(-) diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index 858ba679a230..3065ae4a2b3b 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -235,7 +235,6 @@ impl LayeredAccessor for PrometheusAccessor { async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { self.metrics .increment_request_total(self.scheme, Operation::Read); - let start_time = Instant::now(); let read_res = self .inner @@ -254,11 +253,6 @@ impl LayeredAccessor for PrometheusAccessor { }) }) .await; - - self.metrics.observe_request_duration( - self.scheme, - Operation::Read, - start_time.elapsed()); read_res.map_err(|e| { self.metrics .increment_errors_total(self.scheme, Operation::Read, e.kind()); @@ -269,7 +263,6 @@ impl LayeredAccessor for PrometheusAccessor { async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { self.metrics .increment_request_total(self.scheme, Operation::Write); - let start_time = Instant::now(); let write_res = self .inner @@ -289,11 +282,6 @@ impl LayeredAccessor for PrometheusAccessor { }) .await; - self.metrics.observe_request_duration( - self.scheme, - Operation::Write, - start_time.elapsed(), - ); write_res.map_err(|e| { self.metrics .increment_errors_total(self.scheme, Operation::Write, e.kind()); @@ -410,7 +398,6 @@ impl LayeredAccessor for PrometheusAccessor { fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { self.metrics .increment_request_total(self.scheme, Operation::BlockingRead); - let start_time = Instant::now(); let result = self.inner.blocking_read(path, args).map(|(rp, r)| { ( @@ -424,10 +411,6 @@ impl LayeredAccessor for PrometheusAccessor { ) }); - self.metrics.observe_request_duration( - self.scheme, - Operation::BlockingRead, - start_time.elapsed()); result.map_err(|e| { self.metrics .increment_errors_total(self.scheme, Operation::BlockingRead, e.kind()); @@ -438,7 +421,6 @@ impl LayeredAccessor for PrometheusAccessor { fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { self.metrics .increment_request_total(self.scheme, Operation::BlockingWrite); - let start_time = Instant::now(); let result = self.inner.blocking_write(path, args).map(|(rp, r)| { ( @@ -452,10 +434,6 @@ impl LayeredAccessor for PrometheusAccessor { ) }); - self.metrics.observe_request_duration( - self.scheme, - Operation::BlockingWrite, - start_time.elapsed()); result.map_err(|e| { self.metrics .increment_errors_total(self.scheme, Operation::BlockingWrite, e.kind()); @@ -528,6 +506,7 @@ pub struct PrometheusMetricWrapper { metrics: Arc, scheme: Scheme, bytes_total: usize, + start_time: Instant, } impl PrometheusMetricWrapper { @@ -538,6 +517,7 @@ impl PrometheusMetricWrapper { metrics, scheme, bytes_total: 0, + start_time: Instant::now(), } } } @@ -683,5 +663,7 @@ impl Drop for PrometheusMetricWrapper { fn drop(&mut self) { self.metrics .observe_bytes_total(self.scheme, self.op, self.bytes_total); + self.metrics + .observe_request_duration(self.scheme, self.op, self.start_time.elapsed()); } } From be4127c39b21474bb4210c331ae6b2eb8538a08a Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Tue, 19 Sep 2023 19:16:02 +0800 Subject: [PATCH 18/18] fix fmt --- core/src/layers/prometheus_client.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index 3065ae4a2b3b..b08b50a28af6 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -367,8 +367,11 @@ impl LayeredAccessor for PrometheusAccessor { let result = self.inner.presign(path, args).await; - self.metrics - .observe_request_duration(self.scheme, Operation::Presign, start_time.elapsed()); + self.metrics.observe_request_duration( + self.scheme, + Operation::Presign, + start_time.elapsed(), + ); result.map_err(|e| { self.metrics .increment_errors_total(self.scheme, Operation::Presign, e.kind()); @@ -389,8 +392,11 @@ impl LayeredAccessor for PrometheusAccessor { start_time.elapsed(), ); result.map_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::BlockingCreateDir, e.kind()); + self.metrics.increment_errors_total( + self.scheme, + Operation::BlockingCreateDir, + e.kind(), + ); e }) }