diff --git a/bazel-bin b/bazel-bin new file mode 120000 index 00000000000..4a38b9dd350 --- /dev/null +++ b/bazel-bin @@ -0,0 +1 @@ +/private/var/tmp/_bazel_george.talbot/dfef8e5e4a6ba7ff50ae0029c027343e/execroot/com_datadoghq_pomsky/bazel-out/darwin_arm64-fastbuild/bin \ No newline at end of file diff --git a/bazel-out b/bazel-out new file mode 120000 index 00000000000..d505b35f609 --- /dev/null +++ b/bazel-out @@ -0,0 +1 @@ +/private/var/tmp/_bazel_george.talbot/dfef8e5e4a6ba7ff50ae0029c027343e/execroot/com_datadoghq_pomsky/bazel-out \ No newline at end of file diff --git a/bazel-pomsky b/bazel-pomsky new file mode 120000 index 00000000000..3f2dfdeb4d9 --- /dev/null +++ b/bazel-pomsky @@ -0,0 +1 @@ +/private/var/tmp/_bazel_george.talbot/dfef8e5e4a6ba7ff50ae0029c027343e/execroot/com_datadoghq_pomsky \ No newline at end of file diff --git a/bazel-testlogs b/bazel-testlogs new file mode 120000 index 00000000000..2cb51f0d4c7 --- /dev/null +++ b/bazel-testlogs @@ -0,0 +1 @@ +/private/var/tmp/_bazel_george.talbot/dfef8e5e4a6ba7ff50ae0029c027343e/execroot/com_datadoghq_pomsky/bazel-out/darwin_arm64-fastbuild/testlogs \ No newline at end of file diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index e523fdfa371..7ba9309dd89 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7630,6 +7630,7 @@ version = "0.8.0" dependencies = [ "anyhow", "arrow", + "base64 0.22.1", "chrono", "parquet", "proptest", diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index cfb2ade9361..a102d0b4f0c 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -638,9 +638,11 @@ impl IndexingPipeline { // ParquetPackager let writer_config = quickwit_parquet_engine::storage::ParquetWriterConfig::default(); + let table_config = quickwit_parquet_engine::table_config::TableConfig::default(); let split_writer = quickwit_parquet_engine::storage::ParquetSplitWriter::new( writer_config, self.params.indexing_directory.path(), + &table_config, ); let parquet_packager = ParquetPackager::new(split_writer, parquet_uploader_mailbox); let (parquet_packager_mailbox, parquet_packager_handle) = ctx diff --git a/quickwit/quickwit-indexing/src/actors/parquet_doc_processor.rs b/quickwit/quickwit-indexing/src/actors/parquet_doc_processor.rs index eb51621a30f..894b0251412 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_doc_processor.rs @@ -549,7 +549,8 @@ mod tests { // Create ParquetPackager let writer_config = ParquetWriterConfig::default(); - let split_writer = ParquetSplitWriter::new(writer_config, temp_dir.path()); + let table_config = quickwit_parquet_engine::table_config::TableConfig::default(); + let split_writer = ParquetSplitWriter::new(writer_config, temp_dir.path(), &table_config); let packager = ParquetPackager::new(split_writer, uploader_mailbox); let (packager_mailbox, packager_handle) = universe.spawn_builder().spawn(packager); diff --git a/quickwit/quickwit-indexing/src/actors/parquet_e2e_test.rs b/quickwit/quickwit-indexing/src/actors/parquet_e2e_test.rs index d613fc96003..fd83e0e3674 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_e2e_test.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_e2e_test.rs @@ -172,7 +172,8 @@ async fn test_metrics_pipeline_e2e() { // ParquetPackager between indexer and uploader let writer_config = ParquetWriterConfig::default(); - let split_writer = ParquetSplitWriter::new(writer_config, temp_dir.path()); + let table_config = quickwit_parquet_engine::table_config::TableConfig::default(); + let split_writer = ParquetSplitWriter::new(writer_config, temp_dir.path(), &table_config); let packager = ParquetPackager::new(split_writer, uploader_mailbox); let (packager_mailbox, packager_handle) = universe.spawn_builder().spawn(packager); diff --git a/quickwit/quickwit-indexing/src/actors/parquet_indexer.rs b/quickwit/quickwit-indexing/src/actors/parquet_indexer.rs index 3d632e7cc76..13594ba2e99 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_indexer.rs @@ -607,7 +607,8 @@ mod tests { uploader_mailbox: Mailbox, ) -> (Mailbox, ActorHandle) { let writer_config = ParquetWriterConfig::default(); - let split_writer = ParquetSplitWriter::new(writer_config, temp_dir); + let table_config = quickwit_parquet_engine::table_config::TableConfig::default(); + let split_writer = ParquetSplitWriter::new(writer_config, temp_dir, &table_config); let packager = ParquetPackager::new(split_writer, uploader_mailbox); universe.spawn_builder().spawn(packager) diff --git a/quickwit/quickwit-indexing/src/actors/parquet_packager.rs b/quickwit/quickwit-indexing/src/actors/parquet_packager.rs index b0950141b10..7dd70e645a9 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_packager.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_packager.rs @@ -274,7 +274,8 @@ mod tests { uploader_mailbox: Mailbox, ) -> (Mailbox, ActorHandle) { let writer_config = ParquetWriterConfig::default(); - let split_writer = ParquetSplitWriter::new(writer_config, temp_dir); + let table_config = quickwit_parquet_engine::table_config::TableConfig::default(); + let split_writer = ParquetSplitWriter::new(writer_config, temp_dir, &table_config); let packager = ParquetPackager::new(split_writer, uploader_mailbox); universe.spawn_builder().spawn(packager) diff --git a/quickwit/quickwit-parquet-engine/Cargo.toml b/quickwit/quickwit-parquet-engine/Cargo.toml index 9744dbcc2bc..39918c0948c 100644 --- a/quickwit/quickwit-parquet-engine/Cargo.toml +++ b/quickwit/quickwit-parquet-engine/Cargo.toml @@ -13,8 +13,10 @@ license.workspace = true [dependencies] anyhow = { workspace = true } arrow = { workspace = true } +base64 = { workspace = true } chrono = { workspace = true } parquet = { workspace = true } +prost = { workspace = true } quickwit-common = { workspace = true } quickwit-proto = { workspace = true } sea-query = { workspace = true, optional = true } diff --git a/quickwit/quickwit-parquet-engine/src/index/accumulator.rs b/quickwit/quickwit-parquet-engine/src/index/accumulator.rs index f0bf22e68b8..0267f635b26 100644 --- a/quickwit/quickwit-parquet-engine/src/index/accumulator.rs +++ b/quickwit/quickwit-parquet-engine/src/index/accumulator.rs @@ -151,7 +151,13 @@ impl ParquetBatchAccumulator { /// Internal flush implementation. fn flush_internal(&mut self) -> Result, IndexingError> { - if self.pending_batches.is_empty() { + if self.pending_batches.is_empty() || self.union_fields.is_empty() { + // Nothing to flush: either no batches at all, or only empty + // (zero-column) batches were accumulated. + self.pending_batches.clear(); + self.union_fields.clear(); + self.pending_rows = 0; + self.pending_bytes = 0; return Ok(None); } diff --git a/quickwit/quickwit-parquet-engine/src/storage/config.rs b/quickwit/quickwit-parquet-engine/src/storage/config.rs index 2eb63d73510..a8d675d3c12 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/config.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/config.rs @@ -16,7 +16,7 @@ use arrow::datatypes::{DataType, Schema as ArrowSchema}; use parquet::basic::Compression as ParquetCompression; -use parquet::file::metadata::SortingColumn; +use parquet::file::metadata::{KeyValue, SortingColumn}; use parquet::file::properties::{EnabledStatistics, WriterProperties, WriterPropertiesBuilder}; use parquet::schema::types::ColumnPath; @@ -119,20 +119,42 @@ impl ParquetWriterConfig { } /// Convert to Parquet WriterProperties using the given Arrow schema to configure - /// per-column settings like dictionary encoding and bloom filters. + /// per-column settings like dictionary encoding and bloom filters, with an empty + /// sort order and no metadata. + /// + /// Prefer `to_writer_properties_with_metadata()` in production — this method + /// is mainly for tests that don't care about sort order. pub fn to_writer_properties(&self, schema: &ArrowSchema) -> WriterProperties { + self.to_writer_properties_with_metadata(schema, Vec::new(), None) + } + + /// Convert to Parquet WriterProperties with sorting columns and optional key_value_metadata. + /// + /// `sorting_cols` is produced by `ParquetWriter::sorting_columns()` from the + /// resolved table_config sort fields. + /// + /// When `kv_metadata` is provided, the entries are embedded in the Parquet file's + /// key_value_metadata, making files self-describing (META-07). + pub fn to_writer_properties_with_metadata( + &self, + schema: &ArrowSchema, + sorting_cols: Vec, + kv_metadata: Option>, + ) -> WriterProperties { let mut builder = WriterProperties::builder() .set_max_row_group_size(self.row_group_size) .set_data_page_size_limit(self.data_page_size) .set_write_batch_size(self.write_batch_size) - // Enable column index for efficient pruning on sorted data (64 bytes default) .set_column_index_truncate_length(Some(64)) - // Set sorting columns metadata for readers to use during pruning - .set_sorting_columns(Some(Self::sorting_columns(schema))) - // Enable row group level statistics (min/max/null_count) for query pruning - // This allows DataFusion to skip row groups based on timestamp ranges + .set_sorting_columns(Some(sorting_cols)) .set_statistics_enabled(EnabledStatistics::Chunk); + if let Some(kvs) = kv_metadata + && !kvs.is_empty() + { + builder = builder.set_key_value_metadata(Some(kvs)); + } + builder = match self.compression { Compression::Zstd => { let level = self.compression_level.unwrap_or(DEFAULT_ZSTD_LEVEL); @@ -184,20 +206,6 @@ impl ParquetWriterConfig { } builder } - - /// Get the sorting columns for parquet metadata, computed from the schema - /// and SORT_ORDER. Only columns present in the schema are included. - fn sorting_columns(schema: &ArrowSchema) -> Vec { - SORT_ORDER - .iter() - .filter_map(|name| schema.index_of(name).ok()) - .map(|idx| SortingColumn { - column_idx: idx as i32, - descending: false, - nulls_first: false, - }) - .collect() - } } #[cfg(test)] @@ -370,34 +378,4 @@ mod tests { ); } } - - #[test] - fn test_sorting_columns_order() { - let schema = create_test_schema(); - let sorting_cols = ParquetWriterConfig::sorting_columns(&schema); - - // The test schema has metric_name (idx 0), timestamp_secs (idx 2), - // service (idx 4), env (idx 5), host (idx 6). - // SORT_ORDER is: metric_name, service, env, datacenter, region, host, timestamp_secs - // Only present columns are included, so: metric_name, service, env, host, timestamp_secs - assert_eq!( - sorting_cols.len(), - 5, - "should have 5 sorting columns from the test schema" - ); - - // Verify all are ascending with nulls first - for col in &sorting_cols { - assert!(!col.descending, "sorting should be ascending"); - assert!(!col.nulls_first, "nulls should be last"); - } - - // Verify order matches SORT_ORDER filtered by schema presence: - // metric_name (idx 0), service (idx 4), env (idx 5), host (idx 6), timestamp_secs (idx 2) - assert_eq!(sorting_cols[0].column_idx, 0); // metric_name - assert_eq!(sorting_cols[1].column_idx, 4); // service - assert_eq!(sorting_cols[2].column_idx, 5); // env - assert_eq!(sorting_cols[3].column_idx, 6); // host - assert_eq!(sorting_cols[4].column_idx, 2); // timestamp_secs - } } diff --git a/quickwit/quickwit-parquet-engine/src/storage/split_writer.rs b/quickwit/quickwit-parquet-engine/src/storage/split_writer.rs index 466e2c9cdcb..feda0701bed 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/split_writer.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/split_writer.rs @@ -25,12 +25,16 @@ use tracing::{debug, info, instrument}; use super::config::ParquetWriterConfig; use super::writer::{ParquetWriteError, ParquetWriter}; +use crate::sort_fields::window_start; use crate::split::{MetricsSplitMetadata, ParquetSplit, SplitId, TAG_SERVICE, TimeRange}; +use crate::table_config::TableConfig; /// Writer that produces complete ParquetSplit with metadata from RecordBatch data. pub struct ParquetSplitWriter { /// The underlying Parquet writer. writer: ParquetWriter, + /// Table configuration (sort fields, window duration, product type). + table_config: TableConfig, /// Base directory for split files. base_path: PathBuf, } @@ -41,9 +45,15 @@ impl ParquetSplitWriter { /// # Arguments /// * `config` - Parquet writer configuration /// * `base_path` - Directory where split files will be written - pub fn new(config: ParquetWriterConfig, base_path: impl Into) -> Self { + /// * `table_config` - Table-level config (sort fields, window duration) + pub fn new( + config: ParquetWriterConfig, + base_path: impl Into, + table_config: &TableConfig, + ) -> Self { Self { - writer: ParquetWriter::new(config), + writer: ParquetWriter::new(config, table_config), + table_config: table_config.clone(), base_path: base_path.into(), } } @@ -55,6 +65,10 @@ impl ParquetSplitWriter { /// Write a RecordBatch to a Parquet file and return a ParquetSplit with metadata. /// + /// Builds metadata (including sort fields, window_start, window_duration from + /// table_config) before writing, then embeds compaction KV metadata into the + /// Parquet file. size_bytes is updated after the write completes. + /// /// # Arguments /// * `batch` - The RecordBatch to write /// * `index_uid` - The index unique identifier for the split metadata @@ -67,18 +81,13 @@ impl ParquetSplitWriter { batch: &RecordBatch, index_uid: &str, ) -> Result { - // Generate unique split ID let split_id = SplitId::generate(); + let filename = format!("{}.parquet", split_id); + let file_path = self.base_path.join(&filename); - let file_path = self.base_path.join(format!("{}.parquet", split_id)); - - // Ensure the base directory exists std::fs::create_dir_all(&self.base_path)?; - // Write batch to file - let size_bytes = self.writer.write_to_file(batch, &file_path)?; - - // Extract time range from batch + // Extract batch-level metadata before writing. let time_range = extract_time_range(batch)?; debug!( start_secs = time_range.start_secs, @@ -86,36 +95,60 @@ impl ParquetSplitWriter { "extracted time range from batch" ); - // Extract distinct metric names from batch let metric_names = extract_metric_names(batch)?; - - // Extract distinct service names from batch let service_names = extract_service_names(batch)?; - // Build metadata - let metadata = MetricsSplitMetadata::builder() + // Compute window_start from the earliest timestamp in the batch. + let window_duration = self.table_config.window_duration_secs; + let window_start_secs = if window_duration > 0 { + match window_start(time_range.start_secs as i64, window_duration as i64) { + Ok(dt) => Some(dt.timestamp()), + Err(e) => { + tracing::warn!(error = %e, "failed to compute window_start, omitting"); + None + } + } + } else { + None + }; + + // Build metadata with sort fields and window from table_config. + // size_bytes is set to 0 here and updated after write. + let mut builder = MetricsSplitMetadata::builder() .split_id(split_id.clone()) .index_uid(index_uid) .time_range(time_range) .num_rows(batch.num_rows() as u64) - .size_bytes(size_bytes); + .size_bytes(0) + .sort_fields(self.writer.sort_fields_string()) + .window_duration_secs(window_duration) + .parquet_file(filename); + + if let Some(ws) = window_start_secs { + builder = builder.window_start_secs(ws); + } + + for name in metric_names { + builder = builder.add_metric_name(name); + } + for name in service_names { + builder = builder.add_low_cardinality_tag(TAG_SERVICE, name); + } - // Add metric names - let metadata = metric_names - .into_iter() - .fold(metadata, |m, name| m.add_metric_name(name)); + let mut metadata = builder.build(); - // Add service names as low-cardinality tags - let metadata = service_names.into_iter().fold(metadata, |m, name| { - m.add_low_cardinality_tag(TAG_SERVICE, name) - }); + // Write with compaction metadata embedded in Parquet KV metadata. + let size_bytes = + self.writer + .write_to_file_with_metadata(batch, &file_path, Some(&metadata))?; - let metadata = metadata.build(); + metadata.size_bytes = size_bytes; info!( split_id = %split_id, file_path = %file_path.display(), size_bytes, + sort_fields = %self.writer.sort_fields_string(), "split file written successfully" ); @@ -296,7 +329,7 @@ mod tests { let config = ParquetWriterConfig::default(); let temp_dir = tempfile::tempdir().unwrap(); - let writer = ParquetSplitWriter::new(config, temp_dir.path()); + let writer = ParquetSplitWriter::new(config, temp_dir.path(), &TableConfig::default()); let batch = create_test_batch(10); let split = writer.write_split(&batch, "test-index").unwrap(); @@ -318,7 +351,7 @@ mod tests { let config = ParquetWriterConfig::default(); let temp_dir = tempfile::tempdir().unwrap(); - let writer = ParquetSplitWriter::new(config, temp_dir.path()); + let writer = ParquetSplitWriter::new(config, temp_dir.path(), &TableConfig::default()); // Create batch with timestamps [100, 150, 200] let batch = create_test_batch_with_options( @@ -340,7 +373,7 @@ mod tests { let config = ParquetWriterConfig::default(); let temp_dir = tempfile::tempdir().unwrap(); - let writer = ParquetSplitWriter::new(config, temp_dir.path()); + let writer = ParquetSplitWriter::new(config, temp_dir.path(), &TableConfig::default()); // Create batch with specific metric names let batch = create_test_batch_with_options( diff --git a/quickwit/quickwit-parquet-engine/src/storage/writer.rs b/quickwit/quickwit-parquet-engine/src/storage/writer.rs index 6f29c0be4cc..d45c0a0c9cb 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/writer.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/writer.rs @@ -21,13 +21,116 @@ use std::sync::Arc; use arrow::array::RecordBatch; use arrow::compute::{SortColumn, SortOptions, lexsort_to_indices, take_record_batch}; +use base64::Engine as _; +use base64::engine::general_purpose::STANDARD as BASE64; use parquet::arrow::ArrowWriter; use parquet::errors::ParquetError; +use parquet::file::metadata::{KeyValue, SortingColumn}; +use parquet::file::properties::WriterProperties; use thiserror::Error; use tracing::{debug, instrument}; use super::config::ParquetWriterConfig; -use crate::schema::{SORT_ORDER, validate_required_fields}; +use crate::schema::validate_required_fields; +use crate::sort_fields::parse_sort_fields; +use crate::split::MetricsSplitMetadata; +use crate::table_config::TableConfig; + +/// Parquet key_value_metadata keys for compaction metadata. +/// Prefixed with "qh." to avoid collision with standard Parquet/Arrow keys. +pub(crate) const PARQUET_META_SORT_FIELDS: &str = "qh.sort_fields"; +pub(crate) const PARQUET_META_WINDOW_START: &str = "qh.window_start"; +pub(crate) const PARQUET_META_WINDOW_DURATION: &str = "qh.window_duration_secs"; +pub(crate) const PARQUET_META_NUM_MERGE_OPS: &str = "qh.num_merge_ops"; +pub(crate) const PARQUET_META_ROW_KEYS: &str = "qh.row_keys"; +pub(crate) const PARQUET_META_ROW_KEYS_JSON: &str = "qh.row_keys_json"; + +/// Build Parquet key_value_metadata entries for compaction metadata. +/// Returns Vec that can be added to WriterProperties. +/// +/// Only populated fields are included -- pre-Phase-31 splits produce an empty vec. +pub(crate) fn build_compaction_key_value_metadata( + metadata: &MetricsSplitMetadata, +) -> Vec { + // TW-2: window_duration must divide 3600. + debug_assert!( + metadata.window_duration_secs() == 0 || 3600 % metadata.window_duration_secs() == 0, + "TW-2 violated at Parquet write: window_duration_secs={} does not divide 3600", + metadata.window_duration_secs() + ); + + let mut kvs = Vec::new(); + + if !metadata.sort_fields.is_empty() { + kvs.push(KeyValue::new( + PARQUET_META_SORT_FIELDS.to_string(), + metadata.sort_fields.clone(), + )); + } + + if let Some(ws) = metadata.window_start() { + kvs.push(KeyValue::new( + PARQUET_META_WINDOW_START.to_string(), + ws.to_string(), + )); + } + + if metadata.window_duration_secs() > 0 { + kvs.push(KeyValue::new( + PARQUET_META_WINDOW_DURATION.to_string(), + metadata.window_duration_secs().to_string(), + )); + } + + if metadata.num_merge_ops > 0 { + kvs.push(KeyValue::new( + PARQUET_META_NUM_MERGE_OPS.to_string(), + metadata.num_merge_ops.to_string(), + )); + } + + if let Some(ref row_keys_bytes) = metadata.row_keys_proto { + kvs.push(KeyValue::new( + PARQUET_META_ROW_KEYS.to_string(), + BASE64.encode(row_keys_bytes), + )); + + // Debug: human-readable JSON (best-effort). + if let Ok(row_keys) = ::decode( + row_keys_bytes.as_slice(), + ) && let Ok(json) = serde_json::to_string(&row_keys) + { + kvs.push(KeyValue::new(PARQUET_META_ROW_KEYS_JSON.to_string(), json)); + } + } + + kvs +} + +/// SS-5: Verify that the kv_metadata entries match the source MetricsSplitMetadata. +fn verify_ss5_kv_consistency(metadata: &MetricsSplitMetadata, kvs: &[KeyValue]) { + let find_kv = |key: &str| -> Option<&str> { + kvs.iter() + .find(|kv| kv.key == key) + .and_then(|kv| kv.value.as_deref()) + }; + + if !metadata.sort_fields.is_empty() { + debug_assert_eq!( + find_kv(PARQUET_META_SORT_FIELDS), + Some(metadata.sort_fields.as_str()), + "SS-5 violated: sort_fields in kv_metadata does not match MetricsSplitMetadata" + ); + } + + if let Some(ws) = metadata.window_start() { + debug_assert_eq!( + find_kv(PARQUET_META_WINDOW_START), + Some(ws.to_string()).as_deref(), + "SS-5 violated: window_start in kv_metadata does not match MetricsSplitMetadata" + ); + } +} /// Errors that can occur during parquet writing. #[derive(Debug, Error)] @@ -49,15 +152,37 @@ pub enum ParquetWriteError { SchemaValidation(String), } +/// A resolved sort field: a column name with its sort direction. +struct ResolvedSortField { + name: String, + descending: bool, +} + /// Writer for metrics data to Parquet format. pub struct ParquetWriter { config: ParquetWriterConfig, + /// Physical sort columns resolved from TableConfig, in sort priority order. + resolved_sort_fields: Vec, + /// The original sort fields string from TableConfig, stored verbatim in metadata. + sort_fields_string: String, } impl ParquetWriter { - /// Create a new ParquetWriter. - pub fn new(config: ParquetWriterConfig) -> Self { - Self { config } + /// Create a new ParquetWriter with sort order driven by `table_config`. + /// + /// Parses `table_config.effective_sort_fields()`, resolves each column name + /// to a `ParquetField`. Columns not in the physical schema (e.g., `timeseries_id`) + /// are skipped for sorting but recorded in the metadata string. + /// + /// The writer validates and sorts dynamically from the batch at write time. + pub fn new(config: ParquetWriterConfig, table_config: &TableConfig) -> Self { + let sort_fields_string = table_config.effective_sort_fields().to_string(); + let resolved_sort_fields = resolve_sort_fields(&sort_fields_string); + Self { + config, + resolved_sort_fields, + sort_fields_string, + } } /// Get the writer configuration. @@ -65,20 +190,50 @@ impl ParquetWriter { &self.config } - /// Sort a RecordBatch by the metrics sort order. - /// Columns from SORT_ORDER that are present in the batch schema are used; - /// missing columns are skipped. + /// Get the sort fields string (for metadata). + pub fn sort_fields_string(&self) -> &str { + &self.sort_fields_string + } + + /// Build `SortingColumn` entries for Parquet file metadata. + /// Columns from resolved sort fields that are present in the batch schema are included. + fn sorting_columns(&self, batch: &RecordBatch) -> Vec { + let schema = batch.schema(); + self.resolved_sort_fields + .iter() + .filter_map(|sf| { + schema + .index_of(sf.name.as_str()) + .ok() + .map(|idx| SortingColumn { + column_idx: idx as i32, + descending: sf.descending, + nulls_first: true, + }) + }) + .collect() + } + + /// Sort a RecordBatch according to the table_config sort fields. + /// Columns from the resolved sort fields that are present in the batch schema + /// are used; missing columns are skipped. This sorting enables efficient pruning + /// during query execution. fn sort_batch(&self, batch: &RecordBatch) -> Result { let schema = batch.schema(); - let mut sort_columns: Vec = SORT_ORDER + let mut sort_columns: Vec = self + .resolved_sort_fields .iter() - .filter_map(|name| schema.index_of(name).ok()) - .map(|idx| SortColumn { - values: Arc::clone(batch.column(idx)), - options: Some(SortOptions { - descending: false, - nulls_first: false, - }), + .filter_map(|sf| { + schema + .index_of(sf.name.as_str()) + .ok() + .map(|idx| SortColumn { + values: Arc::clone(batch.column(idx)), + options: Some(SortOptions { + descending: sf.descending, + nulls_first: true, + }), + }) }) .collect(); @@ -103,60 +258,136 @@ impl ParquetWriter { }); let indices = lexsort_to_indices(&sort_columns, None)?; - Ok(take_record_batch(batch, &indices)?) + let sorted_batch = take_record_batch(batch, &indices)?; + + // SS-1: verify the output is actually sorted. + #[cfg(debug_assertions)] + { + if sorted_batch.num_rows() > 1 { + let verify_columns: Vec = self + .resolved_sort_fields + .iter() + .filter_map(|sf| { + schema + .index_of(sf.name.as_str()) + .ok() + .map(|idx| SortColumn { + values: Arc::clone(sorted_batch.column(idx)), + options: Some(SortOptions { + descending: sf.descending, + nulls_first: true, + }), + }) + }) + .collect(); + let verify_indices = lexsort_to_indices(&verify_columns, None) + .expect("SS-1 verification sort failed"); + for i in 0..verify_indices.len() { + debug_assert_eq!( + verify_indices.value(i) as usize, + i, + "SS-1 violated: row {} is out of sort order after sort_batch()", + i + ); + } + } + } + + Ok(sorted_batch) } - /// Write a RecordBatch to Parquet bytes in memory. - /// The batch is sorted before writing by: metric_name, common tags, timestamp. - #[instrument(skip(self, batch), fields(batch_rows = batch.num_rows()))] - pub fn write_to_bytes(&self, batch: &RecordBatch) -> Result, ParquetWriteError> { + /// Validate, sort, and build WriterProperties for a batch. + fn prepare_write( + &self, + batch: &RecordBatch, + split_metadata: Option<&MetricsSplitMetadata>, + ) -> Result<(RecordBatch, WriterProperties), ParquetWriteError> { validate_required_fields(&batch.schema()) .map_err(|e| ParquetWriteError::SchemaValidation(e.to_string()))?; - - // Sort the batch before writing for efficient pruning let sorted_batch = self.sort_batch(batch)?; - let props = self.config.to_writer_properties(&sorted_batch.schema()); - let buffer = Cursor::new(Vec::new()); + let kv_metadata = split_metadata.map(build_compaction_key_value_metadata); + + // SS-5: verify kv_metadata sort_fields matches source. + if let (Some(meta), Some(kvs)) = (split_metadata, &kv_metadata) { + verify_ss5_kv_consistency(meta, kvs); + } + + let props = self.config.to_writer_properties_with_metadata( + &sorted_batch.schema(), + self.sorting_columns(&sorted_batch), + kv_metadata, + ); + Ok((sorted_batch, props)) + } + + /// Write a RecordBatch to Parquet bytes in memory. + #[instrument(skip(self, batch), fields(batch_rows = batch.num_rows()))] + pub fn write_to_bytes( + &self, + batch: &RecordBatch, + split_metadata: Option<&MetricsSplitMetadata>, + ) -> Result, ParquetWriteError> { + let (sorted_batch, props) = self.prepare_write(batch, split_metadata)?; + let buffer = Cursor::new(Vec::new()); let mut writer = ArrowWriter::try_new(buffer, sorted_batch.schema(), Some(props))?; writer.write(&sorted_batch)?; - let buffer = writer.into_inner()?; + let bytes = writer.into_inner()?.into_inner(); - let bytes = buffer.into_inner(); debug!(bytes_written = bytes.len(), "completed write to bytes"); Ok(bytes) } - /// Write a RecordBatch to a Parquet file. - /// The batch is sorted before writing by: metric_name, common tags, timestamp. + /// Write a RecordBatch to a Parquet file with optional compaction metadata. /// /// Returns the number of bytes written. - #[instrument(skip(self, batch), fields(batch_rows = batch.num_rows(), path = %path.display()))] - pub fn write_to_file( + #[instrument(skip(self, batch, split_metadata), fields(batch_rows = batch.num_rows(), path = %path.display()))] + pub fn write_to_file_with_metadata( &self, batch: &RecordBatch, path: &Path, + split_metadata: Option<&MetricsSplitMetadata>, ) -> Result { - validate_required_fields(&batch.schema()) - .map_err(|e| ParquetWriteError::SchemaValidation(e.to_string()))?; - - // Sort the batch before writing for efficient pruning - let sorted_batch = self.sort_batch(batch)?; + let (sorted_batch, props) = self.prepare_write(batch, split_metadata)?; - let props = self.config.to_writer_properties(&sorted_batch.schema()); let file = File::create(path)?; - let mut writer = ArrowWriter::try_new(file, sorted_batch.schema(), Some(props))?; writer.write(&sorted_batch)?; - let file = writer.into_inner()?; - let bytes_written = file.metadata()?.len(); + let bytes_written = writer.into_inner()?.metadata()?.len(); debug!(bytes_written, "completed write to file"); Ok(bytes_written) } } +/// Parse a sort fields string and resolve column names to physical `ParquetField`s. +/// +/// Columns not present in the current schema (e.g., `timeseries_id`) are silently +/// skipped — they are recorded in the metadata string but do not affect physical sort. +fn resolve_sort_fields(sort_fields_str: &str) -> Vec { + let schema = match parse_sort_fields(sort_fields_str) { + Ok(s) => s, + Err(e) => { + tracing::warn!(sort_fields = sort_fields_str, error = %e, "failed to parse sort fields, using empty sort order"); + return Vec::new(); + } + }; + + schema + .column + .iter() + .map(|col| { + let descending = col.sort_direction + == quickwit_proto::sortschema::SortColumnDirection::SortDirectionDescending as i32; + ResolvedSortField { + name: col.name.clone(), + descending, + } + }) + .collect() +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -176,18 +407,17 @@ mod tests { #[test] fn test_writer_creation() { let config = ParquetWriterConfig::default(); - let _writer = ParquetWriter::new(config); + let _writer = ParquetWriter::new(config, &TableConfig::default()); } #[test] fn test_write_to_bytes() { let config = ParquetWriterConfig::default(); - let writer = ParquetWriter::new(config); + let writer = ParquetWriter::new(config, &TableConfig::default()); let batch = create_test_batch(); - let bytes = writer.write_to_bytes(&batch).unwrap(); + let bytes = writer.write_to_bytes(&batch, None).unwrap(); - // Parquet files start with PAR1 magic bytes assert!(bytes.len() > 4); assert_eq!(&bytes[0..4], b"PAR1"); } @@ -195,23 +425,24 @@ mod tests { #[test] fn test_write_to_file() { let config = ParquetWriterConfig::default(); - let writer = ParquetWriter::new(config); + let writer = ParquetWriter::new(config, &TableConfig::default()); let batch = create_test_batch(); let temp_dir = std::env::temp_dir(); let path = temp_dir.join("test_metrics.parquet"); - let bytes_written = writer.write_to_file(&batch, &path).unwrap(); + let bytes_written = writer + .write_to_file_with_metadata(&batch, &path, None) + .unwrap(); assert!(bytes_written > 0); - // Clean up std::fs::remove_file(&path).ok(); } #[test] fn test_schema_validation_missing_field() { let config = ParquetWriterConfig::default(); - let writer = ParquetWriter::new(config); + let writer = ParquetWriter::new(config, &TableConfig::default()); // Create a batch missing required fields let wrong_schema = Arc::new(Schema::new(vec![Field::new( @@ -225,37 +456,7 @@ mod tests { ) .unwrap(); - let result = writer.write_to_bytes(&wrong_batch); - assert!(matches!( - result, - Err(ParquetWriteError::SchemaValidation(_)) - )); - } - - #[test] - fn test_schema_validation_wrong_type() { - let config = ParquetWriterConfig::default(); - let writer = ParquetWriter::new(config); - - // Create a batch where metric_name has wrong type (Utf8 instead of Dictionary) - let wrong_schema = Arc::new(Schema::new(vec![ - Field::new("metric_name", DataType::Utf8, false), - Field::new("metric_type", DataType::UInt8, false), - Field::new("timestamp_secs", DataType::UInt64, false), - Field::new("value", DataType::Float64, false), - ])); - let wrong_batch = RecordBatch::try_new( - wrong_schema, - vec![ - Arc::new(StringArray::from(vec!["test"])) as ArrayRef, - Arc::new(UInt8Array::from(vec![0u8])) as ArrayRef, - Arc::new(UInt64Array::from(vec![100u64])) as ArrayRef, - Arc::new(Float64Array::from(vec![1.0])) as ArrayRef, - ], - ) - .unwrap(); - - let result = writer.write_to_bytes(&wrong_batch); + let result = writer.write_to_bytes(&wrong_batch, None); assert!(matches!( result, Err(ParquetWriteError::SchemaValidation(_)) @@ -267,10 +468,10 @@ mod tests { use super::super::config::Compression; let config = ParquetWriterConfig::new().with_compression(Compression::Snappy); - let writer = ParquetWriter::new(config); + let writer = ParquetWriter::new(config, &TableConfig::default()); let batch = create_test_batch(); - let bytes = writer.write_to_bytes(&batch).unwrap(); + let bytes = writer.write_to_bytes(&batch, None).unwrap(); assert!(bytes.len() > 4); assert_eq!(&bytes[0..4], b"PAR1"); @@ -283,7 +484,7 @@ mod tests { use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; let config = ParquetWriterConfig::default(); - let writer = ParquetWriter::new(config); + let writer = ParquetWriter::new(config, &TableConfig::default()); // Create a schema with required fields + service tag for sort verification let schema = Arc::new(Schema::new(vec![ @@ -331,12 +532,12 @@ mod tests { ) .unwrap(); - // Write to file (will be sorted) let temp_dir = std::env::temp_dir(); let path = temp_dir.join("test_sorting.parquet"); - writer.write_to_file(&batch, &path).unwrap(); + writer + .write_to_file_with_metadata(&batch, &path, None) + .unwrap(); - // Read back and verify sort order let file = File::open(&path).unwrap(); let reader = ParquetRecordBatchReaderBuilder::try_new(file) .unwrap() @@ -407,7 +608,218 @@ mod tests { assert_eq!(get_service(2), "service_a"); assert_eq!(ts_col.value(2), 300); - // Clean up std::fs::remove_file(&path).ok(); } + + #[test] + fn test_write_to_file_with_compaction_metadata() { + use std::fs::File; + + use parquet::file::reader::{FileReader, SerializedFileReader}; + + use crate::split::{SplitId, TimeRange}; + + let config = ParquetWriterConfig::default(); + let writer = ParquetWriter::new(config, &TableConfig::default()); + + let batch = create_test_batch(); + + let metadata = MetricsSplitMetadata::builder() + .split_id(SplitId::new("e2e-test")) + .index_uid("test-index:00000000000000000000000000") + .time_range(TimeRange::new(1000, 2000)) + .window_start_secs(1700000000) + .window_duration_secs(900) + .sort_fields("metric_name|host|timestamp/V2") + .num_merge_ops(3) + .build(); + + let temp_dir = std::env::temp_dir(); + let path = temp_dir.join("test_compaction_metadata.parquet"); + + writer + .write_to_file_with_metadata(&batch, &path, Some(&metadata)) + .unwrap(); + + let file = File::open(&path).unwrap(); + let reader = SerializedFileReader::new(file).unwrap(); + let file_metadata = reader.metadata().file_metadata(); + let kv_metadata = file_metadata + .key_value_metadata() + .expect("should have kv metadata"); + + let find_kv = |key: &str| -> Option { + kv_metadata + .iter() + .find(|kv| kv.key == key) + .and_then(|kv| kv.value.clone()) + }; + + assert_eq!( + find_kv(PARQUET_META_SORT_FIELDS).unwrap(), + "metric_name|host|timestamp/V2" + ); + assert_eq!(find_kv(PARQUET_META_WINDOW_START).unwrap(), "1700000000"); + assert_eq!(find_kv(PARQUET_META_WINDOW_DURATION).unwrap(), "900"); + assert_eq!(find_kv(PARQUET_META_NUM_MERGE_OPS).unwrap(), "3"); + + std::fs::remove_file(&path).ok(); + } + + #[test] + fn test_write_to_file_without_metadata_has_no_qh_keys() { + use std::fs::File; + + use parquet::file::reader::{FileReader, SerializedFileReader}; + + let config = ParquetWriterConfig::default(); + let writer = ParquetWriter::new(config, &TableConfig::default()); + + let batch = create_test_batch(); + let temp_dir = std::env::temp_dir(); + let path = temp_dir.join("test_no_compaction_metadata.parquet"); + + writer + .write_to_file_with_metadata(&batch, &path, None) + .unwrap(); + + let file = File::open(&path).unwrap(); + let reader = SerializedFileReader::new(file).unwrap(); + let file_metadata = reader.metadata().file_metadata(); + + if let Some(kv_metadata) = file_metadata.key_value_metadata() { + let qh_keys: Vec<_> = kv_metadata + .iter() + .filter(|kv| kv.key.starts_with("qh.")) + .collect(); + assert!( + qh_keys.is_empty(), + "should have no qh.* keys without metadata, got: {:?}", + qh_keys + ); + } + + std::fs::remove_file(&path).ok(); + } + + #[test] + fn test_build_compaction_kv_metadata_fully_populated() { + use crate::split::{SplitId, TimeRange}; + + let metadata = MetricsSplitMetadata::builder() + .split_id(SplitId::new("kv-test")) + .index_uid("test-index:00000000000000000000000000") + .time_range(TimeRange::new(1000, 2000)) + .window_start_secs(1700000000) + .window_duration_secs(3600) + .sort_fields("metric_name|host|timestamp/V2") + .num_merge_ops(5) + .row_keys_proto(vec![0x08, 0x01, 0x10, 0x02]) + .build(); + + let kvs = build_compaction_key_value_metadata(&metadata); + + assert!( + kvs.len() >= 5, + "expected at least 5 kv entries, got {}", + kvs.len() + ); + + let find_kv = |key: &str| -> Option { + kvs.iter() + .find(|kv| kv.key == key) + .and_then(|kv| kv.value.clone()) + }; + + assert_eq!( + find_kv(PARQUET_META_SORT_FIELDS).unwrap(), + "metric_name|host|timestamp/V2" + ); + assert_eq!(find_kv(PARQUET_META_WINDOW_START).unwrap(), "1700000000"); + assert_eq!(find_kv(PARQUET_META_WINDOW_DURATION).unwrap(), "3600"); + assert_eq!(find_kv(PARQUET_META_NUM_MERGE_OPS).unwrap(), "5"); + + let row_keys_b64 = find_kv(PARQUET_META_ROW_KEYS).unwrap(); + let decoded = BASE64.decode(&row_keys_b64).unwrap(); + assert_eq!(decoded, vec![0x08, 0x01, 0x10, 0x02]); + } + + #[test] + fn test_build_compaction_kv_metadata_default_pre_phase31() { + use crate::split::{SplitId, TimeRange}; + + let metadata = MetricsSplitMetadata::builder() + .split_id(SplitId::new("old-split")) + .index_uid("test-index:00000000000000000000000000") + .time_range(TimeRange::new(1000, 2000)) + .build(); + + let kvs = build_compaction_key_value_metadata(&metadata); + + assert!( + kvs.is_empty(), + "pre-Phase-31 metadata should produce empty kv vec, got {} entries", + kvs.len() + ); + } + + #[test] + fn test_row_keys_base64_roundtrip() { + use crate::split::{SplitId, TimeRange}; + + let row_keys = quickwit_proto::sortschema::RowKeys { + min_row_values: Some(quickwit_proto::sortschema::ColumnValues { + column: vec![quickwit_proto::sortschema::ColumnValue { + value: Some(quickwit_proto::sortschema::column_value::Value::TypeString( + b"cpu.usage".to_vec(), + )), + }], + }), + max_row_values: Some(quickwit_proto::sortschema::ColumnValues { + column: vec![quickwit_proto::sortschema::ColumnValue { + value: Some(quickwit_proto::sortschema::column_value::Value::TypeString( + b"memory.used".to_vec(), + )), + }], + }), + all_inclusive_max_row_values: None, + expired: false, + }; + + let proto_bytes = prost::Message::encode_to_vec(&row_keys); + + let metadata = MetricsSplitMetadata::builder() + .split_id(SplitId::new("roundtrip-test")) + .index_uid("test-index:00000000000000000000000000") + .time_range(TimeRange::new(1000, 2000)) + .sort_fields("metric_name|timestamp/V2") + .row_keys_proto(proto_bytes.clone()) + .build(); + + let kvs = build_compaction_key_value_metadata(&metadata); + + let b64_entry = kvs + .iter() + .find(|kv| kv.key == PARQUET_META_ROW_KEYS) + .expect("should have row_keys entry"); + let decoded = BASE64 + .decode(b64_entry.value.as_ref().unwrap()) + .expect("should decode base64"); + assert_eq!(decoded, proto_bytes); + + let recovered: quickwit_proto::sortschema::RowKeys = + prost::Message::decode(decoded.as_slice()).expect("should decode proto"); + assert_eq!(recovered, row_keys); + + let json_entry = kvs + .iter() + .find(|kv| kv.key == PARQUET_META_ROW_KEYS_JSON) + .expect("should have row_keys_json entry"); + let json_str = json_entry.value.as_ref().unwrap(); + assert!( + json_str.contains("min_row_values") && json_str.contains("TypeString"), + "JSON should contain RowKeys structure, got: {}", + json_str + ); + } }