diff --git a/python/python/lance/__init__.py b/python/python/lance/__init__.py index 9f6016b8603..b637b7a362f 100644 --- a/python/python/lance/__init__.py +++ b/python/python/lance/__init__.py @@ -6,7 +6,9 @@ import logging import os import warnings -from typing import TYPE_CHECKING, Dict, Optional, Union +from typing import TYPE_CHECKING, Dict, List, Optional, Union + +from lance_namespace import DescribeTableRequest, LanceNamespace from . import io, log from .blob import BlobColumn, BlobFile @@ -89,8 +91,8 @@ def dataset( index_cache_size_bytes: Optional[int] = None, read_params: Optional[Dict[str, any]] = None, session: Optional[Session] = None, - namespace: Optional[any] = None, - table_id: Optional[list] = None, + namespace: Optional[LanceNamespace] = None, + table_id: Optional[List[str]] = None, ignore_namespace_table_storage_options: bool = False, s3_credentials_refresh_offset_seconds: Optional[int] = None, ) -> LanceDataset: @@ -151,15 +153,13 @@ def dataset( session : optional, lance.Session A session to use for this dataset. This contains the caches used by the across multiple datasets. - namespace : optional + namespace : optional, LanceNamespace A namespace instance from which to fetch table location and storage options. - This can be any object with a describe_table(table_id, version) method - that returns a dict with 'location' and 'storage_options' keys. - For example, use lance_namespace.connect() from the lance_namespace package. + Use lance_namespace.connect() from the lance_namespace package. Must be provided together with `table_id`. Cannot be used with `uri`. When provided, the table location will be fetched automatically from the namespace via describe_table(). - table_id : optional, list of str + table_id : optional, List[str] The table identifier when using a namespace (e.g., ["my_table"]). Must be provided together with `namespace`. Cannot be used with `uri`. ignore_namespace_table_storage_options : bool, default False @@ -207,15 +207,17 @@ def dataset( "Both 'namespace' and 'table_id' must be provided together." ) - table_info = namespace.describe_table(table_id=table_id, version=version) - uri = table_info.get("location") + request = DescribeTableRequest(id=table_id, version=version) + response = namespace.describe_table(request) + + uri = response.location if uri is None: raise ValueError("Namespace did not return a 'location' for the table") if ignore_namespace_table_storage_options: namespace_storage_options = None else: - namespace_storage_options = table_info.get("storage_options") + namespace_storage_options = response.storage_options if namespace_storage_options: storage_options_provider = LanceNamespaceStorageOptionsProvider( diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 43d1c423903..a9e5ccb4285 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -34,6 +34,7 @@ import pyarrow as pa import pyarrow.dataset +from lance_namespace import DescribeTableRequest, LanceNamespace from pyarrow import RecordBatch, Schema from lance.log import LOGGER @@ -5096,8 +5097,8 @@ def write_dataset( transaction_properties: Optional[Dict[str, str]] = None, initial_bases: Optional[List[DatasetBasePath]] = None, target_bases: Optional[List[str]] = None, - namespace: Optional[any] = None, - table_id: Optional[list] = None, + namespace: Optional[LanceNamespace] = None, + table_id: Optional[List[str]] = None, ) -> LanceDataset: """Write a given data_obj to the given uri @@ -5190,13 +5191,13 @@ def write_dataset( **CREATE mode**: References must match bases in `initial_bases` **APPEND/OVERWRITE modes**: References must match bases in the existing manifest - namespace : optional, any + namespace : optional, LanceNamespace A namespace instance from which to fetch table location and storage options. Must be provided together with `table_id`. Cannot be used with `uri`. When provided, the table location will be fetched automatically from the namespace via describe_table(). Storage options will be automatically refreshed before they expire. - table_id : optional, list of str + table_id : optional, List[str] The table identifier when using a namespace (e.g., ["my_table"]). Must be provided together with `namespace`. Cannot be used with `uri`. @@ -5230,22 +5231,21 @@ def write_dataset( "Both 'namespace' and 'table_id' must be provided together." ) - # Call describe_table to get location and storage options - table_info = namespace.describe_table(table_id=table_id, version=None) - - # Extract location from namespace response - uri = table_info.get("location") + request = DescribeTableRequest(id=table_id, version=None) + response = namespace.describe_table(request) + uri = response.location if not uri: raise ValueError("Namespace did not return a table location") - # Merge initial storage options from describe_table with user-provided options - namespace_storage_options = table_info.get("storage_options", {}) - if storage_options: - # User-provided options take precedence - merged_storage_options = {**namespace_storage_options, **storage_options} - else: - merged_storage_options = namespace_storage_options - storage_options = merged_storage_options + namespace_storage_options = response.storage_options + if namespace_storage_options: + # TODO: support dynamic storage options provider + if storage_options is None: + storage_options = namespace_storage_options + else: + merged_options = dict(storage_options) + merged_options.update(namespace_storage_options) + storage_options = merged_options elif table_id is not None: raise ValueError("Both 'namespace' and 'table_id' must be provided together.") diff --git a/python/python/lance/namespace.py b/python/python/lance/namespace.py index 2f8890f6aa2..43dc3221f81 100644 --- a/python/python/lance/namespace.py +++ b/python/python/lance/namespace.py @@ -7,7 +7,9 @@ enabling automatic storage options refresh for namespace-managed tables. """ -from typing import Dict +from typing import Dict, List + +from lance_namespace import DescribeTableRequest, LanceNamespace from .io import StorageOptionsProvider @@ -25,10 +27,8 @@ class LanceNamespaceStorageOptionsProvider(StorageOptionsProvider): Parameters ---------- - namespace : any - The namespace instance to fetch storage options from. This can be any - object with a describe_table(table_id, version) method that returns a - dict with 'location' and 'storage_options' keys. For example, use + namespace : LanceNamespace + The namespace instance to fetch storage options from. Use lance_namespace.connect() from the lance_namespace PyPI package. table_id : List[str] The table identifier (e.g., ["workspace", "table_name"]) @@ -61,12 +61,12 @@ class LanceNamespaceStorageOptionsProvider(StorageOptionsProvider): ) """ - def __init__(self, namespace, table_id: list): + def __init__(self, namespace: LanceNamespace, table_id: List[str]): """Initialize with namespace and table ID. Parameters ---------- - namespace : any + namespace : LanceNamespace The namespace instance with a describe_table() method table_id : List[str] The table identifier @@ -91,13 +91,9 @@ def fetch_storage_options(self) -> Dict[str, str]: RuntimeError If the namespace doesn't return storage options or expiration time """ - # Call namespace to describe the table and get storage options - table_info = self._namespace.describe_table( - table_id=self._table_id, version=None - ) - - # Extract storage options - should already be a flat dict of strings - storage_options = table_info.get("storage_options") + request = DescribeTableRequest(id=self._table_id, version=None) + response = self._namespace.describe_table(request) + storage_options = response.storage_options if storage_options is None: raise RuntimeError( "Namespace did not return storage_options. " diff --git a/python/python/tests/test_namespace_integration.py b/python/python/tests/test_namespace_integration.py index 0a275880217..1ab33535330 100644 --- a/python/python/tests/test_namespace_integration.py +++ b/python/python/tests/test_namespace_integration.py @@ -14,11 +14,12 @@ import time import uuid from threading import Lock -from typing import Dict, Optional +from typing import Dict import lance import pyarrow as pa import pytest +from lance_namespace import DescribeTableResponse, LanceNamespace # These are all keys that are accepted by storage_options CONFIG = { @@ -67,7 +68,7 @@ def delete_bucket(s3, bucket_name): pass -class MockLanceNamespace: +class MockLanceNamespace(LanceNamespace): """ Mock namespace implementation that tracks credential refresh calls. @@ -116,9 +117,7 @@ def namespace_id(self) -> str: """Return a unique identifier for this namespace instance.""" return "MockLanceNamespace { }" - def describe_table( - self, table_id: list, version: Optional[int] = None - ) -> Dict[str, any]: + def describe_table(self, request) -> DescribeTableResponse: """ Describe a table and return storage options with incrementing credentials. @@ -127,18 +126,16 @@ def describe_table( Parameters ---------- - table_id : list - The table identifier (e.g., ["my_table"]) - version : Optional[int] - The table version (not used in this mock) + request : DescribeTableRequest + The describe table request. Returns ------- - Dict[str, any] - A dictionary with: - - location: The S3 URI of the table - - storage_options: Dict with AWS credentials and expires_at_millis + DescribeTableResponse + Response with location and storage_options """ + table_id = request.id + with self.lock: self.call_count += 1 count = self.call_count @@ -163,10 +160,10 @@ def describe_table( ) storage_options["expires_at_millis"] = str(expires_at_millis) - return { - "location": location, - "storage_options": storage_options, - } + return DescribeTableResponse( + location=location, + storage_options=storage_options, + ) @pytest.mark.integration