Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions java/core/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,19 @@ impl BlockingDataset {
Ok(indexes)
}

pub fn update_config(
&mut self,
upsert_values: impl Iterator<Item = (String, String)>,
) -> Result<()> {
RT.block_on(self.inner.update_config(upsert_values))?;
Ok(())
}

pub fn delete_config_keys(&mut self, delete_keys: &[&str]) -> Result<()> {
RT.block_on(self.inner.delete_config_keys(delete_keys))?;
Ok(())
}

pub fn close(&self) {}
}

Expand Down Expand Up @@ -991,6 +1004,91 @@ fn inner_list_indexes<'local>(
Ok(array_list)
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeGetConfig<'local>(
mut env: JNIEnv<'local>,
java_dataset: JObject,
) -> JObject<'local> {
ok_or_throw!(env, inner_get_config(&mut env, java_dataset))
}

fn inner_get_config<'local>(
env: &mut JNIEnv<'local>,
java_dataset: JObject,
) -> Result<JObject<'local>> {
let config = {
let dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
dataset_guard.inner.config()?
};

let java_hashmap = env
.new_object("java/util/HashMap", "()V", &[])
.expect("Failed to create Java HashMap");

for (k, v) in config {
let java_key = env
.new_string(&k)
.expect("Failed to create Java String (key)");
let java_value = env
.new_string(&v)
.expect("Failed to create Java String (value)");

env.call_method(
&java_hashmap,
"put",
"(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;",
&[JValue::Object(&java_key), JValue::Object(&java_value)],
)
.expect("Failed to call HashMap.put()");
}

Ok(java_hashmap)
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeUpdateConfig(
mut env: JNIEnv,
java_dataset: JObject,
config_map: JObject,
) {
ok_or_throw_without_return!(env, inner_update_config(&mut env, java_dataset, config_map))
}

