diff --git a/python/python/lance/__init__.py b/python/python/lance/__init__.py index d98de0424ad..92b32f6dc6a 100644 --- a/python/python/lance/__init__.py +++ b/python/python/lance/__init__.py @@ -6,7 +6,7 @@ import logging import os import warnings -from typing import TYPE_CHECKING, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union from . import io, log from .blob import Blob, BlobArray, BlobColumn, BlobFile, blob_array, blob_field @@ -99,6 +99,7 @@ def dataset( session: Optional[Session] = None, namespace: Optional[LanceNamespace] = None, table_id: Optional[List[str]] = None, + storage_options_provider: Optional[Any] = None, ) -> LanceDataset: """ Opens the Lance dataset from the address specified. @@ -166,6 +167,11 @@ def dataset( table_id : optional, List[str] The table identifier when using a namespace (e.g., ["my_table"]). Must be provided together with `namespace`. Cannot be used with `uri`. + storage_options_provider : optional + A storage options provider for automatic credential refresh. Must implement + `fetch_storage_options()` method that returns a dict of storage options. + If provided along with `namespace`, this takes precedence over the + namespace-created provider. Notes ----- @@ -191,7 +197,6 @@ def dataset( ) # Handle namespace resolution in Python - storage_options_provider = None if namespace is not None: if table_id is None: raise ValueError( @@ -208,9 +213,10 @@ def dataset( namespace_storage_options = response.storage_options if namespace_storage_options: - storage_options_provider = LanceNamespaceStorageOptionsProvider( - namespace=namespace, table_id=table_id - ) + if storage_options_provider is None: + storage_options_provider = LanceNamespaceStorageOptionsProvider( + namespace=namespace, table_id=table_id + ) if storage_options is None: storage_options = namespace_storage_options else: diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 7863cb5d318..97a9a0c6775 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -437,6 +437,7 @@ def __init__( uri = os.fspath(uri) if isinstance(uri, Path) else uri self._uri = uri self._storage_options = storage_options + self._storage_options_provider = storage_options_provider # Handle deprecation warning for index_cache_size if index_cache_size is not None: @@ -526,11 +527,13 @@ def __setstate__(self, state): ) self._default_scan_options = default_scan_options self._read_params = read_params + self._storage_options_provider = None def __copy__(self): ds = LanceDataset.__new__(LanceDataset) ds._uri = self._uri ds._storage_options = self._storage_options + ds._storage_options_provider = self._storage_options_provider ds._ds = copy.copy(self._ds) ds._default_scan_options = self._default_scan_options ds._read_params = self._read_params.copy() if self._read_params else None @@ -625,6 +628,7 @@ def create_branch( ds._ds = new_ds ds._uri = new_ds.uri ds._storage_options = self._storage_options + ds._storage_options_provider = self._storage_options_provider ds._default_scan_options = self._default_scan_options ds._read_params = self._read_params return ds @@ -2283,6 +2287,27 @@ def storage_options_accessor(self): """ return self._ds.storage_options_accessor() + def new_file_session(self): + """ + Create a new file session for reading and writing files in this dataset. + + The file session will use the dataset's storage options and provider + for credential management, enabling automatic credential refresh for + long-running operations. + + Returns + ------- + LanceFileSession + A file session configured for this dataset's storage location. + """ + from lance.file import LanceFileSession + + return LanceFileSession( + base_path=self._uri, + storage_options=self.latest_storage_options(), + storage_options_provider=self._storage_options_provider, + ) + def checkout_version( self, version: int | str | Tuple[Optional[str], Optional[int]] ) -> "LanceDataset": @@ -3479,6 +3504,7 @@ def commit( ds = LanceDataset.__new__(LanceDataset) ds._storage_options = storage_options + ds._storage_options_provider = storage_options_provider ds._ds = new_ds ds._uri = new_ds.uri ds._default_scan_options = None @@ -3577,6 +3603,7 @@ def commit_batch( ds._ds = new_ds ds._uri = new_ds.uri ds._storage_options = storage_options + ds._storage_options_provider = storage_options_provider ds._default_scan_options = None ds._read_params = None return BulkCommitResult( @@ -5855,6 +5882,7 @@ def write_dataset( ds = LanceDataset.__new__(LanceDataset) ds._storage_options = storage_options + ds._storage_options_provider = None ds._ds = inner_ds ds._uri = inner_ds.uri ds._default_scan_options = None