Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 102 additions & 45 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,32 @@ pub struct BlockingDataset {
}

impl BlockingDataset {
/// Get the storage options provider that was used when opening this dataset
pub fn get_storage_options_provider(&self) -> Option<Arc<dyn StorageOptionsProvider>> {
self.inner.storage_options_provider()
/// Get the initial storage options used to open this dataset.
///
/// Returns the options that were provided when the dataset was opened,
/// without any refresh from the provider. Returns None if no storage options
/// were provided.
pub fn initial_storage_options(&self) -> Option<HashMap<String, String>> {
self.inner.initial_storage_options().cloned()
}

/// Get the latest storage options, potentially refreshed from the provider.
///
/// If a storage options provider was configured and credentials are expiring,
/// this will refresh them.
pub fn latest_storage_options(&self) -> Result<Option<HashMap<String, String>>> {
RT.block_on(async { self.inner.latest_storage_options().await })
.map(|opt| opt.map(|opts| opts.0))
.map_err(|e| Error::io_error(e.to_string()))
}

pub fn drop(uri: &str, storage_options: HashMap<String, String>) -> Result<()> {
RT.block_on(async move {
let registry = Arc::new(ObjectStoreRegistry::default());
let object_store_params = ObjectStoreParams {
storage_options: Some(storage_options.clone()),
storage_options_accessor: Some(Arc::new(
lance::io::StorageOptionsAccessor::with_static_options(storage_options),
)),
..Default::default()
};
let (object_store, path) =
Expand Down Expand Up @@ -117,20 +133,29 @@ impl BlockingDataset {
storage_options: HashMap<String, String>,
serialized_manifest: Option<&[u8]>,
storage_options_provider: Option<Arc<dyn StorageOptionsProvider>>,
s3_credentials_refresh_offset_seconds: Option<u64>,
) -> Result<Self> {
let mut store_params = ObjectStoreParams {
// Create storage options accessor from storage_options and provider
let accessor = match (storage_options.is_empty(), storage_options_provider) {
(false, Some(provider)) => Some(Arc::new(
lance::io::StorageOptionsAccessor::with_initial_and_provider(
storage_options,
provider,
),
)),
(false, None) => Some(Arc::new(
lance::io::StorageOptionsAccessor::with_static_options(storage_options),
)),
(true, Some(provider)) => Some(Arc::new(
lance::io::StorageOptionsAccessor::with_provider(provider),
)),
(true, None) => None,
};

let store_params = ObjectStoreParams {
block_size: block_size.map(|size| size as usize),
storage_options: Some(storage_options.clone()),
storage_options_accessor: accessor,
..Default::default()
};
if let Some(offset_seconds) = s3_credentials_refresh_offset_seconds {
store_params.s3_credentials_refresh_offset =
std::time::Duration::from_secs(offset_seconds);
}
if let Some(provider) = storage_options_provider.clone() {
store_params.storage_options_provider = Some(provider);
}
let params = ReadParams {
index_cache_size_bytes: index_cache_size_bytes as usize,
metadata_cache_size_bytes: metadata_cache_size_bytes as usize,
Expand All @@ -143,14 +168,6 @@ impl BlockingDataset {
if let Some(ver) = version {
builder = builder.with_version(ver as u64);
}
builder = builder.with_storage_options(storage_options);
if let Some(provider) = storage_options_provider.clone() {
builder = builder.with_storage_options_provider(provider)
}
if let Some(offset_seconds) = s3_credentials_refresh_offset_seconds {
builder = builder
.with_s3_credentials_refresh_offset(std::time::Duration::from_secs(offset_seconds));
}

if let Some(serialized_manifest) = serialized_manifest {
builder = builder.with_serialized_manifest(serialized_manifest)?;
Expand All @@ -166,12 +183,19 @@ impl BlockingDataset {
read_version: Option<u64>,
storage_options: HashMap<String, String>,
) -> Result<Self> {
let accessor = if storage_options.is_empty() {
None
} else {
Some(Arc::new(
lance::io::StorageOptionsAccessor::with_static_options(storage_options),
))
};
let inner = RT.block_on(Dataset::commit(
uri,
operation,
read_version,
Some(ObjectStoreParams {
storage_options: Some(storage_options),
storage_options_accessor: accessor,
..Default::default()
}),
None,
Expand Down Expand Up @@ -335,7 +359,6 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiSchema<'local>(
data_storage_version: JObject, // Optional<String>
enable_v2_manifest_paths: JObject, // Optional<Boolean>
storage_options_obj: JObject, // Map<String, String>
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
initial_bases: JObject,
target_bases: JObject,
) -> JObject<'local> {
Expand All @@ -353,7 +376,6 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiSchema<'local>(
data_storage_version,
enable_v2_manifest_paths,
storage_options_obj,
s3_credentials_refresh_offset_seconds_obj,
initial_bases,
target_bases,
)
Expand All @@ -373,7 +395,6 @@ fn inner_create_with_ffi_schema<'local>(
data_storage_version: JObject, // Optional<String>
enable_v2_manifest_paths: JObject, // Optional<Boolean>
storage_options_obj: JObject, // Map<String, String>
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
initial_bases: JObject,
target_bases: JObject,
) -> Result<JObject<'local>> {
Expand All @@ -394,7 +415,6 @@ fn inner_create_with_ffi_schema<'local>(
enable_v2_manifest_paths,
storage_options_obj,
JObject::null(), // No provider for schema-only creation
s3_credentials_refresh_offset_seconds_obj,
initial_bases,
target_bases,
reader,
Expand Down Expand Up @@ -447,7 +467,6 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>(
data_storage_version: JObject, // Optional<String>
enable_v2_manifest_paths: JObject, // Optional<Boolean>
storage_options_obj: JObject, // Map<String, String>
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
initial_bases: JObject,
target_bases: JObject,
) -> JObject<'local> {
Expand All @@ -466,7 +485,6 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>(
data_storage_version,
storage_options_obj,
JObject::null(),
s3_credentials_refresh_offset_seconds_obj,
initial_bases,
target_bases
)
Expand All @@ -488,7 +506,6 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStreamAndProvider<'lo
enable_v2_manifest_paths: JObject, // Optional<Boolean>
storage_options_obj: JObject, // Map<String, String>
storage_options_provider_obj: JObject, // Optional<StorageOptionsProvider>
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
initial_bases: JObject, // Optional<List<BasePath>>
target_bases: JObject, // Optional<List<String>>
) -> JObject<'local> {
Expand All @@ -507,7 +524,6 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStreamAndProvider<'lo
enable_v2_manifest_paths,
storage_options_obj,
storage_options_provider_obj,
s3_credentials_refresh_offset_seconds_obj,
initial_bases,
target_bases
)
Expand All @@ -528,7 +544,6 @@ fn inner_create_with_ffi_stream<'local>(
enable_v2_manifest_paths: JObject, // Optional<Boolean>
storage_options_obj: JObject, // Map<String, String>
storage_options_provider_obj: JObject, // Optional<StorageOptionsProvider>
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
initial_bases: JObject, // Optional<List<BasePath>>
target_bases: JObject, // Optional<List<String>>
) -> Result<JObject<'local>> {
Expand All @@ -546,7 +561,6 @@ fn inner_create_with_ffi_stream<'local>(
enable_v2_manifest_paths,
storage_options_obj,
storage_options_provider_obj,
s3_credentials_refresh_offset_seconds_obj,
initial_bases,
target_bases,
reader,
Expand All @@ -566,7 +580,6 @@ fn create_dataset<'local>(
enable_v2_manifest_paths: JObject,
storage_options_obj: JObject,
storage_options_provider_obj: JObject, // Optional<StorageOptionsProvider>
s3_credentials_refresh_offset_seconds_obj: JObject,
initial_bases: JObject,
target_bases: JObject,
reader: impl RecordBatchReader + Send + 'static,
Expand All @@ -584,7 +597,6 @@ fn create_dataset<'local>(
Some(&enable_v2_manifest_paths),
&storage_options_obj,
&storage_options_provider_obj,
&s3_credentials_refresh_offset_seconds_obj,
&initial_bases,
&target_bases,
)?;
Expand Down Expand Up @@ -1061,7 +1073,6 @@ pub extern "system" fn Java_org_lance_Dataset_openNative<'local>(
storage_options_obj: JObject, // Map<String, String>
serialized_manifest: JObject, // Optional<ByteBuffer>
storage_options_provider_obj: JObject, // Optional<StorageOptionsProvider>
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
) -> JObject<'local> {
ok_or_throw!(
env,
Expand All @@ -1075,7 +1086,6 @@ pub extern "system" fn Java_org_lance_Dataset_openNative<'local>(
storage_options_obj,
serialized_manifest,
storage_options_provider_obj,
s3_credentials_refresh_offset_seconds_obj
)
)
}
Expand All @@ -1091,7 +1101,6 @@ fn inner_open_native<'local>(
storage_options_obj: JObject, // Map<String, String>
serialized_manifest: JObject, // Optional<ByteBuffer>
storage_options_provider_obj: JObject, // Optional<StorageOptionsProvider>
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
) -> Result<JObject<'local>> {
let path_str: String = path.extract(env)?;
let version = env.get_int_opt(&version_obj)?;
Expand All @@ -1108,11 +1117,6 @@ fn inner_open_native<'local>(
let storage_options_provider_arc =
storage_options_provider.map(|v| Arc::new(v) as Arc<dyn StorageOptionsProvider>);

// Extract s3_credentials_refresh_offset_seconds
let s3_credentials_refresh_offset_seconds = env
.get_long_opt(&s3_credentials_refresh_offset_seconds_obj)?
.map(|v| v as u64);

let serialized_manifest = env.get_bytes_opt(&serialized_manifest)?;
let dataset = BlockingDataset::open(
&path_str,
Expand All @@ -1123,7 +1127,6 @@ fn inner_open_native<'local>(
storage_options,
serialized_manifest,
storage_options_provider_arc,
s3_credentials_refresh_offset_seconds,
)?;
dataset.into_java(env)
}
Expand Down Expand Up @@ -1319,6 +1322,58 @@ fn inner_latest_version_id(env: &mut JNIEnv, java_dataset: JObject) -> Result<u6
dataset_guard.latest_version()
}

