Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 97 additions & 1 deletion docs/src/guide/distributed_write.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,100 @@ Output:
3 craig 55 5
4 dave 66 4
5 eve 77 3
```
```

## Update Columns

Currently, Lance supports the fragment level update columns ability to update existing columns in a distributed manner.

This operation performs a left-outer-hash-join with the right table (new data)
on the column specified by `left_on` and `right_on`. For every row in the current
fragment, the updated column value is:
1. If no matched row on the right side, the column value of the left side row.
2. If there is exactly one corresponding row on the right side, the column value
of the matching row.
3. If there are multiple corresponding rows, the column value of a random row.

```python
import lance
import pyarrow as pa

# Create initial dataset with two fragments
# First fragment
data1 = pa.table(
{
"id": [1, 2, 3, 4],
"name": ["Alice", "Bob", "Charlie", "David"],
"score": [85, 90, 75, 80],
}
)
dataset_uri = "./my_dataset.lance"
dataset = lance.write_dataset(data1, dataset_uri)

# Second fragment
data2 = pa.table(
{
"id": [5, 6, 7, 8],
"name": ["Eve", "Frank", "Grace", "Henry"],
"score": [88, 92, 78, 82],
}
)
dataset = lance.write_dataset(data2, dataset_uri, mode="append")

# Prepare update data for fragment 0 using 'id' as join key
update_data1 = pa.table(
{
"id": [1, 3],
"name": ["Alan", "Chase"],
"score": [95, 85],
}
)

# Prepare update data for fragment 1
update_data2 = pa.table(
{
"id": [5, 7],
"name": ["Eva", "Gracie"],
"score": [98, 88],
}
)

# Update fragment 0
fragment0 = dataset.get_fragment(0)
updated_fragment0, fields_modified0 = fragment0.update_columns(
update_data1, left_on="id", right_on="id"
)

# Update fragment 1
fragment1 = dataset.get_fragment(1)
updated_fragment1, fields_modified1 = fragment1.update_columns(
update_data2, left_on="id", right_on="id"
)

union_fields_modified = list(set(fields_modified0 + fields_modified1))
# Commit the changes for both fragments
op = lance.LanceOperation.Update(
updated_fragments=[updated_fragment0, updated_fragment1],
fields_modified=union_fields_modified,
)
updated_dataset = lance.LanceDataset.commit(
str(dataset_uri), op, read_version=dataset.version
)

# Verify the update
dataset = lance.dataset(dataset_uri)
print(dataset.to_table().to_pandas())
```

Output:
```
id name score
0 1 Alan 95
1 2 Bob 90
2 3 Chase 85
3 4 David 80
4 5 Eva 98
5 6 Frank 92
6 7 Gracie 88
7 8 Henry 82
```