Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 70 additions & 9 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import pyarrow as pa
import pyarrow.dataset
from lance_namespace import DescribeTableRequest, LanceNamespace
from pyarrow import RecordBatch, Schema

from lance.log import LOGGER
Expand Down Expand Up @@ -71,9 +70,11 @@
from .util import _target_partition_size_to_num_partitions, td_to_micros

if TYPE_CHECKING:
from lance_namespace import LanceNamespace
from pyarrow._compute import Expression

from .commit import CommitLock
from .io import StorageOptionsProvider
from .progress import FragmentWriteProgress
from .types import ReaderLike

Expand Down Expand Up @@ -3062,6 +3063,7 @@ def commit(
read_version: Optional[int] = None,
commit_lock: Optional[CommitLock] = None,
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional["StorageOptionsProvider"] = None,
enable_v2_manifest_paths: Optional[bool] = None,
detached: Optional[bool] = False,
max_retries: int = 20,
Expand Down Expand Up @@ -3106,6 +3108,8 @@ def commit(
storage_options : optional, dict
Extra options that make sense for a particular storage connection. This is
used to store connection parameters like credentials, endpoint, etc.
storage_options_provider : StorageOptionsProvider, optional
A provider for dynamic storage options with automatic credential refresh.
enable_v2_manifest_paths : bool, optional
If True, and this is a new dataset, uses the new V2 manifest paths.
These paths provide more efficient opening of datasets with many
Expand Down Expand Up @@ -3191,6 +3195,7 @@ def commit(
operation,
commit_lock,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
enable_v2_manifest_paths=enable_v2_manifest_paths,
detached=detached,
max_retries=max_retries,
Expand All @@ -3202,6 +3207,7 @@ def commit(
read_version,
commit_lock,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
enable_v2_manifest_paths=enable_v2_manifest_paths,
detached=detached,
max_retries=max_retries,
Expand All @@ -3227,6 +3233,7 @@ def commit_batch(
transactions: Sequence[Transaction],
commit_lock: Optional[CommitLock] = None,
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional["StorageOptionsProvider"] = None,
enable_v2_manifest_paths: Optional[bool] = None,
detached: Optional[bool] = False,
max_retries: int = 20,
Expand Down Expand Up @@ -3255,6 +3262,8 @@ def commit_batch(
storage_options : optional, dict
Extra options that make sense for a particular storage connection. This is
used to store connection parameters like credentials, endpoint, etc.
storage_options_provider : StorageOptionsProvider, optional
A provider for dynamic storage options with automatic credential refresh.
enable_v2_manifest_paths : bool, optional
If True, and this is a new dataset, uses the new V2 manifest paths.
These paths provide more efficient opening of datasets with many
Expand Down Expand Up @@ -3301,6 +3310,7 @@ def commit_batch(
transactions,
commit_lock,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
enable_v2_manifest_paths=enable_v2_manifest_paths,
detached=detached,
max_retries=max_retries,
Expand Down Expand Up @@ -5097,6 +5107,7 @@ def write_dataset(
target_bases: Optional[List[str]] = None,
namespace: Optional[LanceNamespace] = None,
table_id: Optional[List[str]] = None,
ignore_namespace_table_storage_options: bool = False,
) -> LanceDataset:
"""Write a given data_obj to the given uri

Expand Down Expand Up @@ -5198,15 +5209,22 @@ def write_dataset(
table_id : optional, List[str]
The table identifier when using a namespace (e.g., ["my_table"]).
Must be provided together with `namespace`. Cannot be used with `uri`.
ignore_namespace_table_storage_options : bool, default False
If True, ignore the storage options returned by the namespace and only use
the provided `storage_options` parameter. The storage options provider will
not be created, so credentials will not be automatically refreshed.
This is useful when you want to use your own credentials instead of the
namespace-provided credentials.

Notes
-----
When using `namespace` and `table_id`:
- The `uri` parameter is optional and will be fetched from the namespace
- A `LanceNamespaceStorageOptionsProvider` will be created automatically for
storage options refresh
storage options refresh (unless `ignore_namespace_table_storage_options=True`)
- Initial storage options from describe_table() will be merged with
any provided `storage_options`
any provided `storage_options` (unless
`ignore_namespace_table_storage_options=True`)
"""
# Validate that user provides either uri OR (namespace + table_id), not both
has_uri = uri is not None
Expand All @@ -5229,23 +5247,62 @@ def write_dataset(
"Both 'namespace' and 'table_id' must be provided together."
)

request = DescribeTableRequest(id=table_id, version=None)
response = namespace.describe_table(request)
# Implement write_into_namespace logic in Python
# This follows the same pattern as the Rust implementation:
# - CREATE mode: calls namespace.create_empty_table()
# - APPEND/OVERWRITE mode: calls namespace.describe_table()
# - Both modes: create storage options provider and merge storage options

from lance_namespace import CreateEmptyTableRequest, DescribeTableRequest

from .namespace import LanceNamespaceStorageOptionsProvider

# Determine which namespace method to call based on mode
if mode == "create":
request = CreateEmptyTableRequest(
id=table_id, location=None, properties=None
)
response = namespace.create_empty_table(request)
elif mode in ("append", "overwrite"):
request = DescribeTableRequest(id=table_id, version=None)
response = namespace.describe_table(request)
else:
raise ValueError(f"Invalid mode: {mode}")

# Get table location from response
uri = response.location
if not uri:
raise ValueError("Namespace did not return a table location")
raise ValueError(
f"Namespace did not return a table location in {mode} response"
)

# Check if we should ignore namespace storage options
if ignore_namespace_table_storage_options:
namespace_storage_options = None
else:
namespace_storage_options = response.storage_options

namespace_storage_options = response.storage_options
# Set up storage options and provider
if namespace_storage_options:
# TODO: support dynamic storage options provider
# Create the storage options provider for automatic refresh
storage_options_provider = LanceNamespaceStorageOptionsProvider(
namespace=namespace, table_id=table_id
)

# Merge namespace storage options with any existing options
# Namespace options take precedence (same as Rust implementation)
if storage_options is None:
storage_options = namespace_storage_options
storage_options = dict(namespace_storage_options)
else:
merged_options = dict(storage_options)
merged_options.update(namespace_storage_options)
storage_options = merged_options
else:
storage_options_provider = None
elif table_id is not None:
raise ValueError("Both 'namespace' and 'table_id' must be provided together.")
else:
storage_options_provider = None

if use_legacy_format is not None:
warnings.warn(
Expand Down Expand Up @@ -5282,6 +5339,10 @@ def write_dataset(
"target_bases": target_bases,
}

# Add storage_options_provider if created from namespace
if storage_options_provider is not None:
params["storage_options_provider"] = storage_options_provider

if commit_lock:
if not callable(commit_lock):
raise TypeError(f"commit_lock must be a function, got {type(commit_lock)}")
Expand Down
6 changes: 6 additions & 0 deletions python/python/lance/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ def __init__(
data_cache_bytes: Optional[int] = None,
version: Optional[str] = None,
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider=None,
max_page_bytes: Optional[int] = None,
_inner_writer: Optional[_LanceFileWriter] = None,
**kwargs,
Expand All @@ -325,6 +326,10 @@ def __init__(
storage_options : optional, dict
Extra options to be used for a particular storage connection. This is
used to store connection parameters like credentials, endpoint, etc.
storage_options_provider : optional, StorageOptionsProvider
A storage options provider that can fetch and refresh storage options
dynamically. This is useful for credentials that expire and need to be
refreshed automatically.
max_page_bytes : optional, int
The maximum size of a page in bytes, if a single array would create a
page larger than this then it will be split into multiple pages. The
Expand All @@ -341,6 +346,7 @@ def __init__(
data_cache_bytes=data_cache_bytes,
version=version,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
max_page_bytes=max_page_bytes,
**kwargs,
Comment thread
jackye1995 marked this conversation as resolved.
)
Expand Down
8 changes: 8 additions & 0 deletions python/python/lance/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,7 @@ def write_fragments(
data_storage_version: Optional[str] = None,
use_legacy_format: Optional[bool] = None,
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider=None,
enable_stable_row_ids: bool = False,
) -> Transaction: ...

Expand All @@ -882,6 +883,7 @@ def write_fragments(
data_storage_version: Optional[str] = None,
use_legacy_format: Optional[bool] = None,
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider=None,
enable_stable_row_ids: bool = False,
) -> List[FragmentMetadata]: ...

Expand All @@ -900,6 +902,7 @@ def write_fragments(
data_storage_version: Optional[str] = None,
use_legacy_format: Optional[bool] = None,
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider=None,
enable_stable_row_ids: bool = False,
) -> List[FragmentMetadata] | Transaction:
"""
Expand Down Expand Up @@ -949,6 +952,10 @@ def write_fragments(
storage_options : Optional[Dict[str, str]]
Extra options that make sense for a particular storage connection. This is
used to store connection parameters like credentials, endpoint, etc.
storage_options_provider : Optional[StorageOptionsProvider]
A storage options provider that can fetch and refresh storage options
dynamically. This is useful for credentials that expire and need to be
refreshed automatically.
enable_stable_row_ids: bool
Experimental: if set to true, the writer will use stable row ids.
These row ids are stable after compaction operations, but not after updates.
Expand Down Expand Up @@ -1001,6 +1008,7 @@ def write_fragments(
progress=progress,
data_storage_version=data_storage_version,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
enable_stable_row_ids=enable_stable_row_ids,
)

Expand Down
4 changes: 4 additions & 0 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ from ..fragment import (
DataFile,
FragmentMetadata,
)
from ..io import StorageOptionsProvider
from ..progress import FragmentWriteProgress as FragmentWriteProgress
from ..types import ReaderLike as ReaderLike
from ..udf import BatchUDF as BatchUDF
Expand Down Expand Up @@ -99,6 +100,7 @@ class LanceFileWriter:
data_cache_bytes: Optional[int],
version: Optional[str],
storage_options: Optional[Dict[str, str]],
storage_options_provider: Optional[StorageOptionsProvider],
keep_original_array: Optional[bool],
max_page_bytes: Optional[int],
): ...
Expand Down Expand Up @@ -345,6 +347,7 @@ class _Dataset:
read_version: Optional[int] = None,
commit_lock: Optional[CommitLock] = None,
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
enable_v2_manifest_paths: Optional[bool] = None,
detached: Optional[bool] = None,
max_retries: Optional[int] = None,
Expand All @@ -356,6 +359,7 @@ class _Dataset:
transactions: Sequence[Transaction],
commit_lock: Optional[CommitLock] = None,
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
enable_v2_manifest_paths: Optional[bool] = None,
detached: Optional[bool] = None,
max_retries: Optional[int] = None,
Expand Down
Loading