Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ on:
- rust/**
- protos/**
- .github/workflows/rust.yml
- rust-toolchain.toml
- Cargo.toml
- Cargo.lock
- deny.toml
Expand Down
246 changes: 232 additions & 14 deletions python/python/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,21 @@
import json
import tempfile
from pathlib import Path
from typing import Union

import lance
import pyarrow as pa


def check_json_type(ds: Union[lance.LanceDataset, pa.Table], col_name: str):
# TODO: In the future it should be possible to verify
# the logical type of a column.

schema = ds.schema
field = schema.field(col_name)
assert field.type == pa.json_()


def test_json_basic_write_read():
"""Test basic JSON type write and read functionality."""

Expand Down Expand Up @@ -44,23 +54,13 @@ def test_json_basic_write_read():
logical_schema = dataset.schema
assert len(logical_schema) == 2
assert logical_schema.field("id").type == pa.int32()
logical_field = logical_schema.field("data")
assert (
str(logical_field.type) == "extension<arrow.json>"
or logical_field.type == pa.utf8()
)
check_json_type(dataset, "data")

# Read data back
result_table = dataset.to_table()

# Check that data is returned as Arrow JSON for Python
result_field = result_table.schema.field("data")
# PyArrow extension types print as extension<arrow.json> but
# the storage type is utf8
assert (
str(result_field.type) == "extension<arrow.json>"
or result_field.type == pa.utf8()
)
check_json_type(result_table, "data")

# Verify data
assert result_table.num_rows == 5
Expand Down Expand Up @@ -467,8 +467,7 @@ def test_json_filter_append_missing_json_cast(tmp_path: Path):
lance.write_dataset(initial_table, dataset_path)
dataset = lance.dataset(dataset_path)
schema = dataset.schema
field = schema.field("article_metadata")
assert str(field.type) == "extension<arrow.json>" or field.type == pa.utf8()
check_json_type(dataset, "article_metadata")

append_table = pa.table(
{
Expand Down Expand Up @@ -513,6 +512,68 @@ def test_json_filter_append_missing_json_cast(tmp_path: Path):
]


def test_json_with_compaction(tmp_path: Path):
"""Test that JSON data survives compaction across fragments."""

dataset_path = tmp_path / "json_compaction.lance"

# Write first fragment
table1 = pa.table(
{
"id": pa.array([1, 2, 3], type=pa.int32()),
"data": pa.array(
[
json.dumps({"name": "Alice", "score": 10}),
json.dumps({"name": "Bob", "score": 20}),
json.dumps({"name": "Charlie", "score": 30}),
],
type=pa.json_(),
),
}
)
lance.write_dataset(table1, dataset_path)

# Write second fragment
table2 = pa.table(
{
"id": pa.array([4, 5], type=pa.int32()),
"data": pa.array(
[
json.dumps({"name": "David", "score": 40}),
json.dumps({"name": "Eve", "score": 50}),
],
type=pa.json_(),
),
}
)
lance.write_dataset(table2, dataset_path, mode="append")

dataset = lance.dataset(dataset_path)
assert len(dataset.get_fragments()) == 2

# Run compaction
dataset.optimize.compact_files()
dataset = lance.dataset(dataset_path)
assert len(dataset.get_fragments()) == 1

# Verify data is intact
result = dataset.to_table()
assert result.num_rows == 5
assert result.column("id").to_pylist() == [1, 2, 3, 4, 5]

# Verify JSON type is preserved
check_json_type(dataset, "data")

# Verify JSON functions still work after compaction
result = dataset.to_table(filter="json_get_string(data, 'name') = 'Alice'")
assert result.num_rows == 1
assert result["id"][0].as_py() == 1

result = dataset.to_table(filter="json_get_int(data, 'score') > 25")
assert result.num_rows == 3
assert result["id"].to_pylist() == [3, 4, 5]


def test_json_limit_offset_batch_transfer_preserves_extension_metadata(tmp_path: Path):
"""Ensure JSON extension metadata survives limit/offset scans.

Expand Down Expand Up @@ -567,3 +628,160 @@ def test_json_limit_offset_batch_transfer_preserves_extension_metadata(tmp_path:

# Ensure JSON functions still recognize the column as JSON.
assert dest.to_table(filter="json_get(meta, 'i') IS NOT NULL").num_rows == num_rows


def test_json_append(tmp_path: Path):
"""Test appending JSON data to an existing dataset."""

dataset_path = tmp_path / "json_append.lance"

# Write initial data
table1 = pa.table(
{
"id": pa.array([1, 2], type=pa.int32()),
"data": pa.array(
[
json.dumps({"color": "red", "count": 1}),
json.dumps({"color": "blue", "count": 2}),
],
type=pa.json_(),
),
}
)
lance.write_dataset(table1, dataset_path)

# Append more data
table2 = pa.table(
{
"id": pa.array([3, 4, 5], type=pa.int32()),
"data": pa.array(
[
json.dumps({"color": "green", "count": 3}),
json.dumps({"color": "yellow", "count": 4}),
None,
],
type=pa.json_(),
),
}
)
lance.write_dataset(table2, dataset_path, mode="append")

dataset = lance.dataset(dataset_path)
assert dataset.count_rows() == 5

# Verify JSON type is preserved
check_json_type(dataset, "data")

# Verify all data is readable
result = dataset.to_table()
assert result.column("id").to_pylist() == [1, 2, 3, 4, 5]

# Verify null handling
data_col = result.column("data")
assert data_col.null_count == 1
assert data_col.is_null().to_pylist() == [False, False, False, False, True]

# Verify JSON functions work across both fragments
result = dataset.to_table(filter="json_get_string(data, 'color') = 'green'")
assert result.num_rows == 1
assert result["id"][0].as_py() == 3

result = dataset.to_table(filter="json_get_int(data, 'count') >= 2")
assert result.num_rows == 3
assert result["id"].to_pylist() == [2, 3, 4]


def test_json_add_columns(tmp_path: Path):
"""Test adding a JSON column to an existing dataset via add_columns."""

dataset_path = tmp_path / "json_add_col.lance"

# Create a dataset without a JSON column
table = pa.table(
{
"id": pa.array([1, 2, 3], type=pa.int32()),
"name": pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
}
)
dataset = lance.write_dataset(table, dataset_path)

# Add a JSON column using a record batch reader
names = table.column("name").to_pylist()
json_values = [json.dumps({"greeting": f"hello {n}"}) for n in names]
new_col = pa.record_batch([pa.array(json_values, type=pa.json_())], ["metadata"])
reader_schema = pa.schema([pa.field("metadata", pa.json_())])

dataset.add_columns(iter([new_col]), reader_schema=reader_schema)
dataset = lance.dataset(dataset_path)

# Verify the new column exists and has the right type
assert dataset.schema.names == ["id", "name", "metadata"]
check_json_type(dataset, "metadata")

# Verify data round-trips
result = dataset.to_table()
assert result.num_rows == 3
metadata_values = result.column("metadata").to_pylist()
for name, val in zip(names, metadata_values):
assert json.loads(val) == {"greeting": f"hello {name}"}

result = dataset.to_table(
filter="json_get_string(metadata, 'greeting') = 'hello Alice'"
)
assert result.num_rows == 1
assert result["id"][0].as_py() == 1


def test_json_merge_insert(tmp_path: Path):
"""Test merge_insert with JSON data."""

dataset_path = tmp_path / "json_merge_insert.lance"

# Create initial dataset
table = pa.table(
{
"id": pa.array([1, 2, 3], type=pa.int32()),
"data": pa.array(
[
json.dumps({"name": "Alice", "score": 10}),
json.dumps({"name": "Bob", "score": 20}),
json.dumps({"name": "Charlie", "score": 30}),
],
type=pa.json_(),
),
}
)
lance.write_dataset(table, dataset_path)

# Merge insert: update id=2, insert id=4
new_data = pa.table(
{
"id": pa.array([2, 4], type=pa.int32()),
"data": pa.array(
[
json.dumps({"name": "Bob", "score": 99}),
json.dumps({"name": "David", "score": 40}),
],
type=pa.json_(),
),
}
)

dataset = lance.dataset(dataset_path)
dataset.merge_insert(
"id"
).when_matched_update_all().when_not_matched_insert_all().execute(new_data)
dataset = lance.dataset(dataset_path)

# Verify row count
assert dataset.count_rows() == 4

# Verify JSON type preserved
check_json_type(dataset, "data")

# Verify data is readable
result = dataset.to_table()
assert sorted(result.column("id").to_pylist()) == [1, 2, 3, 4]

result = dataset.to_table(filter="json_get_int(data, 'score') >= 35")
assert result.num_rows == 2
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# We keep this pinned to keep clippy and rustfmt in sync between local and CI.
# Feel free to upgrade to bring in new lints.
[toolchain]
channel = "1.90.0"
channel = "1.91.0"
components = ["rustfmt", "clippy", "rust-analyzer"]
19 changes: 10 additions & 9 deletions rust/lance-namespace-impls/src/dir/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use arrow_ipc::reader::StreamReader;
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::StreamExt;
use futures::{stream::StreamExt, FutureExt};
use lance::dataset::optimize::{compact_files, CompactionOptions};
use lance::dataset::{builder::DatasetBuilder, WriteParams};
use lance::session::Session;
Expand Down Expand Up @@ -1081,7 +1081,7 @@ impl LanceNamespace for ManifestNamespace {
}

let object_id = Self::str_object_id(table_id);
let table_info = self.query_manifest_for_table(&object_id).await?;
let table_info = self.query_manifest_for_table(&object_id).boxed().await?;

// Extract table name and namespace from table_id
let table_name = table_id.last().cloned().unwrap_or_default();
Expand Down Expand Up @@ -1298,12 +1298,12 @@ impl LanceNamespace for ManifestNamespace {
let object_id = Self::build_object_id(&namespace, &table_name);

// Query manifest for table location
let table_info = self.query_manifest_for_table(&object_id).await?;
let table_info = self.query_manifest_for_table(&object_id).boxed().await?;

match table_info {
Some(info) => {
// Delete from manifest first
self.delete_from_manifest(&object_id).await?;
self.delete_from_manifest(&object_id).boxed().await?;

// Delete physical data directory using the dir_name from manifest
let table_path = self.base_path.child(info.location.as_str());
Expand All @@ -1312,6 +1312,7 @@ impl LanceNamespace for ManifestNamespace {
// Remove the table directory
self.object_store
.remove_dir_all(table_path)
.boxed()
.await
.map_err(|e| Error::Namespace {
source: format!("Failed to delete table directory: {}", e).into(),
Expand Down Expand Up @@ -1489,7 +1490,7 @@ impl LanceNamespace for ManifestNamespace {
let object_id = namespace_id.join(DELIMITER);

// Check if namespace exists
if !self.manifest_contains_object(&object_id).await? {
if !self.manifest_contains_object(&object_id).boxed().await? {
return Err(Error::Namespace {
source: format!("Namespace '{}' not found", object_id).into(),
location: location!(),
Expand All @@ -1499,7 +1500,7 @@ impl LanceNamespace for ManifestNamespace {
// Check for child namespaces
let prefix = format!("{}{}", object_id, DELIMITER);
let filter = format!("starts_with(object_id, '{}')", prefix);
let mut scanner = self.manifest_scanner().await?;
let mut scanner = self.manifest_scanner().boxed().await?;
scanner.filter(&filter).map_err(|e| Error::IO {
source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))),
location: location!(),
Expand All @@ -1509,7 +1510,7 @@ impl LanceNamespace for ManifestNamespace {
location: location!(),
})?;
scanner.with_row_id();
let count = scanner.count_rows().await.map_err(|e| Error::IO {
let count = scanner.count_rows().boxed().await.map_err(|e| Error::IO {
source: box_error(std::io::Error::other(format!(
"Failed to count rows: {}",
e
Expand All @@ -1528,7 +1529,7 @@ impl LanceNamespace for ManifestNamespace {
});
}

self.delete_from_manifest(&object_id).await?;
self.delete_from_manifest(&object_id).boxed().await?;

Ok(DropNamespaceResponse::default())
}
Expand Down Expand Up @@ -1863,7 +1864,7 @@ impl LanceNamespace for ManifestNamespace {
let table_uri = match table_info {
Some(info) => {
// Delete from manifest only (leave physical data intact)
self.delete_from_manifest(&object_id).await?;
self.delete_from_manifest(&object_id).boxed().await?;
Self::construct_full_uri(&self.root, &info.location)?
}
None => {
Expand Down
Loading