Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 0 additions & 56 deletions bindings/java/src/blocking_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,49 +15,17 @@
// specific language governing permissions and limitations
// under the License.

use std::str::FromStr;

use jni::objects::JByteArray;
use jni::objects::JClass;
use jni::objects::JObject;
use jni::objects::JString;
use jni::sys::{jbyteArray, jlong};
use jni::JNIEnv;

use opendal::layers::BlockingLayer;
use opendal::BlockingOperator;
use opendal::Operator;
use opendal::Scheme;

use crate::get_global_runtime;
use crate::jmap_to_hashmap;
use crate::make_operator_info;
use crate::Result;

#[no_mangle]
pub extern "system" fn Java_org_apache_opendal_BlockingOperator_constructor(
mut env: JNIEnv,
_: JClass,
scheme: JString,
map: JObject,
) -> jlong {
intern_constructor(&mut env, scheme, map).unwrap_or_else(|e| {
e.throw(&mut env);
0
})
}

fn intern_constructor(env: &mut JNIEnv, scheme: JString, map: JObject) -> Result<jlong> {
let scheme = Scheme::from_str(env.get_string(&scheme)?.to_str()?)?;
let map = jmap_to_hashmap(env, &map)?;
let mut op = Operator::via_map(scheme, map)?;
if !op.info().full_capability().blocking {
let _guard = unsafe { get_global_runtime() }.enter();
op = op.layer(BlockingLayer::create()?);
}
Ok(Box::into_raw(Box::new(op.blocking())) as jlong)
}

/// # Safety
///
/// This function should not be called before the Operator are ready.
Expand Down Expand Up @@ -161,27 +129,3 @@ fn intern_delete(env: &mut JNIEnv, op: &mut BlockingOperator, path: JString) ->
let path = env.get_string(&path)?;
Ok(op.delete(path.to_str()?)?)
}

/// # Safety
///
/// This function should not be called before the Operator are ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_info<'local>(
mut env: JNIEnv<'local>,
_: JClass,
op: *mut BlockingOperator,
) -> JObject<'local> {
intern_info(&mut env, &mut *op).unwrap_or_else(|e| {
e.throw(&mut env);
JObject::null()
})
}

fn intern_info<'local>(
env: &mut JNIEnv<'local>,
op: &mut BlockingOperator,
) -> Result<JObject<'local>> {
let info = op.info();

make_operator_info(env, info)
}
27 changes: 13 additions & 14 deletions bindings/java/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ unsafe fn get_global_runtime<'local>() -> &'local Runtime {
RUNTIME.get_unchecked()
}

fn usize_to_jlong(n: Option<usize>) -> jlong {
// usize is always >= 0, so we can use -1 to identify the empty value.
n.map_or(-1, |v| v as jlong)
}

fn jmap_to_hashmap(env: &mut JNIEnv, params: &JObject) -> Result<HashMap<String, String>> {
let map = JMap::from_env(env, params)?;
let mut iter = map.iter(env)?;
Expand Down Expand Up @@ -148,17 +153,15 @@ fn make_presigned_request<'a>(env: &mut JNIEnv<'a>, req: PresignedRequest) -> Re
}

fn make_operator_info<'a>(env: &mut JNIEnv<'a>, info: OperatorInfo) -> Result<JObject<'a>> {
let operator_info_class = env.find_class("org/apache/opendal/OperatorInfo")?;

let schema = env.new_string(info.scheme().to_string())?;
let root = env.new_string(info.root().to_string())?;
let name = env.new_string(info.name().to_string())?;
let full_capability_obj = make_capability(env, info.full_capability())?;
let native_capability_obj = make_capability(env, info.native_capability())?;

let operator_info_obj = env
let result = env
.new_object(
operator_info_class,
"org/apache/opendal/OperatorInfo",
"(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Lorg/apache/opendal/Capability;Lorg/apache/opendal/Capability;)V",
&[
JValue::Object(&schema),
Expand All @@ -168,15 +171,12 @@ fn make_operator_info<'a>(env: &mut JNIEnv<'a>, info: OperatorInfo) -> Result<JO
JValue::Object(&native_capability_obj),
],
)?;

Ok(operator_info_obj)
Ok(result)
}

