diff --git a/Cargo.lock b/Cargo.lock index ed11ebbe949..c7813a70de1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5188,6 +5188,7 @@ dependencies = [ "test-log", "tokio", "tracing", + "tracing-mock", "url", ] @@ -9202,6 +9203,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-mock" +version = "0.1.0-beta.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98a31739d4ff16a8634c5463c75d5bf9e500596958a245d1ee5b6b98ac37658d" +dependencies = [ + "tracing", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.22" diff --git a/Cargo.toml b/Cargo.toml index 101b19eb093..c8ddd5670fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -195,6 +195,7 @@ tokio-util = { version = "0.7.16" } tower = "0.5" tower-http = "0.5" tracing = "0.1" +tracing-mock = { version = "=0.1.0-beta.3" } url = "2.5.7" uuid = { version = "1.2", features = ["v4", "serde"] } wiremock = "0.6" diff --git a/rust/lance-io/Cargo.toml b/rust/lance-io/Cargo.toml index ca21af66610..71e9dab1e9f 100644 --- a/rust/lance-io/Cargo.toml +++ b/rust/lance-io/Cargo.toml @@ -55,6 +55,7 @@ test-log.workspace = true mockall.workspace = true rstest.workspace = true mock_instant.workspace = true +tracing-mock = { workspace = true } [target.'cfg(target_os = "linux")'.dev-dependencies] pprof.workspace = true diff --git a/rust/lance-io/src/object_store/tracing.rs b/rust/lance-io/src/object_store/tracing.rs index fc2e1b825ae..3e0c3152889 100644 --- a/rust/lance-io/src/object_store/tracing.rs +++ b/rust/lance-io/src/object_store/tracing.rs @@ -15,25 +15,29 @@ use object_store::{ GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as OSResult, UploadPart, }; -use tracing::{debug_span, instrument, Instrument, Span}; +use tracing::{instrument, Instrument, Span}; #[derive(Debug)] pub struct TracedMultipartUpload { write_span: Span, target: Box, + write_size: usize, } #[async_trait::async_trait] impl MultipartUpload for TracedMultipartUpload { fn put_part(&mut self, data: PutPayload) -> UploadPart { let write_span = self.write_span.clone(); + self.write_size += data.content_length(); let fut = self.target.put_part(data); Box::pin(fut.instrument(write_span)) } #[instrument(level = "debug", skip_all)] async fn complete(&mut self) -> OSResult { - self.target.complete().await + let res = self.target.complete().await?; + self.write_span.record("size", self.write_size); + Ok(res) } #[instrument(level = "debug", skip_all)] @@ -56,12 +60,12 @@ impl std::fmt::Display for TracedObjectStore { #[async_trait::async_trait] #[deny(clippy::missing_trait_methods)] impl object_store::ObjectStore for TracedObjectStore { - #[instrument(level = "debug", skip(self, bytes))] + #[instrument(level = "debug", skip(self, bytes, location), fields(path = location.as_ref(), size = bytes.content_length()))] async fn put(&self, location: &Path, bytes: PutPayload) -> OSResult { self.target.put(location, bytes).await } - #[instrument(level = "debug", skip(self, bytes))] + #[instrument(level = "debug", skip(self, bytes, location), fields(path = location.as_ref(), size = bytes.content_length()))] async fn put_opts( &self, location: &Path, @@ -71,6 +75,7 @@ impl object_store::ObjectStore for TracedObjectStore { self.target.put_opts(location, bytes, opts).await } + #[instrument(level = "debug", skip(self, location), fields(path = location.as_ref(), size = tracing::field::Empty))] async fn put_multipart( &self, location: &Path, @@ -78,10 +83,12 @@ impl object_store::ObjectStore for TracedObjectStore { let upload = self.target.put_multipart(location).await?; Ok(Box::new(TracedMultipartUpload { target: upload, - write_span: debug_span!("put_multipart"), + write_span: tracing::Span::current(), + write_size: 0, })) } + #[instrument(level = "debug", skip(self, location), fields(path = location.as_ref(), size = tracing::field::Empty))] async fn put_multipart_opts( &self, location: &Path, @@ -90,36 +97,47 @@ impl object_store::ObjectStore for TracedObjectStore { let upload = self.target.put_multipart_opts(location, opts).await?; Ok(Box::new(TracedMultipartUpload { target: upload, - write_span: debug_span!("put_multipart_opts"), + write_span: tracing::Span::current(), + write_size: 0, })) } - #[instrument(level = "debug", skip(self, location))] + #[instrument(level = "debug", skip(self, location), fields(path = location.as_ref(), size = tracing::field::Empty))] async fn get(&self, location: &Path) -> OSResult { - self.target.get(location).await + let res = self.target.get(location).await?; + + let span = tracing::Span::current(); + span.record("size", res.meta.size); + + Ok(res) } - #[instrument(level = "debug", skip(self, options))] + #[instrument(level = "debug", skip(self, options, location), fields(path = location.as_ref(), size = tracing::field::Empty))] async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult { - self.target.get_opts(location, options).await + let res = self.target.get_opts(location, options).await?; + + let span = tracing::Span::current(); + span.record("size", res.range.end - res.range.start); + + Ok(res) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, location), fields(path = location.as_ref(), size = range.end - range.start))] async fn get_range(&self, location: &Path, range: Range) -> OSResult { self.target.get_range(location, range).await } - #[instrument(level = "debug", skip(self, ranges))] + #[instrument(level = "debug", skip(self, location), fields(path = location.as_ref(), size = ranges.iter().map(|r| r.end - r.start).sum::()))] async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> OSResult> { self.target.get_ranges(location, ranges).await } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, location), fields(path = location.as_ref()))] async fn head(&self, location: &Path) -> OSResult { self.target.head(location).await } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, location), fields(path = location.as_ref()))] async fn delete(&self, location: &Path) -> OSResult<()> { self.target.delete(location).await } @@ -135,12 +153,12 @@ impl object_store::ObjectStore for TracedObjectStore { .boxed() } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, prefix), fields(prefix = prefix.map(|p| p.as_ref())))] fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult> { self.target.list(prefix).stream_in_current_span().boxed() } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, prefix, offset), fields(prefix = prefix.map(|p| p.as_ref()), offset = offset.as_ref()))] fn list_with_offset( &self, prefix: Option<&Path>, @@ -152,27 +170,27 @@ impl object_store::ObjectStore for TracedObjectStore { .boxed() } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, prefix), fields(prefix = prefix.map(|p| p.as_ref())))] async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult { self.target.list_with_delimiter(prefix).await } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, from, to), fields(from = from.as_ref(), to = to.as_ref()))] async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> { self.target.copy(from, to).await } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, from, to), fields(from = from.as_ref(), to = to.as_ref()))] async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> { self.target.rename(from, to).await } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, from, to), fields(from = from.as_ref(), to = to.as_ref()))] async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> { self.target.rename_if_not_exists(from, to).await } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, from, to), fields(from = from.as_ref(), to = to.as_ref()))] async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> { self.target.copy_if_not_exists(from, to).await } @@ -193,3 +211,248 @@ impl ObjectStoreTracingExt for Arc { Arc::new(TracedObjectStore { target: self }) } } + +#[cfg(test)] +mod tests { + use super::*; + + use bytes::Bytes; + use object_store::memory::InMemory; + use object_store::path::Path; + use object_store::PutPayload; + use tracing_mock::{expect, subscriber}; + + fn payload(data: &[u8]) -> PutPayload { + PutPayload::from_bytes(Bytes::copy_from_slice(data)) + } + + fn make_store() -> Arc { + Arc::new(InMemory::new()).traced() + } + + #[tokio::test(flavor = "current_thread")] + async fn test_put_records_path_and_size() { + let path = Path::from("a/b.bin"); + let data = b"hello world"; + + let span = expect::span().named("put"); + let (sub, handle) = subscriber::mock() + .new_span( + span.clone().with_fields( + expect::field("path") + .with_value(&"a/b.bin") + .and(expect::field("size").with_value(&data.len())) + .only(), + ), + ) + .enter(span.clone()) + .exit(span.clone()) + .run_with_handle(); + + let _guard = tracing::subscriber::set_default(sub); + make_store().put(&path, payload(data)).await.unwrap(); + drop(_guard); + + handle.assert_finished(); + } + + #[tokio::test(flavor = "current_thread")] + async fn test_get_records_path_and_size() { + let path = Path::from("a/b.bin"); + let data = b"hello world"; + let size = data.len() as u64; // meta.size is u64 + + // Seed without an active mock subscriber. + let store = make_store(); + store.put(&path, payload(data)).await.unwrap(); + + let span = expect::span().named("get"); + let (sub, handle) = subscriber::mock() + .new_span( + // size = Empty at span creation, so only path is visited. + span.clone() + .with_fields(expect::field("path").with_value(&"a/b.bin").only()), + ) + .enter(span.clone()) + .record(span.clone(), expect::field("size").with_value(&size)) + .exit(span.clone()) + .run_with_handle(); + + let _guard = tracing::subscriber::set_default(sub); + store.get(&path).await.unwrap(); + drop(_guard); + + handle.assert_finished(); + } + + #[tokio::test(flavor = "current_thread")] + async fn test_get_range_records_path_and_size() { + let path = Path::from("a/b.bin"); + let data = b"hello world"; + + let store = make_store(); + store.put(&path, payload(data)).await.unwrap(); + + let range = 2u64..7u64; + let size = range.end - range.start; + + let span = expect::span().named("get_range"); + let (sub, handle) = subscriber::mock() + .new_span( + // `range` is also captured automatically as a debug field since it + // is not in the skip list, so we don't use `.only()` here. + span.clone().with_fields( + expect::field("path") + .with_value(&"a/b.bin") + .and(expect::field("size").with_value(&size)), + ), + ) + .enter(span.clone()) + .exit(span.clone()) + .run_with_handle(); + + let _guard = tracing::subscriber::set_default(sub); + store.get_range(&path, range).await.unwrap(); + drop(_guard); + + handle.assert_finished(); + } + + #[tokio::test(flavor = "current_thread")] + async fn test_get_ranges_records_path_and_total_size() { + let path = Path::from("a/b.bin"); + let data = b"hello world"; + + let store = make_store(); + store.put(&path, payload(data)).await.unwrap(); + + let ranges = [2u64..5u64, 6u64..9u64]; + let size: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + + let span = expect::span().named("get_ranges"); + let (sub, handle) = subscriber::mock() + .new_span( + // `ranges` is also captured automatically as a debug field since + // it is not in the skip list, so we don't use `.only()` here. + span.clone().with_fields( + expect::field("path") + .with_value(&"a/b.bin") + .and(expect::field("size").with_value(&size)), + ), + ) + .enter(span.clone()) + .exit(span.clone()) + .run_with_handle(); + + let _guard = tracing::subscriber::set_default(sub); + store.get_ranges(&path, &ranges).await.unwrap(); + drop(_guard); + + handle.assert_finished(); + } + + #[tokio::test(flavor = "current_thread")] + async fn test_head_records_path() { + let path = Path::from("a/b.bin"); + let data = b"hello world"; + + let store = make_store(); + store.put(&path, payload(data)).await.unwrap(); + + let span = expect::span().named("head"); + let (sub, handle) = subscriber::mock() + .new_span( + span.clone() + .with_fields(expect::field("path").with_value(&"a/b.bin").only()), + ) + .enter(span.clone()) + .exit(span.clone()) + .run_with_handle(); + + let _guard = tracing::subscriber::set_default(sub); + store.head(&path).await.unwrap(); + drop(_guard); + + handle.assert_finished(); + } + + #[tokio::test(flavor = "current_thread")] + async fn test_delete_records_path() { + let path = Path::from("a/b.bin"); + let data = b"hello world"; + + let store = make_store(); + store.put(&path, payload(data)).await.unwrap(); + + let span = expect::span().named("delete"); + let (sub, handle) = subscriber::mock() + .new_span( + span.clone() + .with_fields(expect::field("path").with_value(&"a/b.bin").only()), + ) + .enter(span.clone()) + .exit(span.clone()) + .run_with_handle(); + + let _guard = tracing::subscriber::set_default(sub); + store.delete(&path).await.unwrap(); + drop(_guard); + + handle.assert_finished(); + } + + #[tokio::test(flavor = "current_thread")] + async fn test_copy_records_from_and_to() { + let from = Path::from("a/src.bin"); + let to = Path::from("a/dst.bin"); + let data = b"hello world"; + + let store = make_store(); + store.put(&from, payload(data)).await.unwrap(); + + let span = expect::span().named("copy"); + let (sub, handle) = subscriber::mock() + .new_span( + span.clone().with_fields( + expect::field("from") + .with_value(&"a/src.bin") + .and(expect::field("to").with_value(&"a/dst.bin")) + .only(), + ), + ) + .enter(span.clone()) + .exit(span.clone()) + .run_with_handle(); + + let _guard = tracing::subscriber::set_default(sub); + store.copy(&from, &to).await.unwrap(); + drop(_guard); + + handle.assert_finished(); + } + + #[tokio::test(flavor = "current_thread")] + async fn test_put_multipart_records_path() { + let path = Path::from("a/b.bin"); + let data = b"hello world"; + + let put_mp_span = expect::span().named("put_multipart"); + // Expect only the span creation; any subsequent enter/exit/record + // events are not in the queue so they are silently ignored. + let (sub, handle) = subscriber::mock() + .new_span( + // size = Empty at span creation, so only path is visited. + put_mp_span.with_fields(expect::field("path").with_value(&"a/b.bin").only()), + ) + .run_with_handle(); + + let _guard = tracing::subscriber::set_default(sub); + let store = make_store(); + let mut upload = store.put_multipart(&path).await.unwrap(); + upload.put_part(payload(data)).await.unwrap(); + upload.complete().await.unwrap(); + drop(_guard); + + handle.assert_finished(); + } +}