Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion java/core/lance-jni/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,17 @@ impl IntoJava for Index {
JObject::null()
};

// Convert base_id from Option<u32> to Integer for Java
let base_id = if let Some(id) = self.base_id {
env.new_object("java/lang/Integer", "(I)V", &[JValue::Int(id as i32)])?
} else {
JObject::null()
};

// Create Index object
Ok(env.new_object(
"com/lancedb/lance/index/Index",
"(Ljava/util/UUID;Ljava/util/List;Ljava/lang/String;J[B[BILjava/time/Instant;)V",
"(Ljava/util/UUID;Ljava/util/List;Ljava/lang/String;J[B[BILjava/time/Instant;Ljava/lang/Integer;)V",
&[
JValue::Object(&uuid),
JValue::Object(&fields),
Expand All @@ -132,6 +139,7 @@ impl IntoJava for Index {
JValue::Object(&index_details),
JValue::Int(self.index_version),
JValue::Object(&created_at),
JValue::Object(&base_id),
],
)?)
}
Expand Down Expand Up @@ -232,6 +240,13 @@ impl FromJObjectWithEnv<Index> for JObject<'_> {
.i()? as u32;
Some(DateTime::from_timestamp(seconds, nanos).unwrap())
};
let base_id_obj = env.get_field(self, "baseId", "Ljava/lang/Integer;")?.l()?;
let base_id = if base_id_obj.is_null() {
None
} else {
let id_value = env.call_method(&base_id_obj, "intValue", "()I", &[])?.i()? as u32;
Some(id_value)
};