fn make_capability<'a>(env: &mut JNIEnv<'a>, cap: Capability) -> Result<JObject<'a>> {
let capability_class = env.find_class("org/apache/opendal/Capability")?;

let capability = env.new_object(
capability_class,
"org/apache/opendal/Capability",
"(ZZZZZZZZZZZZZZZZZZJJJZZZZZZZZZZZZZZZJZ)V",
&[
JValue::Bool(cap.stat as jboolean),
Expand All @@ -197,9 +197,9 @@ fn make_capability<'a>(env: &mut JNIEnv<'a>, cap: Capability) -> Result<JObject<
JValue::Bool(cap.write_with_content_type as jboolean),
JValue::Bool(cap.write_with_content_disposition as jboolean),
JValue::Bool(cap.write_with_cache_control as jboolean),
JValue::Long(cap.write_multi_max_size.map_or(-1, |v| v as jlong)),
JValue::Long(cap.write_multi_min_size.map_or(-1, |v| v as jlong)),
JValue::Long(cap.write_multi_align_size.map_or(-1, |v| v as jlong)),
JValue::Long(usize_to_jlong(cap.write_multi_max_size)),
JValue::Long(usize_to_jlong(cap.write_multi_min_size)),
JValue::Long(usize_to_jlong(cap.write_multi_align_size)),
JValue::Bool(cap.create_dir as jboolean),
JValue::Bool(cap.delete as jboolean),
JValue::Bool(cap.copy as jboolean),
Expand All @@ -215,10 +215,9 @@ fn make_capability<'a>(env: &mut JNIEnv<'a>, cap: Capability) -> Result<JObject<
JValue::Bool(cap.presign_write as jboolean),
JValue::Bool(cap.batch as jboolean),
JValue::Bool(cap.batch_delete as jboolean),
JValue::Long(cap.batch_max_operations.map_or(-1, |v| v as jlong)),
JValue::Long(usize_to_jlong(cap.batch_max_operations)),
JValue::Bool(cap.blocking as jboolean),
],
)?;