fn inner_update_config(env: &mut JNIEnv, java_dataset: JObject, config_map: JObject) -> Result<()> {
let jmap = JMap::from_env(env, &config_map)?;
let config = to_rust_map(env, &jmap)?;
let mut dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
dataset_guard.update_config(config.into_iter())?;
Ok(())
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeDeleteConfigKeys(
mut env: JNIEnv,
java_dataset: JObject,
config_keys: JObject,
) {
ok_or_throw_without_return!(
env,
inner_delete_config_keys(&mut env, java_dataset, config_keys)
)
}

fn inner_delete_config_keys(
env: &mut JNIEnv,
java_dataset: JObject,
config_keys: JObject,
) -> Result<()> {
let keys: Vec<String> = env.get_strings(&config_keys)?;
let mut dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
let key_slice: &[&str] = &keys.iter().map(String::as_str).collect::<Vec<_>>();
dataset_guard.delete_config_keys(key_slice)?;
Ok(())
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeTake(
mut env: JNIEnv,
Expand Down
13 changes: 1 addition & 12 deletions java/core/lance-jni/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,7 @@ pub fn extract_storage_options(
storage_options_obj: &JObject,
) -> Result<HashMap<String, String>> {
let jmap = JMap::from_env(env, storage_options_obj)?;
let storage_options: HashMap<String, String> = env.with_local_frame(16, |env| {
let mut map = HashMap::new();
let mut iter = jmap.iter(env)?;
while let Some((key, value)) = iter.next(env)? {
let key_jstring = JString::from(key);
let value_jstring = JString::from(value);
let key_string: String = env.get_string(&key_jstring)?.into();
let value_string: String = env.get_string(&value_jstring)?.into();
map.insert(key_string, value_string);
}
Ok::<_, Error>(map)
})?;
let storage_options: HashMap<String, String> = to_rust_map(env, &jmap)?;
Ok(storage_options)
}

Expand Down
49 changes: 47 additions & 2 deletions java/core/src/main/java/com/lancedb/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -526,8 +528,8 @@ public Dataset checkoutVersion(long version) {
* Checks out a specific tag of the dataset. If the underlying version is already checked out, it
* returns a new Java Dataset object pointing to the same underlying Rust Dataset object
*
* @param tag
* @return
* @param tag the tag to check out
* @return a new Dataset instance with the specified tag checked out
*/
public Dataset checkoutTag(String tag) {
Preconditions.checkArgument(tag != null, "Tag can not be null");
Expand Down Expand Up @@ -675,6 +677,49 @@ public List<String> listIndexes() {

private native List<String> nativeListIndexes();

/**
* Get the table config of the dataset.
*
* @return the table config
*/
public Map<String, String> getConfig() {
Comment thread
jackye1995 marked this conversation as resolved.
try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) {
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
return nativeGetConfig();
}
}

private native Map<String, String> nativeGetConfig();

/**
* Update the config of the dataset. This operation will only overwrite and NOT delete the
* existing config.
*
* @param tableConfig the config to update
*/
public void updateConfig(Map<String, String> tableConfig) {
try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) {
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
nativeUpdateConfig(tableConfig);
}
}

private native void nativeUpdateConfig(Map<String, String> config);

/**
* Delete the config keys of the dataset.
*
* @param deleteKeys the config keys to delete
*/
public void deleteConfigKeys(Set<String> deleteKeys) {
try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) {
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
nativeDeleteConfigKeys(new ArrayList<>(deleteKeys));
}
}

private native void nativeDeleteConfigKeys(List<String> deleteKeys);

/**
* Closes this dataset and releases any system resources associated with it. If the dataset is
* already closed, then invoking this method has no effect.
Expand Down
73 changes: 73 additions & 0 deletions java/core/src/test/java/com/lancedb/lance/DatasetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -763,4 +763,77 @@ void testDeleteRows() {
}
}
}

@Test
Comment thread
jackye1995 marked this conversation as resolved.
void testUpdateConfig() {
String testMethodName = new Object() {}.getClass().getEnclosingMethod().getName();
String datasetPath = tempDir.resolve(testMethodName).toString();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
dataset = testDataset.createEmptyDataset();
assertEquals(1, dataset.version());
Map<String, String> originalConfig = dataset.getConfig();
Map<String, String> updateConfig = new HashMap<>();
updateConfig.put("key1", "value1");
updateConfig.put("key2", "value2");
dataset.updateConfig(updateConfig);
originalConfig.putAll(updateConfig);
assertEquals(2, dataset.version());
Map<String, String> currentConfig = dataset.getConfig();
for (String configKey : currentConfig.keySet()) {
assertEquals(currentConfig.get(configKey), originalConfig.get(configKey));
}
assertEquals(originalConfig.size(), currentConfig.size());

Map<String, String> updateConfig2 = new HashMap<>();
updateConfig2.put("key1", "value3");
dataset.updateConfig(updateConfig2);
currentConfig = dataset.getConfig();
originalConfig.putAll(updateConfig2);
assertEquals(3, dataset.version());
for (String configKey : currentConfig.keySet()) {
assertEquals(currentConfig.get(configKey), originalConfig.get(configKey));
}
assertEquals(originalConfig.size(), currentConfig.size());
}
}

@Test
void testDeleteConfigKeys() {
String testMethodName = new Object() {}.getClass().getEnclosingMethod().getName();
String datasetPath = tempDir.resolve(testMethodName).toString();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
dataset = testDataset.createEmptyDataset();
assertEquals(1, dataset.version());
Map<String, String> originalConfig = dataset.getConfig();
Map<String, String> config = new HashMap<>();
config.put("key1", "value1");
config.put("key2", "value2");
dataset.updateConfig(config);
assertEquals(2, dataset.version());
Map<String, String> currentConfig = dataset.getConfig();
assertTrue(currentConfig.keySet().containsAll(config.keySet()));
assertEquals(originalConfig.size() + 2, currentConfig.size());

Set<String> deleteKeys = new HashSet<>();
deleteKeys.add("key1");
dataset.deleteConfigKeys(deleteKeys);
assertEquals(3, dataset.version());
originalConfig = currentConfig;
currentConfig = dataset.getConfig();
assertEquals(originalConfig.size() - 1, currentConfig.size());
assertTrue(currentConfig.containsKey("key2"));
assertFalse(currentConfig.containsKey("key1"));
deleteKeys.add("key2");
dataset.deleteConfigKeys(deleteKeys);
assertEquals(4, dataset.version());
currentConfig = dataset.getConfig();
assertEquals(originalConfig.size() - 2, currentConfig.size());
assertFalse(currentConfig.containsKey("key2"));
assertFalse(currentConfig.containsKey("key1"));
}
}
}