Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions rust/lance-core/src/utils/tempfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,14 @@ impl Deref for TempStdFile {
}
}

/// A temporary file that is exposed as an object store path
/// A unique path to a temporary file, exposed as an object store path
///
/// This is a wrapper around [`TempFile`] that exposes the path as an object store path.
/// It is useful when you need to create a temporary file that is only used as an object store path.
/// Unlike [`TempFile`], this does not create an empty file. We create a
/// temporary directory and then construct a path inside it, following the
/// same pattern as [`TempStdPath`]. This avoids holding an open file handle,
/// which on Windows would prevent atomic renames to the same path.
pub struct TempObjFile {
_tempfile: TempFile,
_tempdir: TempDir,
path: ObjPath,
}

Expand All @@ -293,10 +295,10 @@ impl std::ops::Deref for TempObjFile {

impl Default for TempObjFile {
fn default() -> Self {
let tempfile = TempFile::default();
let path = tempfile.obj_path();
let tempdir = TempDir::default();
let path = ObjPath::parse(format!("{}/some_file", tempdir.path_str())).unwrap();
Self {
_tempfile: tempfile,
_tempdir: tempdir,
path,
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-file/src/previous/page_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ mod tests {
.write(&mut writer, starting_field_id)
.await
.unwrap();
writer.shutdown().await.unwrap();
AsyncWriteExt::shutdown(&mut writer).await.unwrap();

let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
Expand Down
48 changes: 23 additions & 25 deletions rust/lance-file/src/previous/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use lance_io::encodings::{
binary::BinaryEncoder, dictionary::DictionaryEncoder, plain::PlainEncoder, Encoder,
};
use lance_io::object_store::ObjectStore;
use lance_io::object_writer::ObjectWriter;
use lance_io::traits::{WriteExt, Writer};
use object_store::path::Path;
use snafu::location;
Expand All @@ -47,10 +46,8 @@ pub trait ManifestProvider {
///
/// Note: the dictionaries have already been written by this point and the schema should
/// be populated with the dictionary lengths/offsets
async fn store_schema(
object_writer: &mut ObjectWriter,
schema: &Schema,
) -> Result<Option<usize>>;
async fn store_schema(object_writer: &mut dyn Writer, schema: &Schema)
-> Result<Option<usize>>;
}

/// Implementation of ManifestProvider that does not store the schema
Expand All @@ -60,7 +57,7 @@ pub(crate) struct NotSelfDescribing {}
#[cfg(test)]
#[async_trait]
impl ManifestProvider for NotSelfDescribing {
async fn store_schema(_: &mut ObjectWriter, _: &Schema) -> Result<Option<usize>> {
async fn store_schema(_: &mut dyn Writer, _: &Schema) -> Result<Option<usize>> {
Ok(None)
}
}
Expand All @@ -79,7 +76,7 @@ impl ManifestProvider for NotSelfDescribing {
/// file_writer.shutdown();
/// ```
pub struct FileWriter<M: ManifestProvider + Send + Sync> {
pub object_writer: ObjectWriter,
pub object_writer: Box<dyn Writer>,
schema: Schema,
batch_id: i32,
page_table: PageTable,
Expand Down Expand Up @@ -109,7 +106,7 @@ impl<M: ManifestProvider + Send + Sync> FileWriter<M> {
}

pub fn with_object_writer(
object_writer: ObjectWriter,
object_writer: Box<dyn Writer>,
schema: Schema,
options: &FileWriterOptions,
) -> Result<Self> {
Expand Down Expand Up @@ -213,7 +210,7 @@ impl<M: ManifestProvider + Send + Sync> FileWriter<M> {
.collect::<Result<Vec<_>>>()?;

Self::write_array(
&mut self.object_writer,
self.object_writer.as_mut(),
field,
&arrs,
self.batch_id,
Expand Down Expand Up @@ -253,7 +250,7 @@ impl<M: ManifestProvider + Send + Sync> FileWriter<M> {

pub async fn finish(&mut self) -> Result<usize> {
self.write_footer().await?;
self.object_writer.shutdown().await?;
Writer::shutdown(self.object_writer.as_mut()).await?;
let num_rows = self
.metadata
.batch_offsets
Expand Down Expand Up @@ -284,7 +281,7 @@ impl<M: ManifestProvider + Send + Sync> FileWriter<M> {

#[async_recursion]
async fn write_array(
object_writer: &mut ObjectWriter,
object_writer: &mut dyn Writer,
field: &Field,
arrs: &[&ArrayRef],
batch_id: i32,
Expand Down Expand Up @@ -385,7 +382,7 @@ impl<M: ManifestProvider + Send + Sync> FileWriter<M> {
}

async fn write_null_array(
object_writer: &mut ObjectWriter,
object_writer: &mut dyn Writer,
field: &Field,
arrs: &[&dyn Array],
batch_id: i32,
Expand All @@ -399,7 +396,7 @@ impl<M: ManifestProvider + Send + Sync> FileWriter<M> {

/// Write fixed size array, including, primtiives, fixed size binary, and fixed size list.
async fn write_fixed_stride_array(
object_writer: &mut ObjectWriter,
object_writer: &mut dyn Writer,
field: &Field,
arrs: &[&dyn Array],
batch_id: i32,
Expand All @@ -419,7 +416,7 @@ impl<M: ManifestProvider + Send + Sync> FileWriter<M> {

/// Write var-length binary arrays.
async fn write_binary_array(
object_writer: &mut ObjectWriter,
object_writer: &mut dyn Writer,
field: &Field,
arrs: &[&dyn Array],
batch_id: i32,
Expand All @@ -435,7 +432,7 @@ impl<M: ManifestProvider + Send + Sync> FileWriter<M> {
}

async fn write_dictionary_arr(
object_writer: &mut ObjectWriter,
object_writer: &mut dyn Writer,
field: &Field,
arrs: &[&dyn Array],
key_type: &DataType,
Expand All @@ -455,7 +452,7 @@ impl<M: ManifestProvider + Send + Sync> FileWriter<M> {

#[async_recursion]
async fn write_struct_array(
object_writer: &mut ObjectWriter,
object_writer: &mut dyn Writer,
field: &Field,
arrays: &[&StructArray],
batch_id: i32,
Expand Down Expand Up @@ -486,7 +483,7 @@ impl<M: ManifestProvider + Send + Sync> FileWriter<M> {
}

async fn write_list_array(
object_writer: &mut ObjectWriter,
object_writer: &mut dyn Writer,
field: &Field,
arrs: &[&dyn Array],
batch_id: i32,
Expand Down Expand Up @@ -534,7 +531,7 @@ impl<M: ManifestProvider + Send + Sync> FileWriter<M> {
}

async fn write_large_list_array(
object_writer: &mut ObjectWriter,
object_writer: &mut dyn Writer,
field: &Field,
arrs: &[&dyn Array],
batch_id: i32,
Expand Down Expand Up @@ -597,7 +594,7 @@ impl<M: ManifestProvider + Send + Sync> FileWriter<M> {
let mut stats_page_table = PageTable::default();
for (i, field) in schema.fields.iter().enumerate() {
Self::write_array(
&mut self.object_writer,
self.object_writer.as_mut(),
field,
&[stats_batch.column(i)],
0, // Only one batch for statistics.
Expand All @@ -606,8 +603,9 @@ impl<M: ManifestProvider + Send + Sync> FileWriter<M> {
.await?;
}

let page_table_position =
stats_page_table.write(&mut self.object_writer, 0).await?;
let page_table_position = stats_page_table
.write(self.object_writer.as_mut(), 0)
.await?;

Ok(Some(StatisticsMetadata {
schema,
Expand All @@ -624,7 +622,7 @@ impl<M: ManifestProvider + Send + Sync> FileWriter<M> {
///
/// The offsets and lengths of the written buffers are stored in the given
/// schema so that the dictionaries can be loaded in the future.
async fn write_dictionaries(writer: &mut ObjectWriter, schema: &mut Schema) -> Result<()> {
async fn write_dictionaries(writer: &mut dyn Writer, schema: &mut Schema) -> Result<()> {
// Write dictionary values.
let max_field_id = schema.max_field_id().unwrap_or(-1);
for field_id in 0..max_field_id + 1 {
Expand Down Expand Up @@ -680,16 +678,16 @@ impl<M: ManifestProvider + Send + Sync> FileWriter<M> {
let field_id_offset = *self.schema.field_ids().iter().min().unwrap();
let pos = self
.page_table
.write(&mut self.object_writer, field_id_offset)
.write(self.object_writer.as_mut(), field_id_offset)
.await?;
self.metadata.page_table_position = pos;

// Step 2. Write statistics.
self.metadata.stats_metadata = self.write_statistics().await?;

// Step 3. Write manifest and dictionary values.
Self::write_dictionaries(&mut self.object_writer, &mut self.schema).await?;
let pos = M::store_schema(&mut self.object_writer, &self.schema).await?;
Self::write_dictionaries(self.object_writer.as_mut(), &mut self.schema).await?;
let pos = M::store_schema(self.object_writer.as_mut(), &self.schema).await?;

// Step 4. Write metadata.
self.metadata.manifest_position = pos;
Expand Down
19 changes: 10 additions & 9 deletions rust/lance-file/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use lance_encoding::encoder::{
use lance_encoding::repdef::RepDefBuilder;
use lance_encoding::version::LanceFileVersion;
use lance_io::object_store::ObjectStore;
use lance_io::object_writer::ObjectWriter;
use lance_io::traits::Writer;
use log::{debug, warn};
use object_store::path::Path;
use prost::Message;
use prost_types::Any;
use snafu::location;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tracing::instrument;

Expand Down Expand Up @@ -114,7 +114,7 @@ const DEFAULT_SPILL_BUFFER_LIMIT: usize = 256 * 1024;
/// of every chunk so each column's pages can be read back and reassembled in
/// order.
struct PageMetadataSpill {
writer: ObjectWriter,
writer: Box<dyn Writer>,
object_store: Arc<ObjectStore>,
path: Path,
/// Current write position in the spill file.
Expand Down Expand Up @@ -177,7 +177,7 @@ impl PageMetadataSpill {
for col_idx in 0..self.column_buffers.len() {
self.flush_column(col_idx).await?;
}
self.writer.shutdown().await?;
Writer::shutdown(self.writer.as_mut()).await?;
Ok(())
}
}
Expand All @@ -204,7 +204,7 @@ enum PageSpillState {
}

pub struct FileWriter {
writer: ObjectWriter,
writer: Box<dyn Writer>,
schema: Option<LanceSchema>,
column_writers: Vec<Box<dyn FieldEncoder>>,
column_metadata: Vec<pbfile::ColumnMetadata>,
Expand All @@ -231,7 +231,7 @@ static WARNED_ON_UNSTABLE_API: AtomicBool = AtomicBool::new(false);
impl FileWriter {
/// Create a new FileWriter with a desired output schema
pub fn try_new(
object_writer: ObjectWriter,
object_writer: Box<dyn Writer>,
schema: LanceSchema,
options: FileWriterOptions,
) -> Result<Self> {
Expand All @@ -244,7 +244,7 @@ impl FileWriter {
///
/// The output schema will be set based on the first batch of data to arrive.
/// If no data arrives and the writer is finished then the write will fail.
pub fn new_lazy(object_writer: ObjectWriter, options: FileWriterOptions) -> Self {
pub fn new_lazy(object_writer: Box<dyn Writer>, options: FileWriterOptions) -> Self {
if let Some(format_version) = options.format_version {
if format_version.is_unstable()
&& WARNED_ON_UNSTABLE_API
Expand Down Expand Up @@ -304,7 +304,7 @@ impl FileWriter {
Ok(writer.finish().await? as usize)
}

async fn do_write_buffer(writer: &mut ObjectWriter, buf: &[u8]) -> Result<()> {
async fn do_write_buffer(writer: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> Result<()> {
writer.write_all(buf).await?;
let pad_bytes = pad_bytes::<PAGE_BUFFER_ALIGNMENT>(buf.len());
writer.write_all(&PAD_BUFFER[..pad_bytes]).await?;
Expand Down Expand Up @@ -810,13 +810,14 @@ impl FileWriter {
self.writer.write_all(MAGIC).await?;

// 7. close the writer
self.writer.shutdown().await?;
Writer::shutdown(self.writer.as_mut()).await?;

Ok(self.rows_written)
}

pub async fn abort(&mut self) {
self.writer.abort().await;
// For multipart uploads, ObjectWriter's Drop impl will abort
// the upload when the writer is dropped.
}

pub async fn tell(&mut self) -> Result<u64> {
Expand Down
1 change: 1 addition & 0 deletions rust/lance-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ tracing.workspace = true
url.workspace = true
path_abs.workspace = true
rand.workspace = true
tempfile.workspace = true

[dev-dependencies]
criterion.workspace = true
Expand Down
6 changes: 3 additions & 3 deletions rust/lance-io/src/encodings/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ mod tests {

let arrs = arr.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
let pos = encoder.encode(arrs.as_slice()).await.unwrap();
writer.shutdown().await.unwrap();
AsyncWriteExt::shutdown(&mut writer).await.unwrap();
Ok(pos)
}

Expand Down Expand Up @@ -562,7 +562,7 @@ mod tests {
object_writer.write_all(b"1234").await.unwrap();
let mut encoder = BinaryEncoder::new(&mut object_writer);
let pos = encoder.encode(&[&data]).await.unwrap();
object_writer.shutdown().await.unwrap();
AsyncWriteExt::shutdown(&mut object_writer).await.unwrap();

let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
Expand Down Expand Up @@ -731,7 +731,7 @@ mod tests {

// let arrs = arr.iter().map(|a| a as &dyn Array).collect::<Vec<_>>();
let pos = encoder.encode(&[&data]).await.unwrap();
object_writer.shutdown().await.unwrap();
AsyncWriteExt::shutdown(&mut object_writer).await.unwrap();
pos
};

Expand Down
2 changes: 1 addition & 1 deletion rust/lance-io/src/encodings/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ mod tests {
let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
let mut encoder = PlainEncoder::new(&mut object_writer, arr1.keys().data_type());
pos = encoder.encode(arrs.as_slice()).await.unwrap();
object_writer.shutdown().await.unwrap();
AsyncWriteExt::shutdown(&mut object_writer).await.unwrap();
}

let reader = LocalObjectReader::open_local_path(&path, 2048, None)
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-io/src/encodings/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ mod tests {
let mut writer = tokio::fs::File::create(&path).await.unwrap();
let mut encoder = PlainEncoder::new(&mut writer, array.data_type());
assert_eq!(encoder.encode(&[&array]).await.unwrap(), 0);
writer.shutdown().await.unwrap();
AsyncWriteExt::shutdown(&mut writer).await.unwrap();
}

let reader = LocalObjectReader::open_local_path(&path, 2048, None)
Expand Down
Loading