Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d892f23
feat: replace fixed MetricDataPoint fields with dynamic tag HashMap
mattmkim Mar 18, 2026
4e928fe
feat: replace ParquetField enum with constants and dynamic validation
mattmkim Mar 18, 2026
d8d71ed
feat: derive sort order and bloom filters from batch schema
mattmkim Mar 18, 2026
ad115bf
feat: union schema accumulation and schema-agnostic ingest validation
mattmkim Mar 18, 2026
d0a995e
feat: dynamic column lookup in split writer
mattmkim Mar 18, 2026
35c3942
feat: remove ParquetSchema dependency from indexing actors
mattmkim Mar 18, 2026
33c4070
refactor: deduplicate test batch helpers
mattmkim Mar 18, 2026
7a5979f
lint
mattmkim Mar 30, 2026
440631b
Merge branch 'main' into matthew.kim/metrics-wide-schema
mattmkim Mar 31, 2026
5eae799
Merge branch 'main' into matthew.kim/metrics-wide-schema
mattmkim Mar 31, 2026
5b2304c
feat(31): sort schema foundation — proto, parser, display, validation…
g-talbot Mar 31, 2026
4d42fd9
fix: rustdoc link errors — use backticks for private items
g-talbot Apr 1, 2026
b6eb595
feat(31): compaction metadata types — extend split metadata, postgres…
g-talbot Mar 31, 2026
76b703a
feat(31): wire TableConfig into sort path, add compaction KV metadata
g-talbot Mar 9, 2026
759c2ca
Update quickwit/quickwit-parquet-engine/src/table_config.rs
g-talbot Apr 7, 2026
6454f1d
Update quickwit/quickwit-parquet-engine/src/table_config.rs
g-talbot Apr 7, 2026
4d8b6b2
Merge quickwit-oss/main into gtt/phase-31-sort-schema
g-talbot Apr 8, 2026
1e67900
Merge branch 'gtt/phase-31-sort-schema' into gtt/phase-31-compaction-…
g-talbot Apr 8, 2026
85fcb2d
Merge branch 'gtt/phase-31-compaction-metadata' into gtt/phase-31-wri…
g-talbot Apr 8, 2026
4481bef
style: rustfmt long match arm in default_sort_fields
g-talbot Apr 8, 2026
64c5d5f
Merge branch 'gtt/phase-31-sort-schema' into gtt/phase-31-compaction-…
g-talbot Apr 8, 2026
a8bf948
Merge branch 'gtt/phase-31-compaction-metadata' into gtt/phase-31-wri…
g-talbot Apr 8, 2026
93e1cc7
fix: make parquet_file field backward-compatible in MetricsSplitMetadata
g-talbot Apr 8, 2026
b968085
Merge branch 'gtt/phase-31-compaction-metadata' into gtt/phase-31-wri…
g-talbot Apr 8, 2026
f7c89bf
fix: handle empty-column batches in accumulator flush
g-talbot Apr 8, 2026
112f290
Merge quickwit-oss/main and address review comments
g-talbot Apr 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bazel-bin
1 change: 1 addition & 0 deletions bazel-out
1 change: 1 addition & 0 deletions bazel-pomsky
1 change: 1 addition & 0 deletions bazel-testlogs
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,9 +638,11 @@ impl IndexingPipeline {

// ParquetPackager
let writer_config = quickwit_parquet_engine::storage::ParquetWriterConfig::default();
let table_config = quickwit_parquet_engine::table_config::TableConfig::default();
let split_writer = quickwit_parquet_engine::storage::ParquetSplitWriter::new(
writer_config,
self.params.indexing_directory.path(),
&table_config,
);
let parquet_packager = ParquetPackager::new(split_writer, parquet_uploader_mailbox);
let (parquet_packager_mailbox, parquet_packager_handle) = ctx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,8 @@ mod tests {

// Create ParquetPackager
let writer_config = ParquetWriterConfig::default();
let split_writer = ParquetSplitWriter::new(writer_config, temp_dir.path());
let table_config = quickwit_parquet_engine::table_config::TableConfig::default();
let split_writer = ParquetSplitWriter::new(writer_config, temp_dir.path(), &table_config);
let packager = ParquetPackager::new(split_writer, uploader_mailbox);
let (packager_mailbox, packager_handle) = universe.spawn_builder().spawn(packager);

Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-indexing/src/actors/parquet_e2e_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ async fn test_metrics_pipeline_e2e() {

// ParquetPackager between indexer and uploader
let writer_config = ParquetWriterConfig::default();
let split_writer = ParquetSplitWriter::new(writer_config, temp_dir.path());
let table_config = quickwit_parquet_engine::table_config::TableConfig::default();
let split_writer = ParquetSplitWriter::new(writer_config, temp_dir.path(), &table_config);
let packager = ParquetPackager::new(split_writer, uploader_mailbox);
let (packager_mailbox, packager_handle) = universe.spawn_builder().spawn(packager);

Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-indexing/src/actors/parquet_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,8 @@ mod tests {
uploader_mailbox: Mailbox<ParquetUploader>,
) -> (Mailbox<ParquetPackager>, ActorHandle<ParquetPackager>) {
let writer_config = ParquetWriterConfig::default();
let split_writer = ParquetSplitWriter::new(writer_config, temp_dir);
let table_config = quickwit_parquet_engine::table_config::TableConfig::default();
let split_writer = ParquetSplitWriter::new(writer_config, temp_dir, &table_config);

let packager = ParquetPackager::new(split_writer, uploader_mailbox);
universe.spawn_builder().spawn(packager)
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-indexing/src/actors/parquet_packager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ mod tests {
uploader_mailbox: Mailbox<ParquetUploader>,
) -> (Mailbox<ParquetPackager>, ActorHandle<ParquetPackager>) {
let writer_config = ParquetWriterConfig::default();
let split_writer = ParquetSplitWriter::new(writer_config, temp_dir);
let table_config = quickwit_parquet_engine::table_config::TableConfig::default();
let split_writer = ParquetSplitWriter::new(writer_config, temp_dir, &table_config);

let packager = ParquetPackager::new(split_writer, uploader_mailbox);
universe.spawn_builder().spawn(packager)
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-parquet-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ license.workspace = true
[dependencies]
anyhow = { workspace = true }
arrow = { workspace = true }
base64 = { workspace = true }
chrono = { workspace = true }
parquet = { workspace = true }
prost = { workspace = true }
quickwit-common = { workspace = true }
quickwit-proto = { workspace = true }
sea-query = { workspace = true, optional = true }
Expand Down
8 changes: 7 additions & 1 deletion quickwit/quickwit-parquet-engine/src/index/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,13 @@ impl ParquetBatchAccumulator {

/// Internal flush implementation.
fn flush_internal(&mut self) -> Result<Option<RecordBatch>, IndexingError> {
if self.pending_batches.is_empty() {
if self.pending_batches.is_empty() || self.union_fields.is_empty() {
// Nothing to flush: either no batches at all, or only empty
// (zero-column) batches were accumulated.
self.pending_batches.clear();
self.union_fields.clear();
self.pending_rows = 0;
self.pending_bytes = 0;
return Ok(None);
}

Expand Down
80 changes: 29 additions & 51 deletions quickwit/quickwit-parquet-engine/src/storage/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use arrow::datatypes::{DataType, Schema as ArrowSchema};
use parquet::basic::Compression as ParquetCompression;
use parquet::file::metadata::SortingColumn;
use parquet::file::metadata::{KeyValue, SortingColumn};
use parquet::file::properties::{EnabledStatistics, WriterProperties, WriterPropertiesBuilder};
use parquet::schema::types::ColumnPath;

Expand Down Expand Up @@ -119,20 +119,42 @@ impl ParquetWriterConfig {
}

/// Convert to Parquet WriterProperties using the given Arrow schema to configure
/// per-column settings like dictionary encoding and bloom filters.
/// per-column settings like dictionary encoding and bloom filters, with an empty
/// sort order and no metadata.
///
/// Prefer `to_writer_properties_with_metadata()` in production — this method
/// is mainly for tests that don't care about sort order.
pub fn to_writer_properties(&self, schema: &ArrowSchema) -> WriterProperties {
self.to_writer_properties_with_metadata(schema, Vec::new(), None)
}

/// Convert to Parquet WriterProperties with sorting columns and optional key_value_metadata.
///
/// `sorting_cols` is produced by `ParquetWriter::sorting_columns()` from the
/// resolved table_config sort fields.
///
/// When `kv_metadata` is provided, the entries are embedded in the Parquet file's
/// key_value_metadata, making files self-describing (META-07).
pub fn to_writer_properties_with_metadata(
&self,
schema: &ArrowSchema,
sorting_cols: Vec<SortingColumn>,
kv_metadata: Option<Vec<KeyValue>>,
) -> WriterProperties {
let mut builder = WriterProperties::builder()
.set_max_row_group_size(self.row_group_size)
.set_data_page_size_limit(self.data_page_size)
.set_write_batch_size(self.write_batch_size)
// Enable column index for efficient pruning on sorted data (64 bytes default)
.set_column_index_truncate_length(Some(64))
// Set sorting columns metadata for readers to use during pruning
.set_sorting_columns(Some(Self::sorting_columns(schema)))
// Enable row group level statistics (min/max/null_count) for query pruning
// This allows DataFusion to skip row groups based on timestamp ranges
.set_sorting_columns(Some(sorting_cols))
.set_statistics_enabled(EnabledStatistics::Chunk);

if let Some(kvs) = kv_metadata
&& !kvs.is_empty()
{
builder = builder.set_key_value_metadata(Some(kvs));
}

builder = match self.compression {
Compression::Zstd => {
let level = self.compression_level.unwrap_or(DEFAULT_ZSTD_LEVEL);
Expand Down Expand Up @@ -184,20 +206,6 @@ impl ParquetWriterConfig {
}
builder
}

/// Get the sorting columns for parquet metadata, computed from the schema
/// and SORT_ORDER. Only columns present in the schema are included.
fn sorting_columns(schema: &ArrowSchema) -> Vec<SortingColumn> {
SORT_ORDER
.iter()
.filter_map(|name| schema.index_of(name).ok())
.map(|idx| SortingColumn {
column_idx: idx as i32,
descending: false,
nulls_first: false,
})
.collect()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -370,34 +378,4 @@ mod tests {
);
}
}

#[test]
fn test_sorting_columns_order() {
let schema = create_test_schema();
let sorting_cols = ParquetWriterConfig::sorting_columns(&schema);

// The test schema has metric_name (idx 0), timestamp_secs (idx 2),
// service (idx 4), env (idx 5), host (idx 6).
// SORT_ORDER is: metric_name, service, env, datacenter, region, host, timestamp_secs
// Only present columns are included, so: metric_name, service, env, host, timestamp_secs
assert_eq!(
sorting_cols.len(),
5,
"should have 5 sorting columns from the test schema"
);

// Verify all are ascending with nulls first
for col in &sorting_cols {
assert!(!col.descending, "sorting should be ascending");
assert!(!col.nulls_first, "nulls should be last");
}

// Verify order matches SORT_ORDER filtered by schema presence:
// metric_name (idx 0), service (idx 4), env (idx 5), host (idx 6), timestamp_secs (idx 2)
assert_eq!(sorting_cols[0].column_idx, 0); // metric_name
assert_eq!(sorting_cols[1].column_idx, 4); // service
assert_eq!(sorting_cols[2].column_idx, 5); // env
assert_eq!(sorting_cols[3].column_idx, 6); // host
assert_eq!(sorting_cols[4].column_idx, 2); // timestamp_secs
}
}
89 changes: 61 additions & 28 deletions quickwit/quickwit-parquet-engine/src/storage/split_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ use tracing::{debug, info, instrument};

use super::config::ParquetWriterConfig;
use super::writer::{ParquetWriteError, ParquetWriter};
use crate::sort_fields::window_start;
use crate::split::{MetricsSplitMetadata, ParquetSplit, SplitId, TAG_SERVICE, TimeRange};
use crate::table_config::TableConfig;

/// Writer that produces complete ParquetSplit with metadata from RecordBatch data.
pub struct ParquetSplitWriter {
/// The underlying Parquet writer.
writer: ParquetWriter,
/// Table configuration (sort fields, window duration, product type).
table_config: TableConfig,
/// Base directory for split files.
base_path: PathBuf,
}
Expand All @@ -41,9 +45,15 @@ impl ParquetSplitWriter {
/// # Arguments
/// * `config` - Parquet writer configuration
/// * `base_path` - Directory where split files will be written
pub fn new(config: ParquetWriterConfig, base_path: impl Into<PathBuf>) -> Self {
/// * `table_config` - Table-level config (sort fields, window duration)
pub fn new(
config: ParquetWriterConfig,
base_path: impl Into<PathBuf>,
table_config: &TableConfig,
) -> Self {
Self {
writer: ParquetWriter::new(config),
writer: ParquetWriter::new(config, table_config),
table_config: table_config.clone(),
base_path: base_path.into(),
}
}
Expand All @@ -55,6 +65,10 @@ impl ParquetSplitWriter {

/// Write a RecordBatch to a Parquet file and return a ParquetSplit with metadata.
///
/// Builds metadata (including sort fields, window_start, window_duration from
/// table_config) before writing, then embeds compaction KV metadata into the
/// Parquet file. size_bytes is updated after the write completes.
///
/// # Arguments
/// * `batch` - The RecordBatch to write
/// * `index_uid` - The index unique identifier for the split metadata
Expand All @@ -67,55 +81,74 @@ impl ParquetSplitWriter {
batch: &RecordBatch,
index_uid: &str,
) -> Result<ParquetSplit, ParquetWriteError> {
// Generate unique split ID
let split_id = SplitId::generate();
let filename = format!("{}.parquet", split_id);
let file_path = self.base_path.join(&filename);

let file_path = self.base_path.join(format!("{}.parquet", split_id));

// Ensure the base directory exists
std::fs::create_dir_all(&self.base_path)?;

// Write batch to file
let size_bytes = self.writer.write_to_file(batch, &file_path)?;

// Extract time range from batch
// Extract batch-level metadata before writing.
let time_range = extract_time_range(batch)?;
debug!(
start_secs = time_range.start_secs,
end_secs = time_range.end_secs,
"extracted time range from batch"
);

// Extract distinct metric names from batch
let metric_names = extract_metric_names(batch)?;

// Extract distinct service names from batch
let service_names = extract_service_names(batch)?;

// Build metadata
let metadata = MetricsSplitMetadata::builder()
// Compute window_start from the earliest timestamp in the batch.
let window_duration = self.table_config.window_duration_secs;
let window_start_secs = if window_duration > 0 {
match window_start(time_range.start_secs as i64, window_duration as i64) {
Ok(dt) => Some(dt.timestamp()),
Err(e) => {
tracing::warn!(error = %e, "failed to compute window_start, omitting");
None
}
}
} else {
None
};

// Build metadata with sort fields and window from table_config.
// size_bytes is set to 0 here and updated after write.
let mut builder = MetricsSplitMetadata::builder()
.split_id(split_id.clone())
.index_uid(index_uid)
.time_range(time_range)
.num_rows(batch.num_rows() as u64)
.size_bytes(size_bytes);
.size_bytes(0)
.sort_fields(self.writer.sort_fields_string())
.window_duration_secs(window_duration)
.parquet_file(filename);

if let Some(ws) = window_start_secs {
builder = builder.window_start_secs(ws);
}

for name in metric_names {
builder = builder.add_metric_name(name);
}
for name in service_names {
builder = builder.add_low_cardinality_tag(TAG_SERVICE, name);
}

// Add metric names
let metadata = metric_names
.into_iter()
.fold(metadata, |m, name| m.add_metric_name(name));
let mut metadata = builder.build();

// Add service names as low-cardinality tags
let metadata = service_names.into_iter().fold(metadata, |m, name| {
m.add_low_cardinality_tag(TAG_SERVICE, name)
});
// Write with compaction metadata embedded in Parquet KV metadata.
let size_bytes =
self.writer
.write_to_file_with_metadata(batch, &file_path, Some(&metadata))?;

let metadata = metadata.build();
metadata.size_bytes = size_bytes;

info!(
split_id = %split_id,
file_path = %file_path.display(),
size_bytes,
sort_fields = %self.writer.sort_fields_string(),
"split file written successfully"
);

Expand Down Expand Up @@ -296,7 +329,7 @@ mod tests {
let config = ParquetWriterConfig::default();
let temp_dir = tempfile::tempdir().unwrap();

let writer = ParquetSplitWriter::new(config, temp_dir.path());
let writer = ParquetSplitWriter::new(config, temp_dir.path(), &TableConfig::default());

let batch = create_test_batch(10);
let split = writer.write_split(&batch, "test-index").unwrap();
Expand All @@ -318,7 +351,7 @@ mod tests {
let config = ParquetWriterConfig::default();
let temp_dir = tempfile::tempdir().unwrap();

let writer = ParquetSplitWriter::new(config, temp_dir.path());
let writer = ParquetSplitWriter::new(config, temp_dir.path(), &TableConfig::default());

// Create batch with timestamps [100, 150, 200]
let batch = create_test_batch_with_options(
Expand All @@ -340,7 +373,7 @@ mod tests {
let config = ParquetWriterConfig::default();
let temp_dir = tempfile::tempdir().unwrap();

let writer = ParquetSplitWriter::new(config, temp_dir.path());
let writer = ParquetSplitWriter::new(config, temp_dir.path(), &TableConfig::default());

// Create batch with specific metric names
let batch = create_test_batch_with_options(
Expand Down
Loading
Loading