Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1e3000f
refactor: extract zone utilities to lance-core
HaochengLIU Jan 6, 2026
15e173b
feat: add per-fragment column statistics to FileWriter
HaochengLIU Jan 6, 2026
3bc7c1a
feat: add column statistics reader to FileReader
HaochengLIU Jan 6, 2026
a307642
feat: add dataset-level column statistics policy
HaochengLIU Jan 6, 2026
4a014d2
feat: add column statistics consolidation and testing
HaochengLIU Jan 6, 2026
4f08d44
feat: add comprehensive compaction tests and formatting fixes
HaochengLIU Jan 7, 2026
e17dabf
fix: comprehensive compaction tests (WIP - tests need debugging)
HaochengLIU Jan 7, 2026
6ac9734
fix: all column statistics tests now passing
HaochengLIU Jan 7, 2026
80be464
cleanup wrong files
HaochengLIU Jan 8, 2026
2df39fd
docs: update FINAL_SUMMARY.md with comprehensive test coverage
HaochengLIU Jan 8, 2026
fc77739
docs: add comprehensive file-by-file review guide
HaochengLIU Jan 8, 2026
ac37515
First rewiew cleanup
HaochengLIU Jan 8, 2026
a938524
improve the default behavior of enable_column_stats flag
HaochengLIU Jan 9, 2026
0097657
improve the column stats writer by flatting the stats
HaochengLIU Jan 9, 2026
21439ad
Address round 1 comments
HaochengLIU Jan 20, 2026
34b064a
rename enable_column_stats to be disable and make it on by default
HaochengLIU Jan 20, 2026
a90f06b
second round of review
HaochengLIU Jan 27, 2026
4db376d
create protobuf entry for col stats
HaochengLIU Jan 27, 2026
d69c779
round 3: Make schema columnar
HaochengLIU Jan 27, 2026
b62a6c0
review reader and writer.rs
HaochengLIU Jan 29, 2026
0a23ab8
handle non accumulator type at running time
HaochengLIU Jan 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions java/lance-jni/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions java/lance-jni/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ fn convert_to_java_operation_inner<'local>(
table_metadata_updates,
schema_metadata_updates,
field_metadata_updates,
column_stats: _,
} => {
let config_updates_obj = export_update_map(env, &config_updates)?;
let table_metadata_updates_obj = export_update_map(env, &table_metadata_updates)?;
Expand Down Expand Up @@ -812,6 +813,7 @@ fn convert_to_rust_operation(
table_metadata_updates,
schema_metadata_updates,
field_metadata_updates,
column_stats: None,
}
}
"Append" => {
Expand Down
125 changes: 67 additions & 58 deletions java/src/test/java/org/lance/FileReaderWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,18 @@
*/
package org.lance;

import org.lance.file.LanceFileReader;
import org.lance.file.LanceFileWriter;
import org.lance.util.Range;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
Expand All @@ -30,28 +38,49 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import org.lance.file.LanceFileReader;
import org.lance.file.LanceFileWriter;
import org.lance.util.Range;

public class FileReaderWriterTest {
/**
* Schema metadata keys written by the file format when column stats are present (must match
* Rust).
*/
private static final String COLUMN_STATS_BUFFER_INDEX_KEY = "lance:column_stats:buffer_index";

private static final String COLUMN_STATS_VERSION_KEY = "lance:column_stats:version";

/**
* Expected schema for a simple file with x (Int64) and y (Utf8), including column-stats metadata.
*/
private static Schema expectedSchemaWithColumnStats() {
Map<String, String> metadata = new HashMap<>();
metadata.put(COLUMN_STATS_BUFFER_INDEX_KEY, "1");
metadata.put(COLUMN_STATS_VERSION_KEY, "1");
return new Schema(Arrays.asList(Field.nullable("x", new ArrowType.Int(64, true)),
Field.nullable("y", new ArrowType.Utf8())),
metadata);
}

/**
* Assert reader schema has same fields and column-stats metadata as expected (avoids
* Schema.equals quirks).
*/
private static void assertSchemaWithColumnStats(Schema expected, Schema actual) {
assertEquals(expected.getFields(), actual.getFields());
assertNotNull(
actual.getMetadata(), "Schema metadata should be present when column stats are written");
assertEquals(expected.getMetadata().get(COLUMN_STATS_BUFFER_INDEX_KEY),
actual.getMetadata().get(COLUMN_STATS_BUFFER_INDEX_KEY));
assertEquals(expected.getMetadata().get(COLUMN_STATS_VERSION_KEY),
actual.getMetadata().get(COLUMN_STATS_VERSION_KEY));
}

private VectorSchemaRoot createBatch(BufferAllocator allocator) throws IOException {
Schema schema =
new Schema(
Arrays.asList(
Field.nullable("x", new ArrowType.Int(64, true)),
Field.nullable("y", new ArrowType.Utf8())),
null);
Schema schema = new Schema(Arrays.asList(Field.nullable("x", new ArrowType.Int(64, true)),
Field.nullable("y", new ArrowType.Utf8())),
null);
VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
root.allocateNew();
BigIntVector iVector = (BigIntVector) root.getVector("x");
Expand Down Expand Up @@ -82,15 +111,10 @@ void testBasicRead(@TempDir Path tempDir) throws Exception {
createSimpleFile(filePath);
LanceFileReader reader = LanceFileReader.open(filePath, allocator);

Schema expectedSchema =
new Schema(
Arrays.asList(
Field.nullable("x", new ArrowType.Int(64, true)),
Field.nullable("y", new ArrowType.Utf8())),
null);
Schema expectedSchema = expectedSchemaWithColumnStats();

assertEquals(100, reader.numRows());
assertEquals(expectedSchema, reader.schema());
assertSchemaWithColumnStats(expectedSchema, reader.schema());

try (ArrowReader batches = reader.readAll(null, null, 100)) {
assertTrue(batches.loadNextBatch());
Expand Down Expand Up @@ -120,7 +144,7 @@ void testBasicRead(@TempDir Path tempDir) throws Exception {
}

// Ok to call schema after close
assertEquals(expectedSchema, reader.schema());
assertSchemaWithColumnStats(expectedSchema, reader.schema());

// close should be idempotent
reader.close();
Expand All @@ -133,15 +157,10 @@ void testReadWithProjection(@TempDir Path tempDir) throws Exception {
createSimpleFile(filePath);
LanceFileReader reader = LanceFileReader.open(filePath, allocator);

Schema expectedSchema =
new Schema(
Arrays.asList(
Field.nullable("x", new ArrowType.Int(64, true)),
Field.nullable("y", new ArrowType.Utf8())),
null);
Schema expectedSchema = expectedSchemaWithColumnStats();

assertEquals(100, reader.numRows());
assertEquals(expectedSchema, reader.schema());
assertSchemaWithColumnStats(expectedSchema, reader.schema());

try (ArrowReader batches = reader.readAll(Collections.singletonList("x"), null, 100)) {
assertTrue(batches.loadNextBatch());
Expand All @@ -161,33 +180,28 @@ void testReadWithProjection(@TempDir Path tempDir) throws Exception {
assertFalse(batches.loadNextBatch());
}

try (ArrowReader batches =
reader.readAll(
null, Arrays.asList(Range.of(1, 11), Range.of(14, 19), Range.of(20, 21)), 100)) {
try (ArrowReader batches = reader.readAll(
null, Arrays.asList(Range.of(1, 11), Range.of(14, 19), Range.of(20, 21)), 100)) {
assertTrue(batches.loadNextBatch());
VectorSchemaRoot batch = batches.getVectorSchemaRoot();
assertEquals(16, batch.getRowCount());
assertEquals(2, batch.getSchema().getFields().size());
assertFalse(batches.loadNextBatch());
}

try (ArrowReader batches =
reader.readAll(
Collections.singletonList("x"),
Arrays.asList(Range.of(23, 25), Range.of(27, 29)),
100)) {
try (ArrowReader batches = reader.readAll(Collections.singletonList("x"),
Arrays.asList(Range.of(23, 25), Range.of(27, 29)),
100)) {
assertTrue(batches.loadNextBatch());
VectorSchemaRoot batch = batches.getVectorSchemaRoot();
assertEquals(4, batch.getRowCount());
assertEquals(1, batch.getSchema().getFields().size());
assertFalse(batches.loadNextBatch());
}

try (ArrowReader batches =
reader.readAll(
Collections.singletonList("y"),
Arrays.asList(Range.of(23, 25), Range.of(27, 29)),
100)) {
try (ArrowReader batches = reader.readAll(Collections.singletonList("y"),
Arrays.asList(Range.of(23, 25), Range.of(27, 29)),
100)) {
assertTrue(batches.loadNextBatch());
VectorSchemaRoot batch = batches.getVectorSchemaRoot();
assertEquals(4, batch.getRowCount());
Expand Down Expand Up @@ -227,11 +241,8 @@ void testWriteWithStorage(@TempDir Path tempDir) throws IOException {
try {
LanceFileWriter.open(filePath, allocator, null, storageOptions);
} catch (IllegalArgumentException e) {
assertTrue(
e.getMessage()
.contains(
"Unable to find object store prefix: no Azure account "
+ "name in URI, and no storage account configured."));
assertTrue(e.getMessage().contains("Unable to find object store prefix: no Azure account "
+ "name in URI, and no storage account configured."));
}

storageOptions.put("account_name", "some_account");
Expand Down Expand Up @@ -295,11 +306,9 @@ void testWriteNullSchemaMetadata(@TempDir Path tempDir) throws Exception {
try (LanceFileWriter writer = LanceFileWriter.open(filePath, allocator, null)) {
try (VectorSchemaRoot batch = createBatch(allocator)) {
writer.write(batch);
Assertions.assertThrows(
Exception.class,
Assertions.assertThrows(Exception.class,
() -> writer.addSchemaMetadata(Collections.singletonMap("someKey", null)));
Assertions.assertThrows(
Exception.class,
Assertions.assertThrows(Exception.class,
() -> writer.addSchemaMetadata(Collections.singletonMap(null, "someValue")));
}
}
Expand Down
19 changes: 19 additions & 0 deletions protos/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ message Manifest {
// appropriately.
map<string, string> config = 16;

// Column statistics metadata.
//
// If present, indicates that consolidated column statistics are available
// for this dataset version.
optional ColumnStats column_stats = 22;

// Metadata associated with the table.
//
// This is a key-value map that can be used to store arbitrary metadata
Expand Down Expand Up @@ -228,6 +234,19 @@ message VersionAuxData {
map<string, bytes> metadata = 3;
}

// Column statistics metadata.
//
// Stores information about consolidated column statistics for the dataset.
message ColumnStats {
// Path to the consolidated column statistics file, relative to the dataset root.
// For example: "_stats/column_stats.lance"
string path = 1;
// Version of the column statistics format.
// This allows for future evolution of the format (e.g., different directory
// structure, different schema, etc.)
uint32 version = 2;
}

// Metadata describing an index.
message IndexMetadata {
// Unique ID of an index. It is unique across all the dataset versions.
Expand Down
9 changes: 6 additions & 3 deletions protos/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ message Transaction {
repeated uint64 deleted_fragment_ids = 2;
// The predicate that was evaluated
//
// This may be used to determine whether the delete would have affected
// This may be used to determine whether the delete would have affected
// files written by a concurrent transaction.
string predicate = 3;
}
Expand Down Expand Up @@ -183,7 +183,7 @@ message Transaction {
// if the target dataset is a branch, this is the branch name of the target dataset
optional string branch_name = 5;
}

// Exact set of key hashes for conflict detection.
// Used when the number of inserted rows is small.
message ExactKeySetFilter {
Expand Down Expand Up @@ -275,14 +275,17 @@ message Transaction {
// If false, the new entries will be merged with the existing map.
bool replace = 2;
}

// An operation that updates the table config, table metadata, schema metadata,
// or field metadata.
message UpdateConfig {
UpdateMap config_updates = 6;
UpdateMap table_metadata_updates = 7;
UpdateMap schema_metadata_updates = 8;
map<int32, UpdateMap> field_metadata_updates = 9;
// Column statistics metadata update.
// If present, updates the column_stats field in the manifest.
optional lance.table.ColumnStats column_stats = 10;

// Deprecated -------------------------------
map<string, string> upsert_values = 1;
Expand Down
2 changes: 2 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5610,6 +5610,7 @@ def write_dataset(
transaction_properties: Optional[Dict[str, str]] = None,
initial_bases: Optional[List[DatasetBasePath]] = None,
target_bases: Optional[List[str]] = None,
disable_column_stats: bool = False,
namespace: Optional[LanceNamespace] = None,
table_id: Optional[List[str]] = None,
) -> LanceDataset:
Expand Down Expand Up @@ -5862,6 +5863,7 @@ def write_dataset(
"transaction_properties": merged_properties,
"initial_bases": initial_bases,
"target_bases": target_bases,
"disable_column_stats": disable_column_stats,
}

# Add storage_options_provider if created from namespace
Expand Down
13 changes: 11 additions & 2 deletions python/python/tests/compat/test_file_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ def __init__(self, path: Path):

def create(self):
batch = build_basic_types()
lance.write_dataset(batch, self.path, data_storage_version="0.1")
lance.write_dataset(
batch,
self.path,
data_storage_version="0.1",
disable_column_stats=True,
)

def check_read(self):
ds = lance.dataset(self.path)
Expand All @@ -110,5 +115,9 @@ def check_write(self):
ds = lance.dataset(self.path)
ds.delete("true")
lance.write_dataset(
build_basic_types(), self.path, data_storage_version="0.1", mode="append"
build_basic_types(),
self.path,
data_storage_version="0.1",
mode="append",
disable_column_stats=True,
)
6 changes: 4 additions & 2 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1454,16 +1454,18 @@ def test_config_update_auto_cleanup(tmp_path):


def test_access_config(tmp_path):
# We assert only on the test key's presence/absence, not on len(ds.config()),
# because the manifest config may contain other keys (e.g. column stats).
table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
base_dir = tmp_path / "test"
ds = lance.write_dataset(table, base_dir, mode="create")
ds.update_config({"test_key": "test_value"})
config_value = ds.config()["test_key"]
assert config_value == "test_value"
assert 1 == len(ds.config())
assert "test_key" in ds.config()

ds.delete_config_keys(["test_key"])
assert 0 == len(ds.config())
assert "test_key" not in ds.config()


def test_auto_cleanup_invalid(tmp_path):
Expand Down
Loading
Loading