Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
command: |
poetry run black --check .
- run:
name: Ruff Lint Check # See pyproject.tooml [tool.ruff]
name: Ruff Lint Check # See pyproject.toml [tool.ruff]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merci 🙏

command: |
poetry run ruff .
- run:
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repos:
- repo: local
hooks:
- id: system
name: flake8
name: ruff
entry: poetry run ruff nucleus
pass_filenames: false
language: system
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


## [0.16.1](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.16.1) - 2023-09-18

### Added
- Added `asynchronous` parameter for `slice.export_embeddings()` and `dataset.export_embeddings()` to allow embeddings to be exported asynchronously.

### Changed
- Changed `slice.export_embeddings()` and `dataset.export_embeddings()` to be asynchronous by deafult.

## [0.16.0](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.16.0) - 2023-09-18

### Removed
Expand Down
5 changes: 3 additions & 2 deletions nucleus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

__all__ = [
"AsyncJob",
"EmbeddingsExportJob",
"BoxAnnotation",
"BoxPrediction",
"CameraParams",
Expand Down Expand Up @@ -68,7 +69,7 @@
Segment,
SegmentationAnnotation,
)
from .async_job import AsyncJob
from .async_job import AsyncJob, EmbeddingsExportJob
from .camera_params import CameraParams
from .connection import Connection
from .constants import (
Expand Down Expand Up @@ -236,7 +237,7 @@ def models(self) -> List[Model]:
def jobs(
self,
) -> List[AsyncJob]:
"""Lists all jobs, see NucleusClinet.list_jobs(...) for advanced options
"""Lists all jobs, see NucleusClient.list_jobs(...) for advanced options

Returns:
List of all AsyncJobs
Expand Down
42 changes: 42 additions & 0 deletions nucleus/async_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,20 @@ def sleep_until_complete(self, verbose_std_out=True):
if final_status["status"] == "Errored":
raise JobError(final_status, self)

@classmethod
def from_id(cls, job_id: str, client: "NucleusClient"): # type: ignore # noqa: F821
"""Creates a job instance from a specific job Id.

Parameters:
job_id: Defines the job Id
client: The client to use for the request.

Returns:
The specific AsyncMethod (or inherited) instance.
"""
job = client.get_job(job_id)
return cls.from_json(job.__dict__, client)

@classmethod
def from_json(cls, payload: dict, client):
# TODO: make private
Expand All @@ -131,6 +145,34 @@ def from_json(cls, payload: dict, client):
)


class EmbeddingsExportJob(AsyncJob):
def result_urls(self, wait_for_completion=True) -> List[str]:
"""Gets a list of signed Scale URLs for each embedding batch.

Parameters:
wait_for_completion: Defines whether the call shall wait for
the job to complete. Defaults to True

Returns:
A list of signed Scale URLs which contain batches of embeddings.

The files contain a JSON array of embedding records with the following schema:
[{
"reference_id": str,
"embedding_vector": List[float]
}]
"""
if wait_for_completion:
self.sleep_until_complete(verbose_std_out=False)

status = self.status()

if status["status"] != "Completed":
raise JobError(status, self)

Comment on lines +170 to +172
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why raise a JobError if the job is not completed? perhaps its still running?

Copy link
Contributor Author

@ntamas92 ntamas92 Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought process was that the usage pattern would be the following:

export_job = dataset.export_embeddings()
export_job.sleep_until_complete(False)
result = export_job.result_urls()

We could just wait for the result urls inside result_urls() also, but then I'd highlight it somehow that obtaining the results could run for a long time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, that makes sense, didn't noticed the AsyncJob inheritence.
This is a neat idea, to have customized job result classes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should add a wait_for_completion parameter. It might even be the default to wait for the job to complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, let's do that

return status["message"]["result"] # type: ignore


class JobError(Exception):
def __init__(self, job_status: Dict[str, str], job: AsyncJob):
final_status_message = job_status["message"]
Expand Down
22 changes: 19 additions & 3 deletions nucleus/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import requests