Ok(Index {
uuid,
Expand All @@ -242,6 +257,7 @@ impl FromJObjectWithEnv<Index> for JObject<'_> {
index_details,
index_version,
created_at,
base_id,
})
}
}
Expand Down
24 changes: 20 additions & 4 deletions java/core/src/main/java/com/lancedb/lance/index/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class Index {
private final byte[] indexDetails;
private final int indexVersion;
private final Instant createdAt;
private final Integer baseId;

private Index(
UUID uuid,
Expand All @@ -44,7 +45,8 @@ private Index(
byte[] fragmentBitmap,
byte[] indexDetails,
int indexVersion,
Instant createdAt) {
Instant createdAt,
Integer baseId) {
this.uuid = uuid;
this.fields = fields;
this.name = name;
Expand All @@ -53,6 +55,7 @@ private Index(
this.indexDetails = indexDetails;
this.indexVersion = indexVersion;
this.createdAt = createdAt;
this.baseId = baseId;
}

public UUID uuid() {
Expand Down Expand Up @@ -94,6 +97,10 @@ public Optional<byte[]> indexDetails() {
return Optional.ofNullable(indexDetails);
}

public Optional<Integer> baseId() {
return Optional.ofNullable(baseId);
}

/**
* Get the index version.
*
Expand Down Expand Up @@ -124,12 +131,13 @@ public boolean equals(Object o) {
&& Objects.equals(name, index.name)
&& Arrays.equals(fragmentBitmap, index.fragmentBitmap)
&& Arrays.equals(indexDetails, index.indexDetails)
&& Objects.equals(createdAt, index.createdAt);
&& Objects.equals(createdAt, index.createdAt)
&& Objects.equals(baseId, index.baseId);
}

@Override
public int hashCode() {
int result = Objects.hash(uuid, fields, name, datasetVersion, indexVersion, createdAt);
int result = Objects.hash(uuid, fields, name, datasetVersion, indexVersion, createdAt, baseId);
result = 31 * result + Arrays.hashCode(fragmentBitmap);
result = 31 * result + Arrays.hashCode(indexDetails);
return result;
Expand All @@ -144,6 +152,7 @@ public String toString() {
.append("datasetVersion", datasetVersion)
.append("indexVersion", indexVersion)
.append("createdAt", createdAt)
.append("baseId", baseId)
.toString();
}

Expand All @@ -166,6 +175,7 @@ public static class Builder {
private byte[] indexDetails;
private int indexVersion;
private Instant createdAt;
private Integer baseId;

private Builder() {}

Expand Down Expand Up @@ -209,6 +219,11 @@ public Builder createdAt(Instant createdAt) {
return this;
}

public Builder baseId(Integer baseId) {
this.baseId = baseId;
return this;
}

public Index build() {
return new Index(
uuid,
Expand All @@ -218,7 +233,8 @@ public Index build() {
fragmentBitmap,
indexDetails,
indexVersion,
createdAt);
createdAt,
baseId);
}
}
}
4 changes: 4 additions & 0 deletions protos/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ message IndexMetadata {
// This field is optional for backward compatibility. For existing indices created before
// this field was added, this will be None/null.
optional uint64 created_at = 8;

// The base path index of the data file. Used when the file is imported or referred from another dataset.
// Lance use it as key of the base_paths field in Manifest to determine the actual base path of the data file.
optional uint32 base_id = 9;
}

// Index Section, containing a list of index metadata for one dataset version.
Expand Down
3 changes: 2 additions & 1 deletion python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3340,6 +3340,7 @@ class Index:
fragment_ids: Set[int]
index_version: int
created_at: Optional[datetime] = None
base_id: Optional[int] = None


class AutoCleanupConfig(TypedDict):
Expand Down Expand Up @@ -3676,7 +3677,7 @@ class Rewrite(BaseOperation):
"""

groups: Iterable[LanceOperation.RewriteGroup]
rewritten_indices: Iterable[LanceOperation.RewrittenIndex]
rewritten_indices: Iterable[LanceOperation.RewrfittenIndex]

def __post_init__(self):
all_frags = [old for group in self.groups for old in group.old_fragments]
Expand Down
2 changes: 2 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,8 @@ impl Dataset {
dict.set_item("fields", field_names).unwrap();
dict.set_item("version", idx.dataset_version).unwrap();
dict.set_item("fragment_ids", fragment_set).unwrap();
dict.set_item("base_id", idx.base_id.map(|id| id as i64))
.unwrap();
dict.into_py_any(py)
})
.collect::<PyResult<Vec<_>>>()
Expand Down
7 changes: 7 additions & 0 deletions python/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ impl FromPyObject<'_> for PyLance<Index> {
.map(|id| id.extract::<u32>())
.collect::<PyResult<RoaringBitmap>>()?,
);
let base_id: Option<u32> = ob
.getattr("base_id")?
.extract::<Option<i64>>()?
.map(|id| id as u32);

Ok(Self(Index {
uuid: Uuid::parse_str(&uuid).map_err(|e| PyValueError::new_err(e.to_string()))?,
Expand All @@ -48,6 +52,7 @@ impl FromPyObject<'_> for PyLance<Index> {
index_details: None,
index_version,
created_at,
base_id,
}))
}
}
Expand Down Expand Up @@ -78,6 +83,7 @@ impl<'py> IntoPyObject<'py> for PyLance<&Index> {
},
);
let created_at = self.0.created_at;
let base_id = self.0.base_id.map(|id| id as i64);

let cls = namespace
.getattr("Index")
Expand All @@ -90,6 +96,7 @@ impl<'py> IntoPyObject<'py> for PyLance<&Index> {
fragment_ids,
index_version,
created_at,
base_id,
))
}
}
Expand Down
7 changes: 7 additions & 0 deletions rust/lance-table/src/format/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use uuid::Uuid;

use super::pb;
use lance_core::{Error, Result};

/// Index metadata
#[derive(Debug, Clone, PartialEq)]
pub struct Index {
Expand Down Expand Up @@ -47,6 +48,10 @@ pub struct Index {
/// This field is optional for backward compatibility. For existing indices created before
/// this field was added, this will be None.
pub created_at: Option<DateTime<Utc>>,

/// The base path index of the index files. Used when the index is imported or referred from another dataset.
/// Lance uses it as key of the base_paths field in Manifest to determine the actual base path of the index files.
pub base_id: Option<u32>,
}

impl Index {
Expand Down Expand Up @@ -102,6 +107,7 @@ impl TryFrom<pb::IndexMetadata> for Index {
DateTime::from_timestamp_millis(ts as i64)
.expect("Invalid timestamp in index metadata")
}),
base_id: proto.base_id,
})
}
}
Expand All @@ -127,6 +133,7 @@ impl From<&Index> for pb::IndexMetadata {
index_details: idx.index_details.clone(),
index_version: Some(idx.index_version),
created_at: idx.created_at.map(|dt| dt.timestamp_millis() as u64),
base_id: idx.base_id,
}
}
}
20 changes: 11 additions & 9 deletions rust/lance-table/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,16 @@ impl Manifest {
}
}

/// Performs a shallow_clone of the manifest entirely in memory without:
/// - Any persistent storage operations
/// - Modifications to the original data
pub fn shallow_clone(
&self,
ref_name: Option<String>,
ref_path: String,
ref_base_id: u32,
Comment thread
jackye1995 marked this conversation as resolved.
transaction_file: String,
) -> Self {
let new_base_id = self.base_paths.keys().max().map(|id| *id + 1).unwrap_or(0);
let cloned_fragments = self
.fragments
.as_ref()
Expand All @@ -202,13 +205,13 @@ impl Manifest {
.files
.into_iter()
.map(|mut file| {
file.base_id = Some(new_base_id);
file.base_id = Some(ref_base_id);
file
})
.collect();

if let Some(mut deletion) = cloned_fragment.deletion_file.take() {
deletion.base_id = Some(new_base_id);
deletion.base_id = Some(ref_base_id);
cloned_fragment.deletion_file = Some(deletion);
}

Expand All @@ -223,12 +226,11 @@ impl Manifest {
writer_version: self.writer_version.clone(),
fragments: Arc::new(cloned_fragments),
version_aux_data: self.version_aux_data,
// TODO: apply shallow clone to indexes
index_section: None,
index_section: None, // These will be set on commit
timestamp_nanos: self.timestamp_nanos,
reader_feature_flags: self.reader_feature_flags,
tag: None,
writer_feature_flags: self.writer_feature_flags,
reader_feature_flags: 0, // These will be set on commit
writer_feature_flags: 0, // These will be set on commit
max_fragment_id: self.max_fragment_id,
transaction_file: Some(transaction_file),
fragment_offsets: self.fragment_offsets.clone(),
Expand All @@ -239,12 +241,12 @@ impl Manifest {
base_paths: {
let mut base_paths = self.base_paths.clone();
let base_path = BasePath {
id: new_base_id,
id: ref_base_id,
name: ref_name,
is_dataset_root: true,
path: ref_path,
};
base_paths.insert(new_base_id, base_path);
base_paths.insert(ref_base_id, base_path);
base_paths
},
}
Expand Down
26 changes: 26 additions & 0 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,32 @@ impl Dataset {
}
}

/// Get the indices directory for a specific index, considering its base_id
pub(crate) fn indice_files_dir(&self, index: &Index) -> Result<Path> {
match index.base_id.as_ref() {
Some(base_id) => {
let base_paths = &self.manifest.base_paths;
let base_path = base_paths.get(base_id).ok_or_else(|| {
Error::invalid_input(
format!(
"base_path id {} not found for index {}",
base_id, index.uuid
),
location!(),
)
})?;
let path = Path::parse(base_path.path.as_str())?;
if base_path.is_dataset_root {
Ok(path.child(INDICES_DIR))
} else {
// For non-dataset-root base paths, we assume the path already points to the indices directory
Ok(path)
}
}
None => Ok(self.base.child(INDICES_DIR)),
}
}

pub fn session(&self) -> Arc<Session> {
self.session.clone()
}
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/optimize/remapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ async fn remap_index(dataset: &mut Dataset, index_id: &Uuid) -> Result<()> {
index_details: curr_index_meta.index_details.clone(),
index_version: curr_index_meta.index_version,
created_at: curr_index_meta.created_at,
base_id: None,
};

let transaction = Transaction::new(
Expand Down
Loading
Loading