Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def __init__(
):
uri = os.fspath(uri) if isinstance(uri, Path) else uri
self._uri = uri
self._storage_options = storage_options
self._ds = _Dataset(
uri,
version,
Expand All @@ -183,20 +184,23 @@ def __init__(
def __deserialize__(
cls,
uri: str,
storage_options: Optional[Dict[str, str]],
version: int,
manifest: bytes,
default_scan_options: Optional[Dict[str, Any]],
):
return cls(
uri,
version,
storage_options=storage_options,
serialized_manifest=manifest,
default_scan_options=default_scan_options,
)

def __reduce__(self):
return type(self).__deserialize__, (
self.uri,
self._storage_options,
self._ds.version(),
self._ds.serialized_manifest(),
self._default_scan_options,
Expand All @@ -205,23 +209,28 @@ def __reduce__(self):
def __getstate__(self):
return (
self.uri,
self._storage_options,
self._ds.version(),
self._ds.serialized_manifest(),
self._default_scan_options,
)

def __setstate__(self, state):
self._uri, version, manifest, default_scan_options = state
self._uri, self._storage_options, version, manifest, default_scan_options = (
state
)
self._ds = _Dataset(
self._uri,
version,
storage_options=self._storage_options,
manifest=manifest,
default_scan_options=default_scan_options,
)

def __copy__(self):
ds = LanceDataset.__new__(LanceDataset)
ds._uri = self._uri
ds._storage_options = self._storage_options
ds._ds = copy.copy(self._ds)
ds._default_scan_options = self._default_scan_options
return ds
Expand Down Expand Up @@ -2208,6 +2217,7 @@ def commit(
max_retries=max_retries,
)
ds = LanceDataset.__new__(LanceDataset)
ds._storage_options = storage_options
ds._ds = new_ds
ds._uri = new_ds.uri
ds._default_scan_options = None
Expand Down Expand Up @@ -3495,6 +3505,7 @@ def write_dataset(
inner_ds = _write_dataset(reader, uri, params)

ds = LanceDataset.__new__(LanceDataset)
ds._storage_options = storage_options
ds._ds = inner_ds
ds._uri = inner_ds.uri
ds._default_scan_options = None
Expand Down
12 changes: 12 additions & 0 deletions python/python/tests/test_s3_ddb.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,18 @@ def test_file_writer_reader(s3_bucket: str):
)


@pytest.mark.filterwarnings("ignore::DeprecationWarning")
@pytest.mark.integration
@pytest.mark.skipif(not _RAY_AVAILABLE, reason="ray is not available")
def test_ray_read_lance(s3_bucket: str):
storage_options = copy.deepcopy(CONFIG)
table = pa.table({"a": [1, 2], "b": ["a", "b"]})
path = f"s3://{s3_bucket}/test_ray_read.lance"
lance.write_dataset(table, path, storage_options=storage_options)
ds = ray.data.read_lance(path, storage_options=storage_options, concurrency=1)
ds.take(1)


@pytest.mark.integration
def test_append_fragment(s3_bucket: str):
storage_options = copy.deepcopy(CONFIG)
Expand Down