Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions java/lance-jni/src/blocking_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ use std::sync::Arc;

use crate::error::{Error, Result};
use crate::ffi::JNIEnvExt;
use crate::traits::{import_vec_from_method, import_vec_to_rust};
use arrow::array::Float32Array;
use arrow::{ffi::FFI_ArrowSchema, ffi_stream::FFI_ArrowArrayStream};
use arrow_schema::SchemaRef;
use jni::objects::{JObject, JString};
use jni::sys::{jboolean, jint, JNI_TRUE};
use jni::{sys::jlong, JNIEnv};
use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner};
use lance_index::scalar::inverted::query::{
BooleanQuery as FtsBooleanQuery, BoostQuery as FtsBoostQuery, FtsQuery,
MatchQuery as FtsMatchQuery, MultiMatchQuery as FtsMultiMatchQuery, Occur as FtsOccur,
PhraseQuery as FtsPhraseQuery,
};
use lance_index::scalar::FullTextSearchQuery;
use lance_io::ffi::to_ffi_arrow_array_stream;
use lance_linalg::distance::DistanceType;

Expand Down Expand Up @@ -51,6 +58,141 @@ impl BlockingScanner {
}
}

fn build_full_text_search_query<'a>(env: &mut JNIEnv<'a>, java_obj: JObject) -> Result<FtsQuery> {
let type_obj = env
.call_method(
&java_obj,
"getType",
"()Lorg/lance/ipc/FullTextQuery$Type;",
&[],
)?
.l()?;
let type_name = env.get_string_from_method(&type_obj, "name")?;

match type_name.as_str() {
"MATCH" => {
let query_text = env.get_string_from_method(&java_obj, "getQueryText")?;
let column = env.get_string_from_method(&java_obj, "getColumn")?;
let boost = env.get_f32_from_method(&java_obj, "getBoost")?;
let fuzziness = env.get_optional_u32_from_method(&java_obj, "getFuzziness")?;
let max_expansions = env.get_int_as_usize_from_method(&java_obj, "getMaxExpansions")?;
let operator = env.get_fts_operator_from_method(&java_obj)?;
let prefix_length = env.get_u32_from_method(&java_obj, "getPrefixLength")?;

let mut query = FtsMatchQuery::new(query_text);
query = query.with_column(Some(column));
query = query
.with_boost(boost)
.with_fuzziness(fuzziness)
.with_max_expansions(max_expansions)
.with_operator(operator)
.with_prefix_length(prefix_length);

Ok(FtsQuery::Match(query))
}
"MATCH_PHRASE" => {
let query_text = env.get_string_from_method(&java_obj, "getQueryText")?;
let column = env.get_string_from_method(&java_obj, "getColumn")?;
let slop = env.get_u32_from_method(&java_obj, "getSlop")?;

let mut query = FtsPhraseQuery::new(query_text);
query = query.with_column(Some(column));
query = query.with_slop(slop);

Ok(FtsQuery::Phrase(query))
}
"MULTI_MATCH" => {
let query_text = env.get_string_from_method(&java_obj, "getQueryText")?;
let columns: Vec<String> =
import_vec_from_method(env, &java_obj, "getColumns", |env, elem| {
let jstr = JString::from(elem);
let value: String = env.get_string(&jstr)?.into();
Ok(value)
})?;

let boosts: Option<Vec<f32>> =
env.get_optional_from_method(&java_obj, "getBoosts", |env, list_obj| {
import_vec_to_rust(env, &list_obj, |env, elem| {
env.get_f32_from_method(&elem, "floatValue")
})
})?;
let operator = env.get_fts_operator_from_method(&java_obj)?;

let mut query = FtsMultiMatchQuery::try_new(query_text, columns)?;
if let Some(boosts) = boosts {
query = query.try_with_boosts(boosts)?;
}
query = query.with_operator(operator);

Ok(FtsQuery::MultiMatch(query))
}
"BOOST" => {
let positive_obj = env
.call_method(
&java_obj,
"getPositive",
"()Lorg/lance/ipc/FullTextQuery;",
&[],
)?
.l()?;
if positive_obj.is_null() {
return Err(Error::input_error(
"positive query must not be null in BOOST FullTextQuery".to_string(),
));
}
let negative_obj = env
.call_method(
&java_obj,
"getNegative",
"()Lorg/lance/ipc/FullTextQuery;",
&[],
)?
.l()?;
if negative_obj.is_null() {
return Err(Error::input_error(
"negative query must not be null in BOOST FullTextQuery".to_string(),
));
}

let positive = build_full_text_search_query(env, positive_obj)?;
let negative = build_full_text_search_query(env, negative_obj)?;
let negative_boost = env.get_f32_from_method(&java_obj, "getNegativeBoost")?;

let query = FtsBoostQuery::new(positive, negative, Some(negative_boost));
Ok(FtsQuery::Boost(query))
}
"BOOLEAN" => {
let clauses: Vec<(FtsOccur, FtsQuery)> =
import_vec_from_method(env, &java_obj, "getClauses", |env, clause_obj| {
let occur = env.get_occur_from_method(&clause_obj)?;

let query_obj = env
.call_method(
&clause_obj,
"getQuery",
"()Lorg/lance/ipc/FullTextQuery;",
&[],
)?
.l()?;
if query_obj.is_null() {
return Err(Error::input_error(
"BooleanClause query must not be null".to_string(),
));
}
let query = build_full_text_search_query(env, query_obj)?;
Ok((occur, query))
})?;

let boolean_query = FtsBooleanQuery::new(clauses);
Ok(FtsQuery::Boolean(boolean_query))
}
other => Err(Error::input_error(format!(
"Unsupported FullTextQuery type: {}",
other
))),
}
}

