From 789f88657293764bbd230b67002358a2b491015c Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 17 Mar 2025 09:18:57 -0700 Subject: [PATCH 1/3] Add JNI bindings for the file reader/writer --- Cargo.lock | 2 + java/core/lance-jni/Cargo.toml | 2 + java/core/lance-jni/src/error.rs | 24 ++- java/core/lance-jni/src/file_reader.rs | 202 ++++++++++++++++++ java/core/lance-jni/src/file_writer.rs | 152 +++++++++++++ java/core/lance-jni/src/lib.rs | 2 + .../lancedb/lance/file/LanceFileReader.java | 101 +++++++++ .../lancedb/lance/file/LanceFileWriter.java | 91 ++++++++ .../lancedb/lance/FileReaderWriterTest.java | 151 +++++++++++++ rust/lance-encoding/src/decoder.rs | 4 +- rust/lance-file/src/v2/reader.rs | 69 +++++- 11 files changed, 793 insertions(+), 7 deletions(-) create mode 100644 java/core/lance-jni/src/file_reader.rs create mode 100644 java/core/lance-jni/src/file_writer.rs create mode 100644 java/core/src/main/java/com/lancedb/lance/file/LanceFileReader.java create mode 100644 java/core/src/main/java/com/lancedb/lance/file/LanceFileWriter.java create mode 100644 java/core/src/test/java/com/lancedb/lance/FileReaderWriterTest.java diff --git a/Cargo.lock b/Cargo.lock index db147d501cc..46fa11d5049 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4084,10 +4084,12 @@ dependencies = [ "lance-core", "lance-datafusion", "lance-encoding", + "lance-file", "lance-index", "lance-io", "lance-linalg", "lazy_static", + "object_store", "serde", "serde_json", "snafu", diff --git a/java/core/lance-jni/Cargo.toml b/java/core/lance-jni/Cargo.toml index 636fe2def67..a26eee044a3 100644 --- a/java/core/lance-jni/Cargo.toml +++ b/java/core/lance-jni/Cargo.toml @@ -20,9 +20,11 @@ lance-linalg = { workspace = true } lance-index = { workspace = true } lance-io.workspace = true lance-core.workspace = true +lance-file.workspace = true arrow = { workspace = true, features = ["ffi"] } arrow-schema.workspace = true datafusion.workspace = true +object_store.workspace = true tokio.workspace = true jni = "0.21.1" snafu.workspace = true diff --git a/java/core/lance-jni/src/error.rs b/java/core/lance-jni/src/error.rs index 36f47ffb566..05454c6111b 100644 --- a/java/core/lance-jni/src/error.rs +++ b/java/core/lance-jni/src/error.rs @@ -19,12 +19,13 @@ use jni::{errors::Error as JniError, JNIEnv}; use lance::error::Error as LanceError; use serde_json::Error as JsonError; -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub enum JavaExceptionClass { IllegalArgumentException, IOException, RuntimeException, UnsupportedOperationException, + AlreadyInException, } impl JavaExceptionClass { @@ -34,6 +35,8 @@ impl JavaExceptionClass { Self::IOException => "java/io/IOException", Self::RuntimeException => "java/lang/RuntimeException", Self::UnsupportedOperationException => "java/lang/UnsupportedOperationException", + // Included for display purposes. This is not a real exception. + Self::AlreadyInException => "AlreadyInException", } } } @@ -71,7 +74,18 @@ impl Error { Self::new(message, JavaExceptionClass::UnsupportedOperationException) } + pub fn in_exception() -> Self { + Self { + message: String::default(), + java_class: JavaExceptionClass::AlreadyInException, + } + } + pub fn throw(&self, env: &mut JNIEnv) { + if self.java_class == JavaExceptionClass::AlreadyInException { + // An exception is already in progress, so we don't need to throw another one. + return; + } if let Err(e) = env.throw_new(self.java_class.as_str(), &self.message) { eprintln!("Error when throwing Java exception: {:?}", e.to_string()); panic!("Error when throwing Java exception: {:?}", e); @@ -96,6 +110,7 @@ impl From for Error { | LanceError::InvalidInput { .. } => Self::input_error(err.to_string()), LanceError::IO { .. } => Self::io_error(err.to_string()), LanceError::NotSupported { .. } => Self::unsupported_error(err.to_string()), + LanceError::NotFound { .. } => Self::io_error(err.to_string()), _ => Self::runtime_error(err.to_string()), } } @@ -120,7 +135,12 @@ impl From for Error { impl From for Error { fn from(err: JniError) -> Self { - Self::runtime_error(err.to_string()) + match err { + // If we get this then it means that an exception was already in progress. We can't + // throw another one so we just return an error indicating that. + JniError::JavaException => Self::in_exception(), + _ => Self::runtime_error(err.to_string()), + } } } diff --git a/java/core/lance-jni/src/file_reader.rs b/java/core/lance-jni/src/file_reader.rs new file mode 100644 index 00000000000..5aba4186cbf --- /dev/null +++ b/java/core/lance-jni/src/file_reader.rs @@ -0,0 +1,202 @@ +use std::sync::Arc; + +use crate::{ + error::{Error, Result}, + traits::IntoJava, + RT, +}; +use arrow::{array::RecordBatchReader, ffi::FFI_ArrowSchema, ffi_stream::FFI_ArrowArrayStream}; +use arrow_schema::SchemaRef; +use jni::{ + objects::{JObject, JString}, + sys::{jint, jlong}, + JNIEnv, +}; +use lance::io::ObjectStore; +use lance_core::cache::FileMetadataCache; +use lance_encoding::decoder::{DecoderPlugins, FilterExpression}; +use lance_file::v2::reader::{FileReader, FileReaderOptions}; +use lance_io::{ + scheduler::{ScanScheduler, SchedulerConfig}, + ReadBatchParams, +}; +use object_store::path::Path; + +pub const NATIVE_READER: &str = "nativeFileReaderHandle"; + +#[derive(Clone, Debug)] +pub struct BlockingFileReader { + pub(crate) inner: Arc, +} + +impl BlockingFileReader { + pub fn create(file_reader: Arc) -> Self { + Self { inner: file_reader } + } + + pub fn open_stream( + &self, + batch_size: u32, + ) -> Result> { + Ok(self.inner.read_stream_projected_blocking( + ReadBatchParams::RangeFull, + batch_size, + None, + FilterExpression::no_filter(), + )?) + } + + pub fn schema(&self) -> Result { + Ok(Arc::new(self.inner.schema().as_ref().into())) + } + + pub fn num_rows(&self) -> u64 { + self.inner.num_rows() + } +} + +impl IntoJava for BlockingFileReader { + fn into_java<'local>(self, env: &mut JNIEnv<'local>) -> Result> { + attach_native_reader(env, self) + } +} + +fn attach_native_reader<'local>( + env: &mut JNIEnv<'local>, + reader: BlockingFileReader, +) -> Result> { + let j_reader = create_java_reader_object(env)?; + unsafe { env.set_rust_field(&j_reader, NATIVE_READER, reader) }?; + Ok(j_reader) +} + +fn create_java_reader_object<'a>(env: &mut JNIEnv<'a>) -> Result> { + let res = env.new_object("com/lancedb/lance/file/LanceFileReader", "()V", &[])?; + Ok(res) +} + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_file_LanceFileReader_openNative<'local>( + mut env: JNIEnv<'local>, + _reader_class: JObject, + file_uri: JString, +) -> JObject<'local> { + ok_or_throw!(env, inner_open(&mut env, file_uri,)) +} + +fn inner_open<'local>(env: &mut JNIEnv<'local>, file_uri: JString) -> Result> { + let file_uri_str: String = env.get_string(&file_uri)?.into(); + + let reader = RT.block_on(async move { + let (obj_store, path) = ObjectStore::from_uri(&file_uri_str).await?; + let obj_store = Arc::new(obj_store); + let config = SchedulerConfig::max_bandwidth(&obj_store); + let scan_scheduler = ScanScheduler::new(obj_store, config); + + let file_scheduler = scan_scheduler.open_file(&Path::parse(&path)?).await?; + FileReader::try_open( + file_scheduler, + None, + Arc::::default(), + &FileMetadataCache::no_cache(), + FileReaderOptions::default(), + ) + .await + })?; + + let reader = BlockingFileReader::create(Arc::new(reader)); + + reader.into_java(env) +} + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_file_LanceFileReader_closeNative<'local>( + mut env: JNIEnv<'local>, + reader: JObject, +) -> JObject<'local> { + let maybe_err = + unsafe { env.take_rust_field::<_, _, BlockingFileReader>(reader, NATIVE_READER) }; + match maybe_err { + Ok(_) => {} + // We were already closed, do nothing + Err(jni::errors::Error::NullPtr(_)) => {} + Err(err) => Error::from(err).throw(&mut env), + } + JObject::null() +} + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_file_LanceFileReader_numRowsNative<'local>( + mut env: JNIEnv<'local>, + reader: JObject, +) -> jlong { + match inner_num_rows(&mut env, reader) { + Ok(num_rows) => num_rows, + Err(e) => { + e.throw(&mut env); + 0 + } + } +} + +// If the reader is closed, the native handle will be null and we will get a JniError::NullPtr +// error when we call get_rust_field. Translate that into a more meaningful error. +fn unwrap_reader(val: std::result::Result) -> Result { + match val { + Ok(val) => Ok(val), + Err(jni::errors::Error::NullPtr(_)) => Err(Error::io_error( + "FileReader has already been closed".to_string(), + )), + err => Ok(err?), + } +} + +fn inner_num_rows<'local>(env: &mut JNIEnv<'local>, reader: JObject) -> Result { + let reader = unsafe { env.get_rust_field::<_, _, BlockingFileReader>(reader, NATIVE_READER) }; + let reader = unwrap_reader(reader)?; + Ok(reader.num_rows() as i64) +} + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_file_LanceFileReader_populateSchemaNative( + mut env: JNIEnv, + reader: JObject, + schema_addr: jlong, +) { + ok_or_throw_without_return!(env, inner_populate_schema(&mut env, reader, schema_addr)); +} + +fn inner_populate_schema(env: &mut JNIEnv, reader: JObject, schema_addr: jlong) -> Result<()> { + let reader = unsafe { env.get_rust_field::<_, _, BlockingFileReader>(reader, NATIVE_READER) }; + let reader = unwrap_reader(reader)?; + let schema = reader.schema()?; + let ffi_schema = FFI_ArrowSchema::try_from(schema.as_ref())?; + unsafe { std::ptr::write_unaligned(schema_addr as *mut FFI_ArrowSchema, ffi_schema) } + Ok(()) +} + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_file_LanceFileReader_readAllNative<'local>( + mut env: JNIEnv<'local>, + reader: JObject, + batch_size: jint, + stream_addr: jlong, +) { + if let Err(e) = inner_read_all(&mut env, reader, batch_size, stream_addr) { + e.throw(&mut env); + } +} + +fn inner_read_all<'local>( + env: &mut JNIEnv<'local>, + reader: JObject, + batch_size: jint, + stream_addr: jlong, +) -> Result<()> { + let reader = unsafe { env.get_rust_field::<_, _, BlockingFileReader>(reader, NATIVE_READER) }; + let reader = unwrap_reader(reader)?; + let arrow_stream = reader.open_stream(batch_size as u32)?; + let ffi_stream = FFI_ArrowArrayStream::new(arrow_stream); + unsafe { std::ptr::write_unaligned(stream_addr as *mut FFI_ArrowArrayStream, ffi_stream) } + Ok(()) +} diff --git a/java/core/lance-jni/src/file_writer.rs b/java/core/lance-jni/src/file_writer.rs new file mode 100644 index 00000000000..60c7aa94b07 --- /dev/null +++ b/java/core/lance-jni/src/file_writer.rs @@ -0,0 +1,152 @@ +use std::sync::{Arc, Mutex}; + +use crate::{ + error::{Error, Result}, + traits::IntoJava, + RT, +}; +use arrow::{ + array::{RecordBatch, StructArray}, + ffi::{from_ffi_and_data_type, FFI_ArrowArray, FFI_ArrowSchema}, +}; +use arrow_schema::DataType; +use jni::{ + objects::{JObject, JString}, + sys::jlong, + JNIEnv, +}; +use lance::io::ObjectStore; +use lance_file::{ + v2::writer::{FileWriter, FileWriterOptions}, + version::LanceFileVersion, +}; + +pub const NATIVE_WRITER: &str = "nativeFileWriterHandle"; + +#[derive(Clone)] +pub struct BlockingFileWriter { + pub(crate) inner: Arc>, +} + +impl BlockingFileWriter { + pub fn create(file_writer: FileWriter) -> Self { + Self { + inner: Arc::new(Mutex::new(file_writer)), + } + } +} + +impl IntoJava for BlockingFileWriter { + fn into_java<'local>(self, env: &mut JNIEnv<'local>) -> Result> { + attach_native_writer(env, self) + } +} + +fn attach_native_writer<'local>( + env: &mut JNIEnv<'local>, + writer: BlockingFileWriter, +) -> Result> { + let j_writer = create_java_writer_object(env)?; + unsafe { env.set_rust_field(&j_writer, NATIVE_WRITER, writer) }?; + Ok(j_writer) +} + +fn create_java_writer_object<'a>(env: &mut JNIEnv<'a>) -> Result> { + let res = env.new_object("com/lancedb/lance/file/LanceFileWriter", "()V", &[])?; + Ok(res) +} + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_file_LanceFileWriter_openNative<'local>( + mut env: JNIEnv<'local>, + _writer_class: JObject, + file_uri: JString, +) -> JObject<'local> { + ok_or_throw!(env, inner_open(&mut env, file_uri,)) +} + +fn inner_open<'local>(env: &mut JNIEnv<'local>, file_uri: JString) -> Result> { + let file_uri_str: String = env.get_string(&file_uri)?.into(); + + let writer = RT.block_on(async move { + let (obj_store, path) = ObjectStore::from_uri(&file_uri_str).await?; + let obj_store = Arc::new(obj_store); + let obj_writer = obj_store.create(&path).await?; + + Result::Ok(FileWriter::new_lazy( + obj_writer, + FileWriterOptions { + format_version: Some(LanceFileVersion::V2_1), + ..Default::default() + }, + )) + })?; + + let writer = BlockingFileWriter::create(writer); + + writer.into_java(env) +} + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_file_LanceFileWriter_closeNative<'local>( + mut env: JNIEnv<'local>, + writer: JObject, +) -> JObject<'local> { + let maybe_err = + unsafe { env.take_rust_field::<_, _, BlockingFileWriter>(writer, NATIVE_WRITER) }; + let writer = match maybe_err { + Ok(writer) => Some(writer), + // We were already closed, do nothing + Err(jni::errors::Error::NullPtr(_)) => None, + Err(err) => { + Error::from(err).throw(&mut env); + None + } + }; + if let Some(writer) = writer { + match RT.block_on(writer.inner.lock().unwrap().finish()) { + Ok(_) => {} + Err(e) => { + Error::from(e).throw(&mut env); + } + } + } + JObject::null() +} + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_file_LanceFileWriter_writeNative<'local>( + mut env: JNIEnv<'local>, + writer: JObject, + batch_address: jlong, + schema_address: jlong, +) -> JObject<'local> { + if let Err(e) = inner_write_batch(&mut env, writer, batch_address, schema_address) { + e.throw(&mut env); + return JObject::null(); + } + JObject::null() +} + +fn inner_write_batch<'local>( + env: &mut JNIEnv<'local>, + writer: JObject, + batch_address: jlong, + schema_address: jlong, +) -> Result<()> { + let c_array_ptr = batch_address as *mut FFI_ArrowArray; + let c_schema_ptr = schema_address as *mut FFI_ArrowSchema; + + let c_array = unsafe { FFI_ArrowArray::from_raw(c_array_ptr) }; + let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) }; + + let data_type = DataType::try_from(&c_schema)?; + let array_data = unsafe { from_ffi_and_data_type(c_array, data_type) }?; + let record_batch = RecordBatch::from(StructArray::from(array_data)); + + let writer = unsafe { env.get_rust_field::<_, _, BlockingFileWriter>(writer, NATIVE_WRITER) }?; + + let mut writer = writer.inner.lock().unwrap(); + RT.block_on(writer.write_batch(&record_batch))?; + Ok(()) +} diff --git a/java/core/lance-jni/src/lib.rs b/java/core/lance-jni/src/lib.rs index 84b7ba64972..437c3d4c00d 100644 --- a/java/core/lance-jni/src/lib.rs +++ b/java/core/lance-jni/src/lib.rs @@ -54,6 +54,8 @@ mod blocking_dataset; mod blocking_scanner; pub mod error; pub mod ffi; +mod file_reader; +mod file_writer; mod fragment; pub mod traits; pub mod utils; diff --git a/java/core/src/main/java/com/lancedb/lance/file/LanceFileReader.java b/java/core/src/main/java/com/lancedb/lance/file/LanceFileReader.java new file mode 100644 index 00000000000..83fc4737ed2 --- /dev/null +++ b/java/core/src/main/java/com/lancedb/lance/file/LanceFileReader.java @@ -0,0 +1,101 @@ +package com.lancedb.lance.file; + +import java.io.IOException; + +import org.apache.arrow.c.ArrowArrayStream; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.pojo.Schema; + +import com.lancedb.lance.JniLoader; + +public class LanceFileReader implements AutoCloseable { + + static { + JniLoader.ensureLoaded(); + } + + private long nativeFileReaderHandle; + + private BufferAllocator allocator; + private Schema schema; + + private static native LanceFileReader openNative(String fileUri) throws IOException; + + private native void closeNative(long nativeLanceFileReaderHandle) throws IOException; + + private native long numRowsNative() throws IOException; + + private native void populateSchemaNative(long arrowSchemaMemoryAddress); + + private native void readAllNative(int batchSize, long streamMemoryAddress) throws IOException; + + private LanceFileReader() { + } + + /** + * Open a LanceFileReader from a file URI + * + * @param path the URI to the Lance file + * @param allocator the Arrow BufferAllocator to use for the reader + * @return a new LanceFileReader + */ + public static LanceFileReader open(String path, BufferAllocator allocator) throws IOException { + LanceFileReader reader = openNative(path); + reader.allocator = allocator; + reader.schema = reader.load_schema(); + return reader; + } + + /** + * Close the LanceFileReader + * + * This method must be called to release resources when the reader is no longer + * needed. + */ + @Override + public void close() throws Exception { + closeNative(nativeFileReaderHandle); + } + + /** + * Get the number of rows in the Lance file + * + * @return the number of rows in the Lance file + */ + public long numRows() throws IOException { + long numRows = numRowsNative(); + return numRows; + } + + /** + * Get the schema of the Lance file + * + * @return the schema of the Lance file + */ + public Schema schema() { + return schema; + } + + private Schema load_schema() throws IOException { + try (ArrowSchema ffiArrowSchema = ArrowSchema.allocateNew(allocator)) { + populateSchemaNative(ffiArrowSchema.memoryAddress()); + return Data.importSchema(allocator, ffiArrowSchema, null); + } + } + + /** + * Read all rows from the Lance file + * + * @param batchSize the maximum number of rows to read in a single batch + * @return an ArrowReader for the Lance file + */ + public ArrowReader readAll(int batchSize) throws IOException { + try (ArrowArrayStream ffiArrowArrayStream = ArrowArrayStream.allocateNew(allocator)) { + readAllNative(batchSize, ffiArrowArrayStream.memoryAddress()); + return Data.importArrayStream(allocator, ffiArrowArrayStream); + } + } +} \ No newline at end of file diff --git a/java/core/src/main/java/com/lancedb/lance/file/LanceFileWriter.java b/java/core/src/main/java/com/lancedb/lance/file/LanceFileWriter.java new file mode 100644 index 00000000000..bb06cbb0715 --- /dev/null +++ b/java/core/src/main/java/com/lancedb/lance/file/LanceFileWriter.java @@ -0,0 +1,91 @@ +package com.lancedb.lance.file; + +import java.io.IOException; + +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.DictionaryProvider; + +import com.lancedb.lance.JniLoader; + +public class LanceFileWriter implements AutoCloseable { + + static { + JniLoader.ensureLoaded(); + } + + private long nativeFileWriterHandle; + private BufferAllocator allocator; + private DictionaryProvider dictionaryProvider; + + private static native LanceFileWriter openNative(String fileUri) throws IOException; + + private native void closeNative(long nativeLanceFileReaderHandle) throws IOException; + + private native void writeNative( + long batchMemoryAddress, + long schemaMemoryAddress) throws IOException; + + private LanceFileWriter() { + } + + /** + * Open a LanceFileWriter to write to a given file URI + * + * @param path the URI of the file to write to + * @param allocator the BufferAllocator to use for the writer + * @param dictionaryProvider the DictionaryProvider to use for the writer + * @return a new LanceFileWriter + */ + public static LanceFileWriter open( + String path, + BufferAllocator allocator, + DictionaryProvider dictionaryProvider) + throws IOException { + LanceFileWriter writer = openNative(path); + writer.allocator = allocator; + writer.dictionaryProvider = dictionaryProvider; + return writer; + } + + /** + * Write a batch of data + * + * @param batch the batch of data to write + * @throws IOException if the batch cannot be written + */ + public void write(VectorSchemaRoot batch) throws IOException { + try (ArrowArray ffiArrowArray = ArrowArray.allocateNew(allocator); + ArrowSchema ffiArrowSchema = ArrowSchema.allocateNew(allocator)) { + Data.exportVectorSchemaRoot( + allocator, + batch, + dictionaryProvider, + ffiArrowArray, + ffiArrowSchema); + writeNative( + ffiArrowArray.memoryAddress(), + ffiArrowSchema.memoryAddress()); + } + } + + /** + * Close the LanceFileWriter + * + * This method must be called to release resources when the writer is no longer + * needed. + * + * This method will also flush all remaining data and write the footer to the + * file. + * + * @throws Exception if the writer cannot be closed + */ + @Override + public void close() throws Exception { + closeNative(nativeFileWriterHandle); + } + +} diff --git a/java/core/src/test/java/com/lancedb/lance/FileReaderWriterTest.java b/java/core/src/test/java/com/lancedb/lance/FileReaderWriterTest.java new file mode 100644 index 00000000000..f5e25554540 --- /dev/null +++ b/java/core/src/test/java/com/lancedb/lance/FileReaderWriterTest.java @@ -0,0 +1,151 @@ +package com.lancedb.lance; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Text; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import com.lancedb.lance.file.LanceFileReader; +import com.lancedb.lance.file.LanceFileWriter; + +public class FileReaderWriterTest { + + @TempDir + private static Path tempDir; + + private VectorSchemaRoot createBatch(BufferAllocator allocator) throws IOException { + Schema schema = new Schema( + Arrays.asList(Field.nullable("x", new ArrowType.Int(64, true)), + Field.nullable("y", new ArrowType.Utf8())), + null); + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); + root.allocateNew(); + BigIntVector iVector = (BigIntVector) root.getVector("x"); + VarCharVector sVector = (VarCharVector) root.getVector("y"); + + for (int i = 0; i < 100; i++) { + iVector.setSafe(i, i); + sVector.setSafe(i, new Text("s-" + i)); + } + root.setRowCount(100); + + return root; + } + + void createSimpleFile(String filePath) throws Exception { + BufferAllocator allocator = new RootAllocator(); + try (LanceFileWriter writer = LanceFileWriter.open(filePath, allocator, null)) { + try (VectorSchemaRoot batch = createBatch(allocator)) { + writer.write(batch); + } + } + } + + @Test + void testBasicRead() throws Exception { + BufferAllocator allocator = new RootAllocator(); + String filePath = tempDir.resolve("basic_read.lance").toString(); + createSimpleFile(filePath); + LanceFileReader reader = LanceFileReader.open(filePath, allocator); + + Schema expectedSchema = new Schema( + Arrays.asList(Field.nullable("x", new ArrowType.Int(64, true)), + Field.nullable("y", new ArrowType.Utf8())), + null); + + assertEquals(100, reader.numRows()); + assertEquals(expectedSchema, reader.schema()); + + try (ArrowReader batches = reader.readAll(100)) { + assertTrue(batches.loadNextBatch()); + VectorSchemaRoot batch = batches.getVectorSchemaRoot(); + assertEquals(100, batch.getRowCount()); + assertEquals(2, batch.getSchema().getFields().size()); + assertFalse(batches.loadNextBatch()); + } + + try (ArrowReader batches = reader.readAll(15)) { + for (int i = 0; i < 100; i += 15) { + int expected = Math.min(15, 100 - i); + assertTrue(batches.loadNextBatch()); + VectorSchemaRoot batch = batches.getVectorSchemaRoot(); + assertEquals(expected, batch.getRowCount()); + assertEquals(2, batch.getSchema().getFields().size()); + } + assertFalse(batches.loadNextBatch()); + } + + reader.close(); + try { + reader.numRows(); + fail("Expected LanceException to be thrown"); + } catch (IOException e) { + assertEquals("FileReader has already been closed", e.getMessage()); + } + + // Ok to call schema after close + assertEquals(expectedSchema, reader.schema()); + + // close should be idempotent + reader.close(); + } + + @Test + void testBasicWrite() throws Exception { + String filePath = tempDir.resolve("basic_write.lance").toString(); + createSimpleFile(filePath); + } + + @Test + void testWriteNoData() throws Exception { + String filePath = tempDir.resolve("no_data.lance").toString(); + BufferAllocator allocator = new RootAllocator(); + + LanceFileWriter writer = LanceFileWriter.open(filePath, allocator, null); + + try { + writer.close(); + fail("Expected IllegalArgumentException to be thrown"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("no data provided")); + } + } + + @Test + void testInvalidPath() { + BufferAllocator allocator = new RootAllocator(); + try { + LanceFileReader.open("/tmp/does_not_exist.lance", allocator); + fail("Expected LanceException to be thrown"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Not found: tmp/does_not_exist.lance")); + } + try { + LanceFileReader.open("", allocator); + fail("Expected LanceException to be thrown"); + } catch (RuntimeException e) { + // expected, would be nice if it was an IOException, but it's not because + // lance throws a wrapped error :( + } catch (IOException e) { + fail("Expected RuntimeException to be thrown"); + } + } + +} diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index d0f19b036be..fdbffdadd60 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -1923,7 +1923,7 @@ pub fn create_decode_iterator( should_validate: bool, is_structural: bool, messages: VecDeque>, -) -> Box { +) -> Box { let arrow_schema = Arc::new(ArrowSchema::from(schema)); let root_fields = arrow_schema.fields.clone(); if is_structural { @@ -2069,7 +2069,7 @@ pub fn schedule_and_decode_blocking( column_indices: Vec, target_schema: Arc, config: SchedulerDecoderConfig, -) -> Result> { +) -> Result> { if requested_rows.num_rows() == 0 { let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref())); return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema))); diff --git a/rust/lance-file/src/v2/reader.rs b/rust/lance-file/src/v2/reader.rs index 7f934062e1f..39d24013dc2 100644 --- a/rust/lance-file/src/v2/reader.rs +++ b/rust/lance-file/src/v2/reader.rs @@ -1055,7 +1055,7 @@ impl FileReader { batch_size: u32, projection: ReaderProjection, filter: FilterExpression, - ) -> Result> { + ) -> Result> { let column_infos = self.collect_columns_from_projection(&projection)?; debug!( "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}", @@ -1086,6 +1086,45 @@ impl FileReader { ) } + fn read_range_blocking( + &self, + range: Range, + batch_size: u32, + projection: ReaderProjection, + filter: FilterExpression, + ) -> Result> { + let column_infos = self.collect_columns_from_projection(&projection)?; + let num_rows = self.num_rows; + + debug!( + "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns", + range, + batch_size, + num_rows, + column_infos.len(), + projection.schema.fields.len(), + ); + + let config = SchedulerDecoderConfig { + batch_size, + cache: self.cache.clone(), + decoder_plugins: self.decoder_plugins.clone(), + io: self.scheduler.clone(), + should_validate: self.options.validate_on_decode, + }; + + let requested_rows = RequestedRows::Ranges(vec![range]); + + schedule_and_decode_blocking( + column_infos, + requested_rows, + filter, + projection.column_indices, + projection.schema, + config, + ) + } + /// Read data from the file as an iterator of record batches /// /// This is a blocking variant of [`Self::read_stream_projected`] that runs entirely in the @@ -1103,7 +1142,7 @@ impl FileReader { batch_size: u32, projection: Option, filter: FilterExpression, - ) -> Result> { + ) -> Result> { let projection = projection.unwrap_or_else(|| self.base_projection.clone()); Self::validate_projection(&projection, &self.metadata)?; let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| { @@ -1137,7 +1176,31 @@ impl FileReader { let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect(); self.take_rows_blocking(indices, batch_size, projection, filter) } - _ => todo!(), + ReadBatchParams::Range(range) => { + verify_bound(¶ms, range.end as u64, false)?; + self.read_range_blocking( + range.start as u64..range.end as u64, + batch_size, + projection, + filter, + ) + } + ReadBatchParams::RangeFrom(range) => { + verify_bound(¶ms, range.start as u64, true)?; + self.read_range_blocking( + range.start as u64..self.num_rows, + batch_size, + projection, + filter, + ) + } + ReadBatchParams::RangeTo(range) => { + verify_bound(¶ms, range.end as u64, false)?; + self.read_range_blocking(0..range.end as u64, batch_size, projection, filter) + } + ReadBatchParams::RangeFull => { + self.read_range_blocking(0..self.num_rows, batch_size, projection, filter) + } } } From e9b796a2352f2dd116d481e13a7bbb09c55e0ac8 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Sun, 23 Mar 2025 08:51:12 -0700 Subject: [PATCH 2/3] Apply maven spotless --- .../lancedb/lance/file/LanceFileReader.java | 183 +++++++------ .../lancedb/lance/file/LanceFileWriter.java | 136 +++++----- .../lancedb/lance/FileReaderWriterTest.java | 253 ++++++++++-------- 3 files changed, 299 insertions(+), 273 deletions(-) diff --git a/java/core/src/main/java/com/lancedb/lance/file/LanceFileReader.java b/java/core/src/main/java/com/lancedb/lance/file/LanceFileReader.java index 83fc4737ed2..ca4d56e884b 100644 --- a/java/core/src/main/java/com/lancedb/lance/file/LanceFileReader.java +++ b/java/core/src/main/java/com/lancedb/lance/file/LanceFileReader.java @@ -1,6 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.lancedb.lance.file; -import java.io.IOException; +import com.lancedb.lance.JniLoader; import org.apache.arrow.c.ArrowArrayStream; import org.apache.arrow.c.ArrowSchema; @@ -9,93 +22,91 @@ import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.types.pojo.Schema; -import com.lancedb.lance.JniLoader; +import java.io.IOException; public class LanceFileReader implements AutoCloseable { - static { - JniLoader.ensureLoaded(); - } - - private long nativeFileReaderHandle; - - private BufferAllocator allocator; - private Schema schema; - - private static native LanceFileReader openNative(String fileUri) throws IOException; - - private native void closeNative(long nativeLanceFileReaderHandle) throws IOException; - - private native long numRowsNative() throws IOException; - - private native void populateSchemaNative(long arrowSchemaMemoryAddress); - - private native void readAllNative(int batchSize, long streamMemoryAddress) throws IOException; - - private LanceFileReader() { - } - - /** - * Open a LanceFileReader from a file URI - * - * @param path the URI to the Lance file - * @param allocator the Arrow BufferAllocator to use for the reader - * @return a new LanceFileReader - */ - public static LanceFileReader open(String path, BufferAllocator allocator) throws IOException { - LanceFileReader reader = openNative(path); - reader.allocator = allocator; - reader.schema = reader.load_schema(); - return reader; - } - - /** - * Close the LanceFileReader - * - * This method must be called to release resources when the reader is no longer - * needed. - */ - @Override - public void close() throws Exception { - closeNative(nativeFileReaderHandle); - } - - /** - * Get the number of rows in the Lance file - * - * @return the number of rows in the Lance file - */ - public long numRows() throws IOException { - long numRows = numRowsNative(); - return numRows; - } - - /** - * Get the schema of the Lance file - * - * @return the schema of the Lance file - */ - public Schema schema() { - return schema; + static { + JniLoader.ensureLoaded(); + } + + private long nativeFileReaderHandle; + + private BufferAllocator allocator; + private Schema schema; + + private static native LanceFileReader openNative(String fileUri) throws IOException; + + private native void closeNative(long nativeLanceFileReaderHandle) throws IOException; + + private native long numRowsNative() throws IOException; + + private native void populateSchemaNative(long arrowSchemaMemoryAddress); + + private native void readAllNative(int batchSize, long streamMemoryAddress) throws IOException; + + private LanceFileReader() {} + + /** + * Open a LanceFileReader from a file URI + * + * @param path the URI to the Lance file + * @param allocator the Arrow BufferAllocator to use for the reader + * @return a new LanceFileReader + */ + public static LanceFileReader open(String path, BufferAllocator allocator) throws IOException { + LanceFileReader reader = openNative(path); + reader.allocator = allocator; + reader.schema = reader.load_schema(); + return reader; + } + + /** + * Close the LanceFileReader + * + *

This method must be called to release resources when the reader is no longer needed. + */ + @Override + public void close() throws Exception { + closeNative(nativeFileReaderHandle); + } + + /** + * Get the number of rows in the Lance file + * + * @return the number of rows in the Lance file + */ + public long numRows() throws IOException { + long numRows = numRowsNative(); + return numRows; + } + + /** + * Get the schema of the Lance file + * + * @return the schema of the Lance file + */ + public Schema schema() { + return schema; + } + + private Schema load_schema() throws IOException { + try (ArrowSchema ffiArrowSchema = ArrowSchema.allocateNew(allocator)) { + populateSchemaNative(ffiArrowSchema.memoryAddress()); + return Data.importSchema(allocator, ffiArrowSchema, null); } - - private Schema load_schema() throws IOException { - try (ArrowSchema ffiArrowSchema = ArrowSchema.allocateNew(allocator)) { - populateSchemaNative(ffiArrowSchema.memoryAddress()); - return Data.importSchema(allocator, ffiArrowSchema, null); - } - } - - /** - * Read all rows from the Lance file - * - * @param batchSize the maximum number of rows to read in a single batch - * @return an ArrowReader for the Lance file - */ - public ArrowReader readAll(int batchSize) throws IOException { - try (ArrowArrayStream ffiArrowArrayStream = ArrowArrayStream.allocateNew(allocator)) { - readAllNative(batchSize, ffiArrowArrayStream.memoryAddress()); - return Data.importArrayStream(allocator, ffiArrowArrayStream); - } + } + + /** + * Read all rows from the Lance file + * + * @param batchSize the maximum number of rows to read in a single batch + * @return an ArrowReader for the Lance file + */ + public ArrowReader readAll(int batchSize) throws IOException { + try (ArrowArrayStream ffiArrowArrayStream = ArrowArrayStream.allocateNew(allocator)) { + readAllNative(batchSize, ffiArrowArrayStream.memoryAddress()); + return Data.importArrayStream(allocator, ffiArrowArrayStream); } -} \ No newline at end of file + } +} diff --git a/java/core/src/main/java/com/lancedb/lance/file/LanceFileWriter.java b/java/core/src/main/java/com/lancedb/lance/file/LanceFileWriter.java index bb06cbb0715..a8d469aef21 100644 --- a/java/core/src/main/java/com/lancedb/lance/file/LanceFileWriter.java +++ b/java/core/src/main/java/com/lancedb/lance/file/LanceFileWriter.java @@ -1,6 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.lancedb.lance.file; -import java.io.IOException; +import com.lancedb.lance.JniLoader; import org.apache.arrow.c.ArrowArray; import org.apache.arrow.c.ArrowSchema; @@ -9,83 +22,70 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.dictionary.DictionaryProvider; -import com.lancedb.lance.JniLoader; +import java.io.IOException; public class LanceFileWriter implements AutoCloseable { - static { - JniLoader.ensureLoaded(); - } + static { + JniLoader.ensureLoaded(); + } - private long nativeFileWriterHandle; - private BufferAllocator allocator; - private DictionaryProvider dictionaryProvider; + private long nativeFileWriterHandle; + private BufferAllocator allocator; + private DictionaryProvider dictionaryProvider; - private static native LanceFileWriter openNative(String fileUri) throws IOException; + private static native LanceFileWriter openNative(String fileUri) throws IOException; - private native void closeNative(long nativeLanceFileReaderHandle) throws IOException; + private native void closeNative(long nativeLanceFileReaderHandle) throws IOException; - private native void writeNative( - long batchMemoryAddress, - long schemaMemoryAddress) throws IOException; + private native void writeNative(long batchMemoryAddress, long schemaMemoryAddress) + throws IOException; - private LanceFileWriter() { - } - - /** - * Open a LanceFileWriter to write to a given file URI - * - * @param path the URI of the file to write to - * @param allocator the BufferAllocator to use for the writer - * @param dictionaryProvider the DictionaryProvider to use for the writer - * @return a new LanceFileWriter - */ - public static LanceFileWriter open( - String path, - BufferAllocator allocator, - DictionaryProvider dictionaryProvider) - throws IOException { - LanceFileWriter writer = openNative(path); - writer.allocator = allocator; - writer.dictionaryProvider = dictionaryProvider; - return writer; - } + private LanceFileWriter() {} - /** - * Write a batch of data - * - * @param batch the batch of data to write - * @throws IOException if the batch cannot be written - */ - public void write(VectorSchemaRoot batch) throws IOException { - try (ArrowArray ffiArrowArray = ArrowArray.allocateNew(allocator); - ArrowSchema ffiArrowSchema = ArrowSchema.allocateNew(allocator)) { - Data.exportVectorSchemaRoot( - allocator, - batch, - dictionaryProvider, - ffiArrowArray, - ffiArrowSchema); - writeNative( - ffiArrowArray.memoryAddress(), - ffiArrowSchema.memoryAddress()); - } - } + /** + * Open a LanceFileWriter to write to a given file URI + * + * @param path the URI of the file to write to + * @param allocator the BufferAllocator to use for the writer + * @param dictionaryProvider the DictionaryProvider to use for the writer + * @return a new LanceFileWriter + */ + public static LanceFileWriter open( + String path, BufferAllocator allocator, DictionaryProvider dictionaryProvider) + throws IOException { + LanceFileWriter writer = openNative(path); + writer.allocator = allocator; + writer.dictionaryProvider = dictionaryProvider; + return writer; + } - /** - * Close the LanceFileWriter - * - * This method must be called to release resources when the writer is no longer - * needed. - * - * This method will also flush all remaining data and write the footer to the - * file. - * - * @throws Exception if the writer cannot be closed - */ - @Override - public void close() throws Exception { - closeNative(nativeFileWriterHandle); + /** + * Write a batch of data + * + * @param batch the batch of data to write + * @throws IOException if the batch cannot be written + */ + public void write(VectorSchemaRoot batch) throws IOException { + try (ArrowArray ffiArrowArray = ArrowArray.allocateNew(allocator); + ArrowSchema ffiArrowSchema = ArrowSchema.allocateNew(allocator)) { + Data.exportVectorSchemaRoot( + allocator, batch, dictionaryProvider, ffiArrowArray, ffiArrowSchema); + writeNative(ffiArrowArray.memoryAddress(), ffiArrowSchema.memoryAddress()); } + } + /** + * Close the LanceFileWriter + * + *

This method must be called to release resources when the writer is no longer needed. + * + *

This method will also flush all remaining data and write the footer to the file. + * + * @throws Exception if the writer cannot be closed + */ + @Override + public void close() throws Exception { + closeNative(nativeFileWriterHandle); + } } diff --git a/java/core/src/test/java/com/lancedb/lance/FileReaderWriterTest.java b/java/core/src/test/java/com/lancedb/lance/FileReaderWriterTest.java index f5e25554540..4b19565de82 100644 --- a/java/core/src/test/java/com/lancedb/lance/FileReaderWriterTest.java +++ b/java/core/src/test/java/com/lancedb/lance/FileReaderWriterTest.java @@ -1,13 +1,20 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.lancedb.lance; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.Arrays; +import com.lancedb.lance.file.LanceFileReader; +import com.lancedb.lance.file.LanceFileWriter; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -22,130 +29,138 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import com.lancedb.lance.file.LanceFileReader; -import com.lancedb.lance.file.LanceFileWriter; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class FileReaderWriterTest { - @TempDir - private static Path tempDir; - - private VectorSchemaRoot createBatch(BufferAllocator allocator) throws IOException { - Schema schema = new Schema( - Arrays.asList(Field.nullable("x", new ArrowType.Int(64, true)), - Field.nullable("y", new ArrowType.Utf8())), - null); - VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); - root.allocateNew(); - BigIntVector iVector = (BigIntVector) root.getVector("x"); - VarCharVector sVector = (VarCharVector) root.getVector("y"); - - for (int i = 0; i < 100; i++) { - iVector.setSafe(i, i); - sVector.setSafe(i, new Text("s-" + i)); - } - root.setRowCount(100); - - return root; + @TempDir private static Path tempDir; + + private VectorSchemaRoot createBatch(BufferAllocator allocator) throws IOException { + Schema schema = + new Schema( + Arrays.asList( + Field.nullable("x", new ArrowType.Int(64, true)), + Field.nullable("y", new ArrowType.Utf8())), + null); + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); + root.allocateNew(); + BigIntVector iVector = (BigIntVector) root.getVector("x"); + VarCharVector sVector = (VarCharVector) root.getVector("y"); + + for (int i = 0; i < 100; i++) { + iVector.setSafe(i, i); + sVector.setSafe(i, new Text("s-" + i)); } + root.setRowCount(100); + + return root; + } - void createSimpleFile(String filePath) throws Exception { - BufferAllocator allocator = new RootAllocator(); - try (LanceFileWriter writer = LanceFileWriter.open(filePath, allocator, null)) { - try (VectorSchemaRoot batch = createBatch(allocator)) { - writer.write(batch); - } - } + void createSimpleFile(String filePath) throws Exception { + BufferAllocator allocator = new RootAllocator(); + try (LanceFileWriter writer = LanceFileWriter.open(filePath, allocator, null)) { + try (VectorSchemaRoot batch = createBatch(allocator)) { + writer.write(batch); + } + } + } + + @Test + void testBasicRead() throws Exception { + BufferAllocator allocator = new RootAllocator(); + String filePath = tempDir.resolve("basic_read.lance").toString(); + createSimpleFile(filePath); + LanceFileReader reader = LanceFileReader.open(filePath, allocator); + + Schema expectedSchema = + new Schema( + Arrays.asList( + Field.nullable("x", new ArrowType.Int(64, true)), + Field.nullable("y", new ArrowType.Utf8())), + null); + + assertEquals(100, reader.numRows()); + assertEquals(expectedSchema, reader.schema()); + + try (ArrowReader batches = reader.readAll(100)) { + assertTrue(batches.loadNextBatch()); + VectorSchemaRoot batch = batches.getVectorSchemaRoot(); + assertEquals(100, batch.getRowCount()); + assertEquals(2, batch.getSchema().getFields().size()); + assertFalse(batches.loadNextBatch()); } - @Test - void testBasicRead() throws Exception { - BufferAllocator allocator = new RootAllocator(); - String filePath = tempDir.resolve("basic_read.lance").toString(); - createSimpleFile(filePath); - LanceFileReader reader = LanceFileReader.open(filePath, allocator); - - Schema expectedSchema = new Schema( - Arrays.asList(Field.nullable("x", new ArrowType.Int(64, true)), - Field.nullable("y", new ArrowType.Utf8())), - null); - - assertEquals(100, reader.numRows()); - assertEquals(expectedSchema, reader.schema()); - - try (ArrowReader batches = reader.readAll(100)) { - assertTrue(batches.loadNextBatch()); - VectorSchemaRoot batch = batches.getVectorSchemaRoot(); - assertEquals(100, batch.getRowCount()); - assertEquals(2, batch.getSchema().getFields().size()); - assertFalse(batches.loadNextBatch()); - } - - try (ArrowReader batches = reader.readAll(15)) { - for (int i = 0; i < 100; i += 15) { - int expected = Math.min(15, 100 - i); - assertTrue(batches.loadNextBatch()); - VectorSchemaRoot batch = batches.getVectorSchemaRoot(); - assertEquals(expected, batch.getRowCount()); - assertEquals(2, batch.getSchema().getFields().size()); - } - assertFalse(batches.loadNextBatch()); - } - - reader.close(); - try { - reader.numRows(); - fail("Expected LanceException to be thrown"); - } catch (IOException e) { - assertEquals("FileReader has already been closed", e.getMessage()); - } - - // Ok to call schema after close - assertEquals(expectedSchema, reader.schema()); - - // close should be idempotent - reader.close(); + try (ArrowReader batches = reader.readAll(15)) { + for (int i = 0; i < 100; i += 15) { + int expected = Math.min(15, 100 - i); + assertTrue(batches.loadNextBatch()); + VectorSchemaRoot batch = batches.getVectorSchemaRoot(); + assertEquals(expected, batch.getRowCount()); + assertEquals(2, batch.getSchema().getFields().size()); + } + assertFalse(batches.loadNextBatch()); } - @Test - void testBasicWrite() throws Exception { - String filePath = tempDir.resolve("basic_write.lance").toString(); - createSimpleFile(filePath); + reader.close(); + try { + reader.numRows(); + fail("Expected LanceException to be thrown"); + } catch (IOException e) { + assertEquals("FileReader has already been closed", e.getMessage()); } - @Test - void testWriteNoData() throws Exception { - String filePath = tempDir.resolve("no_data.lance").toString(); - BufferAllocator allocator = new RootAllocator(); + // Ok to call schema after close + assertEquals(expectedSchema, reader.schema()); - LanceFileWriter writer = LanceFileWriter.open(filePath, allocator, null); + // close should be idempotent + reader.close(); + } - try { - writer.close(); - fail("Expected IllegalArgumentException to be thrown"); - } catch (IllegalArgumentException e) { - assertTrue(e.getMessage().contains("no data provided")); - } - } + @Test + void testBasicWrite() throws Exception { + String filePath = tempDir.resolve("basic_write.lance").toString(); + createSimpleFile(filePath); + } - @Test - void testInvalidPath() { - BufferAllocator allocator = new RootAllocator(); - try { - LanceFileReader.open("/tmp/does_not_exist.lance", allocator); - fail("Expected LanceException to be thrown"); - } catch (IOException e) { - assertTrue(e.getMessage().contains("Not found: tmp/does_not_exist.lance")); - } - try { - LanceFileReader.open("", allocator); - fail("Expected LanceException to be thrown"); - } catch (RuntimeException e) { - // expected, would be nice if it was an IOException, but it's not because - // lance throws a wrapped error :( - } catch (IOException e) { - fail("Expected RuntimeException to be thrown"); - } - } + @Test + void testWriteNoData() throws Exception { + String filePath = tempDir.resolve("no_data.lance").toString(); + BufferAllocator allocator = new RootAllocator(); + + LanceFileWriter writer = LanceFileWriter.open(filePath, allocator, null); + try { + writer.close(); + fail("Expected IllegalArgumentException to be thrown"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("no data provided")); + } + } + + @Test + void testInvalidPath() { + BufferAllocator allocator = new RootAllocator(); + try { + LanceFileReader.open("/tmp/does_not_exist.lance", allocator); + fail("Expected LanceException to be thrown"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Not found: tmp/does_not_exist.lance")); + } + try { + LanceFileReader.open("", allocator); + fail("Expected LanceException to be thrown"); + } catch (RuntimeException e) { + // expected, would be nice if it was an IOException, but it's not because + // lance throws a wrapped error :( + } catch (IOException e) { + fail("Expected RuntimeException to be thrown"); + } + } } From e099a4f8aa09e89d36ee3b50839713a58a4e9597 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 24 Mar 2025 10:30:56 -0700 Subject: [PATCH 3/3] Address clippy suggestions --- java/core/lance-jni/src/file_reader.rs | 14 +++++++------- java/core/lance-jni/src/file_writer.rs | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/java/core/lance-jni/src/file_reader.rs b/java/core/lance-jni/src/file_reader.rs index 5aba4186cbf..35ca848d441 100644 --- a/java/core/lance-jni/src/file_reader.rs +++ b/java/core/lance-jni/src/file_reader.rs @@ -126,8 +126,8 @@ pub extern "system" fn Java_com_lancedb_lance_file_LanceFileReader_closeNative<' } #[no_mangle] -pub extern "system" fn Java_com_lancedb_lance_file_LanceFileReader_numRowsNative<'local>( - mut env: JNIEnv<'local>, +pub extern "system" fn Java_com_lancedb_lance_file_LanceFileReader_numRowsNative( + mut env: JNIEnv<'_>, reader: JObject, ) -> jlong { match inner_num_rows(&mut env, reader) { @@ -151,7 +151,7 @@ fn unwrap_reader(val: std::result::Result) -> Result(env: &mut JNIEnv<'local>, reader: JObject) -> Result { +fn inner_num_rows(env: &mut JNIEnv<'_>, reader: JObject) -> Result { let reader = unsafe { env.get_rust_field::<_, _, BlockingFileReader>(reader, NATIVE_READER) }; let reader = unwrap_reader(reader)?; Ok(reader.num_rows() as i64) @@ -176,8 +176,8 @@ fn inner_populate_schema(env: &mut JNIEnv, reader: JObject, schema_addr: jlong) } #[no_mangle] -pub extern "system" fn Java_com_lancedb_lance_file_LanceFileReader_readAllNative<'local>( - mut env: JNIEnv<'local>, +pub extern "system" fn Java_com_lancedb_lance_file_LanceFileReader_readAllNative( + mut env: JNIEnv<'_>, reader: JObject, batch_size: jint, stream_addr: jlong, @@ -187,8 +187,8 @@ pub extern "system" fn Java_com_lancedb_lance_file_LanceFileReader_readAllNative } } -fn inner_read_all<'local>( - env: &mut JNIEnv<'local>, +fn inner_read_all( + env: &mut JNIEnv<'_>, reader: JObject, batch_size: jint, stream_addr: jlong, diff --git a/java/core/lance-jni/src/file_writer.rs b/java/core/lance-jni/src/file_writer.rs index 60c7aa94b07..98f6218c91a 100644 --- a/java/core/lance-jni/src/file_writer.rs +++ b/java/core/lance-jni/src/file_writer.rs @@ -128,8 +128,8 @@ pub extern "system" fn Java_com_lancedb_lance_file_LanceFileWriter_writeNative<' JObject::null() } -fn inner_write_batch<'local>( - env: &mut JNIEnv<'local>, +fn inner_write_batch( + env: &mut JNIEnv<'_>, writer: JObject, batch_address: jlong, schema_address: jlong,