From de4d68cd3ca7efa97cd82eced4be7953b351fb7b Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 17 Feb 2026 16:24:44 -0800 Subject: [PATCH 1/3] feat: add size to object store tracing --- rust/lance-io/src/object_store/tracing.rs | 60 +++++++++++++++-------- 1 file changed, 39 insertions(+), 21 deletions(-) diff --git a/rust/lance-io/src/object_store/tracing.rs b/rust/lance-io/src/object_store/tracing.rs index fc2e1b825ae..2a617d39caf 100644 --- a/rust/lance-io/src/object_store/tracing.rs +++ b/rust/lance-io/src/object_store/tracing.rs @@ -15,25 +15,29 @@ use object_store::{ GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as OSResult, UploadPart, }; -use tracing::{debug_span, instrument, Instrument, Span}; +use tracing::{instrument, Instrument, Span}; #[derive(Debug)] pub struct TracedMultipartUpload { write_span: Span, target: Box, + write_size: usize, } #[async_trait::async_trait] impl MultipartUpload for TracedMultipartUpload { fn put_part(&mut self, data: PutPayload) -> UploadPart { let write_span = self.write_span.clone(); + self.write_size += data.content_length(); let fut = self.target.put_part(data); Box::pin(fut.instrument(write_span)) } #[instrument(level = "debug", skip_all)] async fn complete(&mut self) -> OSResult { - self.target.complete().await + let res = self.target.complete().await?; + self.write_span.record("size", self.write_size); + Ok(res) } #[instrument(level = "debug", skip_all)] @@ -56,12 +60,12 @@ impl std::fmt::Display for TracedObjectStore { #[async_trait::async_trait] #[deny(clippy::missing_trait_methods)] impl object_store::ObjectStore for TracedObjectStore { - #[instrument(level = "debug", skip(self, bytes))] + #[instrument(level = "debug", skip(self, bytes, location), fields(path = location.as_ref(), size = bytes.content_length()))] async fn put(&self, location: &Path, bytes: PutPayload) -> OSResult { self.target.put(location, bytes).await } - #[instrument(level = "debug", skip(self, bytes))] + #[instrument(level = "debug", skip(self, bytes, location), fields(path = location.as_ref(), size = bytes.content_length()))] async fn put_opts( &self, location: &Path, @@ -71,6 +75,7 @@ impl object_store::ObjectStore for TracedObjectStore { self.target.put_opts(location, bytes, opts).await } + #[instrument(level = "debug", skip(self, location), fields(path = location.as_ref(), size = tracing::field::Empty))] async fn put_multipart( &self, location: &Path, @@ -78,10 +83,12 @@ impl object_store::ObjectStore for TracedObjectStore { let upload = self.target.put_multipart(location).await?; Ok(Box::new(TracedMultipartUpload { target: upload, - write_span: debug_span!("put_multipart"), + write_span: tracing::Span::current(), + write_size: 0, })) } + #[instrument(level = "debug", skip(self, location), fields(path = location.as_ref(), size = tracing::field::Empty))] async fn put_multipart_opts( &self, location: &Path, @@ -90,36 +97,47 @@ impl object_store::ObjectStore for TracedObjectStore { let upload = self.target.put_multipart_opts(location, opts).await?; Ok(Box::new(TracedMultipartUpload { target: upload, - write_span: debug_span!("put_multipart_opts"), + write_span: tracing::Span::current(), + write_size: 0, })) } - #[instrument(level = "debug", skip(self, location))] + #[instrument(level = "debug", skip(self, location), fields(path = location.as_ref(), size = tracing::field::Empty))] async fn get(&self, location: &Path) -> OSResult { - self.target.get(location).await + let res = self.target.get(location).await?; + + let span = tracing::Span::current(); + span.record("size", res.meta.size); + + Ok(res) } - #[instrument(level = "debug", skip(self, options))] + #[instrument(level = "debug", skip(self, options, location), fields(path = location.as_ref(), size = tracing::field::Empty))] async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult { - self.target.get_opts(location, options).await + let res = self.target.get_opts(location, options).await?; + + let span = tracing::Span::current(); + span.record("size", res.range.end - res.range.start); + + Ok(res) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, location), fields(path = location.as_ref(), size = range.end - range.start))] async fn get_range(&self, location: &Path, range: Range) -> OSResult { self.target.get_range(location, range).await } - #[instrument(level = "debug", skip(self, ranges))] + #[instrument(level = "debug", skip(self, location), fields(path = location.as_ref(), size = ranges.iter().map(|r| r.end - r.start).sum::()))] async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> OSResult> { self.target.get_ranges(location, ranges).await } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, location), fields(path = location.as_ref()))] async fn head(&self, location: &Path) -> OSResult { self.target.head(location).await } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, location), fields(path = location.as_ref()))] async fn delete(&self, location: &Path) -> OSResult<()> { self.target.delete(location).await } @@ -135,12 +153,12 @@ impl object_store::ObjectStore for TracedObjectStore { .boxed() } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, prefix), fields(prefix = prefix.map(|p| p.as_ref())))] fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult> { self.target.list(prefix).stream_in_current_span().boxed() } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, prefix, offset), fields(prefix = prefix.map(|p| p.as_ref()), offset = offset.as_ref()))] fn list_with_offset( &self, prefix: Option<&Path>, @@ -152,27 +170,27 @@ impl object_store::ObjectStore for TracedObjectStore { .boxed() } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, prefix), fields(prefix = prefix.map(|p| p.as_ref())))] async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult { self.target.list_with_delimiter(prefix).await } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, from, to), fields(from = from.as_ref(), to = to.as_ref()))] async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> { self.target.copy(from, to).await } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, from, to), fields(from = from.as_ref(), to = to.as_ref()))] async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> { self.target.rename(from, to).await } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, from, to), fields(from = from.as_ref(), to = to.as_ref()))] async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> { self.target.rename_if_not_exists(from, to).await } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, from, to), fields(from = from.as_ref(), to = to.as_ref()))] async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> { self.target.copy_if_not_exists(from, to).await } From 61ef81b9f9c63c3e76835f3a0518500bce960bb3 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 18 Feb 2026 09:45:48 -0800 Subject: [PATCH 2/3] test: add tracing-mock unit tests for TracedObjectStore spans Tests verify span names and field values for put, get, get_range, get_ranges, head, delete, copy, and put_multipart using InMemory as the backing store. Co-Authored-By: Claude Sonnet 4.6 --- Cargo.lock | 11 + Cargo.toml | 1 + rust/lance-io/Cargo.toml | 1 + rust/lance-io/src/object_store/tracing.rs | 245 ++++++++++++++++++++++ 4 files changed, 258 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index ed11ebbe949..c7813a70de1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5188,6 +5188,7 @@ dependencies = [ "test-log", "tokio", "tracing", + "tracing-mock", "url", ] @@ -9202,6 +9203,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-mock" +version = "0.1.0-beta.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98a31739d4ff16a8634c5463c75d5bf9e500596958a245d1ee5b6b98ac37658d" +dependencies = [ + "tracing", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.22" diff --git a/Cargo.toml b/Cargo.toml index 101b19eb093..c8ddd5670fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -195,6 +195,7 @@ tokio-util = { version = "0.7.16" } tower = "0.5" tower-http = "0.5" tracing = "0.1" +tracing-mock = { version = "=0.1.0-beta.3" } url = "2.5.7" uuid = { version = "1.2", features = ["v4", "serde"] } wiremock = "0.6" diff --git a/rust/lance-io/Cargo.toml b/rust/lance-io/Cargo.toml index ca21af66610..71e9dab1e9f 100644 --- a/rust/lance-io/Cargo.toml +++ b/rust/lance-io/Cargo.toml @@ -55,6 +55,7 @@ test-log.workspace = true mockall.workspace = true rstest.workspace = true mock_instant.workspace = true +tracing-mock = { workspace = true } [target.'cfg(target_os = "linux")'.dev-dependencies] pprof.workspace = true diff --git a/rust/lance-io/src/object_store/tracing.rs b/rust/lance-io/src/object_store/tracing.rs index 2a617d39caf..ed739b57159 100644 --- a/rust/lance-io/src/object_store/tracing.rs +++ b/rust/lance-io/src/object_store/tracing.rs @@ -196,6 +196,251 @@ impl object_store::ObjectStore for TracedObjectStore { } } +#[cfg(test)] +mod tests { + use super::*; + + use bytes::Bytes; + use object_store::memory::InMemory; + use object_store::path::Path; + use object_store::PutPayload; + use tracing_mock::{expect, subscriber}; + + fn payload(data: &[u8]) -> PutPayload { + PutPayload::from_bytes(Bytes::copy_from_slice(data)) + } + + fn make_store() -> Arc { + Arc::new(InMemory::new()).traced() + } + + #[tokio::test(flavor = "current_thread")] + async fn test_put_records_path_and_size() { + let path = Path::from("a/b.bin"); + let data = b"hello world"; + + let span = expect::span().named("put"); + let (sub, handle) = subscriber::mock() + .new_span( + span.clone().with_fields( + expect::field("path") + .with_value(&"a/b.bin") + .and(expect::field("size").with_value(&data.len())) + .only(), + ), + ) + .enter(span.clone()) + .exit(span.clone()) + .run_with_handle(); + + let _guard = tracing::subscriber::set_default(sub); + make_store().put(&path, payload(data)).await.unwrap(); + drop(_guard); + + handle.assert_finished(); + } + + #[tokio::test(flavor = "current_thread")] + async fn test_get_records_path_and_size() { + let path = Path::from("a/b.bin"); + let data = b"hello world"; + let size = data.len() as u64; // meta.size is u64 + + // Seed without an active mock subscriber. + let store = make_store(); + store.put(&path, payload(data)).await.unwrap(); + + let span = expect::span().named("get"); + let (sub, handle) = subscriber::mock() + .new_span( + // size = Empty at span creation, so only path is visited. + span.clone() + .with_fields(expect::field("path").with_value(&"a/b.bin").only()), + ) + .enter(span.clone()) + .record(span.clone(), expect::field("size").with_value(&size)) + .exit(span.clone()) + .run_with_handle(); + + let _guard = tracing::subscriber::set_default(sub); + store.get(&path).await.unwrap(); + drop(_guard); + + handle.assert_finished(); + } + + #[tokio::test(flavor = "current_thread")] + async fn test_get_range_records_path_and_size() { + let path = Path::from("a/b.bin"); + let data = b"hello world"; + + let store = make_store(); + store.put(&path, payload(data)).await.unwrap(); + + let range = 2u64..7u64; + let size = range.end - range.start; + + let span = expect::span().named("get_range"); + let (sub, handle) = subscriber::mock() + .new_span( + // `range` is also captured automatically as a debug field since it + // is not in the skip list, so we don't use `.only()` here. + span.clone().with_fields( + expect::field("path") + .with_value(&"a/b.bin") + .and(expect::field("size").with_value(&size)), + ), + ) + .enter(span.clone()) + .exit(span.clone()) + .run_with_handle(); + + let _guard = tracing::subscriber::set_default(sub); + store.get_range(&path, range).await.unwrap(); + drop(_guard); + + handle.assert_finished(); + } + + #[tokio::test(flavor = "current_thread")] + async fn test_get_ranges_records_path_and_total_size() { + let path = Path::from("a/b.bin"); + let data = b"hello world"; + + let store = make_store(); + store.put(&path, payload(data)).await.unwrap(); + + let ranges = [2u64..5u64, 6u64..9u64]; + let size: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + + let span = expect::span().named("get_ranges"); + let (sub, handle) = subscriber::mock() + .new_span( + // `ranges` is also captured automatically as a debug field since + // it is not in the skip list, so we don't use `.only()` here. + span.clone().with_fields( + expect::field("path") + .with_value(&"a/b.bin") + .and(expect::field("size").with_value(&size)), + ), + ) + .enter(span.clone()) + .exit(span.clone()) + .run_with_handle(); + + let _guard = tracing::subscriber::set_default(sub); + store.get_ranges(&path, &ranges).await.unwrap(); + drop(_guard); + + handle.assert_finished(); + } + + #[tokio::test(flavor = "current_thread")] + async fn test_head_records_path() { + let path = Path::from("a/b.bin"); + let data = b"hello world"; + + let store = make_store(); + store.put(&path, payload(data)).await.unwrap(); + + let span = expect::span().named("head"); + let (sub, handle) = subscriber::mock() + .new_span( + span.clone() + .with_fields(expect::field("path").with_value(&"a/b.bin").only()), + ) + .enter(span.clone()) + .exit(span.clone()) + .run_with_handle(); + + let _guard = tracing::subscriber::set_default(sub); + store.head(&path).await.unwrap(); + drop(_guard); + + handle.assert_finished(); + } + + #[tokio::test(flavor = "current_thread")] + async fn test_delete_records_path() { + let path = Path::from("a/b.bin"); + let data = b"hello world"; + + let store = make_store(); + store.put(&path, payload(data)).await.unwrap(); + + let span = expect::span().named("delete"); + let (sub, handle) = subscriber::mock() + .new_span( + span.clone() + .with_fields(expect::field("path").with_value(&"a/b.bin").only()), + ) + .enter(span.clone()) + .exit(span.clone()) + .run_with_handle(); + + let _guard = tracing::subscriber::set_default(sub); + store.delete(&path).await.unwrap(); + drop(_guard); + + handle.assert_finished(); + } + + #[tokio::test(flavor = "current_thread")] + async fn test_copy_records_from_and_to() { + let from = Path::from("a/src.bin"); + let to = Path::from("a/dst.bin"); + let data = b"hello world"; + + let store = make_store(); + store.put(&from, payload(data)).await.unwrap(); + + let span = expect::span().named("copy"); + let (sub, handle) = subscriber::mock() + .new_span( + span.clone().with_fields( + expect::field("from") + .with_value(&"a/src.bin") + .and(expect::field("to").with_value(&"a/dst.bin")) + .only(), + ), + ) + .enter(span.clone()) + .exit(span.clone()) + .run_with_handle(); + + let _guard = tracing::subscriber::set_default(sub); + store.copy(&from, &to).await.unwrap(); + drop(_guard); + + handle.assert_finished(); + } + + #[tokio::test(flavor = "current_thread")] + async fn test_put_multipart_records_path() { + let path = Path::from("a/b.bin"); + let data = b"hello world"; + + let put_mp_span = expect::span().named("put_multipart"); + // Expect only the span creation; any subsequent enter/exit/record + // events are not in the queue so they are silently ignored. + let (sub, handle) = subscriber::mock() + .new_span( + // size = Empty at span creation, so only path is visited. + put_mp_span.with_fields(expect::field("path").with_value(&"a/b.bin").only()), + ) + .run_with_handle(); + + let _guard = tracing::subscriber::set_default(sub); + let store = make_store(); + let mut upload = store.put_multipart(&path).await.unwrap(); + upload.put_part(payload(data)).await.unwrap(); + upload.complete().await.unwrap(); + drop(_guard); + + handle.assert_finished(); + } +} + pub trait ObjectStoreTracingExt { fn traced(self) -> Arc; } From ef8e3258ad70ee26fa35f52f3fcb66227ef0e6b3 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 18 Feb 2026 16:35:43 -0800 Subject: [PATCH 3/3] format --- rust/lance-io/src/object_store/tracing.rs | 32 +++++++++++------------ 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/rust/lance-io/src/object_store/tracing.rs b/rust/lance-io/src/object_store/tracing.rs index ed739b57159..3e0c3152889 100644 --- a/rust/lance-io/src/object_store/tracing.rs +++ b/rust/lance-io/src/object_store/tracing.rs @@ -196,6 +196,22 @@ impl object_store::ObjectStore for TracedObjectStore { } } +pub trait ObjectStoreTracingExt { + fn traced(self) -> Arc; +} + +impl ObjectStoreTracingExt for Arc { + fn traced(self) -> Arc { + Arc::new(TracedObjectStore { target: self }) + } +} + +impl ObjectStoreTracingExt for Arc { + fn traced(self) -> Arc { + Arc::new(TracedObjectStore { target: self }) + } +} + #[cfg(test)] mod tests { use super::*; @@ -440,19 +456,3 @@ mod tests { handle.assert_finished(); } } - -pub trait ObjectStoreTracingExt { - fn traced(self) -> Arc; -} - -impl ObjectStoreTracingExt for Arc { - fn traced(self) -> Arc { - Arc::new(TracedObjectStore { target: self }) - } -} - -impl ObjectStoreTracingExt for Arc { - fn traced(self) -> Arc { - Arc::new(TracedObjectStore { target: self }) - } -}