diff --git a/python/python/tests/test_json.py b/python/python/tests/test_json.py index d68e0ca539a..44dd22be5a9 100644 --- a/python/python/tests/test_json.py +++ b/python/python/tests/test_json.py @@ -4,11 +4,21 @@ import json import tempfile from pathlib import Path +from typing import Union import lance import pyarrow as pa +def check_json_type(ds: Union[lance.LanceDataset, pa.Table], col_name: str): + # TODO: In the future it should be possible to verify + # the logical type of a column. + + schema = ds.schema + field = schema.field(col_name) + assert field.type == pa.json_() + + def test_json_basic_write_read(): """Test basic JSON type write and read functionality.""" @@ -44,23 +54,13 @@ def test_json_basic_write_read(): logical_schema = dataset.schema assert len(logical_schema) == 2 assert logical_schema.field("id").type == pa.int32() - logical_field = logical_schema.field("data") - assert ( - str(logical_field.type) == "extension" - or logical_field.type == pa.utf8() - ) + check_json_type(dataset, "data") # Read data back result_table = dataset.to_table() # Check that data is returned as Arrow JSON for Python - result_field = result_table.schema.field("data") - # PyArrow extension types print as extension but - # the storage type is utf8 - assert ( - str(result_field.type) == "extension" - or result_field.type == pa.utf8() - ) + check_json_type(result_table, "data") # Verify data assert result_table.num_rows == 5 @@ -467,8 +467,7 @@ def test_json_filter_append_missing_json_cast(tmp_path: Path): lance.write_dataset(initial_table, dataset_path) dataset = lance.dataset(dataset_path) schema = dataset.schema - field = schema.field("article_metadata") - assert str(field.type) == "extension" or field.type == pa.utf8() + check_json_type(dataset, "article_metadata") append_table = pa.table( { @@ -513,6 +512,68 @@ def test_json_filter_append_missing_json_cast(tmp_path: Path): ] +def test_json_with_compaction(tmp_path: Path): + """Test that JSON data survives compaction across fragments.""" + + dataset_path = tmp_path / "json_compaction.lance" + + # Write first fragment + table1 = pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int32()), + "data": pa.array( + [ + json.dumps({"name": "Alice", "score": 10}), + json.dumps({"name": "Bob", "score": 20}), + json.dumps({"name": "Charlie", "score": 30}), + ], + type=pa.json_(), + ), + } + ) + lance.write_dataset(table1, dataset_path) + + # Write second fragment + table2 = pa.table( + { + "id": pa.array([4, 5], type=pa.int32()), + "data": pa.array( + [ + json.dumps({"name": "David", "score": 40}), + json.dumps({"name": "Eve", "score": 50}), + ], + type=pa.json_(), + ), + } + ) + lance.write_dataset(table2, dataset_path, mode="append") + + dataset = lance.dataset(dataset_path) + assert len(dataset.get_fragments()) == 2 + + # Run compaction + dataset.optimize.compact_files() + dataset = lance.dataset(dataset_path) + assert len(dataset.get_fragments()) == 1 + + # Verify data is intact + result = dataset.to_table() + assert result.num_rows == 5 + assert result.column("id").to_pylist() == [1, 2, 3, 4, 5] + + # Verify JSON type is preserved + check_json_type(dataset, "data") + + # Verify JSON functions still work after compaction + result = dataset.to_table(filter="json_get_string(data, 'name') = 'Alice'") + assert result.num_rows == 1 + assert result["id"][0].as_py() == 1 + + result = dataset.to_table(filter="json_get_int(data, 'score') > 25") + assert result.num_rows == 3 + assert result["id"].to_pylist() == [3, 4, 5] + + def test_json_limit_offset_batch_transfer_preserves_extension_metadata(tmp_path: Path): """Ensure JSON extension metadata survives limit/offset scans. @@ -567,3 +628,119 @@ def test_json_limit_offset_batch_transfer_preserves_extension_metadata(tmp_path: # Ensure JSON functions still recognize the column as JSON. assert dest.to_table(filter="json_get(meta, 'i') IS NOT NULL").num_rows == num_rows + + +def test_json_append(tmp_path: Path): + """Test appending JSON data to an existing dataset.""" + + dataset_path = tmp_path / "json_append.lance" + + # Write initial data + table1 = pa.table( + { + "id": pa.array([1, 2], type=pa.int32()), + "data": pa.array( + [ + json.dumps({"color": "red", "count": 1}), + json.dumps({"color": "blue", "count": 2}), + ], + type=pa.json_(), + ), + } + ) + lance.write_dataset(table1, dataset_path) + + # Append more data + table2 = pa.table( + { + "id": pa.array([3, 4, 5], type=pa.int32()), + "data": pa.array( + [ + json.dumps({"color": "green", "count": 3}), + json.dumps({"color": "yellow", "count": 4}), + None, + ], + type=pa.json_(), + ), + } + ) + lance.write_dataset(table2, dataset_path, mode="append") + + dataset = lance.dataset(dataset_path) + assert dataset.count_rows() == 5 + + # Verify JSON type is preserved + check_json_type(dataset, "data") + + # Verify all data is readable + result = dataset.to_table() + assert result.column("id").to_pylist() == [1, 2, 3, 4, 5] + + # Verify null handling + data_col = result.column("data") + assert data_col.null_count == 1 + assert data_col.is_null().to_pylist() == [False, False, False, False, True] + + # Verify JSON functions work across both fragments + result = dataset.to_table(filter="json_get_string(data, 'color') = 'green'") + assert result.num_rows == 1 + assert result["id"][0].as_py() == 3 + + result = dataset.to_table(filter="json_get_int(data, 'count') >= 2") + assert result.num_rows == 3 + assert result["id"].to_pylist() == [2, 3, 4] + + +def test_json_merge_insert(tmp_path: Path): + """Test merge_insert with JSON data.""" + + dataset_path = tmp_path / "json_merge_insert.lance" + + # Create initial dataset + table = pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int32()), + "data": pa.array( + [ + json.dumps({"name": "Alice", "score": 10}), + json.dumps({"name": "Bob", "score": 20}), + json.dumps({"name": "Charlie", "score": 30}), + ], + type=pa.json_(), + ), + } + ) + lance.write_dataset(table, dataset_path) + + # Merge insert: update id=2, insert id=4 + new_data = pa.table( + { + "id": pa.array([2, 4], type=pa.int32()), + "data": pa.array( + [ + json.dumps({"name": "Bob", "score": 99}), + json.dumps({"name": "David", "score": 40}), + ], + type=pa.json_(), + ), + } + ) + + dataset = lance.dataset(dataset_path) + dataset.merge_insert( + "id" + ).when_matched_update_all().when_not_matched_insert_all().execute(new_data) + dataset = lance.dataset(dataset_path) + + # Verify row count + assert dataset.count_rows() == 4 + + # Verify JSON type preserved + check_json_type(dataset, "data") + + # Verify data is readable + result = dataset.to_table() + assert sorted(result.column("id").to_pylist()) == [1, 2, 3, 4] + + result = dataset.to_table(filter="json_get_int(data, 'score') >= 35") + assert result.num_rows == 2 diff --git a/rust/lance/src/dataset/write/merge_insert/exec/write.rs b/rust/lance/src/dataset/write/merge_insert/exec/write.rs index 87714e6d46d..45b915fd353 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/write.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/write.rs @@ -444,11 +444,7 @@ impl FullSchemaMergeInsertExec { .iter() .map(|&idx| { let field = input_schema.field(idx); - Arc::new(arrow_schema::Field::new( - field.name(), - field.data_type().clone(), - field.is_nullable(), - )) + Arc::new(field.clone()) }) .collect(); let output_schema = Arc::new(Schema::new(output_fields));