Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 172 additions & 4 deletions src/groundlight/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import logging
import os
import time
from io import BufferedReader, BytesIO
from typing import Optional, Union
from typing import List, Optional, Union

from model import Detector, ImageQuery, PaginatedDetectorList, PaginatedImageQueryList
from openapi_client import Configuration
Expand Down Expand Up @@ -43,6 +44,7 @@ class Groundlight:
"""

DEFAULT_WAIT: float = 30.0
DEFAULT_PATIENCE: float = 30.0

POLLING_INITIAL_DELAY = 0.25
POLLING_EXPONENTIAL_BACKOFF = 1.3 # This still has the nice backoff property that the max number of requests
Expand Down Expand Up @@ -170,6 +172,7 @@ def submit_image_query( # noqa: PLR0913 # pylint: disable=too-many-arguments
detector: Union[Detector, str],
image: Union[str, bytes, Image.Image, BytesIO, BufferedReader, np.ndarray],
wait: Optional[float] = None,
patience_time: Optional[float] = None,
human_review: Optional[str] = None,
inspection_id: Optional[str] = None,
) -> ImageQuery:
Expand All @@ -193,16 +196,20 @@ def submit_image_query( # noqa: PLR0913 # pylint: disable=too-many-arguments
"""
if wait is None:
wait = self.DEFAULT_WAIT
if patience_time is None:
patience_time = self.DEFAULT_PATIENCE
if wait > patience_time:
patience_time = wait

detector_id = detector.id if isinstance(detector, Detector) else detector

image_bytesio: ByteStreamWrapper = parse_supported_image_types(image)

params = {"detector_id": detector_id, "body": image_bytesio}
if wait == 0:
params["patience_time"] = self.DEFAULT_WAIT
if patience_time == 0:
params["patience_time"] = self.DEFAULT_PATIENCE
else:
params["patience_time"] = wait
params["patience_time"] = patience_time

