Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions python/python/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,47 @@ def test_json_append(tmp_path: Path):
assert result["id"].to_pylist() == [2, 3, 4]


def test_json_add_columns(tmp_path: Path):
"""Test adding a JSON column to an existing dataset via add_columns."""

dataset_path = tmp_path / "json_add_col.lance"

# Create a dataset without a JSON column
table = pa.table(
{
"id": pa.array([1, 2, 3], type=pa.int32()),
"name": pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
}
)
dataset = lance.write_dataset(table, dataset_path)

# Add a JSON column using a record batch reader
names = table.column("name").to_pylist()
json_values = [json.dumps({"greeting": f"hello {n}"}) for n in names]
new_col = pa.record_batch([pa.array(json_values, type=pa.json_())], ["metadata"])
reader_schema = pa.schema([pa.field("metadata", pa.json_())])

dataset.add_columns(iter([new_col]), reader_schema=reader_schema)
dataset = lance.dataset(dataset_path)

# Verify the new column exists and has the right type
assert dataset.schema.names == ["id", "name", "metadata"]
check_json_type(dataset, "metadata")

# Verify data round-trips
result = dataset.to_table()
assert result.num_rows == 3
metadata_values = result.column("metadata").to_pylist()
for name, val in zip(names, metadata_values):
assert json.loads(val) == {"greeting": f"hello {name}"}

result = dataset.to_table(
filter="json_get_string(metadata, 'greeting') = 'hello Alice'"
)
assert result.num_rows == 1
assert result["id"][0].as_py() == 1


def test_json_merge_insert(tmp_path: Path):
"""Test merge_insert with JSON data."""

Expand Down
16 changes: 16 additions & 0 deletions rust/lance/src/dataset/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use super::fragment::FragmentReader;
use super::scanner::get_default_batch_size;
use super::write::{open_writer, GenericWriter};
use super::Dataset;
use crate::dataset::utils::SchemaAdapter;
use crate::dataset::FileFragment;

/// Update or insert a new column.
Expand Down Expand Up @@ -43,6 +44,9 @@ pub struct Updater {
/// The schema the new files will be written in. This only contains new columns.
write_schema: Option<Schema>,

/// The adapter to convert the logical data to physical data.
schema_adapter: Option<SchemaAdapter>,

finished: bool,

deletion_restorer: DeletionRestorer,
Expand Down Expand Up @@ -89,6 +93,9 @@ impl Updater {
writer: None,
write_schema,
final_schema,
// The schema adapter needs the data schema, not the logical schema, so it can't be
// created until after the first batch is read.
schema_adapter: None,
finished: false,
deletion_restorer: DeletionRestorer::new(deletion_vector, legacy_batch_size),
})
Expand Down Expand Up @@ -196,6 +203,15 @@ impl Updater {
);
}

let schema_adapter = if let Some(schema_adapter) = self.schema_adapter.as_ref() {
schema_adapter
} else {
self.schema_adapter = Some(SchemaAdapter::new(batch.schema()));
self.schema_adapter.as_ref().unwrap()
};

let batch = schema_adapter.to_physical_batch(batch)?;

let writer = self.writer.as_mut().unwrap();

writer.write(&[batch]).await?;
Expand Down
8 changes: 8 additions & 0 deletions rust/lance/src/dataset/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ impl SchemaAdapter {
schema.fields().iter().any(|field| is_json_field(field))
}

pub fn to_physical_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
if self.requires_physical_conversion() {
Ok(convert_json_columns(&batch)?)
} else {
Ok(batch)
}
}

/// Convert a logical stream into a physical stream.
pub fn to_physical_stream(
&self,
Expand Down
Loading