diff --git a/docs/src/guide/distributed_write.md b/docs/src/guide/distributed_write.md index 34bc52562a5..a58aec9d5d7 100644 --- a/docs/src/guide/distributed_write.md +++ b/docs/src/guide/distributed_write.md @@ -166,4 +166,100 @@ Output: 3 craig 55 5 4 dave 66 4 5 eve 77 3 -``` \ No newline at end of file +``` + +## Update Columns + +Currently, Lance supports the fragment level update columns ability to update existing columns in a distributed manner. + +This operation performs a left-outer-hash-join with the right table (new data) +on the column specified by `left_on` and `right_on`. For every row in the current +fragment, the updated column value is: +1. If no matched row on the right side, the column value of the left side row. +2. If there is exactly one corresponding row on the right side, the column value + of the matching row. +3. If there are multiple corresponding rows, the column value of a random row. + +```python +import lance +import pyarrow as pa + +# Create initial dataset with two fragments +# First fragment +data1 = pa.table( + { + "id": [1, 2, 3, 4], + "name": ["Alice", "Bob", "Charlie", "David"], + "score": [85, 90, 75, 80], + } +) +dataset_uri = "./my_dataset.lance" +dataset = lance.write_dataset(data1, dataset_uri) + +# Second fragment +data2 = pa.table( + { + "id": [5, 6, 7, 8], + "name": ["Eve", "Frank", "Grace", "Henry"], + "score": [88, 92, 78, 82], + } +) +dataset = lance.write_dataset(data2, dataset_uri, mode="append") + +# Prepare update data for fragment 0 using 'id' as join key +update_data1 = pa.table( + { + "id": [1, 3], + "name": ["Alan", "Chase"], + "score": [95, 85], + } +) + +# Prepare update data for fragment 1 +update_data2 = pa.table( + { + "id": [5, 7], + "name": ["Eva", "Gracie"], + "score": [98, 88], + } +) + +# Update fragment 0 +fragment0 = dataset.get_fragment(0) +updated_fragment0, fields_modified0 = fragment0.update_columns( + update_data1, left_on="id", right_on="id" +) + +# Update fragment 1 +fragment1 = dataset.get_fragment(1) +updated_fragment1, fields_modified1 = fragment1.update_columns( + update_data2, left_on="id", right_on="id" +) + +union_fields_modified = list(set(fields_modified0 + fields_modified1)) +# Commit the changes for both fragments +op = lance.LanceOperation.Update( + updated_fragments=[updated_fragment0, updated_fragment1], + fields_modified=union_fields_modified, +) +updated_dataset = lance.LanceDataset.commit( + str(dataset_uri), op, read_version=dataset.version +) + +# Verify the update +dataset = lance.dataset(dataset_uri) +print(dataset.to_table().to_pandas()) +``` + +Output: +``` + id name score +0 1 Alan 95 +1 2 Bob 90 +2 3 Chase 85 +3 4 David 80 +4 5 Eva 98 +5 6 Frank 92 +6 7 Gracie 88 +7 8 Henry 82 +``` \ No newline at end of file