diff --git a/rust/lance/src/dataset/mem_wal/index.rs b/rust/lance/src/dataset/mem_wal/index.rs index e7eb7394c45..a267a9d2e14 100644 --- a/rust/lance/src/dataset/mem_wal/index.rs +++ b/rust/lance/src/dataset/mem_wal/index.rs @@ -31,6 +31,7 @@ use lance_index::vector::pq::ProductQuantizer; use lance_linalg::distance::DistanceType; use lance_table::format::IndexMetadata; use prost::Message as _; +use tracing::instrument; /// Row position in MemTable. /// @@ -331,6 +332,7 @@ impl IndexStore { } /// Insert a batch into all indexes with batch position tracking. + #[instrument(name = "idx_insert_batch", level = "debug", skip_all, fields(num_rows = batch.num_rows(), row_offset, batch_position))] pub fn insert_with_batch_position( &self, batch: &RecordBatch, @@ -378,6 +380,7 @@ impl IndexStore { /// For IVF-PQ indexes, this enables vectorized partition assignment and /// PQ encoding across all batches, improving performance through better /// SIMD utilization. + #[instrument(name = "idx_insert_batches", level = "debug", skip_all, fields(batch_count = batches.len()))] pub fn insert_batches(&self, batches: &[StoredBatch]) -> Result<()> { if batches.is_empty() { return Ok(()); @@ -419,6 +422,7 @@ impl IndexStore { /// /// Returns a map of index names to their update durations for performance tracking. #[allow(clippy::print_stderr)] + #[instrument(name = "idx_insert_batches_parallel", level = "debug", skip_all, fields(batch_count = batches.len()))] pub fn insert_batches_parallel( &self, batches: &[StoredBatch], diff --git a/rust/lance/src/dataset/mem_wal/manifest.rs b/rust/lance/src/dataset/mem_wal/manifest.rs index 1aceac60346..744dcb2def2 100644 --- a/rust/lance/src/dataset/mem_wal/manifest.rs +++ b/rust/lance/src/dataset/mem_wal/manifest.rs @@ -42,6 +42,7 @@ use object_store::PutOptions; use object_store::path::Path; use prost::Message; use serde::{Deserialize, Serialize}; +use tracing::instrument; use uuid::Uuid; use super::util::{manifest_filename, parse_bit_reversed_filename, shard_manifest_path}; @@ -91,6 +92,7 @@ impl ShardManifestStore { /// Read the latest manifest version. /// /// Returns `None` if no manifest exists (new shard). + #[instrument(name = "manifest_read_latest", level = "debug", skip_all, fields(shard_id = %self.shard_id))] pub async fn read_latest(&self) -> Result> { let version = self.find_latest_version().await?; if version == 0 { @@ -134,6 +136,7 @@ impl ShardManifestStore { /// # Errors /// /// Returns `Error::AlreadyExists` if another writer already wrote this version. + #[instrument(name = "manifest_write", level = "debug", skip_all, fields(shard_id = %self.shard_id, version = manifest.version, epoch = manifest.writer_epoch))] pub async fn write(&self, manifest: &ShardManifest) -> Result { let version = manifest.version; let filename = manifest_filename(version); @@ -369,6 +372,7 @@ impl ShardManifestStore { /// # Errors /// /// Returns an error if another writer already claimed the shard. + #[instrument(name = "manifest_claim_epoch", level = "info", skip_all, fields(shard_id = %self.shard_id, shard_spec_id))] pub async fn claim_epoch(&self, shard_spec_id: u32) -> Result<(u64, ShardManifest)> { let current = self.read_latest().await?; @@ -415,6 +419,7 @@ impl ShardManifestStore { /// /// Loads the current manifest and compares epochs. If the stored epoch /// is higher than the local epoch, the writer has been fenced. + #[instrument(name = "manifest_check_fenced", level = "debug", skip_all, fields(shard_id = %self.shard_id, local_epoch))] pub async fn check_fenced(&self, local_epoch: u64) -> Result<()> { let current = self.read_latest().await?; Self::check_fenced_against(¤t, local_epoch, self.shard_id) @@ -452,6 +457,7 @@ impl ShardManifestStore { /// # Returns /// /// The successfully written manifest. + #[instrument(name = "manifest_commit_update", level = "debug", skip_all, fields(shard_id = %self.shard_id, local_epoch))] pub async fn commit_update(&self, local_epoch: u64, prepare_fn: F) -> Result where F: Fn(&ShardManifest) -> ShardManifest, diff --git a/rust/lance/src/dataset/mem_wal/memtable.rs b/rust/lance/src/dataset/mem_wal/memtable.rs index a93b5627f2d..7f40ff35df4 100644 --- a/rust/lance/src/dataset/mem_wal/memtable.rs +++ b/rust/lance/src/dataset/mem_wal/memtable.rs @@ -17,6 +17,7 @@ use lance_core::datatypes::Schema; use lance_core::{Error, Result}; use lance_index::scalar::bloomfilter::sbbf::Sbbf; use tokio::sync::RwLock; +use tracing::instrument; use uuid::Uuid; use super::index::IndexStore; @@ -341,6 +342,7 @@ impl MemTable { /// # Single Writer Requirement /// /// This method MUST only be called from the single writer task. + #[instrument(name = "mt_insert", level = "debug", skip_all, fields(num_rows = batch.num_rows(), generation = self.generation))] pub async fn insert(&mut self, batch: RecordBatch) -> Result { // Validate schema compatibility if batch.schema() != self.schema { @@ -423,6 +425,7 @@ impl MemTable { /// # Single Writer Requirement /// /// This method MUST only be called from the single writer task. + #[instrument(name = "mt_insert_batches", level = "debug", skip_all, fields(batch_count = batches.len(), generation = self.generation))] pub async fn insert_batches_only( &mut self, batches: Vec, diff --git a/rust/lance/src/dataset/mem_wal/memtable/flush.rs b/rust/lance/src/dataset/mem_wal/memtable/flush.rs index 4a46ed062c0..15c9da44f7a 100644 --- a/rust/lance/src/dataset/mem_wal/memtable/flush.rs +++ b/rust/lance/src/dataset/mem_wal/memtable/flush.rs @@ -15,6 +15,7 @@ use lance_io::object_store::ObjectStore; use lance_table::format::IndexMetadata; use log::info; use object_store::path::Path; +use tracing::instrument; use uuid::Uuid; use super::super::index::MemIndexConfig; @@ -77,6 +78,7 @@ impl MemTableFlusher { } /// Flush the MemTable to storage (data files, indexes, bloom filter). + #[instrument(name = "mt_flush_storage", level = "info", skip_all, fields(shard_id = %self.shard_id, epoch, generation = memtable.generation(), row_count = memtable.row_count()))] pub async fn flush(&self, memtable: &MemTable, epoch: u64) -> Result { self.manifest_store.check_fenced(epoch).await?; @@ -134,6 +136,7 @@ impl MemTableFlusher { /// /// Returns the total number of rows written, which is needed for /// reversing row positions in indexes. + #[instrument(name = "mt_write_data_file", level = "debug", skip_all, fields(path = %path))] async fn write_data_file(&self, path: &Path, memtable: &MemTable) -> Result { use arrow_array::RecordBatchIterator; @@ -180,6 +183,7 @@ impl MemTableFlusher { } /// Flush the MemTable to storage with indexes. + #[instrument(name = "mt_flush_with_indexes", level = "info", skip_all, fields(shard_id = %self.shard_id, epoch, generation = memtable.generation(), row_count = memtable.row_count(), index_count = index_configs.len()))] pub async fn flush_with_indexes( &self, memtable: &MemTable, diff --git a/rust/lance/src/dataset/mem_wal/scanner/planner.rs b/rust/lance/src/dataset/mem_wal/scanner/planner.rs index d75a304480e..c70c127f747 100644 --- a/rust/lance/src/dataset/mem_wal/scanner/planner.rs +++ b/rust/lance/src/dataset/mem_wal/scanner/planner.rs @@ -14,6 +14,7 @@ use datafusion::physical_plan::union::UnionExec; use datafusion::physical_plan::{ExecutionPlan, limit::GlobalLimitExec}; use datafusion::prelude::Expr; use lance_core::Result; +use tracing::instrument; use super::collector::LsmDataSourceCollector; use super::data_source::LsmDataSource; @@ -75,6 +76,7 @@ impl LsmScanPlanner { /// - SortPreservingMergeExec is O(N log K) where K is the number of sources /// - Memory usage is bounded by the sum of K sort buffers rather than all data /// - No extra column for _memtable_gen in the common case + #[instrument(name = "lsm_plan_scan", level = "debug", skip_all, fields(has_filter = filter.is_some(), limit, offset))] pub async fn plan_scan( &self, projection: Option<&[String]>, diff --git a/rust/lance/src/dataset/mem_wal/scanner/point_lookup.rs b/rust/lance/src/dataset/mem_wal/scanner/point_lookup.rs index fecdb251166..a11ba083139 100644 --- a/rust/lance/src/dataset/mem_wal/scanner/point_lookup.rs +++ b/rust/lance/src/dataset/mem_wal/scanner/point_lookup.rs @@ -14,6 +14,7 @@ use datafusion::physical_plan::limit::GlobalLimitExec; use datafusion::prelude::Expr; use lance_core::Result; use lance_index::scalar::bloomfilter::sbbf::Sbbf; +use tracing::instrument; use super::collector::LsmDataSourceCollector; use super::data_source::LsmDataSource; @@ -117,6 +118,7 @@ impl LsmPointLookupPlanner { /// /// An execution plan that returns at most one row - the newest version /// of the row with the given primary key. + #[instrument(name = "lsm_point_lookup", level = "debug", skip_all, fields(pk_column_count = self.pk_columns.len()))] pub async fn plan_lookup( &self, pk_values: &[ScalarValue], diff --git a/rust/lance/src/dataset/mem_wal/scanner/vector_search.rs b/rust/lance/src/dataset/mem_wal/scanner/vector_search.rs index 0433929501f..3f9880250c7 100644 --- a/rust/lance/src/dataset/mem_wal/scanner/vector_search.rs +++ b/rust/lance/src/dataset/mem_wal/scanner/vector_search.rs @@ -18,6 +18,7 @@ use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::union::UnionExec; use lance_core::Result; use lance_index::scalar::bloomfilter::sbbf::Sbbf; +use tracing::instrument; use super::collector::LsmDataSourceCollector; use super::data_source::LsmDataSource; @@ -144,6 +145,7 @@ impl LsmVectorSearchPlanner { /// /// An execution plan that returns the top-K nearest neighbors across all /// LSM levels, with stale results filtered out. + #[instrument(name = "lsm_vector_search", level = "info", skip_all, fields(k, nprobes, vector_column = %self.vector_column, distance_type = ?self.distance_type))] pub async fn plan_search( &self, query_vector: &FixedSizeListArray, diff --git a/rust/lance/src/dataset/mem_wal/wal.rs b/rust/lance/src/dataset/mem_wal/wal.rs index 4e92355402c..8dd1d9d6e32 100644 --- a/rust/lance/src/dataset/mem_wal/wal.rs +++ b/rust/lance/src/dataset/mem_wal/wal.rs @@ -21,6 +21,7 @@ use lance_io::object_store::ObjectStore; use object_store::path::Path; use tokio::sync::{mpsc, watch}; +use tracing::instrument; use uuid::Uuid; use super::util::{WatchableOnceCell, shard_wal_path, wal_entry_filename}; @@ -297,6 +298,7 @@ impl WalFlusher { /// /// A `WalFlushResult` with timing metrics and the WAL entry. /// Returns empty result if nothing to flush (already flushed past end_batch_position). + #[instrument(name = "wal_flush", level = "info", skip_all, fields(shard_id = %self.shard_id, end_batch_position, has_indexes = indexes.is_some()))] pub async fn flush_to_with_index_update( &self, batch_store: &BatchStore, @@ -498,6 +500,7 @@ impl WalEntryData { /// # Returns /// /// The parsed WAL entry data, or an error if reading/parsing fails. + #[instrument(name = "wal_entry_read", level = "debug", skip_all, fields(path = %path))] pub async fn read(object_store: &ObjectStore, path: &Path) -> Result { // Read the file let data = object_store diff --git a/rust/lance/src/dataset/mem_wal/write.rs b/rust/lance/src/dataset/mem_wal/write.rs index 4c956fd3ad2..406e521e10d 100644 --- a/rust/lance/src/dataset/mem_wal/write.rs +++ b/rust/lance/src/dataset/mem_wal/write.rs @@ -31,6 +31,7 @@ use tokio::sync::{RwLock, mpsc}; use tokio::task::JoinHandle; use tokio::time::{Interval, interval_at}; use tokio_util::sync::CancellationToken; +use tracing::instrument; use uuid::Uuid; pub use super::index::{ @@ -942,6 +943,7 @@ impl ShardWriter { /// /// The `base_path` should come from `ObjectStore::from_uri()` to ensure /// WAL files are written inside the dataset directory. + #[instrument(name = "sw_open", level = "info", skip_all, fields(shard_id = %config.shard_id, index_count = index_configs.len()))] pub async fn open( object_store: Arc, base_path: Path, @@ -1102,6 +1104,7 @@ impl ShardWriter { /// Fencing is detected lazily during WAL flush via atomic writes. /// If another writer has taken over, the WAL flush will fail with /// `AlreadyExists`, indicating this writer has been fenced. + #[instrument(name = "sw_put", level = "info", skip_all, fields(batch_count = batches.len(), shard_id = %self.config.shard_id))] pub async fn put(&self, batches: Vec) -> Result { if batches.is_empty() { return Err(Error::invalid_input("Cannot write empty batch list")); @@ -1257,6 +1260,7 @@ impl ShardWriter { /// Close the writer gracefully. /// /// Flushes pending data and shuts down background tasks. + #[instrument(name = "sw_close", level = "info", skip_all, fields(shard_id = %self.config.shard_id, epoch = self.epoch))] pub async fn close(self) -> Result<()> { info!("Closing ShardWriter for shard {}", self.config.shard_id); @@ -1373,6 +1377,12 @@ impl WalFlushHandler { /// * `batch_store` - The batch store to flush from /// * `indexes` - Optional indexes to update in parallel with WAL I/O /// * `end_batch_position` - End batch ID (exclusive). Flush batches in (max_flushed, end_batch_position). + #[instrument( + name = "wal_do_flush", + level = "debug", + skip_all, + fields(end_batch_position) + )] async fn do_flush( &self, batch_store: Arc, @@ -1486,6 +1496,7 @@ impl MemTableFlushHandler { /// This method waits for the WAL flush to complete (sent at freeze time), /// then flushes to Lance storage. The WAL flush is already queued by /// freeze_memtable to ensure strict ordering of WAL entries. + #[instrument(name = "mt_flush", level = "info", skip_all, fields(generation = memtable.generation(), row_count = memtable.row_count()))] async fn flush_memtable( &mut self, memtable: Arc,