Ok(capability)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
* accesses data synchronously.
*/
public class BlockingOperator extends NativeObject {
public final OperatorInfo info;

/**
* Construct an OpenDAL blocking operator:
*
Expand All @@ -37,8 +39,15 @@ public class BlockingOperator extends NativeObject {
* @param schema the name of the underneath service to access data from.
* @param map a map of properties to construct the underneath operator.
*/
public BlockingOperator(String schema, Map<String, String> map) {
super(constructor(schema, map));
public static BlockingOperator of(String schema, Map<String, String> map) {
Comment thread
tisonkun marked this conversation as resolved.
try (final Operator operator = Operator.of(schema, map)) {
return operator.blocking();
}
}

BlockingOperator(long nativeHandle, OperatorInfo info) {
super(nativeHandle);
this.info = info;
}

public void write(String path, String content) {
Expand All @@ -61,22 +70,14 @@ public Metadata stat(String path) {
return new Metadata(stat(nativeHandle, path));
}

public OperatorInfo info() {
return info(nativeHandle);
}

@Override
protected native void disposeInternal(long handle);

private static native long constructor(String schema, Map<String, String> map);

private static native void write(long nativeHandle, String path, byte[] content);

private static native byte[] read(long nativeHandle, String path);

private static native void delete(long nativeHandle, String path);

private static native long stat(long nativeHandle, String path);

private static native OperatorInfo info(long nativeHandle);
}
27 changes: 20 additions & 7 deletions bindings/java/src/main/java/org/apache/opendal/Operator.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ private static <T> CompletableFuture<T> take(long requestId) {
}
}

public final OperatorInfo info;

/**
* Construct an OpenDAL operator:
*
Expand All @@ -109,8 +111,21 @@ private static <T> CompletableFuture<T> take(long requestId) {
* @param schema the name of the underneath service to access data from.
* @param map a map of properties to construct the underneath operator.
*/
public Operator(String schema, Map<String, String> map) {
super(constructor(schema, map));
public static Operator of(String schema, Map<String, String> map) {
final long nativeHandle = constructor(schema, map);
final OperatorInfo info = makeOperatorInfo(nativeHandle);
return new Operator(nativeHandle, info);
}

Operator(long nativeHandle, OperatorInfo info) {
super(nativeHandle);
this.info = info;
}

public BlockingOperator blocking() {
final long nativeHandle = makeBlockingOp(this.nativeHandle);
final OperatorInfo info = this.info;
return new BlockingOperator(nativeHandle, info);
}

public CompletableFuture<Void> write(String path, String content) {
Expand Down Expand Up @@ -142,10 +157,6 @@ public CompletableFuture<byte[]> read(String path) {
return AsyncRegistry.take(requestId);
}

public OperatorInfo info() {
return info(nativeHandle);
}

public CompletableFuture<Void> presignRead(String path, Duration duration) {
final long requestId = presignRead(nativeHandle, path, duration.toNanos());
return AsyncRegistry.take(requestId);
Expand Down Expand Up @@ -187,5 +198,7 @@ public CompletableFuture<Void> delete(String path) {

private static native long presignStat(long nativeHandle, String path, long duration);

private static native OperatorInfo info(long nativeHandle);
private static native OperatorInfo makeOperatorInfo(long nativeHandle);

private static native long makeBlockingOp(long nativeHandle);
}
33 changes: 22 additions & 11 deletions bindings/java/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use jni::objects::JObject;
use jni::objects::JString;
use jni::objects::JValue;
use jni::objects::JValueOwned;
use jni::sys::jlong;
use jni::sys::{jlong, jobject};
use jni::JNIEnv;
use opendal::layers::BlockingLayer;
use opendal::raw::PresignedRequest;
Expand Down Expand Up @@ -263,26 +263,37 @@ async fn do_delete(op: &mut Operator, path: String) -> Result<()> {
Ok(op.delete(&path).await?)
}

// # Safety
/// # Safety
///
/// This function should not be called before the Operator are ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_Operator_makeBlockingOp(
_: JNIEnv,
_: JClass,
op: *mut Operator,
) -> jlong {
let op = unsafe { &mut *op };
Box::into_raw(Box::new(op.blocking())) as jlong
}

/// # Safety
///
/// This function should not be called before the Operator are ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_Operator_info<'local>(
mut env: JNIEnv<'local>,
pub unsafe extern "system" fn Java_org_apache_opendal_Operator_makeOperatorInfo(
mut env: JNIEnv,
_: JClass,
op: *mut Operator,
) -> JObject<'local> {
intern_info(&mut env, op).unwrap_or_else(|e| {
) -> jobject {
intern_make_operator_info(&mut env, op).unwrap_or_else(|e| {
e.throw(&mut env);
JObject::null()
JObject::default().into_raw()
})
}

fn intern_info<'local>(env: &mut JNIEnv<'local>, op: *mut Operator) -> Result<JObject<'local>> {
fn intern_make_operator_info(env: &mut JNIEnv, op: *mut Operator) -> Result<jobject> {
let op = unsafe { &mut *op };

let info = op.info();
make_operator_info(env, info)
Ok(make_operator_info(env, op.info())?.into_raw())
}

/// # Safety
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
* under the License.
*/

package org.apache.opendal;
package org.apache.opendal.test;

import static org.assertj.core.api.Assertions.assertThat;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import org.apache.opendal.BlockingOperator;
import org.apache.opendal.Operator;
import org.apache.opendal.OperatorInfo;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

Expand All @@ -35,8 +38,8 @@ public void testBlockingOperatorInfo() {
final Map<String, String> conf = new HashMap<>();
conf.put("root", tempDir.toString());

try (final BlockingOperator op = new BlockingOperator("fs", conf)) {
final OperatorInfo info = op.info();
try (final BlockingOperator op = BlockingOperator.of("fs", conf)) {
final OperatorInfo info = op.info;
assertThat(info).isNotNull();
assertThat(info.scheme).isEqualTo("fs");

Expand All @@ -58,8 +61,8 @@ public void testBlockingOperatorInfo() {
public void testOperatorInfo() {
final Map<String, String> conf = new HashMap<>();
conf.put("root", "/opendal/");
try (final Operator op = new Operator("memory", conf)) {
final OperatorInfo info = op.info();
try (final Operator op = Operator.of("memory", conf)) {
final OperatorInfo info = op.info;
assertThat(info).isNotNull();
assertThat(info.scheme).isEqualTo("memory");

Expand Down
Loading