Skip to content

Silent data loss during concurrent multi-worker appends #6595

@wellgeneric

Description

@wellgeneric

Description

When using Tencent Cloud COS as the storage backend with multiple workers concurrently appending to the same Lance table (e.g., using Ray Data for batch processing with intermediate results persisted to a Lance table), silent data loss can occur — write operations return successfully but data is not actually persisted in the final manifest.

Background

Lance's concurrent write safety relies on put-if-not-exists atomic semantics during manifest commit. Support varies across storage backends:

Backend Atomic Write Mechanism Status
AWS S3 If-None-Match: * (native since Aug 2024) Safe
GCS Native conditional PUT Safe
Azure Blob Native conditional PUT Safe
S3 + DynamoDB External Commit Store Safe
Tencent COS x-cos-forbid-overwrite: true Conditional

COS implements put-if-not-exists via OpenDAL's x-cos-forbid-overwrite: true HTTP header. However, this semantic only works on buckets that have never had versioning enabled. Once a bucket has had versioning enabled (even if subsequently suspended), the header is silently ignored by COS, and put-if-not-exists degrades to a plain PUT — last writer wins, previous writers' manifests are overwritten, and their data files become orphaned.

Reproduction Scenario

import ray
import lance
import pyarrow as pa

# Multiple Ray workers concurrently append to the same Lance table
@ray.remote
def process_and_append(batch_id):
    data = pa.table({"id": [batch_id], "value": [f"batch_{batch_id}"]})
    lance.write_dataset(
        data,
        "cos://my-bucket/pipeline/output_table",
        mode="append",
        storage_options={...},
    )

# 5 workers executing concurrently
ray.get([process_and_append.remote(i) for i in range(5)])

# Expected 5 rows, may get fewer with no errors
ds = lance.dataset("cos://my-bucket/pipeline/output_table", storage_options={...})
print(ds.count_rows())  # Could print 1-4 instead of 5

Root Cause

  1. OpenDAL COS backend has an enable_versioning option in CosConfig (default: false). When false, it declares write_with_if_not_exists: true
  2. Lance's ConditionalPutCommitHandler trusts the storage backend's capability declaration — if put-if-not-exists is reported as supported, no warning is emitted
  3. Whether x-cos-forbid-overwrite actually works depends on whether the bucket has ever had versioning enabled — a runtime condition that cannot be detected at the configuration level
  4. When put-if-not-exists silently fails, no exception is raised, the commit returns normally, but data is effectively lost

Proposal: Extend External Commit Store to Support COS

Lance already has a mature External Commit Store mechanism (currently supporting DynamoDB via the s3+ddb:// protocol) that provides reliable concurrent write safety for S3 deployments.

The proposal is to extend this mechanism to the COS backend, enabling pluggable external stores as manifest locks — such as Redis, MySQL, or other distributed coordination services. This aligns with the direction discussed in #5849 regarding making ManifestStore a pluggable abstraction.

Specifically:

  • Generalize the existing DynamoDB Commit Store logic into a generic External Commit Store trait
  • Allow users to register custom commit store implementations for COS (or other backends where conditional put is unreliable)
  • Provide at least one out-of-the-box non-DynamoDB implementation (e.g., Redis-based or RDBMS-based) to lower the barrier for non-AWS users

Environment

  • Lance (pylance): v4.0.0 (pylance 0.22.0)
  • OpenDAL: v0.55
  • Storage: Tencent Cloud COS
  • Scenario: Ray Data multi-worker batch processing with intermediate result persistence

Related Issues / Discussions

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions