Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d96c930
Make delta_lake argument of TMM
skejserjensen Apr 23, 2025
ae9be08
Remove TableMetadataManager
skejserjensen Apr 24, 2025
10b3314
Move connection_info to DeltaLake
skejserjensen Apr 24, 2025
34cad67
Replace MetadataManager with ManagerMetadata trait
skejserjensen Apr 24, 2025
d30a136
Fix all compile errors from merging main
skejserjensen Aug 18, 2025
e12ca03
Fix compile errors for all tests
skejserjensen Aug 18, 2025
117f3f4
Fix failing tests after rebasing on main
skejserjensen Aug 18, 2025
9280400
Fix compile errors, warnings, tests after rebasing
skejserjensen Sep 3, 2025
146ae67
Fix compile errors and missing create tables
skejserjensen Sep 10, 2025
9c477ed
Normalize naming for creating IO types
skejserjensen Sep 10, 2025
eac89fb
Move compression of multivariate to compression
skejserjensen Oct 15, 2025
2f96f85
Fix unit tests after rebasing branch
skejserjensen Oct 15, 2025
10cc29b
Remove SessionContext from DataFolder
skejserjensen Oct 19, 2025
de0e838
Remove metadata and schema methods from DataFolder
skejserjensen Oct 20, 2025
866a0ef
Move writer to DeltaLake table_writer
skejserjensen Oct 20, 2025
a09f1f1
Move register to DeltaLake and remove DataFolder
skejserjensen Oct 20, 2025
3f3c8b0
Rename DeltaLake to DataFolder
skejserjensen Oct 20, 2025
804f69a
Remove TableMEtadataManager
skejserjensen Oct 28, 2025
e590530
Remove duplicate SessionContexts
skejserjensen Oct 28, 2025
8ebf233
Fix Clippy warnings
skejserjensen Oct 28, 2025
361cb74
Move metadata tables to the metadata schema
skejserjensen Oct 28, 2025
583a6e9
Remove TableMetadataManager from documentation
skejserjensen Oct 28, 2025
0468f06
Fix issues found by copilot code review
skejserjensen Oct 28, 2025
1fd29ac
Fix compile error after rebasing on main
skejserjensen Oct 28, 2025
dbc129a
Format all package using cargo fmt
skejserjensen Oct 28, 2025
5e4bb33
Update based on comments from @CGodiksen
skejserjensen Oct 30, 2025
bff7eaf
Fix compile error due to renaming
skejserjensen Oct 30, 2025
5c33dc4
Update based on comments from @CGodiksen
skejserjensen Oct 30, 2025
1e99357
Update based on comments from @chrthomsen
skejserjensen Oct 31, 2025
8ca91ee
Fix cargo doc warnings
skejserjensen Oct 31, 2025
ef9c74d
Import modelardb_storage::data_folder::DataFolder
skejserjensen Oct 31, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/modelardb_bulkloader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ arrow = { workspace = true, features = ["ffi"] }
datafusion.workspace = true
deltalake.workspace = true
futures.workspace = true
modelardb_compression = { path = "../modelardb_compression" }
modelardb_embedded = { path = "../modelardb_embedded" }
modelardb_storage = { path = "../modelardb_storage" }
modelardb_types = { path = "../modelardb_types" }
Expand Down
39 changes: 21 additions & 18 deletions crates/modelardb_bulkloader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ use deltalake::{ObjectStore, Path};
use futures::stream::StreamExt;
use modelardb_embedded::error::{ModelarDbEmbeddedError, Result};
use modelardb_embedded::operations::Operations;
use modelardb_embedded::operations::data_folder::DataFolder;
use modelardb_storage::delta_lake::DeltaTableWriter;
use modelardb_storage::data_folder::{DataFolder, DeltaTableWriter};
use modelardb_types::types::TimeSeriesTableMetadata;
use sysinfo::System;

Expand Down Expand Up @@ -168,8 +167,9 @@ async fn import(
data_folder.read(sql).await?;
}

if let Some(time_series_table_metadata) =
data_folder.time_series_table_metadata(table_name).await
if let Some(time_series_table_metadata) = data_folder
.time_series_table_metadata_for_registered_time_series_table(table_name)
.await
{
import_time_series_table(
input_stream,
Expand Down Expand Up @@ -205,7 +205,7 @@ async fn import_time_series_table(
cast_double_to_float: bool,
) -> Result<()> {
let table_name = &time_series_table_metadata.name;
let mut delta_table_writer = data_folder.writer(table_name).await?;
let mut delta_table_writer = data_folder.table_writer(table_name).await?;

let mut system = System::new();
let mut current_batch = vec![];
Expand All @@ -220,7 +220,6 @@ async fn import_time_series_table(
system.refresh_memory();
if current_batch_size > (system.available_memory() as usize / 10 * 8)
&& let Err(write_error) = import_and_clear_time_series_table_batch(
data_folder,
&mut delta_table_writer,
time_series_table_metadata,
&mut current_batch,
Expand All @@ -234,7 +233,6 @@ async fn import_time_series_table(
}

if let Err(write_error) = import_and_clear_time_series_table_batch(
data_folder,
&mut delta_table_writer,
time_series_table_metadata,
&mut current_batch,
Expand All @@ -258,7 +256,7 @@ async fn import_normal_table(
table_name: &str,
data_folder: &mut DataFolder,
) -> Result<()> {
let mut delta_table_writer = data_folder.writer(table_name).await?;
let mut delta_table_writer = data_folder.table_writer(table_name).await?;

while let Some(record_batch) = input_stream.next().await {
let record_batch = record_batch?;
Expand Down Expand Up @@ -386,12 +384,11 @@ fn cast_record_batch(record_batch: RecordBatch, cast_double_to_float: bool) -> R
RecordBatch::try_new(cast_schema, cast_columns).map_err(|error| error.into())
}

/// Import the `current_batch` into the time series table with `time_series_table_metadata` in
/// `data_folder` using `delta_table_writer`. Then clear `current_batch` and zero
/// `current_batch_size`. If a [`RecordBatch`] in `current_batch` has a different schema, the
/// compression fails, or the write fails, a [`ModelarDbEmbeddedError`] is returned.
/// Import the `current_batch` into the time series table with `time_series_table_metadata` using
/// `delta_table_writer`. Then clear `current_batch` and zero `current_batch_size`. If a
/// [`RecordBatch`] in `current_batch` has a different schema, the compression fails, or the write
/// fails, a [`ModelarDbEmbeddedError`] is returned.
async fn import_and_clear_time_series_table_batch(
data_folder: &DataFolder,
delta_table_writer: &mut DeltaTableWriter,
time_series_table_metadata: &TimeSeriesTableMetadata,
current_batch: &mut Vec<RecordBatch>,
Expand All @@ -400,9 +397,10 @@ async fn import_and_clear_time_series_table_batch(
if *current_batch_size != 0 {
let schema = current_batch[0].schema();
let uncompressed_data = compute::concat_batches(&schema, &*current_batch)?;
let compressed_data = data_folder
.compress_all(time_series_table_metadata, &uncompressed_data)
.await?;
let compressed_data = modelardb_compression::try_compress_multivariate_time_series(
time_series_table_metadata,
&uncompressed_data,
)?;
delta_table_writer.write_all(&compressed_data).await?;
current_batch.clear();
*current_batch_size = 0;
Expand Down Expand Up @@ -502,16 +500,21 @@ async fn create_data_folder(data_folder_path: &str) -> Result<DataFolder> {
secret_access_key,
)
.await
.map_err(|error| error.into())
}
Some(("az", container_name)) => {
let account_name = env::var("AZURE_STORAGE_ACCOUNT_NAME")?;
let access_key = env::var("AZURE_STORAGE_ACCESS_KEY")?;

DataFolder::open_azure(account_name, access_key, container_name.to_owned()).await
DataFolder::open_azure(account_name, access_key, container_name.to_owned())
.await
.map_err(|error| error.into())
}
_ => {
let data_folder_path = StdPath::new(data_folder_path);
DataFolder::open_local(data_folder_path).await
DataFolder::open_local(data_folder_path)
.await
.map_err(|error| error.into())
}
}
}
Expand Down
156 changes: 149 additions & 7 deletions crates/modelardb_compression/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

use std::sync::Arc;

use arrow::array::StringArray;
use arrow::compute::{self, SortColumn, SortOptions};
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use modelardb_types::types::{ErrorBound, TimestampArray, ValueArray};
use modelardb_types::types::{ErrorBound, TimeSeriesTableMetadata, TimestampArray, ValueArray};

use crate::error::{ModelarDbCompressionError, Result};
use crate::models::macaque_v::MacaqueV;
Expand All @@ -35,6 +37,146 @@ use crate::types::{CompressedSegmentBatchBuilder, CompressedSegmentBuilder, Mode
/// that are marked as residuals are stored as separate segments to allow for efficient pruning.
const RESIDUAL_VALUES_MAX_LENGTH: u8 = 255;

/// Compress the `uncompressed_time_series` from the table with `time_series_table_metadata` and
/// return the resulting segments.
pub fn try_compress_multivariate_time_series(
time_series_table_metadata: &TimeSeriesTableMetadata,
uncompressed_time_series: &RecordBatch,
) -> Result<Vec<RecordBatch>> {
// Sort by all tags and then time to simplify splitting the data into time series.
let sorted_uncompressed_data =
sort_time_series_by_tags_and_time(time_series_table_metadata, uncompressed_time_series)?;

// Split the sorted uncompressed data into time series and compress them separately.
let mut compressed_data = vec![];

let tag_column_arrays: Vec<&StringArray> = time_series_table_metadata
.tag_column_indices
.iter()
.map(|index| modelardb_types::array!(sorted_uncompressed_data, *index, StringArray))
.collect();

let mut tag_values = Vec::with_capacity(tag_column_arrays.len());
for tag_column_array in &tag_column_arrays {
tag_values.push(tag_column_array.value(0).to_owned());
}

// The index of the first data point of each time series must be stored so slices
// containing only data points for each time series can be extracted and compressed.
let mut row_index_start = 0;
for row_index in 0..sorted_uncompressed_data.num_rows() {
// If any of the tags differ, the data point is from a new time series.
let mut is_new_time_series = false;
for tag_column_index in 0..tag_column_arrays.len() {
is_new_time_series |= tag_values[tag_column_index]
!= tag_column_arrays[tag_column_index].value(row_index);
}

if is_new_time_series {
let time_series_length = row_index - row_index_start;
let uncompressed_time_series =
sorted_uncompressed_data.slice(row_index_start, time_series_length);

try_split_and_compress_univariate_time_series(
time_series_table_metadata,
&uncompressed_time_series,
&tag_values,
&mut compressed_data,
)?;

for (tag_column_index, tag_column_array) in tag_column_arrays.iter().enumerate() {
tag_values[tag_column_index] = tag_column_array.value(row_index).to_owned();
}

row_index_start = row_index;
}
}

let time_series_length = sorted_uncompressed_data.num_rows() - row_index_start;
let uncompressed_time_series =
sorted_uncompressed_data.slice(row_index_start, time_series_length);

try_split_and_compress_univariate_time_series(
time_series_table_metadata,
&uncompressed_time_series,
&tag_values,
&mut compressed_data,
)?;

Ok(compressed_data)
}

/// Sort the `uncompressed_data` from the time series table with `time_series_table_metadata`
/// according to its tags and then timestamps.
fn sort_time_series_by_tags_and_time(
time_series_table_metadata: &TimeSeriesTableMetadata,
uncompressed_time_series: &RecordBatch,
) -> Result<RecordBatch> {
let mut sort_columns = vec![];

let sort_options = Some(SortOptions {
descending: false,
nulls_first: false,
});

for tag_column_index in &time_series_table_metadata.tag_column_indices {
let tag_column = uncompressed_time_series.column(*tag_column_index);
sort_columns.push(SortColumn {
values: (*tag_column).clone(),
options: sort_options,
});
}

let timestamp_column_index = time_series_table_metadata.timestamp_column_index;
let timestamp_column = uncompressed_time_series.column(timestamp_column_index);
sort_columns.push(SortColumn {
values: (*timestamp_column).clone(),
options: sort_options,
});

let indices = compute::lexsort_to_indices(&sort_columns, None)?;
let sorted_columns = compute::take_arrays(uncompressed_time_series.columns(), &indices, None)?;
RecordBatch::try_new(uncompressed_time_series.schema(), sorted_columns).map_err(|error| error.into())
}

/// Compress the field columns in `uncompressed_time_series` from the table with
/// `time_series_table_metadata` using [`try_compress_univariate_time_series()`] and append the
/// result to `compressed_data`. It is assumed that all data points in `uncompressed_time_series`
/// have the same tags as in `tag_values`.
pub fn try_split_and_compress_univariate_time_series(
time_series_table_metadata: &TimeSeriesTableMetadata,
uncompressed_time_series: &RecordBatch,
tag_values: &[String],
compressed_time_series: &mut Vec<RecordBatch>,
) -> Result<()> {
let uncompressed_timestamps = modelardb_types::array!(
uncompressed_time_series,
time_series_table_metadata.timestamp_column_index,
TimestampArray
);

for field_column_index in &time_series_table_metadata.field_column_indices {
Comment thread
CGodiksen marked this conversation as resolved.
let uncompressed_values =
modelardb_types::array!(uncompressed_time_series, *field_column_index, ValueArray);

let error_bound = time_series_table_metadata.error_bounds[*field_column_index];

let compressed_segments = try_compress_univariate_time_series(
uncompressed_timestamps,
uncompressed_values,
error_bound,
time_series_table_metadata.compressed_schema.clone(),
tag_values.to_vec(),
*field_column_index as i16,
)
.expect("uncompressed_timestamps and uncompressed_values should have the same length.");

compressed_time_series.push(compressed_segments);
}

Ok(())
}

/// Compress `uncompressed_timestamps` using a start time, end time, and a sampling interval if
/// regular and delta-of-deltas followed by a variable length binary encoding if irregular.
/// `uncompressed_values` is compressed within `error_bound` using the model types in `models`.
Expand All @@ -45,7 +187,7 @@ const RESIDUAL_VALUES_MAX_LENGTH: u8 = 255;
/// `uncompressed_values` have different lengths or if `compressed_schema` is not a valid schema for
/// compressed segments, otherwise the resulting compressed segments are returned as a
/// [`RecordBatch`] with the `compressed_schema` schema.
pub fn try_compress(
pub fn try_compress_univariate_time_series(
uncompressed_timestamps: &TimestampArray,
uncompressed_values: &ValueArray,
error_bound: ErrorBound,
Expand Down Expand Up @@ -275,10 +417,10 @@ mod tests {
const ADD_NOISE_RANGE: Option<Range<f32>> = Some(1.0..1.05);
const TRY_COMPRESS_TEST_LENGTH: usize = 50;

// Tests for try_compress().
// Tests for try_compress_univariate_time_series().
#[test]
Comment thread
CGodiksen marked this conversation as resolved.
fn test_try_compress_empty_time_series_within_lossless_error_bound() {
let compressed_record_batch = try_compress(
let compressed_record_batch = try_compress_univariate_time_series(
&TimestampBuilder::new().finish(),
&ValueBuilder::new().finish(),
ErrorBound::Lossless,
Expand Down Expand Up @@ -440,7 +582,7 @@ mod tests {
let uncompressed_values =
data_generation::generate_values(uncompressed_timestamps.values(), values_structure);

let compressed_record_batch = try_compress(
let compressed_record_batch = try_compress_univariate_time_series(
&uncompressed_timestamps,
&uncompressed_values,
error_bound,
Expand Down Expand Up @@ -544,7 +686,7 @@ mod tests {
let uncompressed_values = uncompressed_values.finish();
assert_eq!(uncompressed_timestamps.len(), uncompressed_values.len());

let compressed_record_batch = try_compress(
let compressed_record_batch = try_compress_univariate_time_series(
&uncompressed_timestamps,
&uncompressed_values,
error_bound,
Expand Down Expand Up @@ -701,7 +843,7 @@ mod tests {
100.0..200.0,
);

let compressed_record_batch = try_compress(
let compressed_record_batch = try_compress_univariate_time_series(
&uncompressed_timestamps,
&uncompressed_values,
error_bound,
Expand Down
12 changes: 12 additions & 0 deletions crates/modelardb_compression/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,24 @@ use std::error::Error;
use std::fmt::{Display, Formatter};
use std::result::Result as StdResult;

use arrow::error::ArrowError;

/// Result type used throughout `modelardb_compression`.
pub type Result<T> = StdResult<T, ModelarDbCompressionError>;

/// Error type used throughout `modelardb_compression`.
#[derive(Debug)]
pub enum ModelarDbCompressionError {
/// Error returned by Apache Arrow.
Arrow(ArrowError),
/// Error returned when an invalid argument was passed.
InvalidArgument(String),
}

impl Display for ModelarDbCompressionError {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
Self::Arrow(reason) => write!(f, "Arrow Error: {reason}"),
Self::InvalidArgument(reason) => write!(f, "Invalid Argument Error: {reason}"),
}
}
Expand All @@ -41,7 +46,14 @@ impl Error for ModelarDbCompressionError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
// Return the error that caused self to occur if one exists.
match self {
Self::Arrow(reason) => Some(reason),
Self::InvalidArgument(_reason) => None,
}
}
}

impl From<ArrowError> for ModelarDbCompressionError {
fn from(error: ArrowError) -> Self {
Self::Arrow(error)
}
}
4 changes: 3 additions & 1 deletion crates/modelardb_compression/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ mod models;
mod types;

// Re-export the few functions and types users are meant to use.
pub use compression::try_compress;
pub use compression::try_compress_multivariate_time_series;
pub use compression::try_compress_univariate_time_series;
pub use compression::try_split_and_compress_univariate_time_series;
pub use models::grid;
pub use models::is_value_within_error_bound;
pub use models::len;
Expand Down
2 changes: 1 addition & 1 deletion crates/modelardb_compression/src/models/swing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ mod tests {
compressed_schema_fields.push(Arc::new(Field::new("tag", DataType::Utf8, false)));
let compressed_schema = Arc::new(Schema::new(compressed_schema_fields));

let segments = crate::try_compress(
let segments = crate::try_compress_univariate_time_series(
&timestamps,
&values,
error_bound,
Expand Down
Loading
Loading