Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
],
)

computation = client.computations.run_capsule(run_params)
computation = client.computations.run_pipeline(run_params)

# Wait for pipeline to finish.

Expand Down
40 changes: 10 additions & 30 deletions src/codeocean/capsule.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,33 +470,34 @@ class Capsules:
"""Client for interacting with Code Ocean capsule APIs."""

client: BaseUrlSession
_route: str = "capsules"

def get_capsule(self, capsule_id: str) -> Capsule:
"""Retrieve metadata for a specific capsule by its ID."""
res = self.client.get(f"capsules/{capsule_id}")
res = self.client.get(f"{self._route}/{capsule_id}")

return Capsule.from_dict(res.json())

def delete_capsule(self, capsule_id: str):
"""Delete a capsule permanently."""
self.client.delete(f"capsules/{capsule_id}")
self.client.delete(f"{self._route}/{capsule_id}")

def get_capsule_app_panel(self, capsule_id: str, version: Optional[int] = None) -> AppPanel:
"""Retrieve app panel information for a specific capsule by its ID."""
res = self.client.get(f"capsules/{capsule_id}/app_panel", params={"version": version} if version else None)
res = self.client.get(f"{self._route}/{capsule_id}/app_panel", params={"version": version} if version else None)

return AppPanel.from_dict(res.json())

def list_computations(self, capsule_id: str) -> list[Computation]:
"""Get all computations associated with a specific capsule."""
res = self.client.get(f"capsules/{capsule_id}/computations")
res = self.client.get(f"{self._route}/{capsule_id}/computations")

return [Computation.from_dict(c) for c in res.json()]

def update_permissions(self, capsule_id: str, permissions: Permissions):
"""Update permissions for a capsule."""
self.client.post(
f"capsules/{capsule_id}/permissions",
f"{self._route}/{capsule_id}/permissions",
json=permissions.to_dict(),
)

Expand All @@ -507,7 +508,7 @@ def attach_data_assets(
) -> list[DataAssetAttachResults]:
"""Attach one or more data assets to a capsule with optional mount paths."""
res = self.client.post(
f"capsules/{capsule_id}/data_assets",
f"{self._route}/{capsule_id}/data_assets",
json=[j.to_dict() for j in attach_params],
)

Expand All @@ -516,21 +517,21 @@ def attach_data_assets(
def detach_data_assets(self, capsule_id: str, data_assets: list[str]):
"""Detach one or more data assets from a capsule by their IDs."""
self.client.delete(
f"capsules/{capsule_id}/data_assets/",
f"{self._route}/{capsule_id}/data_assets/",
json=data_assets,
)

def archive_capsule(self, capsule_id: str, archive: bool):
"""Archive or unarchive a capsule to control its visibility and accessibility."""
self.client.patch(
f"capsules/{capsule_id}/archive",
f"{self._route}/{capsule_id}/archive",
params={"archive": archive},
)

def search_capsules(self, search_params: CapsuleSearchParams) -> CapsuleSearchResults:
"""Search for capsules with filtering, sorting, and pagination
options."""
res = self.client.post("capsules/search", json=search_params.to_dict())
res = self.client.post(f"{self._route}/search", json=search_params.to_dict())

return CapsuleSearchResults.from_dict(res.json())

Expand All @@ -547,24 +548,3 @@ def search_capsules_iterator(self, search_params: CapsuleSearchParams) -> Iterat
return

params["next_token"] = response.next_token

def search_pipelines(self, search_params: CapsuleSearchParams) -> CapsuleSearchResults:
"""Search for pipelines with filtering, sorting, and pagination
options."""
res = self.client.post("pipelines/search", json=search_params.to_dict())

return CapsuleSearchResults.from_dict(res.json())

def search_pipelines_iterator(self, search_params: CapsuleSearchParams) -> Iterator[Capsule]:
"""Iterate through all pipelines matching search criteria with automatic pagination."""
params = search_params.to_dict()
while True:
response = self.search_pipelines(search_params=CapsuleSearchParams(**params))

for result in response.results:
yield result

if not response.has_more:
return

params["next_token"] = response.next_token
2 changes: 2 additions & 0 deletions src/codeocean/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from codeocean.custom_metadata import CustomMetadataSchema
from codeocean.data_asset import DataAssets
from codeocean.error import Error
from codeocean.pipeline import Pipelines


@dataclass
Expand Down Expand Up @@ -55,6 +56,7 @@ def __post_init__(self):
self.computations = Computations(client=self.session)
self.custom_metadata = CustomMetadataSchema(client=self.session)
self.data_assets = DataAssets(client=self.session)
self.pipelines = Pipelines(client=self.session)

def _error_handler(self, response, *args, **kwargs):
try:
Expand Down
3 changes: 3 additions & 0 deletions src/codeocean/computation.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ def run_capsule(self, run_params: RunParams) -> Computation:

return Computation.from_dict(res.json())

# Alias for run_capsule
run_pipeline = run_capsule

def wait_until_completed(
self,
computation: Computation,
Expand Down
17 changes: 17 additions & 0 deletions src/codeocean/data_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ class AWSS3Source:
"description": "The S3 bucket from which the data asset will be created",
},
)
endpoint_name: Optional[str] = field(
default=None,
metadata={
"description": "The name of the custom S3 endpoint where the bucket is stored",
},
)
prefix: Optional[str] = field(
default=None,
metadata={
Expand All @@ -318,6 +324,13 @@ class AWSS3Source:
"description": "When true, Code Ocean will access the source bucket without credentials",
},
)
use_input_bucket: Optional[bool] = field(
default=None,
metadata={
"description": "When true, Code Ocean will try to create the dataset from an internal "
"input bucket. All properties are ignored except for prefix. Only allowed to Admin users.",
},
)


