Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 191 additions & 14 deletions python/python/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,21 @@
import json
import tempfile
from pathlib import Path
from typing import Union

import lance
import pyarrow as pa


def check_json_type(ds: Union[lance.LanceDataset, pa.Table], col_name: str):
# TODO: In the future it should be possible to verify
# the logical type of a column.

schema = ds.schema
field = schema.field(col_name)
assert field.type == pa.json_()


def test_json_basic_write_read():
"""Test basic JSON type write and read functionality."""

Expand Down Expand Up @@ -44,23 +54,13 @@ def test_json_basic_write_read():
logical_schema = dataset.schema
assert len(logical_schema) == 2
assert logical_schema.field("id").type == pa.int32()
logical_field = logical_schema.field("data")
assert (
str(logical_field.type) == "extension<arrow.json>"
or logical_field.type == pa.utf8()
)
check_json_type(dataset, "data")

# Read data back
result_table = dataset.to_table()

# Check that data is returned as Arrow JSON for Python
result_field = result_table.schema.field("data")
# PyArrow extension types print as extension<arrow.json> but
# the storage type is utf8
assert (
str(result_field.type) == "extension<arrow.json>"
or result_field.type == pa.utf8()
)
check_json_type(result_table, "data")

# Verify data
assert result_table.num_rows == 5
Expand Down Expand Up @@ -467,8 +467,7 @@ def test_json_filter_append_missing_json_cast(tmp_path: Path):
lance.write_dataset(initial_table, dataset_path)
dataset = lance.dataset(dataset_path)
schema = dataset.schema
field = schema.field("article_metadata")
assert str(field.type) == "extension<arrow.json>" or field.type == pa.utf8()
check_json_type(dataset, "article_metadata")

append_table = pa.table(
{
Expand Down Expand Up @@ -513,6 +512,68 @@ def test_json_filter_append_missing_json_cast(tmp_path: Path):
]


def test_json_with_compaction(tmp_path: Path):
"""Test that JSON data survives compaction across fragments."""

dataset_path = tmp_path / "json_compaction.lance"

# Write first fragment
table1 = pa.table(
{
"id": pa.array([1, 2, 3], type=pa.int32()),
"data": pa.array(
[
json.dumps({"name": "Alice", "score": 10}),
json.dumps({"name": "Bob", "score": 20}),
json.dumps({"name": "Charlie", "score": 30}),
],
type=pa.json_(),
),
}
)
lance.write_dataset(table1, dataset_path)

# Write second fragment
table2 = pa.table(
{
"id": pa.array([4, 5], type=pa.int32()),
"data": pa.array(
[
json.dumps({"name": "David", "score": 40}),
json.dumps({"name": "Eve", "score": 50}),
],
type=pa.json_(),
),
}
)
lance.write_dataset(table2, dataset_path, mode="append")

dataset = lance.dataset(dataset_path)
assert len(dataset.get_fragments()) == 2

# Run compaction
dataset.optimize.compact_files()
dataset = lance.dataset(dataset_path)
assert len(dataset.get_fragments()) == 1

# Verify data is intact
result = dataset.to_table()
assert result.num_rows == 5
assert result.column("id").to_pylist() == [1, 2, 3, 4, 5]

# Verify JSON type is preserved
check_json_type(dataset, "data")

# Verify JSON functions still work after compaction
result = dataset.to_table(filter="json_get_string(data, 'name') = 'Alice'")
assert result.num_rows == 1
assert result["id"][0].as_py() == 1

result = dataset.to_table(filter="json_get_int(data, 'score') > 25")
assert result.num_rows == 3
assert result["id"].to_pylist() == [3, 4, 5]


def test_json_limit_offset_batch_transfer_preserves_extension_metadata(tmp_path: Path):
"""Ensure JSON extension metadata survives limit/offset scans.

Expand Down Expand Up @@ -567,3 +628,119 @@ def test_json_limit_offset_batch_transfer_preserves_extension_metadata(tmp_path:

# Ensure JSON functions still recognize the column as JSON.
assert dest.to_table(filter="json_get(meta, 'i') IS NOT NULL").num_rows == num_rows


def test_json_append(tmp_path: Path):
"""Test appending JSON data to an existing dataset."""

dataset_path = tmp_path / "json_append.lance"

# Write initial data
table1 = pa.table(
{
"id": pa.array([1, 2], type=pa.int32()),
"data": pa.array(
[
json.dumps({"color": "red", "count": 1}),
json.dumps({"color": "blue", "count": 2}),
],
type=pa.json_(),
),
}
)
lance.write_dataset(table1, dataset_path)

# Append more data
table2 = pa.table(
{
"id": pa.array([3, 4, 5], type=pa.int32()),
"data": pa.array(
[
json.dumps({"color": "green", "count": 3}),
json.dumps({"color": "yellow", "count": 4}),
None,
],
type=pa.json_(),
),
}
)
lance.write_dataset(table2, dataset_path, mode="append")

dataset = lance.dataset(dataset_path)
assert dataset.count_rows() == 5

# Verify JSON type is preserved
check_json_type(dataset, "data")

# Verify all data is readable
result = dataset.to_table()
assert result.column("id").to_pylist() == [1, 2, 3, 4, 5]

# Verify null handling
data_col = result.column("data")
assert data_col.null_count == 1
assert data_col.is_null().to_pylist() == [False, False, False, False, True]

# Verify JSON functions work across both fragments
result = dataset.to_table(filter="json_get_string(data, 'color') = 'green'")
assert result.num_rows == 1
assert result["id"][0].as_py() == 3

result = dataset.to_table(filter="json_get_int(data, 'count') >= 2")
assert result.num_rows == 3
assert result["id"].to_pylist() == [2, 3, 4]


def test_json_merge_insert(tmp_path: Path):
"""Test merge_insert with JSON data."""

dataset_path = tmp_path / "json_merge_insert.lance"

# Create initial dataset
table = pa.table(
{
"id": pa.array([1, 2, 3], type=pa.int32()),
"data": pa.array(
[
json.dumps({"name": "Alice", "score": 10}),
json.dumps({"name": "Bob", "score": 20}),
json.dumps({"name": "Charlie", "score": 30}),
],
type=pa.json_(),
),
}
)
lance.write_dataset(table, dataset_path)

# Merge insert: update id=2, insert id=4
new_data = pa.table(
{
"id": pa.array([2, 4], type=pa.int32()),
"data": pa.array(
[
json.dumps({"name": "Bob", "score": 99}),
json.dumps({"name": "David", "score": 40}),
],
type=pa.json_(),
),
}
)

dataset = lance.dataset(dataset_path)
dataset.merge_insert(
"id"
).when_matched_update_all().when_not_matched_insert_all().execute(new_data)
dataset = lance.dataset(dataset_path)

# Verify row count
assert dataset.count_rows() == 4

# Verify JSON type preserved
check_json_type(dataset, "data")

# Verify data is readable
result = dataset.to_table()
assert sorted(result.column("id").to_pylist()) == [1, 2, 3, 4]

result = dataset.to_table(filter="json_get_int(data, 'score') >= 35")
assert result.num_rows == 2
6 changes: 1 addition & 5 deletions rust/lance/src/dataset/write/merge_insert/exec/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,11 +444,7 @@ impl FullSchemaMergeInsertExec {
.iter()
.map(|&idx| {
let field = input_schema.field(idx);
Arc::new(arrow_schema::Field::new(
field.name(),
field.data_type().clone(),
field.is_nullable(),
))
Arc::new(field.clone())
})
.collect();
let output_schema = Arc::new(Schema::new(output_fields));
Expand Down
Loading