diff --git a/Cargo.lock b/Cargo.lock index db147d501cc..46fa11d5049 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4084,10 +4084,12 @@ dependencies = [ "lance-core", "lance-datafusion", "lance-encoding", + "lance-file", "lance-index", "lance-io", "lance-linalg", "lazy_static", + "object_store", "serde", "serde_json", "snafu", diff --git a/java/core/lance-jni/Cargo.toml b/java/core/lance-jni/Cargo.toml index 636fe2def67..a26eee044a3 100644 --- a/java/core/lance-jni/Cargo.toml +++ b/java/core/lance-jni/Cargo.toml @@ -20,9 +20,11 @@ lance-linalg = { workspace = true } lance-index = { workspace = true } lance-io.workspace = true lance-core.workspace = true +lance-file.workspace = true arrow = { workspace = true, features = ["ffi"] } arrow-schema.workspace = true datafusion.workspace = true +object_store.workspace = true tokio.workspace = true jni = "0.21.1" snafu.workspace = true diff --git a/java/core/lance-jni/src/error.rs b/java/core/lance-jni/src/error.rs index 36f47ffb566..05454c6111b 100644 --- a/java/core/lance-jni/src/error.rs +++ b/java/core/lance-jni/src/error.rs @@ -19,12 +19,13 @@ use jni::{errors::Error as JniError, JNIEnv}; use lance::error::Error as LanceError; use serde_json::Error as JsonError; -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub enum JavaExceptionClass { IllegalArgumentException, IOException, RuntimeException, UnsupportedOperationException, + AlreadyInException, } impl JavaExceptionClass { @@ -34,6 +35,8 @@ impl JavaExceptionClass { Self::IOException => "java/io/IOException", Self::RuntimeException => "java/lang/RuntimeException", Self::UnsupportedOperationException => "java/lang/UnsupportedOperationException", + // Included for display purposes. This is not a real exception. + Self::AlreadyInException => "AlreadyInException", } } } @@ -71,7 +74,18 @@ impl Error { Self::new(message, JavaExceptionClass::UnsupportedOperationException) } + pub fn in_exception() -> Self { + Self { + message: String::default(), + java_class: JavaExceptionClass::AlreadyInException, + } + } + pub fn throw(&self, env: &mut JNIEnv) { + if self.java_class == JavaExceptionClass::AlreadyInException { + // An exception is already in progress, so we don't need to throw another one. + return; + } if let Err(e) = env.throw_new(self.java_class.as_str(), &self.message) { eprintln!("Error when throwing Java exception: {:?}", e.to_string()); panic!("Error when throwing Java exception: {:?}", e); @@ -96,6 +110,7 @@ impl From for Error { | LanceError::InvalidInput { .. } => Self::input_error(err.to_string()), LanceError::IO { .. } => Self::io_error(err.to_string()), LanceError::NotSupported { .. } => Self::unsupported_error(err.to_string()), + LanceError::NotFound { .. } => Self::io_error(err.to_string()), _ => Self::runtime_error(err.to_string()), } } @@ -120,7 +135,12 @@ impl From for Error { impl From for Error { fn from(err: JniError) -> Self { - Self::runtime_error(err.to_string()) + match err { + // If we get this then it means that an exception was already in progress. We can't + // throw another one so we just return an error indicating that. + JniError::JavaException => Self::in_exception(), + _ => Self::runtime_error(err.to_string()), + } } } diff --git a/java/core/lance-jni/src/file_reader.rs b/java/core/lance-jni/src/file_reader.rs new file mode 100644 index 00000000000..35ca848d441 --- /dev/null +++ b/java/core/lance-jni/src/file_reader.rs @@ -0,0 +1,202 @@ +use std::sync::Arc; + +use crate::{ + error::{Error, Result}, + traits::IntoJava, + RT, +}; +use arrow::{array::RecordBatchReader, ffi::FFI_ArrowSchema, ffi_stream::FFI_ArrowArrayStream}; +use arrow_schema::SchemaRef; +use jni::{ + objects::{JObject, JString}, + sys::{jint, jlong}, + JNIEnv, +}; +use lance::io::ObjectStore; +use lance_core::cache::FileMetadataCache; +use lance_encoding::decoder::{DecoderPlugins, FilterExpression}; +use lance_file::v2::reader::{FileReader, FileReaderOptions}; +use lance_io::{ + scheduler::{ScanScheduler, SchedulerConfig}, + ReadBatchParams, +}; +use object_store::path::Path; + +pub const NATIVE_READER: &str = "nativeFileReaderHandle"; + +#[derive(Clone, Debug)] +pub struct BlockingFileReader { + pub(crate) inner: Arc, +} + +impl BlockingFileReader { + pub fn create(file_reader: Arc) -> Self { + Self { inner: file_reader } + } + + pub fn open_stream( + &self, + batch_size: u32, + ) -> Result> { + Ok(self.inner.read_stream_projected_blocking( + ReadBatchParams::RangeFull, + batch_size, + None, + FilterExpression::no_filter(), + )?) + } + + pub fn schema(&self) -> Result { + Ok(Arc::new(self.inner.schema().as_ref().into())) + } + + pub fn num_rows(&self) -> u64 { + self.inner.num_rows() + } +} + +impl IntoJava for BlockingFileReader { + fn into_java<'local>(self, env: &mut JNIEnv<'local>) -> Result> { + attach_native_reader(env, self) + } +} + +fn attach_native_reader<'local>( + env: &mut JNIEnv<'local>, + reader: BlockingFileReader, +) -> Result> { + let j_reader = create_java_reader_object(env)?; + unsafe { env.set_rust_field(&j_reader, NATIVE_READER, reader) }?; + Ok(j_reader) +} + +fn create_java_reader_object<'a>(env: &mut JNIEnv<'a>) -> Result> { + let res = env.new_object("com/lancedb/lance/file/LanceFileReader", "()V", &[])?; + Ok(res) +} + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_file_LanceFileReader_openNative<'local>( + mut env: JNIEnv<'local>, + _reader_class: JObject, + file_uri: JString, +) -> JObject<'local> { + ok_or_throw!(env, inner_open(&mut env, file_uri,)) +} + +fn inner_open<'local>(env: &mut JNIEnv<'local>, file_uri: JString) -> Result> { + let file_uri_str: String = env.get_string(&file_uri)?.into(); + + let reader = RT.block_on(async move { + let (obj_store, path) = ObjectStore::from_uri(&file_uri_str).await?; + let obj_store = Arc::new(obj_store); + let config = SchedulerConfig::max_bandwidth(&obj_store); + let scan_scheduler = ScanScheduler::new(obj_store, config); + + let file_scheduler = scan_scheduler.open_file(&Path::parse(&path)?).await?; + FileReader::try_open( + file_scheduler, + None, + Arc::::default(), + &FileMetadataCache::no_cache(), + FileReaderOptions::default(), + ) + .await + })?; + + let reader = BlockingFileReader::create(Arc::new(reader)); + + reader.into_java(env) +} + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_file_LanceFileReader_closeNative<'local>( + mut env: JNIEnv<'local>, + reader: JObject, +) -> JObject<'local> { + let maybe_err = + unsafe { env.take_rust_field::<_, _, BlockingFileReader>(reader, NATIVE_READER) }; + match maybe_err { + Ok(_) => {} + // We were already closed, do nothing + Err(jni::errors::Error::NullPtr(_)) => {} + Err(err) => Error::from(err).throw(&mut env), + } + JObject::null() +} + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_file_LanceFileReader_numRowsNative( + mut env: JNIEnv<'_>, + reader: JObject, +) -> jlong { + match inner_num_rows(&mut env, reader) { + Ok(num_rows) => num_rows, + Err(e) => { + e.throw(&mut env); + 0 + } + } +} + +// If the reader is closed, the native handle will be null and we will get a JniError::NullPtr +// error when we call get_rust_field. Translate that into a more meaningful error. +fn unwrap_reader(val: std::result::Result) -> Result { + match val { + Ok(val) => Ok(val), + Err(jni::errors::Error::NullPtr(_)) => Err(Error::io_error( + "FileReader has already been closed".to_string(), + )), + err => Ok(err?), + } +} + +fn inner_num_rows(env: &mut JNIEnv<'_>, reader: JObject) -> Result { + let reader = unsafe { env.get_rust_field::<_, _, BlockingFileReader>(reader, NATIVE_READER) }; + let reader = unwrap_reader(reader)?; + Ok(reader.num_rows() as i64) +} + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_file_LanceFileReader_populateSchemaNative( + mut env: JNIEnv, + reader: JObject, + schema_addr: jlong, +) { + ok_or_throw_without_return!(env, inner_populate_schema(&mut env, reader, schema_addr)); +} + +fn inner_populate_schema(env: &mut JNIEnv, reader: JObject, schema_addr: jlong) -> Result<()> { + let reader = unsafe { env.get_rust_field::<_, _, BlockingFileReader>(reader, NATIVE_READER) }; + let reader = unwrap_reader(reader)?; + let schema = reader.schema()?; + let ffi_schema = FFI_ArrowSchema::try_from(schema.as_ref())?; + unsafe { std::ptr::write_unaligned(schema_addr as *mut FFI_ArrowSchema, ffi_schema) } + Ok(()) +} + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_file_LanceFileReader_readAllNative( + mut env: JNIEnv<'_>, + reader: JObject, + batch_size: jint, + stream_addr: jlong, +) { + if let Err(e) = inner_read_all(&mut env, reader, batch_size, stream_addr) { + e.throw(&mut env); + } +} + +fn inner_read_all( + env: &mut JNIEnv<'_>, + reader: JObject, + batch_size: jint, + stream_addr: jlong, +) -> Result<()> { + let reader = unsafe { env.get_rust_field::<_, _, BlockingFileReader>(reader, NATIVE_READER) }; + let reader = unwrap_reader(reader)?; + let arrow_stream = reader.open_stream(batch_size as u32)?; + let ffi_stream = FFI_ArrowArrayStream::new(arrow_stream); + unsafe { std::ptr::write_unaligned(stream_addr as *mut FFI_ArrowArrayStream, ffi_stream) } + Ok(()) +} diff --git a/java/core/lance-jni/src/file_writer.rs b/java/core/lance-jni/src/file_writer.rs new file mode 100644 index 00000000000..98f6218c91a --- /dev/null +++ b/java/core/lance-jni/src/file_writer.rs @@ -0,0 +1,152 @@ +use std::sync::{Arc, Mutex}; + +use crate::{ + error::{Error, Result}, + traits::IntoJava, + RT, +}; +use arrow::{ + array::{RecordBatch, StructArray}, + ffi::{from_ffi_and_data_type, FFI_ArrowArray, FFI_ArrowSchema}, +}; +use arrow_schema::DataType; +use jni::{ + objects::{JObject, JString}, + sys::jlong, + JNIEnv, +}; +use lance::io::ObjectStore; +use lance_file::{ + v2::writer::{FileWriter, FileWriterOptions}, + version::LanceFileVersion, +}; + +pub const NATIVE_WRITER: &str = "nativeFileWriterHandle"; + +#[derive(Clone)] +pub struct BlockingFileWriter { + pub(crate) inner: Arc>, +} + +impl BlockingFileWriter { + pub fn create(file_writer: FileWriter) -> Self { + Self { + inner: Arc::new(Mutex::new(file_writer)), + } + } +} + +impl IntoJava for BlockingFileWriter { + fn into_java<'local>(self, env: &mut JNIEnv<'local>) -> Result> { + attach_native_writer(env, self) + } +} + +fn attach_native_writer<'local>( + env: &mut JNIEnv<'local>, + writer: BlockingFileWriter, +) -> Result> { + let j_writer = create_java_writer_object(env)?; + unsafe { env.set_rust_field(&j_writer, NATIVE_WRITER, writer) }?; + Ok(j_writer) +} + +fn create_java_writer_object<'a>(env: &mut JNIEnv<'a>) -> Result> { + let res = env.new_object("com/lancedb/lance/file/LanceFileWriter", "()V", &[])?; + Ok(res) +} + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_file_LanceFileWriter_openNative<'local>( + mut env: JNIEnv<'local>, + _writer_class: JObject, + file_uri: JString, +) -> JObject<'local> { + ok_or_throw!(env, inner_open(&mut env, file_uri,)) +} + +fn inner_open<'local>(env: &mut JNIEnv<'local>, file_uri: JString) -> Result> { + let file_uri_str: String = env.get_string(&file_uri)?.into(); + + let writer = RT.block_on(async move { + let (obj_store, path) = ObjectStore::from_uri(&file_uri_str).await?; + let obj_store = Arc::new(obj_store); + let obj_writer = obj_store.create(&path).await?; + + Result::Ok(FileWriter::new_lazy( + obj_writer, + FileWriterOptions { + format_version: Some(LanceFileVersion::V2_1), + ..Default::default() + }, + )) + })?; + + let writer = BlockingFileWriter::create(writer); + + writer.into_java(env) +} + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_file_LanceFileWriter_closeNative<'local>( + mut env: JNIEnv<'local>, + writer: JObject, +) -> JObject<'local> { + let maybe_err = + unsafe { env.take_rust_field::<_, _, BlockingFileWriter>(writer, NATIVE_WRITER) }; + let writer = match maybe_err { + Ok(writer) => Some(writer), + // We were already closed, do nothing + Err(jni::errors::Error::NullPtr(_)) => None, + Err(err) => { + Error::from(err).throw(&mut env); + None + } + }; + if let Some(writer) = writer { + match RT.block_on(writer.inner.lock().unwrap().finish()) { + Ok(_) => {} + Err(e) => { + Error::from(e).throw(&mut env); + } + } + } + JObject::null() +} + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_file_LanceFileWriter_writeNative<'local>( + mut env: JNIEnv<'local>, + writer: JObject, + batch_address: jlong, + schema_address: jlong, +) -> JObject<'local> { + if let Err(e) = inner_write_batch(&mut env, writer, batch_address, schema_address) { + e.throw(&mut env); + return JObject::null(); + } + JObject::null() +} + +fn inner_write_batch( + env: &mut JNIEnv<'_>, + writer: JObject, + batch_address: jlong, + schema_address: jlong, +) -> Result<()> { + let c_array_ptr = batch_address as *mut FFI_ArrowArray; + let c_schema_ptr = schema_address as *mut FFI_ArrowSchema; + + let c_array = unsafe { FFI_ArrowArray::from_raw(c_array_ptr) }; + let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) }; + + let data_type = DataType::try_from(&c_schema)?; + let array_data = unsafe { from_ffi_and_data_type(c_array, data_type) }?; + let record_batch = RecordBatch::from(StructArray::from(array_data)); + + let writer = unsafe { env.get_rust_field::<_, _, BlockingFileWriter>(writer, NATIVE_WRITER) }?; + + let mut writer = writer.inner.lock().unwrap(); + RT.block_on(writer.write_batch(&record_batch))?; + Ok(()) +} diff --git a/java/core/lance-jni/src/lib.rs b/java/core/lance-jni/src/lib.rs index 84b7ba64972..437c3d4c00d 100644 --- a/java/core/lance-jni/src/lib.rs +++ b/java/core/lance-jni/src/lib.rs @@ -54,6 +54,8 @@ mod blocking_dataset; mod blocking_scanner; pub mod error; pub mod ffi; +mod file_reader; +mod file_writer; mod fragment; pub mod traits; pub mod utils; diff --git a/java/core/src/main/java/com/lancedb/lance/file/LanceFileReader.java b/java/core/src/main/java/com/lancedb/lance/file/LanceFileReader.java new file mode 100644 index 00000000000..ca4d56e884b --- /dev/null +++ b/java/core/src/main/java/com/lancedb/lance/file/LanceFileReader.java @@ -0,0 +1,112 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lancedb.lance.file; + +import com.lancedb.lance.JniLoader; + +import org.apache.arrow.c.ArrowArrayStream; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.pojo.Schema; + +import java.io.IOException; + +public class LanceFileReader implements AutoCloseable { + + static { + JniLoader.ensureLoaded(); + } + + private long nativeFileReaderHandle; + + private BufferAllocator allocator; + private Schema schema; + + private static native LanceFileReader openNative(String fileUri) throws IOException; + + private native void closeNative(long nativeLanceFileReaderHandle) throws IOException; + + private native long numRowsNative() throws IOException; + + private native void populateSchemaNative(long arrowSchemaMemoryAddress); + + private native void readAllNative(int batchSize, long streamMemoryAddress) throws IOException; + + private LanceFileReader() {} + + /** + * Open a LanceFileReader from a file URI + * + * @param path the URI to the Lance file + * @param allocator the Arrow BufferAllocator to use for the reader + * @return a new LanceFileReader + */ + public static LanceFileReader open(String path, BufferAllocator allocator) throws IOException { + LanceFileReader reader = openNative(path); + reader.allocator = allocator; + reader.schema = reader.load_schema(); + return reader; + } + + /** + * Close the LanceFileReader + * + *

This method must be called to release resources when the reader is no longer needed. + */ + @Override + public void close() throws Exception { + closeNative(nativeFileReaderHandle); + } + + /** + * Get the number of rows in the Lance file + * + * @return the number of rows in the Lance file + */ + public long numRows() throws IOException { + long numRows = numRowsNative(); + return numRows; + } + + /** + * Get the schema of the Lance file + * + * @return the schema of the Lance file + */ + public Schema schema() { + return schema; + } + + private Schema load_schema() throws IOException { + try (ArrowSchema ffiArrowSchema = ArrowSchema.allocateNew(allocator)) { + populateSchemaNative(ffiArrowSchema.memoryAddress()); + return Data.importSchema(allocator, ffiArrowSchema, null); + } + } + + /** + * Read all rows from the Lance file + * + * @param batchSize the maximum number of rows to read in a single batch + * @return an ArrowReader for the Lance file + */ + public ArrowReader readAll(int batchSize) throws IOException { + try (ArrowArrayStream ffiArrowArrayStream = ArrowArrayStream.allocateNew(allocator)) { + readAllNative(batchSize, ffiArrowArrayStream.memoryAddress()); + return Data.importArrayStream(allocator, ffiArrowArrayStream); + } + } +} diff --git a/java/core/src/main/java/com/lancedb/lance/file/LanceFileWriter.java b/java/core/src/main/java/com/lancedb/lance/file/LanceFileWriter.java new file mode 100644 index 00000000000..a8d469aef21 --- /dev/null +++ b/java/core/src/main/java/com/lancedb/lance/file/LanceFileWriter.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lancedb.lance.file; + +import com.lancedb.lance.JniLoader; + +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.DictionaryProvider; + +import java.io.IOException; + +public class LanceFileWriter implements AutoCloseable { + + static { + JniLoader.ensureLoaded(); + } + + private long nativeFileWriterHandle; + private BufferAllocator allocator; + private DictionaryProvider dictionaryProvider; + + private static native LanceFileWriter openNative(String fileUri) throws IOException; + + private native void closeNative(long nativeLanceFileReaderHandle) throws IOException; + + private native void writeNative(long batchMemoryAddress, long schemaMemoryAddress) + throws IOException; + + private LanceFileWriter() {} + + /** + * Open a LanceFileWriter to write to a given file URI + * + * @param path the URI of the file to write to + * @param allocator the BufferAllocator to use for the writer + * @param dictionaryProvider the DictionaryProvider to use for the writer + * @return a new LanceFileWriter + */ + public static LanceFileWriter open( + String path, BufferAllocator allocator, DictionaryProvider dictionaryProvider) + throws IOException { + LanceFileWriter writer = openNative(path); + writer.allocator = allocator; + writer.dictionaryProvider = dictionaryProvider; + return writer; + } + + /** + * Write a batch of data + * + * @param batch the batch of data to write + * @throws IOException if the batch cannot be written + */ + public void write(VectorSchemaRoot batch) throws IOException { + try (ArrowArray ffiArrowArray = ArrowArray.allocateNew(allocator); + ArrowSchema ffiArrowSchema = ArrowSchema.allocateNew(allocator)) { + Data.exportVectorSchemaRoot( + allocator, batch, dictionaryProvider, ffiArrowArray, ffiArrowSchema); + writeNative(ffiArrowArray.memoryAddress(), ffiArrowSchema.memoryAddress()); + } + } + + /** + * Close the LanceFileWriter + * + *

This method must be called to release resources when the writer is no longer needed. + * + *

This method will also flush all remaining data and write the footer to the file. + * + * @throws Exception if the writer cannot be closed + */ + @Override + public void close() throws Exception { + closeNative(nativeFileWriterHandle); + } +} diff --git a/java/core/src/test/java/com/lancedb/lance/FileReaderWriterTest.java b/java/core/src/test/java/com/lancedb/lance/FileReaderWriterTest.java new file mode 100644 index 00000000000..4b19565de82 --- /dev/null +++ b/java/core/src/test/java/com/lancedb/lance/FileReaderWriterTest.java @@ -0,0 +1,166 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lancedb.lance; + +import com.lancedb.lance.file.LanceFileReader; +import com.lancedb.lance.file.LanceFileWriter; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Text; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class FileReaderWriterTest { + + @TempDir private static Path tempDir; + + private VectorSchemaRoot createBatch(BufferAllocator allocator) throws IOException { + Schema schema = + new Schema( + Arrays.asList( + Field.nullable("x", new ArrowType.Int(64, true)), + Field.nullable("y", new ArrowType.Utf8())), + null); + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); + root.allocateNew(); + BigIntVector iVector = (BigIntVector) root.getVector("x"); + VarCharVector sVector = (VarCharVector) root.getVector("y"); + + for (int i = 0; i < 100; i++) { + iVector.setSafe(i, i); + sVector.setSafe(i, new Text("s-" + i)); + } + root.setRowCount(100); + + return root; + } + + void createSimpleFile(String filePath) throws Exception { + BufferAllocator allocator = new RootAllocator(); + try (LanceFileWriter writer = LanceFileWriter.open(filePath, allocator, null)) { + try (VectorSchemaRoot batch = createBatch(allocator)) { + writer.write(batch); + } + } + } + + @Test + void testBasicRead() throws Exception { + BufferAllocator allocator = new RootAllocator(); + String filePath = tempDir.resolve("basic_read.lance").toString(); + createSimpleFile(filePath); + LanceFileReader reader = LanceFileReader.open(filePath, allocator); + + Schema expectedSchema = + new Schema( + Arrays.asList( + Field.nullable("x", new ArrowType.Int(64, true)), + Field.nullable("y", new ArrowType.Utf8())), + null); + + assertEquals(100, reader.numRows()); + assertEquals(expectedSchema, reader.schema()); + + try (ArrowReader batches = reader.readAll(100)) { + assertTrue(batches.loadNextBatch()); + VectorSchemaRoot batch = batches.getVectorSchemaRoot(); + assertEquals(100, batch.getRowCount()); + assertEquals(2, batch.getSchema().getFields().size()); + assertFalse(batches.loadNextBatch()); + } + + try (ArrowReader batches = reader.readAll(15)) { + for (int i = 0; i < 100; i += 15) { + int expected = Math.min(15, 100 - i); + assertTrue(batches.loadNextBatch()); + VectorSchemaRoot batch = batches.getVectorSchemaRoot(); + assertEquals(expected, batch.getRowCount()); + assertEquals(2, batch.getSchema().getFields().size()); + } + assertFalse(batches.loadNextBatch()); + } + + reader.close(); + try { + reader.numRows(); + fail("Expected LanceException to be thrown"); + } catch (IOException e) { + assertEquals("FileReader has already been closed", e.getMessage()); + } + + // Ok to call schema after close + assertEquals(expectedSchema, reader.schema()); + + // close should be idempotent + reader.close(); + } + + @Test + void testBasicWrite() throws Exception { + String filePath = tempDir.resolve("basic_write.lance").toString(); + createSimpleFile(filePath); + } + + @Test + void testWriteNoData() throws Exception { + String filePath = tempDir.resolve("no_data.lance").toString(); + BufferAllocator allocator = new RootAllocator(); + + LanceFileWriter writer = LanceFileWriter.open(filePath, allocator, null); + + try { + writer.close(); + fail("Expected IllegalArgumentException to be thrown"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("no data provided")); + } + } + + @Test + void testInvalidPath() { + BufferAllocator allocator = new RootAllocator(); + try { + LanceFileReader.open("/tmp/does_not_exist.lance", allocator); + fail("Expected LanceException to be thrown"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Not found: tmp/does_not_exist.lance")); + } + try { + LanceFileReader.open("", allocator); + fail("Expected LanceException to be thrown"); + } catch (RuntimeException e) { + // expected, would be nice if it was an IOException, but it's not because + // lance throws a wrapped error :( + } catch (IOException e) { + fail("Expected RuntimeException to be thrown"); + } + } +} diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index d0f19b036be..fdbffdadd60 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -1923,7 +1923,7 @@ pub fn create_decode_iterator( should_validate: bool, is_structural: bool, messages: VecDeque>, -) -> Box { +) -> Box { let arrow_schema = Arc::new(ArrowSchema::from(schema)); let root_fields = arrow_schema.fields.clone(); if is_structural { @@ -2069,7 +2069,7 @@ pub fn schedule_and_decode_blocking( column_indices: Vec, target_schema: Arc, config: SchedulerDecoderConfig, -) -> Result> { +) -> Result> { if requested_rows.num_rows() == 0 { let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref())); return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema))); diff --git a/rust/lance-file/src/v2/reader.rs b/rust/lance-file/src/v2/reader.rs index 7f934062e1f..39d24013dc2 100644 --- a/rust/lance-file/src/v2/reader.rs +++ b/rust/lance-file/src/v2/reader.rs @@ -1055,7 +1055,7 @@ impl FileReader { batch_size: u32, projection: ReaderProjection, filter: FilterExpression, - ) -> Result> { + ) -> Result> { let column_infos = self.collect_columns_from_projection(&projection)?; debug!( "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}", @@ -1086,6 +1086,45 @@ impl FileReader { ) } + fn read_range_blocking( + &self, + range: Range, + batch_size: u32, + projection: ReaderProjection, + filter: FilterExpression, + ) -> Result> { + let column_infos = self.collect_columns_from_projection(&projection)?; + let num_rows = self.num_rows; + + debug!( + "Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns", + range, + batch_size, + num_rows, + column_infos.len(), + projection.schema.fields.len(), + ); + + let config = SchedulerDecoderConfig { + batch_size, + cache: self.cache.clone(), + decoder_plugins: self.decoder_plugins.clone(), + io: self.scheduler.clone(), + should_validate: self.options.validate_on_decode, + }; + + let requested_rows = RequestedRows::Ranges(vec![range]); + + schedule_and_decode_blocking( + column_infos, + requested_rows, + filter, + projection.column_indices, + projection.schema, + config, + ) + } + /// Read data from the file as an iterator of record batches /// /// This is a blocking variant of [`Self::read_stream_projected`] that runs entirely in the @@ -1103,7 +1142,7 @@ impl FileReader { batch_size: u32, projection: Option, filter: FilterExpression, - ) -> Result> { + ) -> Result> { let projection = projection.unwrap_or_else(|| self.base_projection.clone()); Self::validate_projection(&projection, &self.metadata)?; let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| { @@ -1137,7 +1176,31 @@ impl FileReader { let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect(); self.take_rows_blocking(indices, batch_size, projection, filter) } - _ => todo!(), + ReadBatchParams::Range(range) => { + verify_bound(¶ms, range.end as u64, false)?; + self.read_range_blocking( + range.start as u64..range.end as u64, + batch_size, + projection, + filter, + ) + } + ReadBatchParams::RangeFrom(range) => { + verify_bound(¶ms, range.start as u64, true)?; + self.read_range_blocking( + range.start as u64..self.num_rows, + batch_size, + projection, + filter, + ) + } + ReadBatchParams::RangeTo(range) => { + verify_bound(¶ms, range.end as u64, false)?; + self.read_range_blocking(0..range.end as u64, batch_size, projection, filter) + } + ReadBatchParams::RangeFull => { + self.read_range_blocking(0..self.num_rows, batch_size, projection, filter) + } } }