From f55936df714a325b34b5226cad7f05922570adbc Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Sat, 23 Nov 2024 20:40:47 +0800 Subject: [PATCH 1/4] fix: fix storage options for ray --- python/python/lance/dataset.py | 13 ++++++++++++- python/python/tests/test_ray.py | 21 +++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 2c30d0e8407..9cdecabc182 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -164,6 +164,7 @@ def __init__( ): uri = os.fspath(uri) if isinstance(uri, Path) else uri self._uri = uri + self._storage_options = storage_options self._ds = _Dataset( uri, version, @@ -180,6 +181,7 @@ def __init__( def __deserialize__( cls, uri: str, + storage_options: Optional[Dict[str, str]], version: int, manifest: bytes, default_scan_options: Optional[Dict[str, Any]], @@ -187,6 +189,7 @@ def __deserialize__( return cls( uri, version, + storage_options=storage_options, serialized_manifest=manifest, default_scan_options=default_scan_options, ) @@ -194,6 +197,7 @@ def __deserialize__( def __reduce__(self): return type(self).__deserialize__, ( self.uri, + self._storage_options, self._ds.version(), self._ds.serialized_manifest(), self._default_scan_options, @@ -202,16 +206,20 @@ def __reduce__(self): def __getstate__(self): return ( self.uri, + self._storage_options, self._ds.version(), self._ds.serialized_manifest(), self._default_scan_options, ) def __setstate__(self, state): - self._uri, version, manifest, default_scan_options = state + self._uri, self._storage_options, version, manifest, default_scan_options = ( + state + ) self._ds = _Dataset( self._uri, version, + storage_options=self._storage_options, manifest=manifest, default_scan_options=default_scan_options, ) @@ -219,6 +227,7 @@ def __setstate__(self, state): def __copy__(self): ds = LanceDataset.__new__(LanceDataset) ds._uri = self._uri + ds._storage_options = self._storage_options ds._ds = copy.copy(self._ds) ds._default_scan_options = self._default_scan_options return ds @@ -2198,6 +2207,7 @@ def commit( max_retries=max_retries, ) ds = LanceDataset.__new__(LanceDataset) + ds._storage_options = storage_options ds._ds = new_ds ds._uri = new_ds.uri ds._default_scan_options = None @@ -3366,6 +3376,7 @@ def write_dataset( inner_ds = _write_dataset(reader, uri, params) ds = LanceDataset.__new__(LanceDataset) + ds._storage_options = storage_options ds._ds = inner_ds ds._uri = inner_ds.uri ds._default_scan_options = None diff --git a/python/python/tests/test_ray.py b/python/python/tests/test_ray.py index b85f185affa..17c6ae86f50 100644 --- a/python/python/tests/test_ray.py +++ b/python/python/tests/test_ray.py @@ -1,6 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright The Lance Authors +import copy from pathlib import Path import lance @@ -17,6 +18,15 @@ _register_hooks, ) +CONFIG = { + "allow_http": "true", + "aws_access_key_id": "ACCESSKEY", + "aws_secret_access_key": "SECRETKEY", + "aws_endpoint": "http://localhost:9000", + "dynamodb_endpoint": "http://localhost:8000", + "aws_region": "us-west-2", +} + # Use this hook until we have official DataSink in Ray. _register_hooks() @@ -116,3 +126,14 @@ def test_ray_empty_write_lance(tmp_path: Path): # empty write would not generate dataset. with pytest.raises(ValueError): lance.dataset(tmp_path) + + +@pytest.mark.filterwarnings("ignore::DeprecationWarning") +@pytest.mark.integration +def test_ray_read_lance(s3_bucket: str): + storage_options = copy.deepcopy(CONFIG) + table = pa.table({"a": [1, 2], "b": ["a", "b"]}) + path = f"s3://{s3_bucket}/test_ray_read.lance" + lance.write_dataset(table, path, storage_options=storage_options) + ds = ray.data.read_lance(path, storage_options=storage_options, concurrency=1) + ds.take(1) From aafcaffafc67db91608cd9ece93a73dca20a921b Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Wed, 4 Dec 2024 08:21:07 +0800 Subject: [PATCH 2/4] move integration test --- python/python/tests/test_ray.py | 21 --------------------- python/python/tests/test_s3_ddb.py | 12 ++++++++++++ 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/python/python/tests/test_ray.py b/python/python/tests/test_ray.py index 17c6ae86f50..b85f185affa 100644 --- a/python/python/tests/test_ray.py +++ b/python/python/tests/test_ray.py @@ -1,7 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright The Lance Authors -import copy from pathlib import Path import lance @@ -18,15 +17,6 @@ _register_hooks, ) -CONFIG = { - "allow_http": "true", - "aws_access_key_id": "ACCESSKEY", - "aws_secret_access_key": "SECRETKEY", - "aws_endpoint": "http://localhost:9000", - "dynamodb_endpoint": "http://localhost:8000", - "aws_region": "us-west-2", -} - # Use this hook until we have official DataSink in Ray. _register_hooks() @@ -126,14 +116,3 @@ def test_ray_empty_write_lance(tmp_path: Path): # empty write would not generate dataset. with pytest.raises(ValueError): lance.dataset(tmp_path) - - -@pytest.mark.filterwarnings("ignore::DeprecationWarning") -@pytest.mark.integration -def test_ray_read_lance(s3_bucket: str): - storage_options = copy.deepcopy(CONFIG) - table = pa.table({"a": [1, 2], "b": ["a", "b"]}) - path = f"s3://{s3_bucket}/test_ray_read.lance" - lance.write_dataset(table, path, storage_options=storage_options) - ds = ray.data.read_lance(path, storage_options=storage_options, concurrency=1) - ds.take(1) diff --git a/python/python/tests/test_s3_ddb.py b/python/python/tests/test_s3_ddb.py index 9e006fec60e..c5055d82c42 100644 --- a/python/python/tests/test_s3_ddb.py +++ b/python/python/tests/test_s3_ddb.py @@ -287,3 +287,15 @@ def test_file_writer_reader(s3_bucket: str): bytes(reader.read_global_buffer(global_buffer_pos)).decode() == global_buffer_text ) + + +@pytest.mark.filterwarnings("ignore::DeprecationWarning") +@pytest.mark.integration +@pytest.mark.skipif(not _RAY_AVAILABLE, reason="ray is not available") +def test_ray_read_lance(s3_bucket: str): + storage_options = copy.deepcopy(CONFIG) + table = pa.table({"a": [1, 2], "b": ["a", "b"]}) + path = f"s3://{s3_bucket}/test_ray_read.lance" + lance.write_dataset(table, path, storage_options=storage_options) + ds = ray.data.read_lance(path, storage_options=storage_options, concurrency=1) + ds.take(1) From 1e0528195f2b1e89339cbe6958ea0ece1cb9da51 Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Wed, 4 Dec 2024 11:55:09 +0800 Subject: [PATCH 3/4] Update test_s3_ddb.py --- python/python/tests/test_s3_ddb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/python/tests/test_s3_ddb.py b/python/python/tests/test_s3_ddb.py index 790ec50c1af..aab5d4152bf 100644 --- a/python/python/tests/test_s3_ddb.py +++ b/python/python/tests/test_s3_ddb.py @@ -306,4 +306,4 @@ def test_append_fragment(s3_bucket: str): table = pa.table({"a": [1, 2], "b": ["a", "b"]}) lance.fragment.LanceFragment.create( f"s3://{s3_bucket}/test_append.lance", table, storage_options=storage_options - ) \ No newline at end of file + ) From dbdfe36e5d925ca0b607d108325b0f161f2f5bf6 Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Wed, 4 Dec 2024 13:36:31 +0800 Subject: [PATCH 4/4] Update test_s3_ddb.py --- python/python/tests/test_s3_ddb.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/python/tests/test_s3_ddb.py b/python/python/tests/test_s3_ddb.py index aab5d4152bf..adabc740e45 100644 --- a/python/python/tests/test_s3_ddb.py +++ b/python/python/tests/test_s3_ddb.py @@ -300,6 +300,7 @@ def test_ray_read_lance(s3_bucket: str): ds = ray.data.read_lance(path, storage_options=storage_options, concurrency=1) ds.take(1) + @pytest.mark.integration def test_append_fragment(s3_bucket: str): storage_options = copy.deepcopy(CONFIG)