@dataclass_json
Expand Down Expand Up @@ -396,6 +409,10 @@ class AWSS3Target:
bucket: str = field(
metadata={"description": "The S3 bucket where the data asset will be stored"},
)
endpoint_name: Optional[str] = field(
default=None,
metadata={"description": "The name of the custom S3 endpoint where the bucket is stored"},
)
prefix: Optional[str] = field(
default=None,
metadata={
Expand Down
71 changes: 71 additions & 0 deletions src/codeocean/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Iterator
from requests_toolbelt.sessions import BaseUrlSession

from codeocean.capsule import (
Capsule,
Capsules,
CapsuleSearchParams,
CapsuleSearchResults,
AppPanel,
)
from codeocean.components import Permissions
from codeocean.computation import Computation
from codeocean.data_asset import DataAssetAttachParams, DataAssetAttachResults


@dataclass
class Pipelines:
"""Client for interacting with Code Ocean pipeline APIs."""

client: BaseUrlSession
_capsules: Capsules = field(init=False, repr=False)

def __post_init__(self):
self._capsules = Capsules(client=self.client, _route="pipelines")

def get_pipeline(self, pipeline_id: str) -> Capsule:
"""Retrieve metadata for a specific pipeline by its ID."""
return self._capsules.get_capsule(pipeline_id)

def delete_pipeline(self, pipeline_id: str):
"""Delete a pipeline permanently."""
return self._capsules.delete_capsule(pipeline_id)

def get_pipeline_app_panel(self, pipeline_id: str, version: int | None = None) -> AppPanel:
"""Retrieve app panel information for a specific pipeline by its ID."""
return self._capsules.get_capsule_app_panel(pipeline_id, version)

def list_computations(self, pipeline_id: str) -> list[Computation]:
"""Get all computations associated with a specific pipeline."""
return self._capsules.list_computations(pipeline_id)

def update_permissions(self, pipeline_id: str, permissions: Permissions):
"""Update permissions for a pipeline."""
return self._capsules.update_permissions(pipeline_id, permissions)

def attach_data_assets(
self,
pipeline_id: str,
attach_params: list[DataAssetAttachParams],
) -> list[DataAssetAttachResults]:
"""Attach one or more data assets to a pipeline with optional mount paths."""
return self._capsules.attach_data_assets(pipeline_id, attach_params)

def detach_data_assets(self, pipeline_id: str, data_assets: list[str]):
"""Detach one or more data assets from a pipeline by their IDs."""
return self._capsules.detach_data_assets(pipeline_id, data_assets)

def archive_pipeline(self, pipeline_id: str, archive: bool):
"""Archive or unarchive a pipeline to control its visibility and accessibility."""
return self._capsules.archive_capsule(pipeline_id, archive)

def search_pipelines(self, search_params: CapsuleSearchParams) -> CapsuleSearchResults:
"""Search for pipelines with filtering, sorting, and pagination options."""
return self._capsules.search_capsules(search_params)

def search_pipelines_iterator(self, search_params: CapsuleSearchParams) -> Iterator[Capsule]:
"""Iterate through all pipelines matching search criteria with automatic pagination."""
return self._capsules.search_capsules_iterator(search_params)