From ee46f09a477c204e517790069e4f246a23ee8e65 Mon Sep 17 00:00:00 2001 From: jake-valsamis Date: Sat, 17 Jan 2026 18:34:43 -0500 Subject: [PATCH 1/4] 4.1 updates and pipelines route --- src/codeocean/capsule.py | 245 +------------------------ src/codeocean/client.py | 2 + src/codeocean/components.py | 216 ++++++++++++++++++++++ src/codeocean/computation.py | 13 ++ src/codeocean/data_asset.py | 19 +- src/codeocean/pipeline.py | 339 +++++++++++++++++++++++++++++++++++ 6 files changed, 595 insertions(+), 239 deletions(-) create mode 100644 src/codeocean/pipeline.py diff --git a/src/codeocean/capsule.py b/src/codeocean/capsule.py index 758a670..ddff71d 100644 --- a/src/codeocean/capsule.py +++ b/src/codeocean/capsule.py @@ -5,7 +5,13 @@ from typing import Optional, Iterator from requests_toolbelt.sessions import BaseUrlSession -from codeocean.components import Ownership, SortOrder, SearchFilter, Permissions +from codeocean.components import ( + Ownership, + SortOrder, + SearchFilter, + Permissions, + AppPanel, +) from codeocean.computation import Computation from codeocean.data_asset import DataAssetAttachParams, DataAssetAttachResults from codeocean.enum import StrEnum @@ -26,29 +32,6 @@ class CapsuleSortBy(StrEnum): Name = "name" -class AppPanelDataAssetKind(StrEnum): - """The kind of data asset displayed in an app panel. - - - 'Internal' → Data stored inside Code Ocean. - - 'External' → Data stored external to Code Ocean. - - 'Combined' → Data containing multiple external data assets. - - In pipelines, a data asset can only be replaced with one of the same kind. - """ - - Internal = "internal" - External = "external" - Combined = "combined" - - -class AppPanelParameterType(StrEnum): - """The type of parameter displayed in an app panel.""" - - Text = "text" - List = "list" - File = "file" - - @dataclass_json @dataclass(frozen=True) class OriginalCapsuleInfo: @@ -272,199 +255,6 @@ class CapsuleSearchResults: ) -@dataclass_json -@dataclass(frozen=True) -class AppPanelCategories: - """Categories for a capsule's App Panel parameters.""" - - id: str = dataclass_field( - metadata={"description": "Unique identifier for the category."}, - ) - name: str = dataclass_field( - metadata={"description": "Human-readable name of the category."}, - ) - description: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Optional detailed description of the category."}, - ) - help_text: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Optional help text providing guidance or additional information about the category."}, - ) - - -@dataclass_json -@dataclass(frozen=True) -class AppPanelParameters: - """Parameters for a capsule's App Panel.""" - - name: str = dataclass_field( - metadata={"description": "Parameter label/display name."} - ) - type: AppPanelParameterType = dataclass_field( - metadata={"description": "Type of the parameter (text, list, file)."} - ) - category: Optional[str] = dataclass_field( - default=None, - metadata={"description": "ID of category the parameter belongs to."} - ) - param_name: Optional[str] = dataclass_field( - default=None, - metadata={"description": "The parameter name/argument key"} - ) - description: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Description of the parameter."} - ) - help_text: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Help text for the parameter."} - ) - value_type: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Value type of the parameter."} - ) - default_value: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Default value of the parameter."} - ) - required: Optional[bool] = dataclass_field( - default=None, - metadata={"description": "Indicates if the parameter is required."} - ) - hidden: Optional[bool] = dataclass_field( - default=None, - metadata={"description": "Indicates if the parameter is hidden."} - ) - minimum: Optional[float] = dataclass_field( - default=None, - metadata={"description": "Minimum value for the parameter."} - ) - maximum: Optional[float] = dataclass_field( - default=None, - metadata={"description": "Maximum value for the parameter."} - ) - pattern: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Regular expression pattern for the parameter."} - ) - value_options: Optional[list[str]] = dataclass_field( - default=None, - metadata={"description": "Allowed values for the parameter."} - ) - - -@dataclass_json -@dataclass(frozen=True) -class AppPanelGeneral: - """General information about a capsule's App Panel.""" - - title: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Title of the App Panel."} - ) - instructions: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Instructions for using the App Panel."} - ) - help_text: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Help text for the App Panel."} - ) - - -@dataclass_json -@dataclass(frozen=True) -class AppPanelDataAsset: - """Data asset parameter for the App Panel.""" - - id: str = dataclass_field( - metadata={"description": "Unique identifier for the data asset."} - ) - mount: str = dataclass_field( - metadata={"description": "Mount path of the data asset within the capsule. " - "Use this mount path to replace the currently attached data asset with your own"} - ) - name: str = dataclass_field( - metadata={"description": "Display name of the data asset."} - ) - kind: AppPanelDataAssetKind = dataclass_field( - metadata={"description": "Kind of the data asset (internal, external, combined)."} - ) - accessible: bool = dataclass_field( - metadata={"description": "Indicates if the data asset is accessible to the user."} - ) - description: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Optional description of the data asset parameter."} - ) - help_text: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Optional help text for the data asset parameter."} - ) - - -@dataclass_json -@dataclass(frozen=True) -class AppPanelResult: - """Selected result files to display once the computation is complete.""" - - file_name: str = dataclass_field( - metadata={"description": "Name of the result file."} - ) - - -@dataclass_json -@dataclass(frozen=True) -class AppPanelProcess: - """Pipeline process name and its corresponding app panel (for pipelines of capsules only)""" - - name: str = dataclass_field( - metadata={"description": "Name of the pipeline process."} - ) - categories: Optional[AppPanelCategories] = dataclass_field( - default=None, - metadata={"description": "Categories for the pipeline process's app panel parameters."} - ) - parameters: Optional[AppPanelParameters] = dataclass_field( - default=None, - metadata={"description": "Parameters for the pipeline process's app panel."} - ) - - -@dataclass_json -@dataclass(frozen=True) -class AppPanel: - """App Panel configuration for a capsule or pipeline, including general info, data assets, - categories, parameters, and results. - """ - - general: Optional[AppPanelGeneral] = dataclass_field( - default=None, - metadata={"description": "General information about the App Panel."} - ) - data_assets: Optional[list[AppPanelDataAsset]] = dataclass_field( - default=None, - metadata={"description": "List of data assets used in the App Panel."} - ) - categories: Optional[list[AppPanelCategories]] = dataclass_field( - default=None, - metadata={"description": "Categories for organizing App Panel parameters."} - ) - parameters: Optional[list[AppPanelParameters]] = dataclass_field( - default=None, - metadata={"description": "Parameters for the App Panel."} - ) - results: Optional[list[AppPanelResult]] = dataclass_field( - default=None, - metadata={"description": "Result files to display after computation."} - ) - processes: Optional[list[AppPanelProcess]] = dataclass_field( - default=None, - metadata={"description": "Pipeline processes and their App Panels."} - ) - - @dataclass class Capsules: """Client for interacting with Code Ocean capsule APIs.""" @@ -547,24 +337,3 @@ def search_capsules_iterator(self, search_params: CapsuleSearchParams) -> Iterat return params["next_token"] = response.next_token - - def search_pipelines(self, search_params: CapsuleSearchParams) -> CapsuleSearchResults: - """Search for pipelines with filtering, sorting, and pagination - options.""" - res = self.client.post("pipelines/search", json=search_params.to_dict()) - - return CapsuleSearchResults.from_dict(res.json()) - - def search_pipelines_iterator(self, search_params: CapsuleSearchParams) -> Iterator[Capsule]: - """Iterate through all pipelines matching search criteria with automatic pagination.""" - params = search_params.to_dict() - while True: - response = self.search_pipelines(search_params=CapsuleSearchParams(**params)) - - for result in response.results: - yield result - - if not response.has_more: - return - - params["next_token"] = response.next_token diff --git a/src/codeocean/client.py b/src/codeocean/client.py index 607c801..8caadf1 100644 --- a/src/codeocean/client.py +++ b/src/codeocean/client.py @@ -12,6 +12,7 @@ from codeocean.custom_metadata import CustomMetadataSchema from codeocean.data_asset import DataAssets from codeocean.error import Error +from codeocean.pipeline import Pipelines @dataclass @@ -55,6 +56,7 @@ def __post_init__(self): self.computations = Computations(client=self.session) self.custom_metadata = CustomMetadataSchema(client=self.session) self.data_assets = DataAssets(client=self.session) + self.pipelines = Pipelines(client=self.session) def _error_handler(self, response, *args, **kwargs): try: diff --git a/src/codeocean/components.py b/src/codeocean/components.py index f580d64..693422b 100644 --- a/src/codeocean/components.py +++ b/src/codeocean/components.py @@ -146,3 +146,219 @@ class Ownership(StrEnum): Private = "private" Shared = "shared" Created = "created" + + +class AppPanelDataAssetKind(StrEnum): + """The kind of data asset displayed in an app panel. + + - 'Internal' → Data stored inside Code Ocean. + - 'External' → Data stored external to Code Ocean. + - 'Combined' → Data containing multiple external data assets. + + In pipelines, a data asset can only be replaced with one of the same kind. + """ + + Internal = "internal" + External = "external" + Combined = "combined" + + +class AppPanelParameterType(StrEnum): + """The type of parameter displayed in an app panel.""" + + Text = "text" + List = "list" + File = "file" + + +@dataclass_json +@dataclass(frozen=True) +class AppPanelCategories: + """Categories for a capsule's App Panel parameters.""" + + id: str = field( + metadata={"description": "Unique identifier for the category."}, + ) + name: str = field( + metadata={"description": "Human-readable name of the category."}, + ) + description: Optional[str] = field( + default=None, + metadata={"description": "Optional detailed description of the category."}, + ) + help_text: Optional[str] = field( + default=None, + metadata={"description": "Optional help text providing guidance or additional information about the category."}, + ) + + +@dataclass_json +@dataclass(frozen=True) +class AppPanelParameters: + """Parameters for a capsule's App Panel.""" + + name: str = field( + metadata={"description": "Parameter label/display name."} + ) + type: AppPanelParameterType = field( + metadata={"description": "Type of the parameter (text, list, file)."} + ) + category: Optional[str] = field( + default=None, + metadata={"description": "ID of category the parameter belongs to."} + ) + param_name: Optional[str] = field( + default=None, + metadata={"description": "The parameter name/argument key"} + ) + description: Optional[str] = field( + default=None, + metadata={"description": "Description of the parameter."} + ) + help_text: Optional[str] = field( + default=None, + metadata={"description": "Help text for the parameter."} + ) + value_type: Optional[str] = field( + default=None, + metadata={"description": "Value type of the parameter."} + ) + default_value: Optional[str] = field( + default=None, + metadata={"description": "Default value of the parameter."} + ) + required: Optional[bool] = field( + default=None, + metadata={"description": "Indicates if the parameter is required."} + ) + hidden: Optional[bool] = field( + default=None, + metadata={"description": "Indicates if the parameter is hidden."} + ) + minimum: Optional[float] = field( + default=None, + metadata={"description": "Minimum value for the parameter."} + ) + maximum: Optional[float] = field( + default=None, + metadata={"description": "Maximum value for the parameter."} + ) + pattern: Optional[str] = field( + default=None, + metadata={"description": "Regular expression pattern for the parameter."} + ) + value_options: Optional[list[str]] = field( + default=None, + metadata={"description": "Allowed values for the parameter."} + ) + + +@dataclass_json +@dataclass(frozen=True) +class AppPanelGeneral: + """General information about a capsule's App Panel.""" + + title: Optional[str] = field( + default=None, + metadata={"description": "Title of the App Panel."} + ) + instructions: Optional[str] = field( + default=None, + metadata={"description": "Instructions for using the App Panel."} + ) + help_text: Optional[str] = field( + default=None, + metadata={"description": "Help text for the App Panel."} + ) + + +@dataclass_json +@dataclass(frozen=True) +class AppPanelDataAsset: + """Data asset parameter for the App Panel.""" + + id: str = field( + metadata={"description": "Unique identifier for the data asset."} + ) + mount: str = field( + metadata={"description": "Mount path of the data asset within the capsule. " + "Use this mount path to replace the currently attached data asset with your own"} + ) + name: str = field( + metadata={"description": "Display name of the data asset."} + ) + kind: AppPanelDataAssetKind = field( + metadata={"description": "Kind of the data asset (internal, external, combined)."} + ) + accessible: bool = field( + metadata={"description": "Indicates if the data asset is accessible to the user."} + ) + description: Optional[str] = field( + default=None, + metadata={"description": "Optional description of the data asset parameter."} + ) + help_text: Optional[str] = field( + default=None, + metadata={"description": "Optional help text for the data asset parameter."} + ) + + +@dataclass_json +@dataclass(frozen=True) +class AppPanelResult: + """Selected result files to display once the computation is complete.""" + + file_name: str = field( + metadata={"description": "Name of the result file."} + ) + + +@dataclass_json +@dataclass(frozen=True) +class AppPanelProcess: + """Pipeline process name and its corresponding app panel (for pipelines of capsules only)""" + + name: str = field( + metadata={"description": "Name of the pipeline process."} + ) + categories: Optional[AppPanelCategories] = field( + default=None, + metadata={"description": "Categories for the pipeline process's app panel parameters."} + ) + parameters: Optional[AppPanelParameters] = field( + default=None, + metadata={"description": "Parameters for the pipeline process's app panel."} + ) + + +@dataclass_json +@dataclass(frozen=True) +class AppPanel: + """App Panel configuration for a capsule or pipeline, including general info, data assets, + categories, parameters, and results. + """ + + general: Optional[AppPanelGeneral] = field( + default=None, + metadata={"description": "General information about the App Panel."} + ) + data_assets: Optional[list[AppPanelDataAsset]] = field( + default=None, + metadata={"description": "List of data assets used in the App Panel."} + ) + categories: Optional[list[AppPanelCategories]] = field( + default=None, + metadata={"description": "Categories for organizing App Panel parameters."} + ) + parameters: Optional[list[AppPanelParameters]] = field( + default=None, + metadata={"description": "Parameters for the App Panel."} + ) + results: Optional[list[AppPanelResult]] = field( + default=None, + metadata={"description": "Result files to display after computation."} + ) + processes: Optional[list[AppPanelProcess]] = field( + default=None, + metadata={"description": "Pipeline processes and their App Panels."} + ) diff --git a/src/codeocean/computation.py b/src/codeocean/computation.py index b0d94eb..7506d2f 100644 --- a/src/codeocean/computation.py +++ b/src/codeocean/computation.py @@ -274,6 +274,19 @@ def run_capsule(self, run_params: RunParams) -> Computation: return Computation.from_dict(res.json()) + def run_pipeline(self, run_params: RunParams) -> Computation: + """ + Execute a pipeline with specified parameters and data assets. + + Set run_params.pipeline_id and optionally provide data_assets, + processes (with process-specific parameters), and nextflow_profile configuration. + + This is a convenience method that calls the same endpoint as run_capsule. + """ + res = self.client.post("computations", json=run_params.to_dict()) + + return Computation.from_dict(res.json()) + def wait_until_completed( self, computation: Computation, diff --git a/src/codeocean/data_asset.py b/src/codeocean/data_asset.py index 89c0a95..f36e214 100644 --- a/src/codeocean/data_asset.py +++ b/src/codeocean/data_asset.py @@ -294,12 +294,18 @@ class DataAssetUpdateParams: @dataclass(frozen=True) class AWSS3Source: """AWS S3 source configuration for creating data assets.""" - + bucket: str = field( metadata={ "description": "The S3 bucket from which the data asset will be created", }, ) + endpoint_name: Optional[str] = field( + default=None, + metadata={ + "description": "The name of the custom S3 endpoint where the bucket is stored", + }, + ) prefix: Optional[str] = field( default=None, metadata={ @@ -318,6 +324,13 @@ class AWSS3Source: "description": "When true, Code Ocean will access the source bucket without credentials", }, ) + use_input_bucket: Optional[bool] = field( + default=None, + metadata={ + "description": "When true, Code Ocean will try to create the dataset from an internal " + "input bucket. All properties are ignored except for prefix. Only allowed to Admin users.", + }, + ) @dataclass_json @@ -396,6 +409,10 @@ class AWSS3Target: bucket: str = field( metadata={"description": "The S3 bucket where the data asset will be stored"}, ) + endpoint_name: Optional[str] = field( + default=None, + metadata={"description": "The name of the custom S3 endpoint where the bucket is stored"}, + ) prefix: Optional[str] = field( default=None, metadata={ diff --git a/src/codeocean/pipeline.py b/src/codeocean/pipeline.py new file mode 100644 index 0000000..30ab506 --- /dev/null +++ b/src/codeocean/pipeline.py @@ -0,0 +1,339 @@ +from __future__ import annotations + +from dataclasses import dataclass, field as dataclass_field +from dataclasses_json import dataclass_json +from typing import Iterator, Optional +from requests_toolbelt.sessions import BaseUrlSession + +from codeocean.components import ( + Ownership, + SortOrder, + SearchFilter, + Permissions, + AppPanel, +) +from codeocean.computation import Computation +from codeocean.data_asset import DataAssetAttachParams, DataAssetAttachResults +from codeocean.enum import StrEnum + + +class PipelineStatus(StrEnum): + """Status of a pipeline indicating its release state.""" + + NonRelease = "non_release" + Release = "release" + + +class PipelineSortBy(StrEnum): + """Fields available for sorting pipeline search results.""" + + Created = "created" + LastAccessed = "last_accessed" + Name = "name" + + +@dataclass_json +@dataclass(frozen=True) +class OriginalPipelineInfo: + """Information about the original pipeline when this pipeline is duplicated from + another.""" + + id: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Original pipeline ID"}, + ) + major_version: Optional[int] = dataclass_field( + default=None, + metadata={"description": "Original pipeline major version"}, + ) + minor_version: Optional[int] = dataclass_field( + default=None, + metadata={"description": "Original pipeline minor version"}, + ) + name: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Original pipeline name"}, + ) + created: Optional[int] = dataclass_field( + default=None, + metadata={"description": "Original pipeline creation time (int64 timestamp)"}, + ) + public: Optional[bool] = dataclass_field( + default=None, + metadata={"description": "Indicates whether the original pipeline is public"}, + ) + + +@dataclass_json +@dataclass(frozen=True) +class Pipeline: + """Represents a Code Ocean pipeline with its metadata and properties.""" + + id: str = dataclass_field( + metadata={"description": "Pipeline ID"}, + ) + created: int = dataclass_field( + metadata={"description": "Pipeline creation time (int64 timestamp)"}, + ) + name: str = dataclass_field( + metadata={"description": "Pipeline display name"}, + ) + status: PipelineStatus = dataclass_field( + metadata={"description": "Status of the pipeline (non_release or release)"}, + ) + owner: str = dataclass_field( + metadata={"description": "Pipeline owner's ID"}, + ) + slug: str = dataclass_field( + metadata={"description": "Alternate pipeline ID (URL-friendly identifier)"}, + ) + last_accessed: Optional[int] = dataclass_field( + default=None, + metadata={"description": "Pipeline last accessed time (int64 timestamp)"}, + ) + article: Optional[dict] = dataclass_field( + default=None, + metadata={ + "description": "Pipeline article info with URL, ID, DOI, " + "citation, state, name, journal_name, and " + "publish_time" + }, + ) + cloned_from_url: Optional[str] = dataclass_field( + default=None, + metadata={"description": "URL to external Git repository linked to pipeline"}, + ) + description: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Pipeline description"}, + ) + field: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Pipeline research field"}, + ) + tags: Optional[list[str]] = dataclass_field( + default=None, + metadata={"description": "List of tags associated with the pipeline"}, + ) + original_pipeline: Optional[OriginalPipelineInfo] = dataclass_field( + default=None, + metadata={ + "description": "Original pipeline info when this pipeline is duplicated from another" + }, + ) + release_pipeline: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Release pipeline ID"}, + ) + submission: Optional[dict] = dataclass_field( + default=None, + metadata={ + "description": "Submission info with timestamp, commit hash, " + "verification_pipeline, verified status, and " + "verified_timestamp" + }, + ) + versions: Optional[list[dict]] = dataclass_field( + default=None, + metadata={ + "description": "Pipeline versions with major_version, minor_version, release_time, and DOI" + }, + ) + + +@dataclass_json +@dataclass(frozen=True) +class PipelineSearchParams: + """Parameters for searching pipelines with various filters and pagination + options.""" + + query: Optional[str] = dataclass_field( + default=None, + metadata={ + "description": """Search expression supporting free text and field:value filters. + Valid fields: + - id + - name + - doi + - tag + - field + - affiliation + - journal + - article + - author + + Free text: + - Matches across weighted fields (name, tags, description, authors, etc.) + + Syntax rules: + - Same field repeated = OR + - Different fields = AND + - Quotes = exact phrase + - No explicit OR operator + - No wildcards (*) + - Not case sensitive + + Notes: + - "description" is not directly searchable; it is covered by free-text matching. + + Examples: + - name:RNA-seq tag:genomics + - name:"single cell analysis" + - Synergy + - name:Synergy + """ + }, + ) + next_token: Optional[str] = dataclass_field( + default=None, + metadata={ + "description": "Token for next page of results from previous response" + }, + ) + offset: Optional[int] = dataclass_field( + default=None, + metadata={ + "description": "Starting index for search results (ignored if next_token is set)" + }, + ) + limit: Optional[int] = dataclass_field( + default=None, + metadata={ + "description": "Number of items to return (up to 1000, defaults to 100)" + }, + ) + sort_field: Optional[PipelineSortBy] = dataclass_field( + default=None, + metadata={"description": "Field to sort by (created, name, or last_accessed)"}, + ) + sort_order: Optional[SortOrder] = dataclass_field( + default=None, + metadata={ + "description": "Sort order ('asc' or 'desc') - must be provided with a sort_field parameter as well!" + }, + ) + ownership: Optional[Ownership] = dataclass_field( + default=None, + metadata={ + "description": "Filter by ownership ('private', 'created' or 'shared') - defaults to all accessible" + }, + ) + status: Optional[PipelineStatus] = dataclass_field( + default=None, + metadata={"description": "Filter by status (release or non_release) - defaults to all"}, + ) + favorite: Optional[bool] = dataclass_field( + default=None, + metadata={"description": "Search only favorite pipelines"}, + ) + archived: Optional[bool] = dataclass_field( + default=None, + metadata={"description": "Search only archived pipelines"}, + ) + filters: Optional[list[SearchFilter]] = dataclass_field( + default=None, + metadata={ + "description": "Additional field-level filters for name, description, tags, or custom fields" + }, + ) + + +@dataclass_json +@dataclass(frozen=True) +class PipelineSearchResults: + """Results from a pipeline search operation with pagination support.""" + + has_more: bool = dataclass_field( + metadata={"description": "Indicates if there are more results available"}, + ) + results: list[Pipeline] = dataclass_field( + metadata={"description": "Array of pipelines found matching the search criteria"}, + ) + next_token: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Token for fetching the next page of results"}, + ) + + +@dataclass +class Pipelines: + """Client for interacting with Code Ocean pipeline APIs.""" + + client: BaseUrlSession + + def get_pipeline(self, pipeline_id: str) -> Pipeline: + """Retrieve metadata for a specific pipeline by its ID.""" + res = self.client.get(f"pipelines/{pipeline_id}") + + return Pipeline.from_dict(res.json()) + + def delete_pipeline(self, pipeline_id: str): + """Delete a pipeline permanently.""" + self.client.delete(f"pipelines/{pipeline_id}") + + def get_pipeline_app_panel(self, pipeline_id: str, version: int | None = None) -> AppPanel: + """Retrieve app panel information for a specific pipeline by its ID.""" + res = self.client.get(f"pipelines/{pipeline_id}/app_panel", params={"version": version} if version else None) + + return AppPanel.from_dict(res.json()) + + def list_computations(self, pipeline_id: str) -> list[Computation]: + """Get all computations associated with a specific pipeline.""" + res = self.client.get(f"pipelines/{pipeline_id}/computations") + + return [Computation.from_dict(c) for c in res.json()] + + def update_permissions(self, pipeline_id: str, permissions: Permissions): + """Update permissions for a pipeline.""" + self.client.post( + f"pipelines/{pipeline_id}/permissions", + json=permissions.to_dict(), + ) + + def attach_data_assets( + self, + pipeline_id: str, + attach_params: list[DataAssetAttachParams], + ) -> list[DataAssetAttachResults]: + """Attach one or more data assets to a pipeline with optional mount paths.""" + res = self.client.post( + f"pipelines/{pipeline_id}/data_assets", + json=[j.to_dict() for j in attach_params], + ) + + return [DataAssetAttachResults.from_dict(c) for c in res.json()] + + def detach_data_assets(self, pipeline_id: str, data_assets: list[str]): + """Detach one or more data assets from a pipeline by their IDs.""" + self.client.delete( + f"pipelines/{pipeline_id}/data_assets/", + json=data_assets, + ) + + def archive_pipeline(self, pipeline_id: str, archive: bool): + """Archive or unarchive a pipeline to control its visibility and accessibility.""" + self.client.patch( + f"pipelines/{pipeline_id}/archive", + params={"archive": archive}, + ) + + def search_pipelines(self, search_params: PipelineSearchParams) -> PipelineSearchResults: + """Search for pipelines with filtering, sorting, and pagination + options.""" + res = self.client.post("pipelines/search", json=search_params.to_dict()) + + return PipelineSearchResults.from_dict(res.json()) + + def search_pipelines_iterator(self, search_params: PipelineSearchParams) -> Iterator[Pipeline]: + """Iterate through all pipelines matching search criteria with automatic pagination.""" + params = search_params.to_dict() + while True: + response = self.search_pipelines(search_params=PipelineSearchParams(**params)) + + for result in response.results: + yield result + + if not response.has_more: + return + + params["next_token"] = response.next_token From 295ee4194285a644f0010c1a7021011a173a2c4d Mon Sep 17 00:00:00 2001 From: jake-valsamis Date: Sat, 17 Jan 2026 18:38:59 -0500 Subject: [PATCH 2/4] fix formatting --- src/codeocean/data_asset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/codeocean/data_asset.py b/src/codeocean/data_asset.py index f36e214..5066bdd 100644 --- a/src/codeocean/data_asset.py +++ b/src/codeocean/data_asset.py @@ -294,7 +294,7 @@ class DataAssetUpdateParams: @dataclass(frozen=True) class AWSS3Source: """AWS S3 source configuration for creating data assets.""" - + bucket: str = field( metadata={ "description": "The S3 bucket from which the data asset will be created", From a8faf3f971beecc8461ffb54961b0f6ad80109de Mon Sep 17 00:00:00 2001 From: Zvika Gart Date: Tue, 20 Jan 2026 20:46:47 +0000 Subject: [PATCH 3/4] refactor: Pipelines subclasses Capsules, move AppPanel back to capsule.py - Capsules now uses configurable _route for API paths - Pipelines extends Capsules with _route="pipelines" and method aliases - Moved AppPanel classes from components.py back to capsule.py - Added run_pipeline as alias for run_capsule in Computations Co-Authored-By: Claude Opus 4.5 --- examples/run_pipeline.py | 2 +- src/codeocean/capsule.py | 243 +++++++++++++++++++++++-- src/codeocean/components.py | 216 ---------------------- src/codeocean/computation.py | 14 +- src/codeocean/pipeline.py | 341 ++--------------------------------- 5 files changed, 241 insertions(+), 575 deletions(-) diff --git a/examples/run_pipeline.py b/examples/run_pipeline.py index 1c338c6..8fe8681 100644 --- a/examples/run_pipeline.py +++ b/examples/run_pipeline.py @@ -38,7 +38,7 @@ ], ) -computation = client.computations.run_capsule(run_params) +computation = client.computations.run_pipeline(run_params) # Wait for pipeline to finish. diff --git a/src/codeocean/capsule.py b/src/codeocean/capsule.py index ddff71d..55df092 100644 --- a/src/codeocean/capsule.py +++ b/src/codeocean/capsule.py @@ -5,13 +5,7 @@ from typing import Optional, Iterator from requests_toolbelt.sessions import BaseUrlSession -from codeocean.components import ( - Ownership, - SortOrder, - SearchFilter, - Permissions, - AppPanel, -) +from codeocean.components import Ownership, SortOrder, SearchFilter, Permissions from codeocean.computation import Computation from codeocean.data_asset import DataAssetAttachParams, DataAssetAttachResults from codeocean.enum import StrEnum @@ -32,6 +26,29 @@ class CapsuleSortBy(StrEnum): Name = "name" +class AppPanelDataAssetKind(StrEnum): + """The kind of data asset displayed in an app panel. + + - 'Internal' → Data stored inside Code Ocean. + - 'External' → Data stored external to Code Ocean. + - 'Combined' → Data containing multiple external data assets. + + In pipelines, a data asset can only be replaced with one of the same kind. + """ + + Internal = "internal" + External = "external" + Combined = "combined" + + +class AppPanelParameterType(StrEnum): + """The type of parameter displayed in an app panel.""" + + Text = "text" + List = "list" + File = "file" + + @dataclass_json @dataclass(frozen=True) class OriginalCapsuleInfo: @@ -255,38 +272,232 @@ class CapsuleSearchResults: ) +@dataclass_json +@dataclass(frozen=True) +class AppPanelCategories: + """Categories for a capsule's App Panel parameters.""" + + id: str = dataclass_field( + metadata={"description": "Unique identifier for the category."}, + ) + name: str = dataclass_field( + metadata={"description": "Human-readable name of the category."}, + ) + description: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Optional detailed description of the category."}, + ) + help_text: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Optional help text providing guidance or additional information about the category."}, + ) + + +@dataclass_json +@dataclass(frozen=True) +class AppPanelParameters: + """Parameters for a capsule's App Panel.""" + + name: str = dataclass_field( + metadata={"description": "Parameter label/display name."} + ) + type: AppPanelParameterType = dataclass_field( + metadata={"description": "Type of the parameter (text, list, file)."} + ) + category: Optional[str] = dataclass_field( + default=None, + metadata={"description": "ID of category the parameter belongs to."} + ) + param_name: Optional[str] = dataclass_field( + default=None, + metadata={"description": "The parameter name/argument key"} + ) + description: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Description of the parameter."} + ) + help_text: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Help text for the parameter."} + ) + value_type: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Value type of the parameter."} + ) + default_value: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Default value of the parameter."} + ) + required: Optional[bool] = dataclass_field( + default=None, + metadata={"description": "Indicates if the parameter is required."} + ) + hidden: Optional[bool] = dataclass_field( + default=None, + metadata={"description": "Indicates if the parameter is hidden."} + ) + minimum: Optional[float] = dataclass_field( + default=None, + metadata={"description": "Minimum value for the parameter."} + ) + maximum: Optional[float] = dataclass_field( + default=None, + metadata={"description": "Maximum value for the parameter."} + ) + pattern: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Regular expression pattern for the parameter."} + ) + value_options: Optional[list[str]] = dataclass_field( + default=None, + metadata={"description": "Allowed values for the parameter."} + ) + + +@dataclass_json +@dataclass(frozen=True) +class AppPanelGeneral: + """General information about a capsule's App Panel.""" + + title: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Title of the App Panel."} + ) + instructions: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Instructions for using the App Panel."} + ) + help_text: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Help text for the App Panel."} + ) + + +@dataclass_json +@dataclass(frozen=True) +class AppPanelDataAsset: + """Data asset parameter for the App Panel.""" + + id: str = dataclass_field( + metadata={"description": "Unique identifier for the data asset."} + ) + mount: str = dataclass_field( + metadata={"description": "Mount path of the data asset within the capsule. " + "Use this mount path to replace the currently attached data asset with your own"} + ) + name: str = dataclass_field( + metadata={"description": "Display name of the data asset."} + ) + kind: AppPanelDataAssetKind = dataclass_field( + metadata={"description": "Kind of the data asset (internal, external, combined)."} + ) + accessible: bool = dataclass_field( + metadata={"description": "Indicates if the data asset is accessible to the user."} + ) + description: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Optional description of the data asset parameter."} + ) + help_text: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Optional help text for the data asset parameter."} + ) + + +@dataclass_json +@dataclass(frozen=True) +class AppPanelResult: + """Selected result files to display once the computation is complete.""" + + file_name: str = dataclass_field( + metadata={"description": "Name of the result file."} + ) + + +@dataclass_json +@dataclass(frozen=True) +class AppPanelProcess: + """Pipeline process name and its corresponding app panel (for pipelines of capsules only)""" + + name: str = dataclass_field( + metadata={"description": "Name of the pipeline process."} + ) + categories: Optional[AppPanelCategories] = dataclass_field( + default=None, + metadata={"description": "Categories for the pipeline process's app panel parameters."} + ) + parameters: Optional[AppPanelParameters] = dataclass_field( + default=None, + metadata={"description": "Parameters for the pipeline process's app panel."} + ) + + +@dataclass_json +@dataclass(frozen=True) +class AppPanel: + """App Panel configuration for a capsule or pipeline, including general info, data assets, + categories, parameters, and results. + """ + + general: Optional[AppPanelGeneral] = dataclass_field( + default=None, + metadata={"description": "General information about the App Panel."} + ) + data_assets: Optional[list[AppPanelDataAsset]] = dataclass_field( + default=None, + metadata={"description": "List of data assets used in the App Panel."} + ) + categories: Optional[list[AppPanelCategories]] = dataclass_field( + default=None, + metadata={"description": "Categories for organizing App Panel parameters."} + ) + parameters: Optional[list[AppPanelParameters]] = dataclass_field( + default=None, + metadata={"description": "Parameters for the App Panel."} + ) + results: Optional[list[AppPanelResult]] = dataclass_field( + default=None, + metadata={"description": "Result files to display after computation."} + ) + processes: Optional[list[AppPanelProcess]] = dataclass_field( + default=None, + metadata={"description": "Pipeline processes and their App Panels."} + ) + + @dataclass class Capsules: """Client for interacting with Code Ocean capsule APIs.""" client: BaseUrlSession + _route: str = "capsules" def get_capsule(self, capsule_id: str) -> Capsule: """Retrieve metadata for a specific capsule by its ID.""" - res = self.client.get(f"capsules/{capsule_id}") + res = self.client.get(f"{self._route}/{capsule_id}") return Capsule.from_dict(res.json()) def delete_capsule(self, capsule_id: str): """Delete a capsule permanently.""" - self.client.delete(f"capsules/{capsule_id}") + self.client.delete(f"{self._route}/{capsule_id}") def get_capsule_app_panel(self, capsule_id: str, version: Optional[int] = None) -> AppPanel: """Retrieve app panel information for a specific capsule by its ID.""" - res = self.client.get(f"capsules/{capsule_id}/app_panel", params={"version": version} if version else None) + res = self.client.get(f"{self._route}/{capsule_id}/app_panel", params={"version": version} if version else None) return AppPanel.from_dict(res.json()) def list_computations(self, capsule_id: str) -> list[Computation]: """Get all computations associated with a specific capsule.""" - res = self.client.get(f"capsules/{capsule_id}/computations") + res = self.client.get(f"{self._route}/{capsule_id}/computations") return [Computation.from_dict(c) for c in res.json()] def update_permissions(self, capsule_id: str, permissions: Permissions): """Update permissions for a capsule.""" self.client.post( - f"capsules/{capsule_id}/permissions", + f"{self._route}/{capsule_id}/permissions", json=permissions.to_dict(), ) @@ -297,7 +508,7 @@ def attach_data_assets( ) -> list[DataAssetAttachResults]: """Attach one or more data assets to a capsule with optional mount paths.""" res = self.client.post( - f"capsules/{capsule_id}/data_assets", + f"{self._route}/{capsule_id}/data_assets", json=[j.to_dict() for j in attach_params], ) @@ -306,21 +517,21 @@ def attach_data_assets( def detach_data_assets(self, capsule_id: str, data_assets: list[str]): """Detach one or more data assets from a capsule by their IDs.""" self.client.delete( - f"capsules/{capsule_id}/data_assets/", + f"{self._route}/{capsule_id}/data_assets/", json=data_assets, ) def archive_capsule(self, capsule_id: str, archive: bool): """Archive or unarchive a capsule to control its visibility and accessibility.""" self.client.patch( - f"capsules/{capsule_id}/archive", + f"{self._route}/{capsule_id}/archive", params={"archive": archive}, ) def search_capsules(self, search_params: CapsuleSearchParams) -> CapsuleSearchResults: """Search for capsules with filtering, sorting, and pagination options.""" - res = self.client.post("capsules/search", json=search_params.to_dict()) + res = self.client.post(f"{self._route}/search", json=search_params.to_dict()) return CapsuleSearchResults.from_dict(res.json()) diff --git a/src/codeocean/components.py b/src/codeocean/components.py index 693422b..f580d64 100644 --- a/src/codeocean/components.py +++ b/src/codeocean/components.py @@ -146,219 +146,3 @@ class Ownership(StrEnum): Private = "private" Shared = "shared" Created = "created" - - -class AppPanelDataAssetKind(StrEnum): - """The kind of data asset displayed in an app panel. - - - 'Internal' → Data stored inside Code Ocean. - - 'External' → Data stored external to Code Ocean. - - 'Combined' → Data containing multiple external data assets. - - In pipelines, a data asset can only be replaced with one of the same kind. - """ - - Internal = "internal" - External = "external" - Combined = "combined" - - -class AppPanelParameterType(StrEnum): - """The type of parameter displayed in an app panel.""" - - Text = "text" - List = "list" - File = "file" - - -@dataclass_json -@dataclass(frozen=True) -class AppPanelCategories: - """Categories for a capsule's App Panel parameters.""" - - id: str = field( - metadata={"description": "Unique identifier for the category."}, - ) - name: str = field( - metadata={"description": "Human-readable name of the category."}, - ) - description: Optional[str] = field( - default=None, - metadata={"description": "Optional detailed description of the category."}, - ) - help_text: Optional[str] = field( - default=None, - metadata={"description": "Optional help text providing guidance or additional information about the category."}, - ) - - -@dataclass_json -@dataclass(frozen=True) -class AppPanelParameters: - """Parameters for a capsule's App Panel.""" - - name: str = field( - metadata={"description": "Parameter label/display name."} - ) - type: AppPanelParameterType = field( - metadata={"description": "Type of the parameter (text, list, file)."} - ) - category: Optional[str] = field( - default=None, - metadata={"description": "ID of category the parameter belongs to."} - ) - param_name: Optional[str] = field( - default=None, - metadata={"description": "The parameter name/argument key"} - ) - description: Optional[str] = field( - default=None, - metadata={"description": "Description of the parameter."} - ) - help_text: Optional[str] = field( - default=None, - metadata={"description": "Help text for the parameter."} - ) - value_type: Optional[str] = field( - default=None, - metadata={"description": "Value type of the parameter."} - ) - default_value: Optional[str] = field( - default=None, - metadata={"description": "Default value of the parameter."} - ) - required: Optional[bool] = field( - default=None, - metadata={"description": "Indicates if the parameter is required."} - ) - hidden: Optional[bool] = field( - default=None, - metadata={"description": "Indicates if the parameter is hidden."} - ) - minimum: Optional[float] = field( - default=None, - metadata={"description": "Minimum value for the parameter."} - ) - maximum: Optional[float] = field( - default=None, - metadata={"description": "Maximum value for the parameter."} - ) - pattern: Optional[str] = field( - default=None, - metadata={"description": "Regular expression pattern for the parameter."} - ) - value_options: Optional[list[str]] = field( - default=None, - metadata={"description": "Allowed values for the parameter."} - ) - - -@dataclass_json -@dataclass(frozen=True) -class AppPanelGeneral: - """General information about a capsule's App Panel.""" - - title: Optional[str] = field( - default=None, - metadata={"description": "Title of the App Panel."} - ) - instructions: Optional[str] = field( - default=None, - metadata={"description": "Instructions for using the App Panel."} - ) - help_text: Optional[str] = field( - default=None, - metadata={"description": "Help text for the App Panel."} - ) - - -@dataclass_json -@dataclass(frozen=True) -class AppPanelDataAsset: - """Data asset parameter for the App Panel.""" - - id: str = field( - metadata={"description": "Unique identifier for the data asset."} - ) - mount: str = field( - metadata={"description": "Mount path of the data asset within the capsule. " - "Use this mount path to replace the currently attached data asset with your own"} - ) - name: str = field( - metadata={"description": "Display name of the data asset."} - ) - kind: AppPanelDataAssetKind = field( - metadata={"description": "Kind of the data asset (internal, external, combined)."} - ) - accessible: bool = field( - metadata={"description": "Indicates if the data asset is accessible to the user."} - ) - description: Optional[str] = field( - default=None, - metadata={"description": "Optional description of the data asset parameter."} - ) - help_text: Optional[str] = field( - default=None, - metadata={"description": "Optional help text for the data asset parameter."} - ) - - -@dataclass_json -@dataclass(frozen=True) -class AppPanelResult: - """Selected result files to display once the computation is complete.""" - - file_name: str = field( - metadata={"description": "Name of the result file."} - ) - - -@dataclass_json -@dataclass(frozen=True) -class AppPanelProcess: - """Pipeline process name and its corresponding app panel (for pipelines of capsules only)""" - - name: str = field( - metadata={"description": "Name of the pipeline process."} - ) - categories: Optional[AppPanelCategories] = field( - default=None, - metadata={"description": "Categories for the pipeline process's app panel parameters."} - ) - parameters: Optional[AppPanelParameters] = field( - default=None, - metadata={"description": "Parameters for the pipeline process's app panel."} - ) - - -@dataclass_json -@dataclass(frozen=True) -class AppPanel: - """App Panel configuration for a capsule or pipeline, including general info, data assets, - categories, parameters, and results. - """ - - general: Optional[AppPanelGeneral] = field( - default=None, - metadata={"description": "General information about the App Panel."} - ) - data_assets: Optional[list[AppPanelDataAsset]] = field( - default=None, - metadata={"description": "List of data assets used in the App Panel."} - ) - categories: Optional[list[AppPanelCategories]] = field( - default=None, - metadata={"description": "Categories for organizing App Panel parameters."} - ) - parameters: Optional[list[AppPanelParameters]] = field( - default=None, - metadata={"description": "Parameters for the App Panel."} - ) - results: Optional[list[AppPanelResult]] = field( - default=None, - metadata={"description": "Result files to display after computation."} - ) - processes: Optional[list[AppPanelProcess]] = field( - default=None, - metadata={"description": "Pipeline processes and their App Panels."} - ) diff --git a/src/codeocean/computation.py b/src/codeocean/computation.py index 7506d2f..bc71a7d 100644 --- a/src/codeocean/computation.py +++ b/src/codeocean/computation.py @@ -274,18 +274,8 @@ def run_capsule(self, run_params: RunParams) -> Computation: return Computation.from_dict(res.json()) - def run_pipeline(self, run_params: RunParams) -> Computation: - """ - Execute a pipeline with specified parameters and data assets. - - Set run_params.pipeline_id and optionally provide data_assets, - processes (with process-specific parameters), and nextflow_profile configuration. - - This is a convenience method that calls the same endpoint as run_capsule. - """ - res = self.client.post("computations", json=run_params.to_dict()) - - return Computation.from_dict(res.json()) + # Alias for run_capsule + run_pipeline = run_capsule def wait_until_completed( self, diff --git a/src/codeocean/pipeline.py b/src/codeocean/pipeline.py index 30ab506..7ae8685 100644 --- a/src/codeocean/pipeline.py +++ b/src/codeocean/pipeline.py @@ -1,339 +1,20 @@ from __future__ import annotations -from dataclasses import dataclass, field as dataclass_field -from dataclasses_json import dataclass_json -from typing import Iterator, Optional -from requests_toolbelt.sessions import BaseUrlSession +from dataclasses import dataclass -from codeocean.components import ( - Ownership, - SortOrder, - SearchFilter, - Permissions, - AppPanel, -) -from codeocean.computation import Computation -from codeocean.data_asset import DataAssetAttachParams, DataAssetAttachResults -from codeocean.enum import StrEnum - - -class PipelineStatus(StrEnum): - """Status of a pipeline indicating its release state.""" - - NonRelease = "non_release" - Release = "release" - - -class PipelineSortBy(StrEnum): - """Fields available for sorting pipeline search results.""" - - Created = "created" - LastAccessed = "last_accessed" - Name = "name" - - -@dataclass_json -@dataclass(frozen=True) -class OriginalPipelineInfo: - """Information about the original pipeline when this pipeline is duplicated from - another.""" - - id: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Original pipeline ID"}, - ) - major_version: Optional[int] = dataclass_field( - default=None, - metadata={"description": "Original pipeline major version"}, - ) - minor_version: Optional[int] = dataclass_field( - default=None, - metadata={"description": "Original pipeline minor version"}, - ) - name: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Original pipeline name"}, - ) - created: Optional[int] = dataclass_field( - default=None, - metadata={"description": "Original pipeline creation time (int64 timestamp)"}, - ) - public: Optional[bool] = dataclass_field( - default=None, - metadata={"description": "Indicates whether the original pipeline is public"}, - ) - - -@dataclass_json -@dataclass(frozen=True) -class Pipeline: - """Represents a Code Ocean pipeline with its metadata and properties.""" - - id: str = dataclass_field( - metadata={"description": "Pipeline ID"}, - ) - created: int = dataclass_field( - metadata={"description": "Pipeline creation time (int64 timestamp)"}, - ) - name: str = dataclass_field( - metadata={"description": "Pipeline display name"}, - ) - status: PipelineStatus = dataclass_field( - metadata={"description": "Status of the pipeline (non_release or release)"}, - ) - owner: str = dataclass_field( - metadata={"description": "Pipeline owner's ID"}, - ) - slug: str = dataclass_field( - metadata={"description": "Alternate pipeline ID (URL-friendly identifier)"}, - ) - last_accessed: Optional[int] = dataclass_field( - default=None, - metadata={"description": "Pipeline last accessed time (int64 timestamp)"}, - ) - article: Optional[dict] = dataclass_field( - default=None, - metadata={ - "description": "Pipeline article info with URL, ID, DOI, " - "citation, state, name, journal_name, and " - "publish_time" - }, - ) - cloned_from_url: Optional[str] = dataclass_field( - default=None, - metadata={"description": "URL to external Git repository linked to pipeline"}, - ) - description: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Pipeline description"}, - ) - field: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Pipeline research field"}, - ) - tags: Optional[list[str]] = dataclass_field( - default=None, - metadata={"description": "List of tags associated with the pipeline"}, - ) - original_pipeline: Optional[OriginalPipelineInfo] = dataclass_field( - default=None, - metadata={ - "description": "Original pipeline info when this pipeline is duplicated from another" - }, - ) - release_pipeline: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Release pipeline ID"}, - ) - submission: Optional[dict] = dataclass_field( - default=None, - metadata={ - "description": "Submission info with timestamp, commit hash, " - "verification_pipeline, verified status, and " - "verified_timestamp" - }, - ) - versions: Optional[list[dict]] = dataclass_field( - default=None, - metadata={ - "description": "Pipeline versions with major_version, minor_version, release_time, and DOI" - }, - ) - - -@dataclass_json -@dataclass(frozen=True) -class PipelineSearchParams: - """Parameters for searching pipelines with various filters and pagination - options.""" - - query: Optional[str] = dataclass_field( - default=None, - metadata={ - "description": """Search expression supporting free text and field:value filters. - Valid fields: - - id - - name - - doi - - tag - - field - - affiliation - - journal - - article - - author - - Free text: - - Matches across weighted fields (name, tags, description, authors, etc.) - - Syntax rules: - - Same field repeated = OR - - Different fields = AND - - Quotes = exact phrase - - No explicit OR operator - - No wildcards (*) - - Not case sensitive - - Notes: - - "description" is not directly searchable; it is covered by free-text matching. - - Examples: - - name:RNA-seq tag:genomics - - name:"single cell analysis" - - Synergy - - name:Synergy - """ - }, - ) - next_token: Optional[str] = dataclass_field( - default=None, - metadata={ - "description": "Token for next page of results from previous response" - }, - ) - offset: Optional[int] = dataclass_field( - default=None, - metadata={ - "description": "Starting index for search results (ignored if next_token is set)" - }, - ) - limit: Optional[int] = dataclass_field( - default=None, - metadata={ - "description": "Number of items to return (up to 1000, defaults to 100)" - }, - ) - sort_field: Optional[PipelineSortBy] = dataclass_field( - default=None, - metadata={"description": "Field to sort by (created, name, or last_accessed)"}, - ) - sort_order: Optional[SortOrder] = dataclass_field( - default=None, - metadata={ - "description": "Sort order ('asc' or 'desc') - must be provided with a sort_field parameter as well!" - }, - ) - ownership: Optional[Ownership] = dataclass_field( - default=None, - metadata={ - "description": "Filter by ownership ('private', 'created' or 'shared') - defaults to all accessible" - }, - ) - status: Optional[PipelineStatus] = dataclass_field( - default=None, - metadata={"description": "Filter by status (release or non_release) - defaults to all"}, - ) - favorite: Optional[bool] = dataclass_field( - default=None, - metadata={"description": "Search only favorite pipelines"}, - ) - archived: Optional[bool] = dataclass_field( - default=None, - metadata={"description": "Search only archived pipelines"}, - ) - filters: Optional[list[SearchFilter]] = dataclass_field( - default=None, - metadata={ - "description": "Additional field-level filters for name, description, tags, or custom fields" - }, - ) - - -@dataclass_json -@dataclass(frozen=True) -class PipelineSearchResults: - """Results from a pipeline search operation with pagination support.""" - - has_more: bool = dataclass_field( - metadata={"description": "Indicates if there are more results available"}, - ) - results: list[Pipeline] = dataclass_field( - metadata={"description": "Array of pipelines found matching the search criteria"}, - ) - next_token: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Token for fetching the next page of results"}, - ) +from codeocean.capsule import Capsules @dataclass -class Pipelines: +class Pipelines(Capsules): """Client for interacting with Code Ocean pipeline APIs.""" - client: BaseUrlSession - - def get_pipeline(self, pipeline_id: str) -> Pipeline: - """Retrieve metadata for a specific pipeline by its ID.""" - res = self.client.get(f"pipelines/{pipeline_id}") - - return Pipeline.from_dict(res.json()) - - def delete_pipeline(self, pipeline_id: str): - """Delete a pipeline permanently.""" - self.client.delete(f"pipelines/{pipeline_id}") - - def get_pipeline_app_panel(self, pipeline_id: str, version: int | None = None) -> AppPanel: - """Retrieve app panel information for a specific pipeline by its ID.""" - res = self.client.get(f"pipelines/{pipeline_id}/app_panel", params={"version": version} if version else None) - - return AppPanel.from_dict(res.json()) - - def list_computations(self, pipeline_id: str) -> list[Computation]: - """Get all computations associated with a specific pipeline.""" - res = self.client.get(f"pipelines/{pipeline_id}/computations") - - return [Computation.from_dict(c) for c in res.json()] - - def update_permissions(self, pipeline_id: str, permissions: Permissions): - """Update permissions for a pipeline.""" - self.client.post( - f"pipelines/{pipeline_id}/permissions", - json=permissions.to_dict(), - ) - - def attach_data_assets( - self, - pipeline_id: str, - attach_params: list[DataAssetAttachParams], - ) -> list[DataAssetAttachResults]: - """Attach one or more data assets to a pipeline with optional mount paths.""" - res = self.client.post( - f"pipelines/{pipeline_id}/data_assets", - json=[j.to_dict() for j in attach_params], - ) - - return [DataAssetAttachResults.from_dict(c) for c in res.json()] - - def detach_data_assets(self, pipeline_id: str, data_assets: list[str]): - """Detach one or more data assets from a pipeline by their IDs.""" - self.client.delete( - f"pipelines/{pipeline_id}/data_assets/", - json=data_assets, - ) - - def archive_pipeline(self, pipeline_id: str, archive: bool): - """Archive or unarchive a pipeline to control its visibility and accessibility.""" - self.client.patch( - f"pipelines/{pipeline_id}/archive", - params={"archive": archive}, - ) - - def search_pipelines(self, search_params: PipelineSearchParams) -> PipelineSearchResults: - """Search for pipelines with filtering, sorting, and pagination - options.""" - res = self.client.post("pipelines/search", json=search_params.to_dict()) - - return PipelineSearchResults.from_dict(res.json()) - - def search_pipelines_iterator(self, search_params: PipelineSearchParams) -> Iterator[Pipeline]: - """Iterate through all pipelines matching search criteria with automatic pagination.""" - params = search_params.to_dict() - while True: - response = self.search_pipelines(search_params=PipelineSearchParams(**params)) - - for result in response.results: - yield result - - if not response.has_more: - return + _route: str = "pipelines" - params["next_token"] = response.next_token + # Aliases for pipeline-specific naming + get_pipeline = Capsules.get_capsule + delete_pipeline = Capsules.delete_capsule + get_pipeline_app_panel = Capsules.get_capsule_app_panel + archive_pipeline = Capsules.archive_capsule + search_pipelines = Capsules.search_capsules + search_pipelines_iterator = Capsules.search_capsules_iterator From 765987a7c25f9c9ab7403bab3931736b60d192fb Mon Sep 17 00:00:00 2001 From: Zvika Gart Date: Wed, 21 Jan 2026 11:15:06 +0000 Subject: [PATCH 4/4] refactor: use composition for Pipelines instead of inheritance Pipelines now holds an internal Capsules instance with _route="pipelines" and delegates to it. This avoids exposing Capsules methods on Pipelines. Co-Authored-By: Claude Opus 4.5 --- src/codeocean/pipeline.py | 73 +++++++++++++++++++++++++++++++++------ 1 file changed, 62 insertions(+), 11 deletions(-) diff --git a/src/codeocean/pipeline.py b/src/codeocean/pipeline.py index 7ae8685..5a76ae5 100644 --- a/src/codeocean/pipeline.py +++ b/src/codeocean/pipeline.py @@ -1,20 +1,71 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field +from typing import Iterator +from requests_toolbelt.sessions import BaseUrlSession -from codeocean.capsule import Capsules +from codeocean.capsule import ( + Capsule, + Capsules, + CapsuleSearchParams, + CapsuleSearchResults, + AppPanel, +) +from codeocean.components import Permissions +from codeocean.computation import Computation +from codeocean.data_asset import DataAssetAttachParams, DataAssetAttachResults @dataclass -class Pipelines(Capsules): +class Pipelines: """Client for interacting with Code Ocean pipeline APIs.""" - _route: str = "pipelines" + client: BaseUrlSession + _capsules: Capsules = field(init=False, repr=False) - # Aliases for pipeline-specific naming - get_pipeline = Capsules.get_capsule - delete_pipeline = Capsules.delete_capsule - get_pipeline_app_panel = Capsules.get_capsule_app_panel - archive_pipeline = Capsules.archive_capsule - search_pipelines = Capsules.search_capsules - search_pipelines_iterator = Capsules.search_capsules_iterator + def __post_init__(self): + self._capsules = Capsules(client=self.client, _route="pipelines") + + def get_pipeline(self, pipeline_id: str) -> Capsule: + """Retrieve metadata for a specific pipeline by its ID.""" + return self._capsules.get_capsule(pipeline_id) + + def delete_pipeline(self, pipeline_id: str): + """Delete a pipeline permanently.""" + return self._capsules.delete_capsule(pipeline_id) + + def get_pipeline_app_panel(self, pipeline_id: str, version: int | None = None) -> AppPanel: + """Retrieve app panel information for a specific pipeline by its ID.""" + return self._capsules.get_capsule_app_panel(pipeline_id, version) + + def list_computations(self, pipeline_id: str) -> list[Computation]: + """Get all computations associated with a specific pipeline.""" + return self._capsules.list_computations(pipeline_id) + + def update_permissions(self, pipeline_id: str, permissions: Permissions): + """Update permissions for a pipeline.""" + return self._capsules.update_permissions(pipeline_id, permissions) + + def attach_data_assets( + self, + pipeline_id: str, + attach_params: list[DataAssetAttachParams], + ) -> list[DataAssetAttachResults]: + """Attach one or more data assets to a pipeline with optional mount paths.""" + return self._capsules.attach_data_assets(pipeline_id, attach_params) + + def detach_data_assets(self, pipeline_id: str, data_assets: list[str]): + """Detach one or more data assets from a pipeline by their IDs.""" + return self._capsules.detach_data_assets(pipeline_id, data_assets) + + def archive_pipeline(self, pipeline_id: str, archive: bool): + """Archive or unarchive a pipeline to control its visibility and accessibility.""" + return self._capsules.archive_capsule(pipeline_id, archive) + + def search_pipelines(self, search_params: CapsuleSearchParams) -> CapsuleSearchResults: + """Search for pipelines with filtering, sorting, and pagination options.""" + return self._capsules.search_capsules(search_params) + + def search_pipelines_iterator(self, search_params: CapsuleSearchParams) -> Iterator[Capsule]: + """Iterate through all pipelines matching search criteria with automatic pagination.""" + return self._capsules.search_capsules_iterator(search_params)