diff --git a/.circleci/config.yml b/.circleci/config.yml index 04af178c..43278ba1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -39,7 +39,7 @@ jobs: command: | poetry run black --check . - run: - name: Ruff Lint Check # See pyproject.tooml [tool.ruff] + name: Ruff Lint Check # See pyproject.toml [tool.ruff] command: | poetry run ruff . - run: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ebce0f67..e48280b3 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -11,7 +11,7 @@ repos: - repo: local hooks: - id: system - name: flake8 + name: ruff entry: poetry run ruff nucleus pass_filenames: false language: system diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d70a6ca..b066f563 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.16.1](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.16.1) - 2023-09-18 + +### Added +- Added `asynchronous` parameter for `slice.export_embeddings()` and `dataset.export_embeddings()` to allow embeddings to be exported asynchronously. + +### Changed +- Changed `slice.export_embeddings()` and `dataset.export_embeddings()` to be asynchronous by deafult. + ## [0.16.0](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.16.0) - 2023-09-18 ### Removed diff --git a/nucleus/__init__.py b/nucleus/__init__.py index 72e6c772..5f876dad 100644 --- a/nucleus/__init__.py +++ b/nucleus/__init__.py @@ -2,6 +2,7 @@ __all__ = [ "AsyncJob", + "EmbeddingsExportJob", "BoxAnnotation", "BoxPrediction", "CameraParams", @@ -68,7 +69,7 @@ Segment, SegmentationAnnotation, ) -from .async_job import AsyncJob +from .async_job import AsyncJob, EmbeddingsExportJob from .camera_params import CameraParams from .connection import Connection from .constants import ( @@ -236,7 +237,7 @@ def models(self) -> List[Model]: def jobs( self, ) -> List[AsyncJob]: - """Lists all jobs, see NucleusClinet.list_jobs(...) for advanced options + """Lists all jobs, see NucleusClient.list_jobs(...) for advanced options Returns: List of all AsyncJobs diff --git a/nucleus/async_job.py b/nucleus/async_job.py index 06fe6855..7256afa2 100644 --- a/nucleus/async_job.py +++ b/nucleus/async_job.py @@ -119,6 +119,20 @@ def sleep_until_complete(self, verbose_std_out=True): if final_status["status"] == "Errored": raise JobError(final_status, self) + @classmethod + def from_id(cls, job_id: str, client: "NucleusClient"): # type: ignore # noqa: F821 + """Creates a job instance from a specific job Id. + + Parameters: + job_id: Defines the job Id + client: The client to use for the request. + + Returns: + The specific AsyncMethod (or inherited) instance. + """ + job = client.get_job(job_id) + return cls.from_json(job.__dict__, client) + @classmethod def from_json(cls, payload: dict, client): # TODO: make private @@ -131,6 +145,34 @@ def from_json(cls, payload: dict, client): ) +class EmbeddingsExportJob(AsyncJob): + def result_urls(self, wait_for_completion=True) -> List[str]: + """Gets a list of signed Scale URLs for each embedding batch. + + Parameters: + wait_for_completion: Defines whether the call shall wait for + the job to complete. Defaults to True + + Returns: + A list of signed Scale URLs which contain batches of embeddings. + + The files contain a JSON array of embedding records with the following schema: + [{ + "reference_id": str, + "embedding_vector": List[float] + }] + """ + if wait_for_completion: + self.sleep_until_complete(verbose_std_out=False) + + status = self.status() + + if status["status"] != "Completed": + raise JobError(status, self) + + return status["message"]["result"] # type: ignore + + class JobError(Exception): def __init__(self, job_status: Dict[str, str], job: AsyncJob): final_status_message = job_status["message"] diff --git a/nucleus/dataset.py b/nucleus/dataset.py index 5569856d..5f76e630 100644 --- a/nucleus/dataset.py +++ b/nucleus/dataset.py @@ -15,7 +15,7 @@ import requests from nucleus.annotation_uploader import AnnotationUploader, PredictionUploader -from nucleus.async_job import AsyncJob +from nucleus.async_job import AsyncJob, EmbeddingsExportJob from nucleus.prediction import Prediction, from_json from nucleus.track import Track from nucleus.url_utils import sanitize_string_args @@ -1421,18 +1421,34 @@ def items_and_annotation_generator( def export_embeddings( self, - ) -> List[Dict[str, Union[str, List[float]]]]: + asynchronous: bool = True, + ) -> Union[List[Dict[str, Union[str, List[float]]]], EmbeddingsExportJob]: """Fetches a pd.DataFrame-ready list of dataset embeddings. + Parameters: + asynchronous: Whether or not to process the export asynchronously (and + return an :class:`EmbeddingsExportJob` object). Default is True. + Returns: - A list, where each item is a dict with two keys representing a row + If synchronous, a list where each item is a dict with two keys representing a row in the dataset:: List[{ "reference_id": str, "embedding_vector": List[float] }] + + Otherwise, returns an :class:`EmbeddingsExportJob` object. """ + if asynchronous: + api_payload = self._client.make_request( + payload=None, + route=f"dataset/{self.id}/async_export_embeddings", + requests_command=requests.post, + ) + + return EmbeddingsExportJob.from_json(api_payload, self._client) + api_payload = self._client.make_request( payload=None, route=f"dataset/{self.id}/embeddings", diff --git a/nucleus/job.py b/nucleus/job.py index 7e64ca6c..d05d5e05 100644 --- a/nucleus/job.py +++ b/nucleus/job.py @@ -27,6 +27,7 @@ class CustomerJobTypes(str, Enum): CLONE_DATASET = "cloneDataset" METADATA_UPDATE = "metadataUpdate" TRIGGER_EVALUATE = "triggerEvaluate" + EXPORT_EMBEDDINGS = "exportEmbeddings" def __contains__(self, item): try: diff --git a/nucleus/slice.py b/nucleus/slice.py index 47a5c0ca..647129c5 100644 --- a/nucleus/slice.py +++ b/nucleus/slice.py @@ -7,7 +7,7 @@ import requests from nucleus.annotation import Annotation -from nucleus.async_job import AsyncJob +from nucleus.async_job import AsyncJob, EmbeddingsExportJob from nucleus.constants import EXPORT_FOR_TRAINING_KEY, EXPORTED_ROWS, ITEMS_KEY from nucleus.dataset_item import DatasetItem from nucleus.errors import NucleusAPIError @@ -600,17 +600,33 @@ def send_to_labeling(self, project_id: str): def export_embeddings( self, - ) -> List[Dict[str, Union[str, List[float]]]]: + asynchronous: bool = True, + ) -> Union[List[Dict[str, Union[str, List[float]]]], EmbeddingsExportJob]: """Fetches a pd.DataFrame-ready list of slice embeddings. + Parameters: + asynchronous: Whether or not to process the export asynchronously (and + return an :class:`EmbeddingsExportJob` object). Default is True. + Returns: - A list where each element is a columnar mapping:: + If synchronous, a list where each element is a columnar mapping:: List[{ "reference_id": str, "embedding_vector": List[float] }] + + Otherwise, returns an :class:`EmbeddingsExportJob` object. """ + if asynchronous: + api_payload = self._client.make_request( + payload=None, + route=f"dataset/{self.id}/async_export_embeddings", + requests_command=requests.post, + ) + + return EmbeddingsExportJob.from_json(api_payload, self._client) + api_payload = self._client.make_request( payload=None, route=f"slice/{self.id}/embeddings", diff --git a/tests/test_autotag.py b/tests/test_autotag.py index 2480a328..3518d37f 100644 --- a/tests/test_autotag.py +++ b/tests/test_autotag.py @@ -60,7 +60,7 @@ def test_export_embeddings(CLIENT): if running_as_nucleus_pytest_user(CLIENT): embeddings = Dataset( DATASET_WITH_EMBEDDINGS, CLIENT - ).export_embeddings() + ).export_embeddings(asynchronous=False) assert "embedding_vector" in embeddings[0] assert "reference_id" in embeddings[0] @@ -100,7 +100,7 @@ def test_dataset_export_autotag_tagged_items(CLIENT): def test_export_slice_embeddings(CLIENT): if running_as_nucleus_pytest_user(CLIENT): test_slice = CLIENT.get_slice("slc_c8jwtmj372xg07g9v3k0") - embeddings = test_slice.export_embeddings() + embeddings = test_slice.export_embeddings(asynchronous=False) assert "embedding_vector" in embeddings[0] assert "reference_id" in embeddings[0]