from nucleus.annotation_uploader import AnnotationUploader, PredictionUploader
from nucleus.async_job import AsyncJob
from nucleus.async_job import AsyncJob, EmbeddingsExportJob
from nucleus.prediction import Prediction, from_json
from nucleus.track import Track
from nucleus.url_utils import sanitize_string_args
Expand Down Expand Up @@ -1421,18 +1421,34 @@ def items_and_annotation_generator(

def export_embeddings(
self,
) -> List[Dict[str, Union[str, List[float]]]]:
asynchronous: bool = True,
) -> Union[List[Dict[str, Union[str, List[float]]]], EmbeddingsExportJob]:
"""Fetches a pd.DataFrame-ready list of dataset embeddings.

Parameters:
asynchronous: Whether or not to process the export asynchronously (and
return an :class:`EmbeddingsExportJob` object). Default is True.

Returns:
A list, where each item is a dict with two keys representing a row
If synchronous, a list where each item is a dict with two keys representing a row
in the dataset::

List[{
"reference_id": str,
"embedding_vector": List[float]
}]

Otherwise, returns an :class:`EmbeddingsExportJob` object.
"""
if asynchronous:
api_payload = self._client.make_request(
payload=None,
route=f"dataset/{self.id}/async_export_embeddings",
requests_command=requests.post,
)

return EmbeddingsExportJob.from_json(api_payload, self._client)

api_payload = self._client.make_request(
payload=None,
route=f"dataset/{self.id}/embeddings",
Expand Down
1 change: 1 addition & 0 deletions nucleus/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class CustomerJobTypes(str, Enum):
CLONE_DATASET = "cloneDataset"
METADATA_UPDATE = "metadataUpdate"
TRIGGER_EVALUATE = "triggerEvaluate"
EXPORT_EMBEDDINGS = "exportEmbeddings"

def __contains__(self, item):
try:
Expand Down
22 changes: 19 additions & 3 deletions nucleus/slice.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import requests

from nucleus.annotation import Annotation
from nucleus.async_job import AsyncJob
from nucleus.async_job import AsyncJob, EmbeddingsExportJob
from nucleus.constants import EXPORT_FOR_TRAINING_KEY, EXPORTED_ROWS, ITEMS_KEY
from nucleus.dataset_item import DatasetItem
from nucleus.errors import NucleusAPIError
Expand Down Expand Up @@ -600,17 +600,33 @@ def send_to_labeling(self, project_id: str):

def export_embeddings(
self,
) -> List[Dict[str, Union[str, List[float]]]]:
asynchronous: bool = True,
) -> Union[List[Dict[str, Union[str, List[float]]]], EmbeddingsExportJob]:
"""Fetches a pd.DataFrame-ready list of slice embeddings.

Parameters:
asynchronous: Whether or not to process the export asynchronously (and
return an :class:`EmbeddingsExportJob` object). Default is True.

Returns:
A list where each element is a columnar mapping::
If synchronous, a list where each element is a columnar mapping::

List[{
"reference_id": str,
"embedding_vector": List[float]
}]

Otherwise, returns an :class:`EmbeddingsExportJob` object.
"""
if asynchronous:
api_payload = self._client.make_request(
payload=None,
route=f"dataset/{self.id}/async_export_embeddings",
requests_command=requests.post,
)

return EmbeddingsExportJob.from_json(api_payload, self._client)

api_payload = self._client.make_request(
payload=None,
route=f"slice/{self.id}/embeddings",
Expand Down
4 changes: 2 additions & 2 deletions tests/test_autotag.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test_export_embeddings(CLIENT):
if running_as_nucleus_pytest_user(CLIENT):
embeddings = Dataset(
DATASET_WITH_EMBEDDINGS, CLIENT
).export_embeddings()
).export_embeddings(asynchronous=False)
assert "embedding_vector" in embeddings[0]
assert "reference_id" in embeddings[0]

Expand Down Expand Up @@ -100,7 +100,7 @@ def test_dataset_export_autotag_tagged_items(CLIENT):
def test_export_slice_embeddings(CLIENT):
if running_as_nucleus_pytest_user(CLIENT):
test_slice = CLIENT.get_slice("slc_c8jwtmj372xg07g9v3k0")
embeddings = test_slice.export_embeddings()
embeddings = test_slice.export_embeddings(asynchronous=False)
assert "embedding_vector" in embeddings[0]
assert "reference_id" in embeddings[0]

Expand Down