#[no_mangle]
pub extern "system" fn Java_org_lance_Dataset_nativeGetInitialStorageOptions<'local>(
mut env: JNIEnv<'local>,
java_dataset: JObject,
) -> JObject<'local> {
ok_or_throw!(
env,
inner_get_initial_storage_options(&mut env, java_dataset)
)
}

fn inner_get_initial_storage_options<'local>(
env: &mut JNIEnv<'local>,
java_dataset: JObject,
) -> Result<JObject<'local>> {
let storage_options = {
let dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
dataset_guard.initial_storage_options()
};
match storage_options {
Some(opts) => opts.into_java(env),
None => Ok(JObject::null()),
}
}

#[no_mangle]
pub extern "system" fn Java_org_lance_Dataset_nativeGetLatestStorageOptions<'local>(
mut env: JNIEnv<'local>,
java_dataset: JObject,
) -> JObject<'local> {
ok_or_throw!(
env,
inner_get_latest_storage_options(&mut env, java_dataset)
)
}

fn inner_get_latest_storage_options<'local>(
env: &mut JNIEnv<'local>,
java_dataset: JObject,
) -> Result<JObject<'local>> {
let storage_options = {
let dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
dataset_guard.latest_storage_options()?
};
match storage_options {
Some(opts) => opts.into_java(env),
None => Ok(JObject::null()),
}
}

