Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions java/lance-jni/src/file_reader.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::collections::BTreeMap;
use std::ops::Range;
use std::sync::{Arc, Mutex};

Expand All @@ -20,8 +21,9 @@ use jni::{
};
use lance::io::ObjectStore;
use lance_core::cache::LanceCache;
use lance_core::datatypes::Schema;
use lance_core::datatypes::{BlobHandling, OnMissing, Projection, Schema};
use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
use lance_encoding::version::LanceFileVersion;
use lance_file::reader::{FileReader, FileReaderOptions, ReaderProjection};
use lance_io::object_store::{ObjectStoreParams, ObjectStoreRegistry};
use lance_io::{
Expand Down Expand Up @@ -218,10 +220,10 @@ pub extern "system" fn Java_org_lance_file_LanceFileReader_readAllNative(
projected_names: JObject,
selection_ranges: JObject,
stream_addr: jlong,
blob_read_mode: jint,
) {
let result = (|| -> Result<()> {
let mut read_parameter = ReadBatchParams::default();
let mut reader_projection: Option<ReaderProjection> = None;
// We get reader here not from env.get_rust_field, because we need reader: MutexGuard<BlockingFileReader> has no relationship with the env lifecycle.
// If we get reader from env.get_rust_field, we can't use env (can't borrow again) until we drop the reader.
#[allow(unused_variables)]
Expand All @@ -239,17 +241,44 @@ pub extern "system" fn Java_org_lance_file_LanceFileReader_readAllNative(
};

let file_version = reader.inner.metadata().version();
let base_schema = Schema::try_from(reader.schema()?.as_ref())?;

if !projected_names.is_null() {
let schema = Schema::try_from(reader.schema()?.as_ref())?;
let column_names: Vec<String> = env.get_strings(&projected_names)?;
let names: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
reader_projection = Some(ReaderProjection::from_column_names(
let blob_handling = if blob_read_mode == 1 {
BlobHandling::BlobsDescriptions
} else {
BlobHandling::AllBinary
};

let reader_projection = {
let mut projection =
Projection::empty(Arc::new(base_schema.clone())).with_blob_handling(blob_handling);

if !projected_names.is_null() {
let column_names: Vec<String> = env.get_strings(&projected_names)?;
projection = projection.union_columns(&column_names, OnMissing::Error)?;
} else {
projection = projection.union_predicate(|_| true);
}

let transformed_schema = projection.to_bare_schema();

let field_id_to_column_index = base_schema
.fields_pre_order()
.filter(|field| {
file_version < LanceFileVersion::V2_1
|| field.is_leaf()
|| field.is_packed_struct()
})
.enumerate()
.map(|(idx, field)| (field.id as u32, idx as u32))
.collect::<BTreeMap<_, _>>();

Some(ReaderProjection::from_field_ids(
file_version,
&schema,
names.as_slice(),
)?);
}
&transformed_schema,
&field_id_to_column_index,
)?)
};

if !selection_ranges.is_null() {
let mut ranges: Vec<Range<u64>> = Vec::new();
Expand Down
41 changes: 41 additions & 0 deletions java/src/main/java/org/lance/file/BlobReadMode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance.file;

/**
* Controls how blob-encoded columns are returned when reading a Lance file.
*
* <p>Blob columns can be read in two modes:
*
* <ul>
* <li>{@link #CONTENT} — materializes the full binary content (default)
* <li>{@link #DESCRIPTOR} — returns a struct with {@code position} and {@code size} fields
* </ul>
*/
public enum BlobReadMode {
/** Return blob columns as materialized binary content (default). */
CONTENT(0),
/** Return blob columns as descriptors (struct with position and size). */
DESCRIPTOR(1);

private final int value;

BlobReadMode(int value) {
this.value = value;
}

public int getValue() {
return value;
}
}
59 changes: 59 additions & 0 deletions java/src/main/java/org/lance/file/FileReadOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance.file;

/**
* Options for reading a Lance file.
*
* <p>Use {@link #builder()} to create an instance. New options can be added here in the future
* without breaking existing callers.
*/
public class FileReadOptions {
private final BlobReadMode blobReadMode;

private FileReadOptions(Builder builder) {
this.blobReadMode = builder.blobReadMode;
}

/** Returns the blob read mode. Defaults to {@link BlobReadMode#CONTENT}. */
public BlobReadMode getBlobReadMode() {
return blobReadMode;
}

/** Creates a new builder with default options. */
public static Builder builder() {
return new Builder();
}

public static class Builder {
private BlobReadMode blobReadMode = BlobReadMode.CONTENT;

private Builder() {}

/**
* Sets how blob-encoded columns are returned.
*
* @param blobReadMode {@link BlobReadMode#CONTENT} to materialize binary content, or {@link
* BlobReadMode#DESCRIPTOR} to return position/size descriptors
*/
public Builder blobReadMode(BlobReadMode blobReadMode) {
this.blobReadMode = blobReadMode;
return this;
}

public FileReadOptions build() {
return new FileReadOptions(this);
}
}
}
36 changes: 32 additions & 4 deletions java/src/main/java/org/lance/file/LanceFileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ private native void readAllNative(
int batchSize,
@Nullable List<String> projectedNames,
@Nullable List<Range> ranges,
long streamMemoryAddress)
long streamMemoryAddress,
int blobReadMode)
throws IOException;

private LanceFileReader() {}
Expand Down Expand Up @@ -124,18 +125,45 @@ private Schema load_schema() throws IOException {
}

/**
* Read all rows from the Lance file
* Read all rows from the Lance file.
*
* <p>Blob-encoded columns are returned as materialized binary content. Use {@link #readAll(List,
* List, int, FileReadOptions)} to control blob output format.
*
* @param batchSize the maximum number of rows to read in a single batch
* @param projectedNames optional list of column names to project; if null, all columns are read
* @param ranges optional array of ranges to read; if null, all rows are read.
* @param batchSize the maximum number of rows to read in a single batch
* @return an ArrowReader for the Lance file
*/
public ArrowReader readAll(
@Nullable List<String> projectedNames, @Nullable List<Range> ranges, int batchSize)
throws IOException {
return readAll(projectedNames, ranges, batchSize, FileReadOptions.builder().build());
}

/**
* Read all rows from the Lance file with additional read options.
*
* @param projectedNames optional list of column names to project; if null, all columns are read
* @param ranges optional array of ranges to read; if null, all rows are read.
* @param batchSize the maximum number of rows to read in a single batch
* @param options file read options controlling output format (e.g. blob handling)
* @return an ArrowReader for the Lance file
* @see FileReadOptions
*/
public ArrowReader readAll(
@Nullable List<String> projectedNames,
@Nullable List<Range> ranges,
int batchSize,
FileReadOptions options)
throws IOException {
try (ArrowArrayStream ffiArrowArrayStream = ArrowArrayStream.allocateNew(allocator)) {
readAllNative(batchSize, projectedNames, ranges, ffiArrowArrayStream.memoryAddress());
readAllNative(
batchSize,
projectedNames,
ranges,
ffiArrowArrayStream.memoryAddress(),
options.getBlobReadMode().getValue());
return Data.importArrayStream(allocator, ffiArrowArrayStream);
}
}
Expand Down
95 changes: 95 additions & 0 deletions java/src/test/java/org/lance/FileReaderWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,23 @@
*/
package org.lance;

import org.lance.file.BlobReadMode;
import org.lance.file.FileReadOptions;
import org.lance.file.LanceFileReader;
import org.lance.file.LanceFileWriter;
import org.lance.util.Range;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.LargeVarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.Text;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -304,4 +309,94 @@ void testWriteNullSchemaMetadata(@TempDir Path tempDir) throws Exception {
}
}
}

private void writeBlobFile(String filePath, BufferAllocator allocator) throws Exception {
Map<String, String> blobMetadata = new HashMap<>();
blobMetadata.put("lance-encoding:blob", "true");

Field blobField =
new Field(
"blob_data",
new FieldType(true, ArrowType.LargeBinary.INSTANCE, null, blobMetadata),
Collections.emptyList());

Schema schema = new Schema(Collections.singletonList(blobField), null);

try (LanceFileWriter writer =
LanceFileWriter.open(filePath, allocator, null, Collections.emptyMap())) {
try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
root.allocateNew();

LargeVarBinaryVector blobVector = (LargeVarBinaryVector) root.getVector("blob_data");

for (int i = 0; i < 5; i++) {
byte[] data = new byte[100 * (i + 1)];
Arrays.fill(data, (byte) i);
blobVector.setSafe(i, data);
}

root.setRowCount(5);
writer.write(root);
}
}
}

@Test
void testBlobDescriptorMode(@TempDir Path tempDir) throws Exception {
String filePath = tempDir.resolve("test_blob.lance").toString();
BufferAllocator allocator = new RootAllocator();
writeBlobFile(filePath, allocator);

try (LanceFileReader reader = LanceFileReader.open(filePath, allocator)) {
assertTrue(
reader.schema().getFields().get(0).getMetadata().containsKey("lance-encoding:blob"),
"Blob metadata should be preserved in schema");

FileReadOptions options =
FileReadOptions.builder().blobReadMode(BlobReadMode.DESCRIPTOR).build();
try (ArrowReader batch =
reader.readAll(Collections.singletonList("blob_data"), null, 10, options)) {
assertTrue(batch.loadNextBatch());
VectorSchemaRoot root = batch.getVectorSchemaRoot();
assertEquals(5, root.getRowCount());

FieldVector column = root.getVector("blob_data");
assertTrue(
column.getField().getType() instanceof ArrowType.Struct,
"DESCRIPTOR mode should return Struct but got " + column.getField().getType());
assertEquals(
2,
column.getField().getChildren().size(),
"Struct should have 2 fields (position and size)");
}
}
allocator.close();
}

@Test
void testBlobContentMode(@TempDir Path tempDir) throws Exception {
String filePath = tempDir.resolve("test_blob.lance").toString();
BufferAllocator allocator = new RootAllocator();
writeBlobFile(filePath, allocator);

try (LanceFileReader reader = LanceFileReader.open(filePath, allocator)) {
// Default readAll (no BlobReadMode) should return materialized binary
try (ArrowReader batch = reader.readAll(Collections.singletonList("blob_data"), null, 10)) {
assertTrue(batch.loadNextBatch());
VectorSchemaRoot root = batch.getVectorSchemaRoot();
assertEquals(5, root.getRowCount());

FieldVector column = root.getVector("blob_data");
assertTrue(
column.getField().getType() instanceof ArrowType.LargeBinary,
"CONTENT mode should return LargeBinary but got " + column.getField().getType());

LargeVarBinaryVector binaryVector = (LargeVarBinaryVector) column;
for (int i = 0; i < 5; i++) {
assertEquals(100 * (i + 1), binaryVector.get(i).length);
}
}
}
allocator.close();
}
}