Skip to content

Dataset creation via WriteMode::Create is not atomic #4451

@andrea-reale

Description

@andrea-reale

Summary

When multiple threads (or tasks) concurrently write to a non-existent lance dataset using WriteMode::Create, a race condition occurs. Instead of one write succeeding and the others failing, multiple writes can succeed, leading to unpredictable data overwrites. The creation operation is not atomic.

Scenario

N threads (or tasks) try to concurrently write to a not-yet-existent lance dataset using WriteMode::Create

Expected behavior

Only one thread will succeed; the other N-1 will fail with lance::Error::DatasetAlreadyExists{..}

Observed behaviour

Multiple threads will succeed with their write

Affects versions

  • 0.32.1

Steps to reproduce

The following code consistently reproduces the issue. It spawns 100 tasks that all attempt to create the same dataset. The assertions at the end demonstrate the failure.

use std::path::Path;
use std::sync::Arc;

use arrow::array::{RecordBatch, RecordBatchIterator};
use arrow::datatypes::{DataType, Field, Schema};
use lance::Result;
use lance::dataset::{Dataset, WriteParams};
use url::Url;

#[tokio::main]
async fn main() -> Result<()> {
    if Path::new("/tmp/test_dataset").exists() {
        std::fs::remove_dir_all("/tmp/test_dataset")?;
    }

    let dataset_path = Url::parse("file:///tmp/test_dataset")?;

    let schema = Arc::new(Schema::new(vec![
        Field::new("foo", DataType::Int32, false),
        Field::new("bar", DataType::Int32, false),
    ]));

    let empty_batch = RecordBatch::new_empty(schema.clone());

    // Spawn N parallel tasks that write to the initially non-existent dataset
    // with WriteMode::Create.
    //
    // The expectation is that one write will succeed and the others will fail
    // with DatasetAlreadyExists error.
    let num_tasks: usize = 100;
    println!("Spawning {num_tasks} tasks writing to the dataset at {dataset_path}");
    let tasks: Vec<_> = (0..num_tasks)
        .map(|_| {
            let path = dataset_path.clone();
            let reader =
                RecordBatchIterator::new(vec![Ok(empty_batch.clone())].into_iter(), schema.clone());

            let write_params = WriteParams {
                mode: lance::dataset::WriteMode::Create,
                ..Default::default()
            };

            tokio::spawn(async move {
                Dataset::write(reader, path.clone().as_str(), Some(write_params)).await
            })
        })
        .collect();

    let results = futures::future::join_all(tasks).await;

    // Gather stats about the outcome of the parallel writes
    let dataset_exists = results
        .iter()
        .filter(|r| matches!(r, Ok(Err(lance::Error::DatasetAlreadyExists { .. }))))
        .count();

    let successful_writes = results.iter().filter(|r| matches!(r, Ok(Ok(_)))).count();

    println!(
        "Summary:\n - Total writes attempted: {num_tasks}\n - Successful writes: {successful_writes}\n - Failures by DatasetAlreadyExists: {dataset_exists}"
    );

    assert_eq!(
        1, successful_writes,
        "Expected exactly one successful write"
    );
    assert_eq!(
        num_tasks - 1,
        dataset_exists,
        "Expected all other tasks to fail with DatasetAlreadyExists"
    );

    Ok(())
}

Why it matters

The content of the dataset written by a thread A can be overwritten by the content of a later-scheduled thread B, even though the expectation is that only the first write would succeed.

Effectively, the dataset creation operation cannot be used as an atomic primitive.

For example, a common pattern is to attempt creation and fall back to opening the dataset if it already exists. This bug makes that pattern unreliable, as it can lead to silent data corruption when one writer's data is overwritten by another's. In pseudo-rust:

...
let dataset =  match Dataset::write(empty_batch, path, WriteParams{ WriteMode::Create, ..}).await {
    Ok(d) => d,
    Err(DatasetAlreadyExists{..}) => Dataset::open(path).await?,
    Err(e) => return Err(e),
};
Dataset::write(meaningful_data, d, WriteParams{WriteMode::Append, ..}).await
...

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions