diff --git a/examples/run_pipeline.py b/examples/run_pipeline.py index 1c338c6..8fe8681 100644 --- a/examples/run_pipeline.py +++ b/examples/run_pipeline.py @@ -38,7 +38,7 @@ ], ) -computation = client.computations.run_capsule(run_params) +computation = client.computations.run_pipeline(run_params) # Wait for pipeline to finish. diff --git a/src/codeocean/capsule.py b/src/codeocean/capsule.py index 758a670..55df092 100644 --- a/src/codeocean/capsule.py +++ b/src/codeocean/capsule.py @@ -470,33 +470,34 @@ class Capsules: """Client for interacting with Code Ocean capsule APIs.""" client: BaseUrlSession + _route: str = "capsules" def get_capsule(self, capsule_id: str) -> Capsule: """Retrieve metadata for a specific capsule by its ID.""" - res = self.client.get(f"capsules/{capsule_id}") + res = self.client.get(f"{self._route}/{capsule_id}") return Capsule.from_dict(res.json()) def delete_capsule(self, capsule_id: str): """Delete a capsule permanently.""" - self.client.delete(f"capsules/{capsule_id}") + self.client.delete(f"{self._route}/{capsule_id}") def get_capsule_app_panel(self, capsule_id: str, version: Optional[int] = None) -> AppPanel: """Retrieve app panel information for a specific capsule by its ID.""" - res = self.client.get(f"capsules/{capsule_id}/app_panel", params={"version": version} if version else None) + res = self.client.get(f"{self._route}/{capsule_id}/app_panel", params={"version": version} if version else None) return AppPanel.from_dict(res.json()) def list_computations(self, capsule_id: str) -> list[Computation]: """Get all computations associated with a specific capsule.""" - res = self.client.get(f"capsules/{capsule_id}/computations") + res = self.client.get(f"{self._route}/{capsule_id}/computations") return [Computation.from_dict(c) for c in res.json()] def update_permissions(self, capsule_id: str, permissions: Permissions): """Update permissions for a capsule.""" self.client.post( - f"capsules/{capsule_id}/permissions", + f"{self._route}/{capsule_id}/permissions", json=permissions.to_dict(), ) @@ -507,7 +508,7 @@ def attach_data_assets( ) -> list[DataAssetAttachResults]: """Attach one or more data assets to a capsule with optional mount paths.""" res = self.client.post( - f"capsules/{capsule_id}/data_assets", + f"{self._route}/{capsule_id}/data_assets", json=[j.to_dict() for j in attach_params], ) @@ -516,21 +517,21 @@ def attach_data_assets( def detach_data_assets(self, capsule_id: str, data_assets: list[str]): """Detach one or more data assets from a capsule by their IDs.""" self.client.delete( - f"capsules/{capsule_id}/data_assets/", + f"{self._route}/{capsule_id}/data_assets/", json=data_assets, ) def archive_capsule(self, capsule_id: str, archive: bool): """Archive or unarchive a capsule to control its visibility and accessibility.""" self.client.patch( - f"capsules/{capsule_id}/archive", + f"{self._route}/{capsule_id}/archive", params={"archive": archive}, ) def search_capsules(self, search_params: CapsuleSearchParams) -> CapsuleSearchResults: """Search for capsules with filtering, sorting, and pagination options.""" - res = self.client.post("capsules/search", json=search_params.to_dict()) + res = self.client.post(f"{self._route}/search", json=search_params.to_dict()) return CapsuleSearchResults.from_dict(res.json()) @@ -547,24 +548,3 @@ def search_capsules_iterator(self, search_params: CapsuleSearchParams) -> Iterat return params["next_token"] = response.next_token - - def search_pipelines(self, search_params: CapsuleSearchParams) -> CapsuleSearchResults: - """Search for pipelines with filtering, sorting, and pagination - options.""" - res = self.client.post("pipelines/search", json=search_params.to_dict()) - - return CapsuleSearchResults.from_dict(res.json()) - - def search_pipelines_iterator(self, search_params: CapsuleSearchParams) -> Iterator[Capsule]: - """Iterate through all pipelines matching search criteria with automatic pagination.""" - params = search_params.to_dict() - while True: - response = self.search_pipelines(search_params=CapsuleSearchParams(**params)) - - for result in response.results: - yield result - - if not response.has_more: - return - - params["next_token"] = response.next_token diff --git a/src/codeocean/client.py b/src/codeocean/client.py index 607c801..8caadf1 100644 --- a/src/codeocean/client.py +++ b/src/codeocean/client.py @@ -12,6 +12,7 @@ from codeocean.custom_metadata import CustomMetadataSchema from codeocean.data_asset import DataAssets from codeocean.error import Error +from codeocean.pipeline import Pipelines @dataclass @@ -55,6 +56,7 @@ def __post_init__(self): self.computations = Computations(client=self.session) self.custom_metadata = CustomMetadataSchema(client=self.session) self.data_assets = DataAssets(client=self.session) + self.pipelines = Pipelines(client=self.session) def _error_handler(self, response, *args, **kwargs): try: diff --git a/src/codeocean/computation.py b/src/codeocean/computation.py index b0d94eb..bc71a7d 100644 --- a/src/codeocean/computation.py +++ b/src/codeocean/computation.py @@ -274,6 +274,9 @@ def run_capsule(self, run_params: RunParams) -> Computation: return Computation.from_dict(res.json()) + # Alias for run_capsule + run_pipeline = run_capsule + def wait_until_completed( self, computation: Computation, diff --git a/src/codeocean/data_asset.py b/src/codeocean/data_asset.py index 89c0a95..5066bdd 100644 --- a/src/codeocean/data_asset.py +++ b/src/codeocean/data_asset.py @@ -300,6 +300,12 @@ class AWSS3Source: "description": "The S3 bucket from which the data asset will be created", }, ) + endpoint_name: Optional[str] = field( + default=None, + metadata={ + "description": "The name of the custom S3 endpoint where the bucket is stored", + }, + ) prefix: Optional[str] = field( default=None, metadata={ @@ -318,6 +324,13 @@ class AWSS3Source: "description": "When true, Code Ocean will access the source bucket without credentials", }, ) + use_input_bucket: Optional[bool] = field( + default=None, + metadata={ + "description": "When true, Code Ocean will try to create the dataset from an internal " + "input bucket. All properties are ignored except for prefix. Only allowed to Admin users.", + }, + ) @dataclass_json @@ -396,6 +409,10 @@ class AWSS3Target: bucket: str = field( metadata={"description": "The S3 bucket where the data asset will be stored"}, ) + endpoint_name: Optional[str] = field( + default=None, + metadata={"description": "The name of the custom S3 endpoint where the bucket is stored"}, + ) prefix: Optional[str] = field( default=None, metadata={ diff --git a/src/codeocean/pipeline.py b/src/codeocean/pipeline.py new file mode 100644 index 0000000..5a76ae5 --- /dev/null +++ b/src/codeocean/pipeline.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Iterator +from requests_toolbelt.sessions import BaseUrlSession + +from codeocean.capsule import ( + Capsule, + Capsules, + CapsuleSearchParams, + CapsuleSearchResults, + AppPanel, +) +from codeocean.components import Permissions +from codeocean.computation import Computation +from codeocean.data_asset import DataAssetAttachParams, DataAssetAttachResults + + +@dataclass +class Pipelines: + """Client for interacting with Code Ocean pipeline APIs.""" + + client: BaseUrlSession + _capsules: Capsules = field(init=False, repr=False) + + def __post_init__(self): + self._capsules = Capsules(client=self.client, _route="pipelines") + + def get_pipeline(self, pipeline_id: str) -> Capsule: + """Retrieve metadata for a specific pipeline by its ID.""" + return self._capsules.get_capsule(pipeline_id) + + def delete_pipeline(self, pipeline_id: str): + """Delete a pipeline permanently.""" + return self._capsules.delete_capsule(pipeline_id) + + def get_pipeline_app_panel(self, pipeline_id: str, version: int | None = None) -> AppPanel: + """Retrieve app panel information for a specific pipeline by its ID.""" + return self._capsules.get_capsule_app_panel(pipeline_id, version) + + def list_computations(self, pipeline_id: str) -> list[Computation]: + """Get all computations associated with a specific pipeline.""" + return self._capsules.list_computations(pipeline_id) + + def update_permissions(self, pipeline_id: str, permissions: Permissions): + """Update permissions for a pipeline.""" + return self._capsules.update_permissions(pipeline_id, permissions) + + def attach_data_assets( + self, + pipeline_id: str, + attach_params: list[DataAssetAttachParams], + ) -> list[DataAssetAttachResults]: + """Attach one or more data assets to a pipeline with optional mount paths.""" + return self._capsules.attach_data_assets(pipeline_id, attach_params) + + def detach_data_assets(self, pipeline_id: str, data_assets: list[str]): + """Detach one or more data assets from a pipeline by their IDs.""" + return self._capsules.detach_data_assets(pipeline_id, data_assets) + + def archive_pipeline(self, pipeline_id: str, archive: bool): + """Archive or unarchive a pipeline to control its visibility and accessibility.""" + return self._capsules.archive_capsule(pipeline_id, archive) + + def search_pipelines(self, search_params: CapsuleSearchParams) -> CapsuleSearchResults: + """Search for pipelines with filtering, sorting, and pagination options.""" + return self._capsules.search_capsules(search_params) + + def search_pipelines_iterator(self, search_params: CapsuleSearchParams) -> Iterator[Capsule]: + """Iterate through all pipelines matching search criteria with automatic pagination.""" + return self._capsules.search_capsules_iterator(search_params)