diff --git a/java/lance-jni/src/file_reader.rs b/java/lance-jni/src/file_reader.rs index ccaac121579..85da803295f 100644 --- a/java/lance-jni/src/file_reader.rs +++ b/java/lance-jni/src/file_reader.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::collections::BTreeMap; use std::ops::Range; use std::sync::{Arc, Mutex}; @@ -20,8 +21,9 @@ use jni::{ }; use lance::io::ObjectStore; use lance_core::cache::LanceCache; -use lance_core::datatypes::Schema; +use lance_core::datatypes::{BlobHandling, OnMissing, Projection, Schema}; use lance_encoding::decoder::{DecoderPlugins, FilterExpression}; +use lance_encoding::version::LanceFileVersion; use lance_file::reader::{FileReader, FileReaderOptions, ReaderProjection}; use lance_io::object_store::{ObjectStoreParams, ObjectStoreRegistry}; use lance_io::{ @@ -218,10 +220,10 @@ pub extern "system" fn Java_org_lance_file_LanceFileReader_readAllNative( projected_names: JObject, selection_ranges: JObject, stream_addr: jlong, + blob_read_mode: jint, ) { let result = (|| -> Result<()> { let mut read_parameter = ReadBatchParams::default(); - let mut reader_projection: Option = None; // We get reader here not from env.get_rust_field, because we need reader: MutexGuard has no relationship with the env lifecycle. // If we get reader from env.get_rust_field, we can't use env (can't borrow again) until we drop the reader. #[allow(unused_variables)] @@ -239,17 +241,44 @@ pub extern "system" fn Java_org_lance_file_LanceFileReader_readAllNative( }; let file_version = reader.inner.metadata().version(); + let base_schema = Schema::try_from(reader.schema()?.as_ref())?; - if !projected_names.is_null() { - let schema = Schema::try_from(reader.schema()?.as_ref())?; - let column_names: Vec = env.get_strings(&projected_names)?; - let names: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect(); - reader_projection = Some(ReaderProjection::from_column_names( + let blob_handling = if blob_read_mode == 1 { + BlobHandling::BlobsDescriptions + } else { + BlobHandling::AllBinary + }; + + let reader_projection = { + let mut projection = + Projection::empty(Arc::new(base_schema.clone())).with_blob_handling(blob_handling); + + if !projected_names.is_null() { + let column_names: Vec = env.get_strings(&projected_names)?; + projection = projection.union_columns(&column_names, OnMissing::Error)?; + } else { + projection = projection.union_predicate(|_| true); + } + + let transformed_schema = projection.to_bare_schema(); + + let field_id_to_column_index = base_schema + .fields_pre_order() + .filter(|field| { + file_version < LanceFileVersion::V2_1 + || field.is_leaf() + || field.is_packed_struct() + }) + .enumerate() + .map(|(idx, field)| (field.id as u32, idx as u32)) + .collect::>(); + + Some(ReaderProjection::from_field_ids( file_version, - &schema, - names.as_slice(), - )?); - } + &transformed_schema, + &field_id_to_column_index, + )?) + }; if !selection_ranges.is_null() { let mut ranges: Vec> = Vec::new(); diff --git a/java/src/main/java/org/lance/file/BlobReadMode.java b/java/src/main/java/org/lance/file/BlobReadMode.java new file mode 100644 index 00000000000..d7be0381fbf --- /dev/null +++ b/java/src/main/java/org/lance/file/BlobReadMode.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.file; + +/** + * Controls how blob-encoded columns are returned when reading a Lance file. + * + *

Blob columns can be read in two modes: + * + *

    + *
  • {@link #CONTENT} — materializes the full binary content (default) + *
  • {@link #DESCRIPTOR} — returns a struct with {@code position} and {@code size} fields + *
+ */ +public enum BlobReadMode { + /** Return blob columns as materialized binary content (default). */ + CONTENT(0), + /** Return blob columns as descriptors (struct with position and size). */ + DESCRIPTOR(1); + + private final int value; + + BlobReadMode(int value) { + this.value = value; + } + + public int getValue() { + return value; + } +} diff --git a/java/src/main/java/org/lance/file/FileReadOptions.java b/java/src/main/java/org/lance/file/FileReadOptions.java new file mode 100644 index 00000000000..3d813c78eec --- /dev/null +++ b/java/src/main/java/org/lance/file/FileReadOptions.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.file; + +/** + * Options for reading a Lance file. + * + *

Use {@link #builder()} to create an instance. New options can be added here in the future + * without breaking existing callers. + */ +public class FileReadOptions { + private final BlobReadMode blobReadMode; + + private FileReadOptions(Builder builder) { + this.blobReadMode = builder.blobReadMode; + } + + /** Returns the blob read mode. Defaults to {@link BlobReadMode#CONTENT}. */ + public BlobReadMode getBlobReadMode() { + return blobReadMode; + } + + /** Creates a new builder with default options. */ + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private BlobReadMode blobReadMode = BlobReadMode.CONTENT; + + private Builder() {} + + /** + * Sets how blob-encoded columns are returned. + * + * @param blobReadMode {@link BlobReadMode#CONTENT} to materialize binary content, or {@link + * BlobReadMode#DESCRIPTOR} to return position/size descriptors + */ + public Builder blobReadMode(BlobReadMode blobReadMode) { + this.blobReadMode = blobReadMode; + return this; + } + + public FileReadOptions build() { + return new FileReadOptions(this); + } + } +} diff --git a/java/src/main/java/org/lance/file/LanceFileReader.java b/java/src/main/java/org/lance/file/LanceFileReader.java index 9777e529f48..e3962eb539a 100644 --- a/java/src/main/java/org/lance/file/LanceFileReader.java +++ b/java/src/main/java/org/lance/file/LanceFileReader.java @@ -54,7 +54,8 @@ private native void readAllNative( int batchSize, @Nullable List projectedNames, @Nullable List ranges, - long streamMemoryAddress) + long streamMemoryAddress, + int blobReadMode) throws IOException; private LanceFileReader() {} @@ -124,18 +125,45 @@ private Schema load_schema() throws IOException { } /** - * Read all rows from the Lance file + * Read all rows from the Lance file. + * + *

Blob-encoded columns are returned as materialized binary content. Use {@link #readAll(List, + * List, int, FileReadOptions)} to control blob output format. * - * @param batchSize the maximum number of rows to read in a single batch * @param projectedNames optional list of column names to project; if null, all columns are read * @param ranges optional array of ranges to read; if null, all rows are read. + * @param batchSize the maximum number of rows to read in a single batch * @return an ArrowReader for the Lance file */ public ArrowReader readAll( @Nullable List projectedNames, @Nullable List ranges, int batchSize) throws IOException { + return readAll(projectedNames, ranges, batchSize, FileReadOptions.builder().build()); + } + + /** + * Read all rows from the Lance file with additional read options. + * + * @param projectedNames optional list of column names to project; if null, all columns are read + * @param ranges optional array of ranges to read; if null, all rows are read. + * @param batchSize the maximum number of rows to read in a single batch + * @param options file read options controlling output format (e.g. blob handling) + * @return an ArrowReader for the Lance file + * @see FileReadOptions + */ + public ArrowReader readAll( + @Nullable List projectedNames, + @Nullable List ranges, + int batchSize, + FileReadOptions options) + throws IOException { try (ArrowArrayStream ffiArrowArrayStream = ArrowArrayStream.allocateNew(allocator)) { - readAllNative(batchSize, projectedNames, ranges, ffiArrowArrayStream.memoryAddress()); + readAllNative( + batchSize, + projectedNames, + ranges, + ffiArrowArrayStream.memoryAddress(), + options.getBlobReadMode().getValue()); return Data.importArrayStream(allocator, ffiArrowArrayStream); } } diff --git a/java/src/test/java/org/lance/FileReaderWriterTest.java b/java/src/test/java/org/lance/FileReaderWriterTest.java index c645acdcaa2..a849a87c576 100644 --- a/java/src/test/java/org/lance/FileReaderWriterTest.java +++ b/java/src/test/java/org/lance/FileReaderWriterTest.java @@ -13,6 +13,8 @@ */ package org.lance; +import org.lance.file.BlobReadMode; +import org.lance.file.FileReadOptions; import org.lance.file.LanceFileReader; import org.lance.file.LanceFileWriter; import org.lance.util.Range; @@ -20,11 +22,14 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.LargeVarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.Text; import org.junit.jupiter.api.Assertions; @@ -304,4 +309,94 @@ void testWriteNullSchemaMetadata(@TempDir Path tempDir) throws Exception { } } } + + private void writeBlobFile(String filePath, BufferAllocator allocator) throws Exception { + Map blobMetadata = new HashMap<>(); + blobMetadata.put("lance-encoding:blob", "true"); + + Field blobField = + new Field( + "blob_data", + new FieldType(true, ArrowType.LargeBinary.INSTANCE, null, blobMetadata), + Collections.emptyList()); + + Schema schema = new Schema(Collections.singletonList(blobField), null); + + try (LanceFileWriter writer = + LanceFileWriter.open(filePath, allocator, null, Collections.emptyMap())) { + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + root.allocateNew(); + + LargeVarBinaryVector blobVector = (LargeVarBinaryVector) root.getVector("blob_data"); + + for (int i = 0; i < 5; i++) { + byte[] data = new byte[100 * (i + 1)]; + Arrays.fill(data, (byte) i); + blobVector.setSafe(i, data); + } + + root.setRowCount(5); + writer.write(root); + } + } + } + + @Test + void testBlobDescriptorMode(@TempDir Path tempDir) throws Exception { + String filePath = tempDir.resolve("test_blob.lance").toString(); + BufferAllocator allocator = new RootAllocator(); + writeBlobFile(filePath, allocator); + + try (LanceFileReader reader = LanceFileReader.open(filePath, allocator)) { + assertTrue( + reader.schema().getFields().get(0).getMetadata().containsKey("lance-encoding:blob"), + "Blob metadata should be preserved in schema"); + + FileReadOptions options = + FileReadOptions.builder().blobReadMode(BlobReadMode.DESCRIPTOR).build(); + try (ArrowReader batch = + reader.readAll(Collections.singletonList("blob_data"), null, 10, options)) { + assertTrue(batch.loadNextBatch()); + VectorSchemaRoot root = batch.getVectorSchemaRoot(); + assertEquals(5, root.getRowCount()); + + FieldVector column = root.getVector("blob_data"); + assertTrue( + column.getField().getType() instanceof ArrowType.Struct, + "DESCRIPTOR mode should return Struct but got " + column.getField().getType()); + assertEquals( + 2, + column.getField().getChildren().size(), + "Struct should have 2 fields (position and size)"); + } + } + allocator.close(); + } + + @Test + void testBlobContentMode(@TempDir Path tempDir) throws Exception { + String filePath = tempDir.resolve("test_blob.lance").toString(); + BufferAllocator allocator = new RootAllocator(); + writeBlobFile(filePath, allocator); + + try (LanceFileReader reader = LanceFileReader.open(filePath, allocator)) { + // Default readAll (no BlobReadMode) should return materialized binary + try (ArrowReader batch = reader.readAll(Collections.singletonList("blob_data"), null, 10)) { + assertTrue(batch.loadNextBatch()); + VectorSchemaRoot root = batch.getVectorSchemaRoot(); + assertEquals(5, root.getRowCount()); + + FieldVector column = root.getVector("blob_data"); + assertTrue( + column.getField().getType() instanceof ArrowType.LargeBinary, + "CONTENT mode should return LargeBinary but got " + column.getField().getType()); + + LargeVarBinaryVector binaryVector = (LargeVarBinaryVector) column; + for (int i = 0; i < 5; i++) { + assertEquals(100 * (i + 1), binaryVector.get(i).length); + } + } + } + allocator.close(); + } }