diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 76379f5b4c1a..4692da0a68c7 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -39,6 +39,7 @@ use prometheus::Registry; use crate::raw::Accessor; use crate::raw::*; use crate::*; + /// Add [prometheus](https://docs.rs/prometheus) for every operations. /// /// # Prometheus Metrics @@ -107,6 +108,7 @@ pub struct PrometheusLayer { registry: Registry, requests_duration_seconds_buckets: Vec, bytes_total_buckets: Vec, + path_label_level: usize, } impl PrometheusLayer { @@ -116,6 +118,7 @@ impl PrometheusLayer { registry, requests_duration_seconds_buckets: exponential_buckets(0.01, 2.0, 16).unwrap(), bytes_total_buckets: exponential_buckets(0.01, 2.0, 16).unwrap(), + path_label_level: 0, } } @@ -134,6 +137,14 @@ impl PrometheusLayer { } self } + + /// set path label level + /// 0: no path label, the path label will be the "" + /// >0: the path label will be the path split by "/" and get the last n level, like "/abc/def/ghi", if n=1, the path label will be "/abc" + pub fn enable_path_label(mut self, level: usize) -> Self { + self.path_label_level = level; + self + } } impl Layer for PrometheusLayer { @@ -149,11 +160,13 @@ impl Layer for PrometheusLayer { self.registry.clone(), self.requests_duration_seconds_buckets.clone(), self.bytes_total_buckets.clone(), + self.path_label_level, )), - scheme: scheme.to_string(), + scheme, } } } + /// [`PrometheusMetrics`] provide the performance and IO metrics. #[derive(Debug)] pub struct PrometheusMetrics { @@ -163,6 +176,8 @@ pub struct PrometheusMetrics { pub requests_duration_seconds: HistogramVec, /// Size of the specific metrics. pub bytes_total: HistogramVec, + /// The Path Level we will keep in the path label. + pub path_label_level: usize, } impl PrometheusMetrics { @@ -171,11 +186,17 @@ impl PrometheusMetrics { registry: Registry, requests_duration_seconds_buckets: Vec, bytes_total_buckets: Vec, + path_label_level: usize, ) -> Self { + let labels = if path_label_level > 0 { + vec!["scheme", "operation", "path"] + } else { + vec!["scheme", "operation"] + }; let requests_total = register_int_counter_vec_with_registry!( "requests_total", "Total times of create be called", - &["scheme", "operation"], + &labels, registry ) .unwrap(); @@ -186,18 +207,16 @@ impl PrometheusMetrics { ); let requests_duration_seconds = - register_histogram_vec_with_registry!(opts, &["scheme", "operation"], registry) - .unwrap(); + register_histogram_vec_with_registry!(opts, &labels, registry).unwrap(); let opts = histogram_opts!("bytes_total", "Total size of ", bytes_total_buckets); - let bytes_total = - register_histogram_vec_with_registry!(opts, &["scheme", "operation"], registry) - .unwrap(); + let bytes_total = register_histogram_vec_with_registry!(opts, &labels, registry).unwrap(); Self { requests_total, requests_duration_seconds, bytes_total, + path_label_level, } } @@ -211,13 +230,34 @@ impl PrometheusMetrics { kind.into_static() ); } + + /// generate metric label + pub fn generate_metric_label<'a>( + &self, + scheme: &'a str, + operation: &'a str, + path_label: &'a str, + ) -> Vec<&'a str> { + match self.path_label_level { + 0 => { + vec![scheme, operation] + } + n if n > 0 => { + let path_value = get_path_label(path_label, self.path_label_level); + vec![scheme, operation, path_value] + } + _ => { + vec![scheme, operation] + } + } + } } #[derive(Clone)] pub struct PrometheusAccessor { inner: A, stats: Arc, - scheme: String, + scheme: Scheme, } impl Debug for PrometheusAccessor { @@ -243,15 +283,18 @@ impl LayeredAccessor for PrometheusAccessor { } async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { - self.stats - .requests_total - .with_label_values(&[&self.scheme]) - .inc(); + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::CreateDir.into_static(), + path, + ); + + self.stats.requests_total.with_label_values(&labels).inc(); let timer = self .stats .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::CreateDir.into_static()]) + .with_label_values(&labels) .start_timer(); let create_res = self.inner.create_dir(path, args).await; @@ -264,15 +307,17 @@ impl LayeredAccessor for PrometheusAccessor { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) - .inc(); + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::Read.into_static(), + path, + ); + self.stats.requests_total.with_label_values(&labels).inc(); let timer = self .stats .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) + .with_label_values(&labels) .start_timer(); let read_res = self @@ -282,7 +327,7 @@ impl LayeredAccessor for PrometheusAccessor { v.map(|(rp, r)| { self.stats .bytes_total - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) + .with_label_values(&labels) .observe(rp.metadata().content_length() as f64); ( rp, @@ -290,7 +335,8 @@ impl LayeredAccessor for PrometheusAccessor { r, Operation::Read, self.stats.clone(), - &self.scheme, + self.scheme, + &path.to_string(), ), ) }) @@ -304,15 +350,17 @@ impl LayeredAccessor for PrometheusAccessor { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::Write.into_static()]) - .inc(); + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::Write.into_static(), + path, + ); + self.stats.requests_total.with_label_values(&labels).inc(); let timer = self .stats .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::Write.into_static()]) + .with_label_values(&labels) .start_timer(); let write_res = self @@ -326,7 +374,8 @@ impl LayeredAccessor for PrometheusAccessor { r, Operation::Write, self.stats.clone(), - &self.scheme, + self.scheme, + &path.to_string(), ), ) }) @@ -341,14 +390,16 @@ impl LayeredAccessor for PrometheusAccessor { } async fn stat(&self, path: &str, args: OpStat) -> Result { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::Stat.into_static()]) - .inc(); + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::Stat.into_static(), + path, + ); + self.stats.requests_total.with_label_values(&labels).inc(); let timer = self .stats .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::Stat.into_static()]) + .with_label_values(&labels) .start_timer(); let stat_res = self @@ -366,15 +417,17 @@ impl LayeredAccessor for PrometheusAccessor { } async fn delete(&self, path: &str, args: OpDelete) -> Result { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::Stat.into_static()]) - .inc(); + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::Delete.into_static(), + path, + ); + self.stats.requests_total.with_label_values(&labels).inc(); let timer = self .stats .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::Stat.into_static()]) + .with_label_values(&labels) .start_timer(); let delete_res = self.inner.delete(path, args).await; @@ -387,15 +440,17 @@ impl LayeredAccessor for PrometheusAccessor { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::List.into_static()]) - .inc(); + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::List.into_static(), + path, + ); + self.stats.requests_total.with_label_values(&labels).inc(); let timer = self .stats .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::List.into_static()]) + .with_label_values(&labels) .start_timer(); let list_res = self.inner.list(path, args).await; @@ -408,15 +463,17 @@ impl LayeredAccessor for PrometheusAccessor { } async fn batch(&self, args: OpBatch) -> Result { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::Batch.into_static()]) - .inc(); + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::Batch.into_static(), + "", + ); + self.stats.requests_total.with_label_values(&labels).inc(); let timer = self .stats .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::Batch.into_static()]) + .with_label_values(&labels) .start_timer(); let result = self.inner.batch(args).await; @@ -429,15 +486,17 @@ impl LayeredAccessor for PrometheusAccessor { } async fn presign(&self, path: &str, args: OpPresign) -> Result { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::Presign.into_static()]) - .inc(); + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::Presign.into_static(), + path, + ); + self.stats.requests_total.with_label_values(&labels).inc(); let timer = self .stats .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::Presign.into_static()]) + .with_label_values(&labels) .start_timer(); let result = self.inner.presign(path, args).await; timer.observe_duration(); @@ -450,15 +509,17 @@ impl LayeredAccessor for PrometheusAccessor { } fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::BlockingCreateDir.into_static()]) - .inc(); + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::BlockingCreateDir.into_static(), + path, + ); + self.stats.requests_total.with_label_values(&labels).inc(); let timer = self .stats .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::BlockingCreateDir.into_static()]) + .with_label_values(&labels) .start_timer(); let result = self.inner.blocking_create_dir(path, args); @@ -472,20 +533,22 @@ impl LayeredAccessor for PrometheusAccessor { } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) - .inc(); + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::BlockingRead.into_static(), + path, + ); + self.stats.requests_total.with_label_values(&labels).inc(); let timer = self .stats .requests_duration_seconds - .with_label_values(&[&self.scheme]) + .with_label_values(&labels) .start_timer(); let result = self.inner.blocking_read(path, args).map(|(rp, r)| { self.stats .bytes_total - .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) + .with_label_values(&labels) .observe(rp.metadata().content_length() as f64); ( rp, @@ -493,7 +556,8 @@ impl LayeredAccessor for PrometheusAccessor { r, Operation::BlockingRead, self.stats.clone(), - &self.scheme, + self.scheme, + &path.to_string(), ), ) }); @@ -506,15 +570,17 @@ impl LayeredAccessor for PrometheusAccessor { } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) - .inc(); + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::BlockingWrite.into_static(), + path, + ); + self.stats.requests_total.with_label_values(&labels).inc(); let timer = self .stats .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) + .with_label_values(&labels) .start_timer(); let result = self.inner.blocking_write(path, args).map(|(rp, r)| { ( @@ -523,7 +589,8 @@ impl LayeredAccessor for PrometheusAccessor { r, Operation::BlockingWrite, self.stats.clone(), - &self.scheme, + self.scheme, + &path.to_string(), ), ) }); @@ -536,15 +603,17 @@ impl LayeredAccessor for PrometheusAccessor { } fn blocking_stat(&self, path: &str, args: OpStat) -> Result { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::BlockingStat.into_static()]) - .inc(); + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::BlockingStat.into_static(), + path, + ); + self.stats.requests_total.with_label_values(&labels).inc(); let timer = self .stats .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::BlockingStat.into_static()]) + .with_label_values(&labels) .start_timer(); let result = self.inner.blocking_stat(path, args); timer.observe_duration(); @@ -556,15 +625,17 @@ impl LayeredAccessor for PrometheusAccessor { } fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::BlockingDelete.into_static()]) - .inc(); + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::BlockingDelete.into_static(), + path, + ); + self.stats.requests_total.with_label_values(&labels).inc(); let timer = self .stats .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::BlockingDelete.into_static()]) + .with_label_values(&labels) .start_timer(); let result = self.inner.blocking_delete(path, args); timer.observe_duration(); @@ -577,15 +648,17 @@ impl LayeredAccessor for PrometheusAccessor { } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { - self.stats - .requests_total - .with_label_values(&[&self.scheme, Operation::BlockingList.into_static()]) - .inc(); + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::BlockingList.into_static(), + path, + ); + self.stats.requests_total.with_label_values(&labels).inc(); let timer = self .stats .requests_duration_seconds - .with_label_values(&[&self.scheme, Operation::BlockingList.into_static()]) + .with_label_values(&labels) .start_timer(); let result = self.inner.blocking_list(path, args); timer.observe_duration(); @@ -603,27 +676,40 @@ pub struct PrometheusMetricWrapper { op: Operation, stats: Arc, - scheme: String, + scheme: Scheme, + path: String, } impl PrometheusMetricWrapper { - fn new(inner: R, op: Operation, stats: Arc, scheme: &String) -> Self { + fn new( + inner: R, + op: Operation, + stats: Arc, + scheme: Scheme, + path: &String, + ) -> Self { Self { inner, op, stats, - scheme: scheme.to_string(), + scheme, + path: path.to_string(), } } } impl oio::Read for PrometheusMetricWrapper { fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::Read.into_static(), + &self.path, + ); self.inner.poll_read(cx, buf).map(|res| match res { Ok(bytes) => { self.stats .bytes_total - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) + .with_label_values(&labels) .observe(bytes as f64); Ok(bytes) } @@ -645,11 +731,16 @@ impl oio::Read for PrometheusMetricWrapper { } fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::Read.into_static(), + &self.path, + ); self.inner.poll_next(cx).map(|res| match res { Some(Ok(bytes)) => { self.stats .bytes_total - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) + .with_label_values(&labels) .observe(bytes.len() as f64); Some(Ok(bytes)) } @@ -664,12 +755,17 @@ impl oio::Read for PrometheusMetricWrapper { impl oio::BlockingRead for PrometheusMetricWrapper { fn read(&mut self, buf: &mut [u8]) -> Result { + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::BlockingRead.into_static(), + &self.path, + ); self.inner .read(buf) .map(|n| { self.stats .bytes_total - .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) + .with_label_values(&labels) .observe(n as f64); n }) @@ -687,11 +783,16 @@ impl oio::BlockingRead for PrometheusMetricWrapper { } fn next(&mut self) -> Option> { + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::BlockingRead.into_static(), + &self.path, + ); self.inner.next().map(|res| match res { Ok(bytes) => { self.stats .bytes_total - .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) + .with_label_values(&labels) .observe(bytes.len() as f64); Ok(bytes) } @@ -706,12 +807,17 @@ impl oio::BlockingRead for PrometheusMetricWrapper { #[async_trait] impl oio::Write for PrometheusMetricWrapper { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll> { + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::Write.into_static(), + &self.path, + ); self.inner .poll_write(cx, bs) .map_ok(|n| { self.stats .bytes_total - .with_label_values(&[&self.scheme, Operation::Write.into_static()]) + .with_label_values(&labels) .observe(n as f64); n }) @@ -738,12 +844,17 @@ impl oio::Write for PrometheusMetricWrapper { impl oio::BlockingWrite for PrometheusMetricWrapper { fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result { + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + Operation::BlockingWrite.into_static(), + &self.path, + ); self.inner .write(bs) .map(|n| { self.stats .bytes_total - .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) + .with_label_values(&labels) .observe(n as f64); n }) @@ -760,3 +871,31 @@ impl oio::BlockingWrite for PrometheusMetricWrapper { }) } } + +fn get_path_label(path: &str, path_level: usize) -> &str { + if path_level > 0 { + return path + .char_indices() + .filter(|&(_, c)| c == '/') + .nth(path_level - 1) + .map_or(path, |(i, _)| &path[..i]); + } + "" +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_get_path_label() { + let path = "abc/def/ghi"; + assert_eq!(get_path_label(path, 0), ""); + assert_eq!(get_path_label(path, 1), "abc"); + assert_eq!(get_path_label(path, 2), "abc/def"); + assert_eq!(get_path_label(path, 3), "abc/def/ghi"); + assert_eq!(get_path_label(path, usize::MAX), "abc/def/ghi"); + + assert_eq!(get_path_label("", 0), ""); + } +}