diff --git a/examples/notebooks/beam-ml/custom_remote_inference.ipynb b/examples/notebooks/beam-ml/custom_remote_inference.ipynb index a29fb572adad..e510cd38e216 100644 --- a/examples/notebooks/beam-ml/custom_remote_inference.ipynb +++ b/examples/notebooks/beam-ml/custom_remote_inference.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": { "cellView": "form", "id": "paYiulysGrwR" @@ -137,7 +137,7 @@ "source": [ "!pip install --upgrade pip\n", "!pip install protobuf==3.19.4\n", - "!pip install apache-beam[interactive,gcp]>=2.40.0\n", + "!pip install apache-beam[interactive,gcp]>=2.65.0\n", "!pip install google-cloud-vision==3.1.1\n", "!pip install requests\n", "\n", @@ -192,21 +192,19 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 5, "metadata": { "id": "gE0go8CpnTy3" }, "outputs": [], "source": [ - "from typing import List\n", "import io\n", - "import os\n", "import requests\n", "\n", "from google.cloud import vision\n", "from google.cloud.vision_v1.types import Feature\n", "import apache_beam as beam\n", - "from apache_beam.ml.inference.base import ModelHandler\n", + "from apache_beam.ml.inference.base import RemoteModelHandler\n", "from apache_beam.ml.inference.base import RunInference\n" ] }, @@ -222,7 +220,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "metadata": { "id": "_89eN_1QeYEd" }, @@ -257,29 +255,34 @@ "\n", "When you run remote inference, prepare to encounter, identify, and handle failure as gracefully as possible. We recommend using the following techniques:\n", "\n", - "* **Exponential backoff:** Retry failed remote calls with exponentially growing pauses between retries. Using exponential backoff ensures that failures don't lead to an overwhelming number of retries in quick succession.\n", + "* **Exponential backoff:** Retry failed remote calls with exponentially growing pauses between retries. Using exponential backoff ensures that failures don't lead to an overwhelming number of retries in quick succession. The `RemoteModelHandler` base class handles this logic, with the `retry_fn` argument determining which errors are retryable. For this example we will always retry. \n", "\n", - "* **Dead-letter queues:** Route failed inferences to a separate `PCollection` without failing the whole transform. Continue execution without failing the job (batch jobs' default behavior) or retrying indefinitely (streaming jobs' default behavior).\n", - "You can then run custom pipeline logic on the dead-letter (unprocessed messages) queue to log the failure, send an alert, and push the failed message to temporary storage so that it can eventually be reprocessed." + "* **Dead-letter queues:** Route failed inferences to a separate `PCollection` without failing the whole transform. Continue execution without failing the job (batch jobs' default behavior) or retrying indefinitely (streaming jobs' default behavior). This is provided through the `with_exception_handling()` [option](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference.with_exception_handling) for RunInference. This produces tagged outputs for the failed inferences which can be handled separately from successful ones. You can then run custom pipeline logic on the dead-letter (unprocessed messages) queue to log the failure, send an alert, and push the failed message to temporary storage so that it can eventually be reprocessed." ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 7, "metadata": { "id": "LnaisJ_JiY_Q" }, "outputs": [], "source": [ - "class CloudVisionModelHandler(ModelHandler):\n", - " \"\"\"DoFn that accepts a batch of images as bytearray\n", - " and sends that batch to the Cloud Vision API for remote inference\"\"\"\n", - " def load_model(self):\n", + "def _always_retry(exception: Exception) -> bool:\n", + " return True\n", + "\n", + "class CloudVisionModelHandler(RemoteModelHandler):\n", + " def __init__(self):\n", + " \"\"\"DoFn that accepts a batch of images as bytearray\n", + " and sends that batch to the Cloud Vision API for remote inference\n", + " \"\"\"\n", + " super().__init__(namespace=\"CloudVisionModelHandler\", retry_filter=_always_retry)\n", + " def create_client(self):\n", " \"\"\"Initiate the Google Vision API client.\"\"\"\n", " client = vision.ImageAnnotatorClient()\n", " return client\n", "\n", - " def run_inference(self, batch, model, inference):\n", + " def request(self, batch, model, inference_args):\n", " feature = Feature()\n", " feature.type_ = Feature.Type.LABEL_DETECTION\n", "\n", @@ -333,7 +336,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 9, "metadata": { "colab": { "base_uri": "https://localhost:8080/" @@ -343,129 +346,123 @@ }, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "('http://farm3.staticflickr.com/2824/10213933686_6936eb402b_z.jpg', label_annotations {\n", - " mid: \"/m/083wq\"\n", - " description: \"Wheel\"\n", - " score: 0.977976143\n", - " topicality: 0.977976143\n", + " mid: \"/m/04_sv\"\n", + " description: \"Motorcycle\"\n", + " score: 0.9922548\n", + " topicality: 0.14033242\n", "}\n", "label_annotations {\n", - " mid: \"/m/0h9mv\"\n", - " description: \"Tire\"\n", - " score: 0.977934957\n", - " topicality: 0.977934957\n", + " mid: \"/m/01prls\"\n", + " description: \"Land vehicle\"\n", + " score: 0.99086833\n", + " topicality: 0.0029524593\n", "}\n", "label_annotations {\n", - " mid: \"/m/043g5f\"\n", - " description: \"Fuel tank\"\n", - " score: 0.958490431\n", - " topicality: 0.958490431\n", + " mid: \"/m/0768fx\"\n", + " description: \"Automotive lighting\"\n", + " score: 0.9853215\n", + " topicality: 0.002913047\n", "}\n", "label_annotations {\n", - " mid: \"/m/05s2s\"\n", - " description: \"Plant\"\n", - " score: 0.95674181\n", - " topicality: 0.95674181\n", + " mid: \"/m/07yv9\"\n", + " description: \"Vehicle\"\n", + " score: 0.98517245\n", + " topicality: 0.010408105\n", "}\n", "label_annotations {\n", - " mid: \"/m/0h8lk_j\"\n", - " description: \"Automotive fuel system\"\n", - " score: 0.941456497\n", - " topicality: 0.941456497\n", + " mid: \"/m/043g5f\"\n", + " description: \"Fuel tank\"\n", + " score: 0.9823826\n", + " topicality: 0.01933147\n", "}\n", "label_annotations {\n", - " mid: \"/m/07yv9\"\n", - " description: \"Vehicle\"\n", - " score: 0.936428607\n", - " topicality: 0.936428607\n", + " mid: \"/m/012f08\"\n", + " description: \"Motor vehicle\"\n", + " score: 0.97732854\n", + " topicality: 0.0009314301\n", "}\n", "label_annotations {\n", - " mid: \"/m/02qwkrn\"\n", - " description: \"Vehicle brake\"\n", - " score: 0.905624092\n", - " topicality: 0.905624092\n", + " mid: \"/m/0h9mv\"\n", + " description: \"Tire\"\n", + " score: 0.9735299\n", + " topicality: 0.0020883244\n", "}\n", "label_annotations {\n", - " mid: \"/m/0h8pb3l\"\n", - " description: \"Automotive tire\"\n", - " score: 0.897686064\n", - " topicality: 0.897686064\n", + " mid: \"/m/083wq\"\n", + " description: \"Wheel\"\n", + " score: 0.9715105\n", + " topicality: 0.0028435893\n", "}\n", "label_annotations {\n", - " mid: \"/m/0768fx\"\n", - " description: \"Automotive lighting\"\n", - " score: 0.897505879\n", - " topicality: 0.897505879\n", + " mid: \"/m/0h8pb3l\"\n", + " description: \"Automotive Tire\"\n", + " score: 0.96993804\n", + " topicality: 5.827098e-05\n", "}\n", "label_annotations {\n", - " mid: \"/m/0h8p7_l\"\n", - " description: \"Automotive exhaust\"\n", - " score: 0.877965152\n", - " topicality: 0.877965152\n", + " mid: \"/m/0h8ls87\"\n", + " description: \"Automotive Exterior\"\n", + " score: 0.9641536\n", + " topicality: 0.00045098987\n", "}\n", ")\n", "('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations {\n", - " mid: \"/m/054_l\"\n", - " description: \"Mirror\"\n", - " score: 0.969698846\n", - " topicality: 0.969698846\n", + " mid: \"/m/02w3_ws\"\n", + " description: \"Personal care\"\n", + " score: 0.853392\n", + " topicality: 0.00013828959\n", "}\n", "label_annotations {\n", - " mid: \"/m/02jz0l\"\n", - " description: \"Tap\"\n", - " score: 0.962297797\n", - " topicality: 0.962297797\n", + " mid: \"/m/02pkr5\"\n", + " description: \"Plumbing fixture\"\n", + " score: 0.8383083\n", + " topicality: 0.012253191\n", "}\n", "label_annotations {\n", - " mid: \"/m/0h8lr5r\"\n", - " description: \"Bathroom sink\"\n", - " score: 0.933002412\n", - " topicality: 0.933002412\n", + " mid: \"/m/0b_zf\"\n", + " description: \"Plumbing\"\n", + " score: 0.726803\n", + " topicality: 0.016276756\n", "}\n", "label_annotations {\n", - " mid: \"/m/0130jx\"\n", - " description: \"Sink\"\n", - " score: 0.930314779\n", - " topicality: 0.930314779\n", - "}\n", - "label_annotations {\n", - " mid: \"/m/02pkr5\"\n", - " description: \"Plumbing fixture\"\n", - " score: 0.920037031\n", - " topicality: 0.920037031\n", + " mid: \"/m/01j2bj\"\n", + " description: \"Bathroom\"\n", + " score: 0.72486097\n", + " topicality: 0.35419264\n", "}\n", "label_annotations {\n", - " mid: \"/m/02dgv\"\n", - " description: \"Door\"\n", - " score: 0.890176594\n", - " topicality: 0.890176594\n", + " mid: \"/m/02jz0l\"\n", + " description: \"Tap\"\n", + " score: 0.6317307\n", + " topicality: 0.00705197\n", "}\n", "label_annotations {\n", - " mid: \"/m/09ggk\"\n", - " description: \"Purple\"\n", - " score: 0.878831089\n", - " topicality: 0.878831089\n", + " mid: \"/m/0130jx\"\n", + " description: \"Sink\"\n", + " score: 0.5732167\n", + " topicality: 0.07520393\n", "}\n", "label_annotations {\n", - " mid: \"/m/01j2bj\"\n", - " description: \"Bathroom\"\n", - " score: 0.866840482\n", - " topicality: 0.866840482\n", + " mid: \"/m/054_l\"\n", + " description: \"Mirror\"\n", + " score: 0.5680867\n", + " topicality: 0.08497098\n", "}\n", "label_annotations {\n", - " mid: \"/m/04wnmd\"\n", - " description: \"Fixture\"\n", - " score: 0.862223864\n", - " topicality: 0.862223864\n", + " mid: \"/m/0h8lr5r\"\n", + " description: \"Bathroom Sink\"\n", + " score: 0.557554\n", + " topicality: 0.007725588\n", "}\n", "label_annotations {\n", - " mid: \"/m/09qqq\"\n", - " description: \"Wall\"\n", - " score: 0.809348285\n", - " topicality: 0.809348285\n", + " mid: \"/m/03jvk\"\n", + " description: \"Household hardware\"\n", + " score: 0.5140049\n", + " topicality: 0.00064662547\n", "}\n", ")\n", "('http://farm8.staticflickr.com/7003/6528937031_10e1ce0960_z.jpg', error {\n", @@ -484,125 +481,113 @@ "}\n", ")\n", "('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations {\n", - " mid: \"/m/054_l\"\n", - " description: \"Mirror\"\n", - " score: 0.969698846\n", - " topicality: 0.969698846\n", - "}\n", - "label_annotations {\n", - " mid: \"/m/02jz0l\"\n", - " description: \"Tap\"\n", - " score: 0.962297797\n", - " topicality: 0.962297797\n", - "}\n", - "label_annotations {\n", - " mid: \"/m/0h8lr5r\"\n", - " description: \"Bathroom sink\"\n", - " score: 0.933002412\n", - " topicality: 0.933002412\n", - "}\n", - "label_annotations {\n", - " mid: \"/m/0130jx\"\n", - " description: \"Sink\"\n", - " score: 0.930314779\n", - " topicality: 0.930314779\n", + " mid: \"/m/02w3_ws\"\n", + " description: \"Personal care\"\n", + " score: 0.853392\n", + " topicality: 0.00013828959\n", "}\n", "label_annotations {\n", " mid: \"/m/02pkr5\"\n", " description: \"Plumbing fixture\"\n", - " score: 0.920037031\n", - " topicality: 0.920037031\n", + " score: 0.8383083\n", + " topicality: 0.012253191\n", "}\n", "label_annotations {\n", - " mid: \"/m/02dgv\"\n", - " description: \"Door\"\n", - " score: 0.890176594\n", - " topicality: 0.890176594\n", - "}\n", - "label_annotations {\n", - " mid: \"/m/09ggk\"\n", - " description: \"Purple\"\n", - " score: 0.878831089\n", - " topicality: 0.878831089\n", + " mid: \"/m/0b_zf\"\n", + " description: \"Plumbing\"\n", + " score: 0.726803\n", + " topicality: 0.016276756\n", "}\n", "label_annotations {\n", " mid: \"/m/01j2bj\"\n", " description: \"Bathroom\"\n", - " score: 0.866840482\n", - " topicality: 0.866840482\n", + " score: 0.72486097\n", + " topicality: 0.35419264\n", "}\n", "label_annotations {\n", - " mid: \"/m/04wnmd\"\n", - " description: \"Fixture\"\n", - " score: 0.862223864\n", - " topicality: 0.862223864\n", + " mid: \"/m/02jz0l\"\n", + " description: \"Tap\"\n", + " score: 0.6317307\n", + " topicality: 0.00705197\n", "}\n", "label_annotations {\n", - " mid: \"/m/09qqq\"\n", - " description: \"Wall\"\n", - " score: 0.809348285\n", - " topicality: 0.809348285\n", + " mid: \"/m/0130jx\"\n", + " description: \"Sink\"\n", + " score: 0.5732167\n", + " topicality: 0.07520393\n", "}\n", - ")\n", - "('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations {\n", + "label_annotations {\n", " mid: \"/m/054_l\"\n", " description: \"Mirror\"\n", - " score: 0.969698846\n", - " topicality: 0.969698846\n", - "}\n", - "label_annotations {\n", - " mid: \"/m/02jz0l\"\n", - " description: \"Tap\"\n", - " score: 0.962297797\n", - " topicality: 0.962297797\n", + " score: 0.5680867\n", + " topicality: 0.08497098\n", "}\n", "label_annotations {\n", " mid: \"/m/0h8lr5r\"\n", - " description: \"Bathroom sink\"\n", - " score: 0.933002412\n", - " topicality: 0.933002412\n", + " description: \"Bathroom Sink\"\n", + " score: 0.557554\n", + " topicality: 0.007725588\n", "}\n", "label_annotations {\n", - " mid: \"/m/0130jx\"\n", - " description: \"Sink\"\n", - " score: 0.930314779\n", - " topicality: 0.930314779\n", + " mid: \"/m/03jvk\"\n", + " description: \"Household hardware\"\n", + " score: 0.5140049\n", + " topicality: 0.00064662547\n", + "}\n", + ")\n", + "('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations {\n", + " mid: \"/m/02w3_ws\"\n", + " description: \"Personal care\"\n", + " score: 0.853392\n", + " topicality: 0.00013828959\n", "}\n", "label_annotations {\n", " mid: \"/m/02pkr5\"\n", " description: \"Plumbing fixture\"\n", - " score: 0.920037031\n", - " topicality: 0.920037031\n", + " score: 0.8383083\n", + " topicality: 0.012253191\n", "}\n", "label_annotations {\n", - " mid: \"/m/02dgv\"\n", - " description: \"Door\"\n", - " score: 0.890176594\n", - " topicality: 0.890176594\n", - "}\n", - "label_annotations {\n", - " mid: \"/m/09ggk\"\n", - " description: \"Purple\"\n", - " score: 0.878831089\n", - " topicality: 0.878831089\n", + " mid: \"/m/0b_zf\"\n", + " description: \"Plumbing\"\n", + " score: 0.726803\n", + " topicality: 0.016276756\n", "}\n", "label_annotations {\n", " mid: \"/m/01j2bj\"\n", " description: \"Bathroom\"\n", - " score: 0.866840482\n", - " topicality: 0.866840482\n", + " score: 0.72486097\n", + " topicality: 0.35419264\n", + "}\n", + "label_annotations {\n", + " mid: \"/m/02jz0l\"\n", + " description: \"Tap\"\n", + " score: 0.6317307\n", + " topicality: 0.00705197\n", + "}\n", + "label_annotations {\n", + " mid: \"/m/0130jx\"\n", + " description: \"Sink\"\n", + " score: 0.5732167\n", + " topicality: 0.07520393\n", + "}\n", + "label_annotations {\n", + " mid: \"/m/054_l\"\n", + " description: \"Mirror\"\n", + " score: 0.5680867\n", + " topicality: 0.08497098\n", "}\n", "label_annotations {\n", - " mid: \"/m/04wnmd\"\n", - " description: \"Fixture\"\n", - " score: 0.862223864\n", - " topicality: 0.862223864\n", + " mid: \"/m/0h8lr5r\"\n", + " description: \"Bathroom Sink\"\n", + " score: 0.557554\n", + " topicality: 0.007725588\n", "}\n", "label_annotations {\n", - " mid: \"/m/09qqq\"\n", - " description: \"Wall\"\n", - " score: 0.809348285\n", - " topicality: 0.809348285\n", + " mid: \"/m/03jvk\"\n", + " description: \"Household hardware\"\n", + " score: 0.5140049\n", + " topicality: 0.00064662547\n", "}\n", ")\n" ] @@ -610,11 +595,12 @@ ], "source": [ "with beam.Pipeline() as pipeline:\n", - " _ = (pipeline | \"Create inputs\" >> beam.Create(image_urls)\n", + " main, failed = (pipeline | \"Create inputs\" >> beam.Create(image_urls)\n", " | \"Read images\" >> beam.Map(read_image)\n", - " | \"Inference\" >> RunInference(model_handler=CloudVisionModelHandler())\n", - " | \"Print image_url and annotation\" >> beam.Map(print)\n", - " )" + " | \"Inference\" >> RunInference(model_handler=CloudVisionModelHandler()).with_exception_handling()\n", + " )\n", + " _ = main | \"Print image_url and annotation\" >> beam.Map(print)\n", + " _ = failed.failed_inferences | \"Print failed inferences\" >> beam.Map(print)" ] }, {