if human_review is not None:
params["human_review"] = human_review
Expand All @@ -223,6 +230,167 @@ def submit_image_query( # noqa: PLR0913 # pylint: disable=too-many-arguments
image_query = self.wait_for_confident_result(image_query, confidence_threshold=threshold, timeout_sec=wait)
return self._fixup_image_query(image_query)

def ask_confident(
self,
detector: Union[Detector, str],
image: Union[str, bytes, Image.Image, BytesIO, BufferedReader, np.ndarray],
patience_time: Optional[float] = None,
) -> ImageQuery:
# Not yet differentiated from submit_image_query other than simplified parameter set
self.submit_image_query(detector, image, wait=patience_time, patience_time=patience_time)

def ask_ml(
self,
detector: Union[Detector, str],
image: Union[str, bytes, Image.Image, BytesIO, BufferedReader, np.ndarray],
wait: Optional[float] = None,
human_review: Optional[str] = None,
) -> ImageQuery:
if wait is None:
wait = self.DEFAULT_WAIT
if patience_time is None:
patience_time = self.DEFAULT_PATIENCE
if wait > patience_time:
patience_time = wait

detector_id = detector.id if isinstance(detector, Detector) else detector

image_bytesio: ByteStreamWrapper = parse_supported_image_types(image)

params = {"detector_id": detector_id, "body": image_bytesio}
if patience_time == 0:
params["patience_time"] = self.DEFAULT_PATIENCE
else:
params["patience_time"] = patience_time

# still available to even within ask_ml
if human_review is not None:
params["human_review"] = human_review

raw_image_query = self.image_queries_api.submit_image_query(**params)
image_query = ImageQuery.parse_obj(raw_image_query.to_dict())

if wait:
image_query = self.wait_for_fast_ml_result(image_query, timeout_sec=wait)
return self._fixup_image_query(image_query)

def ask_async(
self,
detector: Union[Detector, str],
image: Union[str, bytes, Image.Image, BytesIO, BufferedReader, np.ndarray],
patience_time: Optional[float] = None,
) -> ImageQuery:
"""Sends an image to Groundlight without waiting for a response.
:param detector: the Detector object, or string id of a detector like `det_12345`
:param image: The image, in several possible formats:
- filename (string) of a jpeg file
- byte array or BytesIO or BufferedReader with jpeg bytes
- numpy array with values 0-255 and dimensions (H,W,3) in BGR order
(Note OpenCV uses BGR not RGB. `img[:, :, ::-1]` will reverse the channels)
- PIL Image
Any binary format must be JPEG-encoded already. Any pixel format will get
converted to JPEG at high quality before sending to service.
:param patience_time: How long Groundlight will work to answer the query.
:param human_review: If `None` or `DEFAULT`, send the image query for human review
only if the ML prediction is not confident.
If set to `ALWAYS`, always send the image query for human review.
If set to `NEVER`, never send the image query for human review.
"""

if patience_time is None:
patience_time = self.DEFAULT_PATIENCE

detector_id = detector.id if isinstance(detector, Detector) else detector

image_bytesio: ByteStreamWrapper = parse_supported_image_types(image)

params = {"detector_id": detector_id, "body": image_bytesio}
### This would require a corresponding backend change, but could save up to a couple seconds of time
### waiting for the server response
### alternatively, we could use the asyncio
params["async"] = True
if patience_time == 0:
params["patience_time"] = self.DEFAULT_PATIENCE
else:
params["patience_time"] = patience_time

raw_image_query = self.image_queries_api.submit_image_query(**params) # best api call we have, still has delay
image_query = ImageQuery.parse_obj(raw_image_query.to_dict())
return self._fixup_image_query(image_query)

async def ask_async_alternate(
self,
detector: Union[Detector, str],
image: Union[str, bytes, Image.Image, BytesIO, BufferedReader, np.ndarray],
patience_time: Optional[float] = None,
) -> ImageQuery:
if patience_time is None:
patience_time = self.DEFAULT_PATIENCE

detector_id = detector.id if isinstance(detector, Detector) else detector

image_bytesio: ByteStreamWrapper = parse_supported_image_types(image)

params = {"detector_id": detector_id, "body": image_bytesio}
if patience_time == 0:
params["patience_time"] = self.DEFAULT_PATIENCE
else:
params["patience_time"] = patience_time
### This would still benefit from a backend change, but uses true async
# params["async"] = True
raw_image_query = await self.image_queries_api.submit_image_query(
**params
) # best api call we have, still has delay
image_query = ImageQuery.parse_obj(raw_image_query.to_dict())
return self._fixup_image_query(image_query)

def ask_async_alternate_wrapper(
self,
detector: Union[Detector, str],
image_set: List[Union[str, bytes, Image.Image, BytesIO, BufferedReader, np.ndarray]],
patience_time: Optional[float] = None,
) -> List[ImageQuery]:
async def wrapper():
tasks = [
asyncio.create_task(self.ask_async_alternate(detector, image, patience_time)) for image in image_set
]
for task in tasks:
await task
# alternatively use asyncio.gather
# await asyncio.gather(*(self.ask_async_alternate(i) for i in image_set))

asyncio.run(wrapper())

def wait_for_fast_ml_result(
self,
image_query: Union[ImageQuery, str],
timeout_sec: float = 30.0,
) -> ImageQuery:
# Convert from image_query_id to ImageQuery if needed.
if isinstance(image_query, str):
image_query = self.get_image_query(image_query)

start_time = time.time()
next_delay = self.POLLING_INITIAL_DELAY
target_delay = 0.0
image_query = self._fixup_image_query(image_query)
while True:
patience_so_far = time.time() - start_time
if iq_has_answer(image_query): # Primary difference from wait_for_confident_result
logger.debug(f"Confident answer for {image_query} after {patience_so_far:.1f}s")
break
if patience_so_far >= timeout_sec:
logger.debug(f"Timeout after {timeout_sec:.0f}s waiting for {image_query}")
break
target_delay = min(patience_so_far + next_delay, timeout_sec)
sleep_time = max(target_delay - patience_so_far, 0)

time.sleep(sleep_time)
next_delay *= self.POLLING_EXPONENTIAL_BACKOFF
image_query = self.get_image_query(image_query.id)
image_query = self._fixup_image_query(image_query)
return image_query

def wait_for_confident_result(
self,
image_query: Union[ImageQuery, str],
Expand Down
21 changes: 21 additions & 0 deletions test/integration/test_groundlight.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,27 @@ def test_submit_image_query_pil(gl: Groundlight, detector: Detector):
_image_query = gl.submit_image_query(detector=detector.id, image=black)


def test_ask_confident(gl: Groundlight, detector: Detector):
_iq = gl.ask_confident(detector=detector.id, image="test/assets/cat.jpeg")
_iq = gl.ask_confident(detector=detector.id, image="test/assets/cat.jpeg", patience_time=10)
# TODO: Check that we get a proper error if 0 < patience time < minimum backend patience time


def test_ask_ml(gl: Groundlight, detector: Detector):
_iq = gl.ask_ml(detector=detector.id, image="test/assets/cat.jpeg")
_iq = gl.ask_ml(detector=detector.id, image="test/assets/cat.jpeg", wait=10)
_iq = gl.ask_ml(
detector=detector.id, image="test/assets/cat.jpeg", wait=1
) # should be legal as wait isn't subject to minimum backend patience time


def test_ask_async(gl: Groundlight, detector: Detector):
_iq = gl.ask_async(detector=detector.id, image="test/assets/cat.jpeg")
_iq = gl.ask_async(detector=detector.id, image="test/assets/cat.jpeg", patience_time=10)
_iq = gl.ask_async(detector=detector.id, image="test/assets/cat.jpeg", patience_time=600)
# TODO: Check that we get a proper error if 0 < patience time < minimum backend patience time


def test_list_image_queries(gl: Groundlight):
image_queries = gl.list_image_queries()
assert str(image_queries)
Expand Down