From e3060b33b37e7edf93abd5f0dbd60ea1af8a1468 Mon Sep 17 00:00:00 2001 From: xloya Date: Mon, 3 Nov 2025 14:48:23 +0800 Subject: [PATCH 1/3] add docs --- docs/src/guide/distributed_write.md | 69 ++++++++++++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/docs/src/guide/distributed_write.md b/docs/src/guide/distributed_write.md index 34bc52562a5..109f2f9418b 100644 --- a/docs/src/guide/distributed_write.md +++ b/docs/src/guide/distributed_write.md @@ -166,4 +166,71 @@ Output: 3 craig 55 5 4 dave 66 4 5 eve 77 3 -``` \ No newline at end of file +``` + +## Update Columns + +Currently, Lance supports the fragment level update columns ability to update existing columns in a distributed manner. + +This operation performs a left-outer-hash-join with the right table (new data) +on the column specified by `left_on` and `right_on`. For every row in the current +fragment, the updated column value is: +1. If no matched row on the right side, the column value of the left side row. +2. If there is exactly one corresponding row on the right side, the column value + of the matching row. +3. If there are multiple corresponding rows, the column value of a random row. + +```python +import lance +import pyarrow as pa + +# Create initial dataset +data = pa.table( + { + "id": [1, 2, 3, 4], + "name": ["Alice", "Bob", "Charlie", "David"], + "score": [85, 90, 75, 80], + } +) +dataset_uri = "test_dataset_update_columns_custom_join_key" +dataset = lance.write_dataset(data, dataset_uri) + +# Prepare update data using 'id' as join key +# Note: We only update 'score', not 'id' itself +update_data = pa.table( + { + "id": [1, 3], + "name": ["Alan", "Chase"], + "score": [95, 85], + } +) + +# Get the fragment and update columns +fragment = dataset.get_fragment(0) +updated_fragment, fields_modified = fragment.update_columns( + update_data, left_on="id", right_on="id" +) + +# Commit the changes + +op = lance.LanceOperation.Update( + updated_fragments=[updated_fragment], + fields_modified=fields_modified, +) +updated_dataset = lance.LanceDataset.commit( + str(dataset_uri), op, read_version=dataset.version +) + +# Verify the update +result = updated_dataset.to_table().to_pydict() +print(result) +``` + +Output: +``` + id name score +0 1 Alan 95 +1 2 Bob 90 +2 3 Chase 85 +3 4 David 80 +``` \ No newline at end of file From 304aaf53027d6654033b2990fbd59db8b4e5f227 Mon Sep 17 00:00:00 2001 From: xloya Date: Mon, 3 Nov 2025 14:54:14 +0800 Subject: [PATCH 2/3] update --- docs/src/guide/distributed_write.md | 68 ++++++++++++++++++++--------- 1 file changed, 48 insertions(+), 20 deletions(-) diff --git a/docs/src/guide/distributed_write.md b/docs/src/guide/distributed_write.md index 109f2f9418b..801c37c7a40 100644 --- a/docs/src/guide/distributed_write.md +++ b/docs/src/guide/distributed_write.md @@ -184,20 +184,30 @@ fragment, the updated column value is: import lance import pyarrow as pa -# Create initial dataset -data = pa.table( +# Create initial dataset with two fragments +# First fragment +data1 = pa.table( { "id": [1, 2, 3, 4], "name": ["Alice", "Bob", "Charlie", "David"], "score": [85, 90, 75, 80], } ) -dataset_uri = "test_dataset_update_columns_custom_join_key" -dataset = lance.write_dataset(data, dataset_uri) +dataset_uri = "./my_dataset.lance" +dataset = lance.write_dataset(data1, dataset_uri) -# Prepare update data using 'id' as join key -# Note: We only update 'score', not 'id' itself -update_data = pa.table( +# Second fragment +data2 = pa.table( + { + "id": [5, 6, 7, 8], + "name": ["Eve", "Frank", "Grace", "Henry"], + "score": [88, 92, 78, 82], + } +) +dataset = lance.write_dataset(data2, dataset_uri, mode="append") + +# Prepare update data for fragment 0 using 'id' as join key +update_data1 = pa.table( { "id": [1, 3], "name": ["Alan", "Chase"], @@ -205,16 +215,30 @@ update_data = pa.table( } ) -# Get the fragment and update columns -fragment = dataset.get_fragment(0) -updated_fragment, fields_modified = fragment.update_columns( - update_data, left_on="id", right_on="id" +# Prepare update data for fragment 1 +update_data2 = pa.table( + { + "id": [5, 7], + "name": ["Eva", "Gracie"], + "score": [98, 88], + } +) + +# Update fragment 0 +fragment0 = dataset.get_fragment(0) +updated_fragment0, fields_modified = fragment0.update_columns( + update_data1, left_on="id", right_on="id" ) -# Commit the changes +# Update fragment 1 +fragment1 = dataset.get_fragment(1) +updated_fragment1, _ = fragment1.update_columns( + update_data2, left_on="id", right_on="id" +) +# Commit the changes for both fragments op = lance.LanceOperation.Update( - updated_fragments=[updated_fragment], + updated_fragments=[updated_fragment0, updated_fragment1], fields_modified=fields_modified, ) updated_dataset = lance.LanceDataset.commit( @@ -222,15 +246,19 @@ updated_dataset = lance.LanceDataset.commit( ) # Verify the update -result = updated_dataset.to_table().to_pydict() -print(result) +dataset = lance.dataset(dataset_uri) +print(dataset.to_table().to_pandas()) ``` Output: ``` - id name score -0 1 Alan 95 -1 2 Bob 90 -2 3 Chase 85 -3 4 David 80 + id name score +0 1 Alan 95 +1 2 Bob 90 +2 3 Chase 85 +3 4 David 80 +4 5 Eva 98 +5 6 Frank 92 +6 7 Gracie 88 +7 8 Henry 82 ``` \ No newline at end of file From 86350cbecbf46bec383a0338f72fa07d8903b1e1 Mon Sep 17 00:00:00 2001 From: xloya Date: Mon, 3 Nov 2025 15:03:10 +0800 Subject: [PATCH 3/3] update --- docs/src/guide/distributed_write.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/src/guide/distributed_write.md b/docs/src/guide/distributed_write.md index 801c37c7a40..a58aec9d5d7 100644 --- a/docs/src/guide/distributed_write.md +++ b/docs/src/guide/distributed_write.md @@ -226,20 +226,21 @@ update_data2 = pa.table( # Update fragment 0 fragment0 = dataset.get_fragment(0) -updated_fragment0, fields_modified = fragment0.update_columns( +updated_fragment0, fields_modified0 = fragment0.update_columns( update_data1, left_on="id", right_on="id" ) # Update fragment 1 fragment1 = dataset.get_fragment(1) -updated_fragment1, _ = fragment1.update_columns( +updated_fragment1, fields_modified1 = fragment1.update_columns( update_data2, left_on="id", right_on="id" ) +union_fields_modified = list(set(fields_modified0 + fields_modified1)) # Commit the changes for both fragments op = lance.LanceOperation.Update( updated_fragments=[updated_fragment0, updated_fragment1], - fields_modified=fields_modified, + fields_modified=union_fields_modified, ) updated_dataset = lance.LanceDataset.commit( str(dataset_uri), op, read_version=dataset.version