diff --git a/python/python/tests/test_json.py b/python/python/tests/test_json.py index 44dd22be5a9..0cbc918cc18 100644 --- a/python/python/tests/test_json.py +++ b/python/python/tests/test_json.py @@ -691,6 +691,47 @@ def test_json_append(tmp_path: Path): assert result["id"].to_pylist() == [2, 3, 4] +def test_json_add_columns(tmp_path: Path): + """Test adding a JSON column to an existing dataset via add_columns.""" + + dataset_path = tmp_path / "json_add_col.lance" + + # Create a dataset without a JSON column + table = pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int32()), + "name": pa.array(["Alice", "Bob", "Charlie"], type=pa.string()), + } + ) + dataset = lance.write_dataset(table, dataset_path) + + # Add a JSON column using a record batch reader + names = table.column("name").to_pylist() + json_values = [json.dumps({"greeting": f"hello {n}"}) for n in names] + new_col = pa.record_batch([pa.array(json_values, type=pa.json_())], ["metadata"]) + reader_schema = pa.schema([pa.field("metadata", pa.json_())]) + + dataset.add_columns(iter([new_col]), reader_schema=reader_schema) + dataset = lance.dataset(dataset_path) + + # Verify the new column exists and has the right type + assert dataset.schema.names == ["id", "name", "metadata"] + check_json_type(dataset, "metadata") + + # Verify data round-trips + result = dataset.to_table() + assert result.num_rows == 3 + metadata_values = result.column("metadata").to_pylist() + for name, val in zip(names, metadata_values): + assert json.loads(val) == {"greeting": f"hello {name}"} + + result = dataset.to_table( + filter="json_get_string(metadata, 'greeting') = 'hello Alice'" + ) + assert result.num_rows == 1 + assert result["id"][0].as_py() == 1 + + def test_json_merge_insert(tmp_path: Path): """Test merge_insert with JSON data.""" diff --git a/rust/lance/src/dataset/updater.rs b/rust/lance/src/dataset/updater.rs index 7197f877943..bdcbc73cbfe 100644 --- a/rust/lance/src/dataset/updater.rs +++ b/rust/lance/src/dataset/updater.rs @@ -14,6 +14,7 @@ use super::fragment::FragmentReader; use super::scanner::get_default_batch_size; use super::write::{open_writer, GenericWriter}; use super::Dataset; +use crate::dataset::utils::SchemaAdapter; use crate::dataset::FileFragment; /// Update or insert a new column. @@ -43,6 +44,9 @@ pub struct Updater { /// The schema the new files will be written in. This only contains new columns. write_schema: Option, + /// The adapter to convert the logical data to physical data. + schema_adapter: Option, + finished: bool, deletion_restorer: DeletionRestorer, @@ -89,6 +93,9 @@ impl Updater { writer: None, write_schema, final_schema, + // The schema adapter needs the data schema, not the logical schema, so it can't be + // created until after the first batch is read. + schema_adapter: None, finished: false, deletion_restorer: DeletionRestorer::new(deletion_vector, legacy_batch_size), }) @@ -196,6 +203,15 @@ impl Updater { ); } + let schema_adapter = if let Some(schema_adapter) = self.schema_adapter.as_ref() { + schema_adapter + } else { + self.schema_adapter = Some(SchemaAdapter::new(batch.schema())); + self.schema_adapter.as_ref().unwrap() + }; + + let batch = schema_adapter.to_physical_batch(batch)?; + let writer = self.writer.as_mut().unwrap(); writer.write(&[batch]).await?; diff --git a/rust/lance/src/dataset/utils.rs b/rust/lance/src/dataset/utils.rs index 56792a9317d..5a459e3032e 100644 --- a/rust/lance/src/dataset/utils.rs +++ b/rust/lance/src/dataset/utils.rs @@ -163,6 +163,14 @@ impl SchemaAdapter { schema.fields().iter().any(|field| is_json_field(field)) } + pub fn to_physical_batch(&self, batch: RecordBatch) -> Result { + if self.requires_physical_conversion() { + Ok(convert_json_columns(&batch)?) + } else { + Ok(batch) + } + } + /// Convert a logical stream into a physical stream. pub fn to_physical_stream( &self,