#[no_mangle]
pub extern "system" fn Java_org_lance_Dataset_nativeCheckoutLatest(
mut env: JNIEnv,
Expand Down Expand Up @@ -2178,7 +2233,9 @@ fn transform_jstorage_options(
Ok(storage_options
.map(|options| {
Some(ObjectStoreParams {
storage_options: Some(options),
storage_options_accessor: Some(Arc::new(
lance::io::StorageOptionsAccessor::with_static_options(options),
)),
..Default::default()
})
})
Expand Down
4 changes: 3 additions & 1 deletion java/lance-jni/src/file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ fn inner_open<'local>(
let storage_options = to_rust_map(env, &jmap)?;
let reader = RT.block_on(async move {
let object_params = ObjectStoreParams {
storage_options: Some(storage_options),
storage_options_accessor: Some(Arc::new(
lance::io::StorageOptionsAccessor::with_static_options(storage_options),
)),
..Default::default()
};
let (obj_store, path) = ObjectStore::from_uri_and_params(
Expand Down
4 changes: 3 additions & 1 deletion java/lance-jni/src/file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ fn inner_open<'local>(

let writer = RT.block_on(async move {
let object_params = ObjectStoreParams {
storage_options: Some(storage_options),
storage_options_accessor: Some(Arc::new(
lance::io::StorageOptionsAccessor::with_static_options(storage_options),
)),
..Default::default()
};
let (obj_store, path) = ObjectStore::from_uri_and_params(
Expand Down
Loading
Loading