///////////////////
// Write Methods //
///////////////////
Expand All @@ -67,6 +209,7 @@ pub extern "system" fn Java_org_lance_ipc_LanceScanner_createScanner<'local>(
limit_obj: JObject, // Optional<Integer>
offset_obj: JObject, // Optional<Integer>
query_obj: JObject, // Optional<Query>
fts_query_obj: JObject, // Optional<FullTextQuery>
with_row_id: jboolean, // boolean
with_row_address: jboolean, // boolean
batch_readahead: jint, // int
Expand All @@ -85,6 +228,7 @@ pub extern "system" fn Java_org_lance_ipc_LanceScanner_createScanner<'local>(
limit_obj,
offset_obj,
query_obj,
fts_query_obj,
with_row_id,
with_row_address,
batch_readahead,
Expand All @@ -105,6 +249,7 @@ fn inner_create_scanner<'local>(
limit_obj: JObject,
offset_obj: JObject,
query_obj: JObject,
fts_query_obj: JObject,
with_row_id: jboolean,
with_row_address: jboolean,
batch_readahead: jint,
Expand Down Expand Up @@ -204,6 +349,13 @@ fn inner_create_scanner<'local>(
Ok(())
})?;

env.get_optional(&fts_query_obj, |env, java_obj| {
let fts_query = build_full_text_search_query(env, java_obj)?;
let full_text_query = FullTextSearchQuery::new_query(fts_query);
scanner.full_text_search(full_text_query)?;
Ok(())
})?;

scanner.batch_readahead(batch_readahead as usize);

env.get_optional(&column_orderings, |env, java_obj| {
Expand Down
41 changes: 41 additions & 0 deletions java/lance-jni/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::Error;
use jni::objects::{JByteBuffer, JFloatArray, JObjectArray, JString};
use jni::sys::jobjectArray;
use jni::{objects::JObject, JNIEnv};
use lance_index::scalar::inverted::query::{Occur, Operator};

/// Extend JNIEnv with helper functions.
pub trait JNIEnvExt {
Expand Down Expand Up @@ -97,6 +98,8 @@ pub trait JNIEnvExt {
obj: &JObject,
method_name: &str,
) -> Result<Option<u32>>;
// Get f32 from Java Float with given method name.
fn get_f32_from_method(&mut self, obj: &JObject, method_name: &str) -> Result<f32>;

fn get_optional_integer_from_method<T>(
&mut self,
Expand Down Expand Up @@ -146,6 +149,10 @@ pub trait JNIEnvExt {
fn get_optional<T, F>(&mut self, obj: &JObject, f: F) -> Result<Option<T>>
where
F: FnOnce(&mut JNIEnv, JObject) -> Result<T>;

fn get_fts_operator_from_method(&mut self, obj: &JObject) -> Result<Operator>;

fn get_occur_from_method(&mut self, obj: &JObject) -> Result<Occur>;
}

impl JNIEnvExt for JNIEnv<'_> {
Expand Down Expand Up @@ -278,6 +285,34 @@ impl JNIEnvExt for JNIEnv<'_> {
})
}

fn get_fts_operator_from_method(&mut self, obj: &JObject) -> Result<Operator> {
let operator_obj = self
.call_method(
obj,
"getOperator",
"()Lorg/lance/ipc/FullTextQuery$Operator;",
&[],
)?
.l()?;
let operator_str = self.get_string_from_method(&operator_obj, "name")?;
Operator::try_from(operator_str.as_str())
.map_err(|e| Error::io_error(format!("Invalid operator: {:?}", e)))
}

fn get_occur_from_method(&mut self, obj: &JObject) -> Result<Occur> {
let occur_obj = self
.call_method(
obj,
"getOccur",
"()Lorg/lance/ipc/FullTextQuery$Occur;",
&[],
)?
.l()?;
let occur_str = self.get_string_from_method(&occur_obj, "name")?;
Occur::try_from(occur_str.as_str())
.map_err(|e| Error::io_error(format!("Invalid occur: {:?}", e)))
}

fn get_string_from_method(&mut self, obj: &JObject, method_name: &str) -> Result<String> {
let string_obj = self
.call_method(obj, method_name, "()Ljava/lang/String;", &[])?
Expand Down Expand Up @@ -335,6 +370,12 @@ impl JNIEnvExt for JNIEnv<'_> {
self.get_optional_integer_from_method(obj, method_name)
}

fn get_f32_from_method(&mut self, obj: &JObject, method_name: &str) -> Result<f32> {
let float_obj = self.call_method(obj, method_name, "()F", &[])?;
let float_value = float_obj.f()?;
Ok(float_value)
}

fn get_optional_integer_from_method<T>(
&mut self,
obj: &JObject,
Expand Down
Loading
Loading