Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions python/python/lance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import logging
import os
import warnings
from typing import TYPE_CHECKING, Dict, Optional, Union
from typing import TYPE_CHECKING, Dict, List, Optional, Union

from lance_namespace import DescribeTableRequest, LanceNamespace

from . import io, log
from .blob import BlobColumn, BlobFile
Expand Down Expand Up @@ -89,8 +91,8 @@ def dataset(
index_cache_size_bytes: Optional[int] = None,
read_params: Optional[Dict[str, any]] = None,
session: Optional[Session] = None,
namespace: Optional[any] = None,
table_id: Optional[list] = None,
namespace: Optional[LanceNamespace] = None,
table_id: Optional[List[str]] = None,
ignore_namespace_table_storage_options: bool = False,
s3_credentials_refresh_offset_seconds: Optional[int] = None,
) -> LanceDataset:
Expand Down Expand Up @@ -151,15 +153,13 @@ def dataset(
session : optional, lance.Session
A session to use for this dataset. This contains the caches used by the
across multiple datasets.
namespace : optional
namespace : optional, LanceNamespace
A namespace instance from which to fetch table location and storage options.
This can be any object with a describe_table(table_id, version) method
that returns a dict with 'location' and 'storage_options' keys.
For example, use lance_namespace.connect() from the lance_namespace package.
Use lance_namespace.connect() from the lance_namespace package.
Must be provided together with `table_id`. Cannot be used with `uri`.
When provided, the table location will be fetched automatically from the
namespace via describe_table().
table_id : optional, list of str
table_id : optional, List[str]
The table identifier when using a namespace (e.g., ["my_table"]).
Must be provided together with `namespace`. Cannot be used with `uri`.
ignore_namespace_table_storage_options : bool, default False
Expand Down Expand Up @@ -207,15 +207,17 @@ def dataset(
"Both 'namespace' and 'table_id' must be provided together."
)

table_info = namespace.describe_table(table_id=table_id, version=version)
uri = table_info.get("location")
request = DescribeTableRequest(id=table_id, version=version)
response = namespace.describe_table(request)

uri = response.location
if uri is None:
raise ValueError("Namespace did not return a 'location' for the table")

if ignore_namespace_table_storage_options:
namespace_storage_options = None
else:
namespace_storage_options = table_info.get("storage_options")
namespace_storage_options = response.storage_options

if namespace_storage_options:
storage_options_provider = LanceNamespaceStorageOptionsProvider(
Expand Down
34 changes: 17 additions & 17 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import pyarrow as pa
import pyarrow.dataset
from lance_namespace import DescribeTableRequest, LanceNamespace
from pyarrow import RecordBatch, Schema

from lance.log import LOGGER
Expand Down Expand Up @@ -5096,8 +5097,8 @@ def write_dataset(
transaction_properties: Optional[Dict[str, str]] = None,
initial_bases: Optional[List[DatasetBasePath]] = None,
target_bases: Optional[List[str]] = None,
namespace: Optional[any] = None,
table_id: Optional[list] = None,
namespace: Optional[LanceNamespace] = None,
table_id: Optional[List[str]] = None,
) -> LanceDataset:
"""Write a given data_obj to the given uri

Expand Down Expand Up @@ -5190,13 +5191,13 @@ def write_dataset(

**CREATE mode**: References must match bases in `initial_bases`
**APPEND/OVERWRITE modes**: References must match bases in the existing manifest
namespace : optional, any
namespace : optional, LanceNamespace
A namespace instance from which to fetch table location and storage options.
Must be provided together with `table_id`. Cannot be used with `uri`.
When provided, the table location will be fetched automatically from the
namespace via describe_table(). Storage options will be automatically refreshed
before they expire.
table_id : optional, list of str
table_id : optional, List[str]
The table identifier when using a namespace (e.g., ["my_table"]).
Must be provided together with `namespace`. Cannot be used with `uri`.

Expand Down Expand Up @@ -5230,22 +5231,21 @@ def write_dataset(
"Both 'namespace' and 'table_id' must be provided together."
)

# Call describe_table to get location and storage options
table_info = namespace.describe_table(table_id=table_id, version=None)

# Extract location from namespace response
uri = table_info.get("location")
request = DescribeTableRequest(id=table_id, version=None)
response = namespace.describe_table(request)
uri = response.location
if not uri:
raise ValueError("Namespace did not return a table location")

# Merge initial storage options from describe_table with user-provided options
namespace_storage_options = table_info.get("storage_options", {})
if storage_options:
# User-provided options take precedence
merged_storage_options = {**namespace_storage_options, **storage_options}
else:
merged_storage_options = namespace_storage_options
storage_options = merged_storage_options
namespace_storage_options = response.storage_options
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it consistent with how lance.dataset handles the storage options

if namespace_storage_options:
# TODO: support dynamic storage options provider
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this I will put a separated PR, need to properly handle CreateEmptyTable case

if storage_options is None:
storage_options = namespace_storage_options
else:
merged_options = dict(storage_options)
merged_options.update(namespace_storage_options)
storage_options = merged_options
elif table_id is not None:
raise ValueError("Both 'namespace' and 'table_id' must be provided together.")

Expand Down
24 changes: 10 additions & 14 deletions python/python/lance/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
enabling automatic storage options refresh for namespace-managed tables.
"""

from typing import Dict
from typing import Dict, List

from lance_namespace import DescribeTableRequest, LanceNamespace

from .io import StorageOptionsProvider

Expand All @@ -25,10 +27,8 @@ class LanceNamespaceStorageOptionsProvider(StorageOptionsProvider):

Parameters
----------
namespace : any
The namespace instance to fetch storage options from. This can be any
object with a describe_table(table_id, version) method that returns a
dict with 'location' and 'storage_options' keys. For example, use
namespace : LanceNamespace
The namespace instance to fetch storage options from. Use
lance_namespace.connect() from the lance_namespace PyPI package.
table_id : List[str]
The table identifier (e.g., ["workspace", "table_name"])
Expand Down Expand Up @@ -61,12 +61,12 @@ class LanceNamespaceStorageOptionsProvider(StorageOptionsProvider):
)
"""

def __init__(self, namespace, table_id: list):
def __init__(self, namespace: LanceNamespace, table_id: List[str]):
"""Initialize with namespace and table ID.

Parameters
----------
namespace : any
namespace : LanceNamespace
The namespace instance with a describe_table() method
table_id : List[str]
The table identifier
Expand All @@ -91,13 +91,9 @@ def fetch_storage_options(self) -> Dict[str, str]:
RuntimeError
If the namespace doesn't return storage options or expiration time
"""
# Call namespace to describe the table and get storage options
table_info = self._namespace.describe_table(
table_id=self._table_id, version=None
)

# Extract storage options - should already be a flat dict of strings
storage_options = table_info.get("storage_options")
request = DescribeTableRequest(id=self._table_id, version=None)
response = self._namespace.describe_table(request)
storage_options = response.storage_options
if storage_options is None:
raise RuntimeError(
"Namespace did not return storage_options. "
Expand Down
31 changes: 14 additions & 17 deletions python/python/tests/test_namespace_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
import time
import uuid
from threading import Lock
from typing import Dict, Optional
from typing import Dict

import lance
import pyarrow as pa
import pytest
from lance_namespace import DescribeTableResponse, LanceNamespace

# These are all keys that are accepted by storage_options
CONFIG = {
Expand Down Expand Up @@ -67,7 +68,7 @@ def delete_bucket(s3, bucket_name):
pass


class MockLanceNamespace:
class MockLanceNamespace(LanceNamespace):
"""
Mock namespace implementation that tracks credential refresh calls.

Expand Down Expand Up @@ -116,9 +117,7 @@ def namespace_id(self) -> str:
"""Return a unique identifier for this namespace instance."""
return "MockLanceNamespace { }"

def describe_table(
self, table_id: list, version: Optional[int] = None
) -> Dict[str, any]:
def describe_table(self, request) -> DescribeTableResponse:
"""
Describe a table and return storage options with incrementing credentials.

Expand All @@ -127,18 +126,16 @@ def describe_table(

Parameters
----------
table_id : list
The table identifier (e.g., ["my_table"])
version : Optional[int]
The table version (not used in this mock)
request : DescribeTableRequest
The describe table request.

Returns
-------
Dict[str, any]
A dictionary with:
- location: The S3 URI of the table
- storage_options: Dict with AWS credentials and expires_at_millis
DescribeTableResponse
Response with location and storage_options
"""
table_id = request.id

with self.lock:
self.call_count += 1
count = self.call_count
Expand All @@ -163,10 +160,10 @@ def describe_table(
)
storage_options["expires_at_millis"] = str(expires_at_millis)

return {
"location": location,
"storage_options": storage_options,
}
return DescribeTableResponse(
location=location,
storage_options=storage_options,
)


@pytest.mark.integration
Expand Down