diff --git a/src/groundlight/client.py b/src/groundlight/client.py index 65e0b97b..8c9237db 100644 --- a/src/groundlight/client.py +++ b/src/groundlight/client.py @@ -1,8 +1,9 @@ +import asyncio import logging import os import time from io import BufferedReader, BytesIO -from typing import Optional, Union +from typing import List, Optional, Union from model import Detector, ImageQuery, PaginatedDetectorList, PaginatedImageQueryList from openapi_client import Configuration @@ -43,6 +44,7 @@ class Groundlight: """ DEFAULT_WAIT: float = 30.0 + DEFAULT_PATIENCE: float = 30.0 POLLING_INITIAL_DELAY = 0.25 POLLING_EXPONENTIAL_BACKOFF = 1.3 # This still has the nice backoff property that the max number of requests @@ -170,6 +172,7 @@ def submit_image_query( # noqa: PLR0913 # pylint: disable=too-many-arguments detector: Union[Detector, str], image: Union[str, bytes, Image.Image, BytesIO, BufferedReader, np.ndarray], wait: Optional[float] = None, + patience_time: Optional[float] = None, human_review: Optional[str] = None, inspection_id: Optional[str] = None, ) -> ImageQuery: @@ -193,16 +196,20 @@ def submit_image_query( # noqa: PLR0913 # pylint: disable=too-many-arguments """ if wait is None: wait = self.DEFAULT_WAIT + if patience_time is None: + patience_time = self.DEFAULT_PATIENCE + if wait > patience_time: + patience_time = wait detector_id = detector.id if isinstance(detector, Detector) else detector image_bytesio: ByteStreamWrapper = parse_supported_image_types(image) params = {"detector_id": detector_id, "body": image_bytesio} - if wait == 0: - params["patience_time"] = self.DEFAULT_WAIT + if patience_time == 0: + params["patience_time"] = self.DEFAULT_PATIENCE else: - params["patience_time"] = wait + params["patience_time"] = patience_time if human_review is not None: params["human_review"] = human_review @@ -223,6 +230,167 @@ def submit_image_query( # noqa: PLR0913 # pylint: disable=too-many-arguments image_query = self.wait_for_confident_result(image_query, confidence_threshold=threshold, timeout_sec=wait) return self._fixup_image_query(image_query) + def ask_confident( + self, + detector: Union[Detector, str], + image: Union[str, bytes, Image.Image, BytesIO, BufferedReader, np.ndarray], + patience_time: Optional[float] = None, + ) -> ImageQuery: + # Not yet differentiated from submit_image_query other than simplified parameter set + self.submit_image_query(detector, image, wait=patience_time, patience_time=patience_time) + + def ask_ml( + self, + detector: Union[Detector, str], + image: Union[str, bytes, Image.Image, BytesIO, BufferedReader, np.ndarray], + wait: Optional[float] = None, + human_review: Optional[str] = None, + ) -> ImageQuery: + if wait is None: + wait = self.DEFAULT_WAIT + if patience_time is None: + patience_time = self.DEFAULT_PATIENCE + if wait > patience_time: + patience_time = wait + + detector_id = detector.id if isinstance(detector, Detector) else detector + + image_bytesio: ByteStreamWrapper = parse_supported_image_types(image) + + params = {"detector_id": detector_id, "body": image_bytesio} + if patience_time == 0: + params["patience_time"] = self.DEFAULT_PATIENCE + else: + params["patience_time"] = patience_time + + # still available to even within ask_ml + if human_review is not None: + params["human_review"] = human_review + + raw_image_query = self.image_queries_api.submit_image_query(**params) + image_query = ImageQuery.parse_obj(raw_image_query.to_dict()) + + if wait: + image_query = self.wait_for_fast_ml_result(image_query, timeout_sec=wait) + return self._fixup_image_query(image_query) + + def ask_async( + self, + detector: Union[Detector, str], + image: Union[str, bytes, Image.Image, BytesIO, BufferedReader, np.ndarray], + patience_time: Optional[float] = None, + ) -> ImageQuery: + """Sends an image to Groundlight without waiting for a response. + :param detector: the Detector object, or string id of a detector like `det_12345` + :param image: The image, in several possible formats: + - filename (string) of a jpeg file + - byte array or BytesIO or BufferedReader with jpeg bytes + - numpy array with values 0-255 and dimensions (H,W,3) in BGR order + (Note OpenCV uses BGR not RGB. `img[:, :, ::-1]` will reverse the channels) + - PIL Image + Any binary format must be JPEG-encoded already. Any pixel format will get + converted to JPEG at high quality before sending to service. + :param patience_time: How long Groundlight will work to answer the query. + :param human_review: If `None` or `DEFAULT`, send the image query for human review + only if the ML prediction is not confident. + If set to `ALWAYS`, always send the image query for human review. + If set to `NEVER`, never send the image query for human review. + """ + + if patience_time is None: + patience_time = self.DEFAULT_PATIENCE + + detector_id = detector.id if isinstance(detector, Detector) else detector + + image_bytesio: ByteStreamWrapper = parse_supported_image_types(image) + + params = {"detector_id": detector_id, "body": image_bytesio} + ### This would require a corresponding backend change, but could save up to a couple seconds of time + ### waiting for the server response + ### alternatively, we could use the asyncio + params["async"] = True + if patience_time == 0: + params["patience_time"] = self.DEFAULT_PATIENCE + else: + params["patience_time"] = patience_time + + raw_image_query = self.image_queries_api.submit_image_query(**params) # best api call we have, still has delay + image_query = ImageQuery.parse_obj(raw_image_query.to_dict()) + return self._fixup_image_query(image_query) + + async def ask_async_alternate( + self, + detector: Union[Detector, str], + image: Union[str, bytes, Image.Image, BytesIO, BufferedReader, np.ndarray], + patience_time: Optional[float] = None, + ) -> ImageQuery: + if patience_time is None: + patience_time = self.DEFAULT_PATIENCE + + detector_id = detector.id if isinstance(detector, Detector) else detector + + image_bytesio: ByteStreamWrapper = parse_supported_image_types(image) + + params = {"detector_id": detector_id, "body": image_bytesio} + if patience_time == 0: + params["patience_time"] = self.DEFAULT_PATIENCE + else: + params["patience_time"] = patience_time + ### This would still benefit from a backend change, but uses true async + # params["async"] = True + raw_image_query = await self.image_queries_api.submit_image_query( + **params + ) # best api call we have, still has delay + image_query = ImageQuery.parse_obj(raw_image_query.to_dict()) + return self._fixup_image_query(image_query) + + def ask_async_alternate_wrapper( + self, + detector: Union[Detector, str], + image_set: List[Union[str, bytes, Image.Image, BytesIO, BufferedReader, np.ndarray]], + patience_time: Optional[float] = None, + ) -> List[ImageQuery]: + async def wrapper(): + tasks = [ + asyncio.create_task(self.ask_async_alternate(detector, image, patience_time)) for image in image_set + ] + for task in tasks: + await task + # alternatively use asyncio.gather + # await asyncio.gather(*(self.ask_async_alternate(i) for i in image_set)) + + asyncio.run(wrapper()) + + def wait_for_fast_ml_result( + self, + image_query: Union[ImageQuery, str], + timeout_sec: float = 30.0, + ) -> ImageQuery: + # Convert from image_query_id to ImageQuery if needed. + if isinstance(image_query, str): + image_query = self.get_image_query(image_query) + + start_time = time.time() + next_delay = self.POLLING_INITIAL_DELAY + target_delay = 0.0 + image_query = self._fixup_image_query(image_query) + while True: + patience_so_far = time.time() - start_time + if iq_has_answer(image_query): # Primary difference from wait_for_confident_result + logger.debug(f"Confident answer for {image_query} after {patience_so_far:.1f}s") + break + if patience_so_far >= timeout_sec: + logger.debug(f"Timeout after {timeout_sec:.0f}s waiting for {image_query}") + break + target_delay = min(patience_so_far + next_delay, timeout_sec) + sleep_time = max(target_delay - patience_so_far, 0) + + time.sleep(sleep_time) + next_delay *= self.POLLING_EXPONENTIAL_BACKOFF + image_query = self.get_image_query(image_query.id) + image_query = self._fixup_image_query(image_query) + return image_query + def wait_for_confident_result( self, image_query: Union[ImageQuery, str], diff --git a/test/integration/test_groundlight.py b/test/integration/test_groundlight.py index 5c836714..5374dc3b 100644 --- a/test/integration/test_groundlight.py +++ b/test/integration/test_groundlight.py @@ -236,6 +236,27 @@ def test_submit_image_query_pil(gl: Groundlight, detector: Detector): _image_query = gl.submit_image_query(detector=detector.id, image=black) +def test_ask_confident(gl: Groundlight, detector: Detector): + _iq = gl.ask_confident(detector=detector.id, image="test/assets/cat.jpeg") + _iq = gl.ask_confident(detector=detector.id, image="test/assets/cat.jpeg", patience_time=10) + # TODO: Check that we get a proper error if 0 < patience time < minimum backend patience time + + +def test_ask_ml(gl: Groundlight, detector: Detector): + _iq = gl.ask_ml(detector=detector.id, image="test/assets/cat.jpeg") + _iq = gl.ask_ml(detector=detector.id, image="test/assets/cat.jpeg", wait=10) + _iq = gl.ask_ml( + detector=detector.id, image="test/assets/cat.jpeg", wait=1 + ) # should be legal as wait isn't subject to minimum backend patience time + + +def test_ask_async(gl: Groundlight, detector: Detector): + _iq = gl.ask_async(detector=detector.id, image="test/assets/cat.jpeg") + _iq = gl.ask_async(detector=detector.id, image="test/assets/cat.jpeg", patience_time=10) + _iq = gl.ask_async(detector=detector.id, image="test/assets/cat.jpeg", patience_time=600) + # TODO: Check that we get a proper error if 0 < patience time < minimum backend patience time + + def test_list_image_queries(gl: Groundlight): image_queries = gl.list_image_queries() assert str(image_queries)