From 2d56fe32a7db10d979d160e058de4eecfa622b06 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Wed, 28 May 2025 11:51:25 -0400 Subject: [PATCH 1/4] Update Custom Remote Inference example to use RemoteModelHandler --- .../beam-ml/custom_remote_inference.ipynb | 365 +++++++++--------- 1 file changed, 177 insertions(+), 188 deletions(-) diff --git a/examples/notebooks/beam-ml/custom_remote_inference.ipynb b/examples/notebooks/beam-ml/custom_remote_inference.ipynb index a29fb572adad..6a5090517d6d 100644 --- a/examples/notebooks/beam-ml/custom_remote_inference.ipynb +++ b/examples/notebooks/beam-ml/custom_remote_inference.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": { "cellView": "form", "id": "paYiulysGrwR" @@ -137,7 +137,7 @@ "source": [ "!pip install --upgrade pip\n", "!pip install protobuf==3.19.4\n", - "!pip install apache-beam[interactive,gcp]>=2.40.0\n", + "!pip install apache-beam[interactive,gcp]==2.65.0\n", "!pip install google-cloud-vision==3.1.1\n", "!pip install requests\n", "\n", @@ -192,21 +192,19 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "metadata": { "id": "gE0go8CpnTy3" }, "outputs": [], "source": [ - "from typing import List\n", "import io\n", - "import os\n", "import requests\n", "\n", "from google.cloud import vision\n", "from google.cloud.vision_v1.types import Feature\n", "import apache_beam as beam\n", - "from apache_beam.ml.inference.base import ModelHandler\n", + "from apache_beam.ml.inference.base import RemoteModelHandler\n", "from apache_beam.ml.inference.base import RunInference\n" ] }, @@ -222,7 +220,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "metadata": { "id": "_89eN_1QeYEd" }, @@ -257,7 +255,7 @@ "\n", "When you run remote inference, prepare to encounter, identify, and handle failure as gracefully as possible. We recommend using the following techniques:\n", "\n", - "* **Exponential backoff:** Retry failed remote calls with exponentially growing pauses between retries. Using exponential backoff ensures that failures don't lead to an overwhelming number of retries in quick succession.\n", + "* **Exponential backoff:** Retry failed remote calls with exponentially growing pauses between retries. Using exponential backoff ensures that failures don't lead to an overwhelming number of retries in quick succession. The `RemoteModelHandler` base class handles this logic, with the `retry_fn` argument determining which errors are retryable. For this example we will always retry. \n", "\n", "* **Dead-letter queues:** Route failed inferences to a separate `PCollection` without failing the whole transform. Continue execution without failing the job (batch jobs' default behavior) or retrying indefinitely (streaming jobs' default behavior).\n", "You can then run custom pipeline logic on the dead-letter (unprocessed messages) queue to log the failure, send an alert, and push the failed message to temporary storage so that it can eventually be reprocessed." @@ -265,21 +263,27 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 7, "metadata": { "id": "LnaisJ_JiY_Q" }, "outputs": [], "source": [ - "class CloudVisionModelHandler(ModelHandler):\n", - " \"\"\"DoFn that accepts a batch of images as bytearray\n", - " and sends that batch to the Cloud Vision API for remote inference\"\"\"\n", - " def load_model(self):\n", + "def _always_retry(exception: Exception) -> bool:\n", + " return True\n", + "\n", + "class CloudVisionModelHandler(RemoteModelHandler):\n", + " def __init__(self):\n", + " \"\"\"DoFn that accepts a batch of images as bytearray\n", + " and sends that batch to the Cloud Vision API for remote inference\n", + " \"\"\"\n", + " super().__init__(namespace=\"CloudVisionModelHandler\", retry_filter=_always_retry)\n", + " def create_client(self):\n", " \"\"\"Initiate the Google Vision API client.\"\"\"\n", " client = vision.ImageAnnotatorClient()\n", " return client\n", "\n", - " def run_inference(self, batch, model, inference):\n", + " def request(self, batch, model, inference_args):\n", " feature = Feature()\n", " feature.type_ = Feature.Type.LABEL_DETECTION\n", "\n", @@ -333,7 +337,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 8, "metadata": { "colab": { "base_uri": "https://localhost:8080/" @@ -343,129 +347,123 @@ }, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "('http://farm3.staticflickr.com/2824/10213933686_6936eb402b_z.jpg', label_annotations {\n", - " mid: \"/m/083wq\"\n", - " description: \"Wheel\"\n", - " score: 0.977976143\n", - " topicality: 0.977976143\n", + " mid: \"/m/04_sv\"\n", + " description: \"Motorcycle\"\n", + " score: 0.9922548\n", + " topicality: 0.14033242\n", "}\n", "label_annotations {\n", - " mid: \"/m/0h9mv\"\n", - " description: \"Tire\"\n", - " score: 0.977934957\n", - " topicality: 0.977934957\n", + " mid: \"/m/01prls\"\n", + " description: \"Land vehicle\"\n", + " score: 0.99086833\n", + " topicality: 0.0029524593\n", "}\n", "label_annotations {\n", - " mid: \"/m/043g5f\"\n", - " description: \"Fuel tank\"\n", - " score: 0.958490431\n", - " topicality: 0.958490431\n", + " mid: \"/m/0768fx\"\n", + " description: \"Automotive lighting\"\n", + " score: 0.9853215\n", + " topicality: 0.002913047\n", "}\n", "label_annotations {\n", - " mid: \"/m/05s2s\"\n", - " description: \"Plant\"\n", - " score: 0.95674181\n", - " topicality: 0.95674181\n", + " mid: \"/m/07yv9\"\n", + " description: \"Vehicle\"\n", + " score: 0.98517245\n", + " topicality: 0.010408105\n", "}\n", "label_annotations {\n", - " mid: \"/m/0h8lk_j\"\n", - " description: \"Automotive fuel system\"\n", - " score: 0.941456497\n", - " topicality: 0.941456497\n", + " mid: \"/m/043g5f\"\n", + " description: \"Fuel tank\"\n", + " score: 0.9823826\n", + " topicality: 0.01933147\n", "}\n", "label_annotations {\n", - " mid: \"/m/07yv9\"\n", - " description: \"Vehicle\"\n", - " score: 0.936428607\n", - " topicality: 0.936428607\n", + " mid: \"/m/012f08\"\n", + " description: \"Motor vehicle\"\n", + " score: 0.97732854\n", + " topicality: 0.0009314301\n", "}\n", "label_annotations {\n", - " mid: \"/m/02qwkrn\"\n", - " description: \"Vehicle brake\"\n", - " score: 0.905624092\n", - " topicality: 0.905624092\n", + " mid: \"/m/0h9mv\"\n", + " description: \"Tire\"\n", + " score: 0.9735299\n", + " topicality: 0.0020883244\n", "}\n", "label_annotations {\n", - " mid: \"/m/0h8pb3l\"\n", - " description: \"Automotive tire\"\n", - " score: 0.897686064\n", - " topicality: 0.897686064\n", + " mid: \"/m/083wq\"\n", + " description: \"Wheel\"\n", + " score: 0.9715105\n", + " topicality: 0.0028435893\n", "}\n", "label_annotations {\n", - " mid: \"/m/0768fx\"\n", - " description: \"Automotive lighting\"\n", - " score: 0.897505879\n", - " topicality: 0.897505879\n", + " mid: \"/m/0h8pb3l\"\n", + " description: \"Automotive Tire\"\n", + " score: 0.96993804\n", + " topicality: 5.827098e-05\n", "}\n", "label_annotations {\n", - " mid: \"/m/0h8p7_l\"\n", - " description: \"Automotive exhaust\"\n", - " score: 0.877965152\n", - " topicality: 0.877965152\n", + " mid: \"/m/0h8ls87\"\n", + " description: \"Automotive Exterior\"\n", + " score: 0.9641536\n", + " topicality: 0.00045098987\n", "}\n", ")\n", "('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations {\n", - " mid: \"/m/054_l\"\n", - " description: \"Mirror\"\n", - " score: 0.969698846\n", - " topicality: 0.969698846\n", + " mid: \"/m/02w3_ws\"\n", + " description: \"Personal care\"\n", + " score: 0.853392\n", + " topicality: 0.00013828959\n", "}\n", "label_annotations {\n", - " mid: \"/m/02jz0l\"\n", - " description: \"Tap\"\n", - " score: 0.962297797\n", - " topicality: 0.962297797\n", + " mid: \"/m/02pkr5\"\n", + " description: \"Plumbing fixture\"\n", + " score: 0.8383083\n", + " topicality: 0.012253191\n", "}\n", "label_annotations {\n", - " mid: \"/m/0h8lr5r\"\n", - " description: \"Bathroom sink\"\n", - " score: 0.933002412\n", - " topicality: 0.933002412\n", + " mid: \"/m/0b_zf\"\n", + " description: \"Plumbing\"\n", + " score: 0.726803\n", + " topicality: 0.016276756\n", "}\n", "label_annotations {\n", - " mid: \"/m/0130jx\"\n", - " description: \"Sink\"\n", - " score: 0.930314779\n", - " topicality: 0.930314779\n", - "}\n", - "label_annotations {\n", - " mid: \"/m/02pkr5\"\n", - " description: \"Plumbing fixture\"\n", - " score: 0.920037031\n", - " topicality: 0.920037031\n", + " mid: \"/m/01j2bj\"\n", + " description: \"Bathroom\"\n", + " score: 0.72486097\n", + " topicality: 0.35419264\n", "}\n", "label_annotations {\n", - " mid: \"/m/02dgv\"\n", - " description: \"Door\"\n", - " score: 0.890176594\n", - " topicality: 0.890176594\n", + " mid: \"/m/02jz0l\"\n", + " description: \"Tap\"\n", + " score: 0.6317307\n", + " topicality: 0.00705197\n", "}\n", "label_annotations {\n", - " mid: \"/m/09ggk\"\n", - " description: \"Purple\"\n", - " score: 0.878831089\n", - " topicality: 0.878831089\n", + " mid: \"/m/0130jx\"\n", + " description: \"Sink\"\n", + " score: 0.5732167\n", + " topicality: 0.07520393\n", "}\n", "label_annotations {\n", - " mid: \"/m/01j2bj\"\n", - " description: \"Bathroom\"\n", - " score: 0.866840482\n", - " topicality: 0.866840482\n", + " mid: \"/m/054_l\"\n", + " description: \"Mirror\"\n", + " score: 0.5680867\n", + " topicality: 0.08497098\n", "}\n", "label_annotations {\n", - " mid: \"/m/04wnmd\"\n", - " description: \"Fixture\"\n", - " score: 0.862223864\n", - " topicality: 0.862223864\n", + " mid: \"/m/0h8lr5r\"\n", + " description: \"Bathroom Sink\"\n", + " score: 0.557554\n", + " topicality: 0.007725588\n", "}\n", "label_annotations {\n", - " mid: \"/m/09qqq\"\n", - " description: \"Wall\"\n", - " score: 0.809348285\n", - " topicality: 0.809348285\n", + " mid: \"/m/03jvk\"\n", + " description: \"Household hardware\"\n", + " score: 0.5140049\n", + " topicality: 0.00064662547\n", "}\n", ")\n", "('http://farm8.staticflickr.com/7003/6528937031_10e1ce0960_z.jpg', error {\n", @@ -484,125 +482,113 @@ "}\n", ")\n", "('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations {\n", - " mid: \"/m/054_l\"\n", - " description: \"Mirror\"\n", - " score: 0.969698846\n", - " topicality: 0.969698846\n", - "}\n", - "label_annotations {\n", - " mid: \"/m/02jz0l\"\n", - " description: \"Tap\"\n", - " score: 0.962297797\n", - " topicality: 0.962297797\n", - "}\n", - "label_annotations {\n", - " mid: \"/m/0h8lr5r\"\n", - " description: \"Bathroom sink\"\n", - " score: 0.933002412\n", - " topicality: 0.933002412\n", - "}\n", - "label_annotations {\n", - " mid: \"/m/0130jx\"\n", - " description: \"Sink\"\n", - " score: 0.930314779\n", - " topicality: 0.930314779\n", + " mid: \"/m/02w3_ws\"\n", + " description: \"Personal care\"\n", + " score: 0.853392\n", + " topicality: 0.00013828959\n", "}\n", "label_annotations {\n", " mid: \"/m/02pkr5\"\n", " description: \"Plumbing fixture\"\n", - " score: 0.920037031\n", - " topicality: 0.920037031\n", - "}\n", - "label_annotations {\n", - " mid: \"/m/02dgv\"\n", - " description: \"Door\"\n", - " score: 0.890176594\n", - " topicality: 0.890176594\n", + " score: 0.8383083\n", + " topicality: 0.012253191\n", "}\n", "label_annotations {\n", - " mid: \"/m/09ggk\"\n", - " description: \"Purple\"\n", - " score: 0.878831089\n", - " topicality: 0.878831089\n", + " mid: \"/m/0b_zf\"\n", + " description: \"Plumbing\"\n", + " score: 0.726803\n", + " topicality: 0.016276756\n", "}\n", "label_annotations {\n", " mid: \"/m/01j2bj\"\n", " description: \"Bathroom\"\n", - " score: 0.866840482\n", - " topicality: 0.866840482\n", + " score: 0.72486097\n", + " topicality: 0.35419264\n", "}\n", "label_annotations {\n", - " mid: \"/m/04wnmd\"\n", - " description: \"Fixture\"\n", - " score: 0.862223864\n", - " topicality: 0.862223864\n", + " mid: \"/m/02jz0l\"\n", + " description: \"Tap\"\n", + " score: 0.6317307\n", + " topicality: 0.00705197\n", "}\n", "label_annotations {\n", - " mid: \"/m/09qqq\"\n", - " description: \"Wall\"\n", - " score: 0.809348285\n", - " topicality: 0.809348285\n", + " mid: \"/m/0130jx\"\n", + " description: \"Sink\"\n", + " score: 0.5732167\n", + " topicality: 0.07520393\n", "}\n", - ")\n", - "('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations {\n", + "label_annotations {\n", " mid: \"/m/054_l\"\n", " description: \"Mirror\"\n", - " score: 0.969698846\n", - " topicality: 0.969698846\n", - "}\n", - "label_annotations {\n", - " mid: \"/m/02jz0l\"\n", - " description: \"Tap\"\n", - " score: 0.962297797\n", - " topicality: 0.962297797\n", + " score: 0.5680867\n", + " topicality: 0.08497098\n", "}\n", "label_annotations {\n", " mid: \"/m/0h8lr5r\"\n", - " description: \"Bathroom sink\"\n", - " score: 0.933002412\n", - " topicality: 0.933002412\n", + " description: \"Bathroom Sink\"\n", + " score: 0.557554\n", + " topicality: 0.007725588\n", "}\n", "label_annotations {\n", - " mid: \"/m/0130jx\"\n", - " description: \"Sink\"\n", - " score: 0.930314779\n", - " topicality: 0.930314779\n", + " mid: \"/m/03jvk\"\n", + " description: \"Household hardware\"\n", + " score: 0.5140049\n", + " topicality: 0.00064662547\n", + "}\n", + ")\n", + "('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations {\n", + " mid: \"/m/02w3_ws\"\n", + " description: \"Personal care\"\n", + " score: 0.853392\n", + " topicality: 0.00013828959\n", "}\n", "label_annotations {\n", " mid: \"/m/02pkr5\"\n", " description: \"Plumbing fixture\"\n", - " score: 0.920037031\n", - " topicality: 0.920037031\n", - "}\n", - "label_annotations {\n", - " mid: \"/m/02dgv\"\n", - " description: \"Door\"\n", - " score: 0.890176594\n", - " topicality: 0.890176594\n", + " score: 0.8383083\n", + " topicality: 0.012253191\n", "}\n", "label_annotations {\n", - " mid: \"/m/09ggk\"\n", - " description: \"Purple\"\n", - " score: 0.878831089\n", - " topicality: 0.878831089\n", + " mid: \"/m/0b_zf\"\n", + " description: \"Plumbing\"\n", + " score: 0.726803\n", + " topicality: 0.016276756\n", "}\n", "label_annotations {\n", " mid: \"/m/01j2bj\"\n", " description: \"Bathroom\"\n", - " score: 0.866840482\n", - " topicality: 0.866840482\n", + " score: 0.72486097\n", + " topicality: 0.35419264\n", "}\n", "label_annotations {\n", - " mid: \"/m/04wnmd\"\n", - " description: \"Fixture\"\n", - " score: 0.862223864\n", - " topicality: 0.862223864\n", + " mid: \"/m/02jz0l\"\n", + " description: \"Tap\"\n", + " score: 0.6317307\n", + " topicality: 0.00705197\n", "}\n", "label_annotations {\n", - " mid: \"/m/09qqq\"\n", - " description: \"Wall\"\n", - " score: 0.809348285\n", - " topicality: 0.809348285\n", + " mid: \"/m/0130jx\"\n", + " description: \"Sink\"\n", + " score: 0.5732167\n", + " topicality: 0.07520393\n", + "}\n", + "label_annotations {\n", + " mid: \"/m/054_l\"\n", + " description: \"Mirror\"\n", + " score: 0.5680867\n", + " topicality: 0.08497098\n", + "}\n", + "label_annotations {\n", + " mid: \"/m/0h8lr5r\"\n", + " description: \"Bathroom Sink\"\n", + " score: 0.557554\n", + " topicality: 0.007725588\n", + "}\n", + "label_annotations {\n", + " mid: \"/m/03jvk\"\n", + " description: \"Household hardware\"\n", + " score: 0.5140049\n", + " topicality: 0.00064662547\n", "}\n", ")\n" ] @@ -642,18 +628,21 @@ "provenance": [] }, "kernelspec": { - "display_name": "Python 3", + "display_name": "beam101", "language": "python", "name": "python3" }, "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", "name": "python", - "version": "3.10.7 (main, Dec 7 2022, 13:34:16) [Clang 14.0.0 (clang-1400.0.29.102)]" - }, - "vscode": { - "interpreter": { - "hash": "40c55305dca37c951f6b497e2e996ca59c449c4502b9f8a4515c118ec923845d" - } + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.1" } }, "nbformat": 4, From 981491b085cbef61aa392da292011e26acbed1ce Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Wed, 28 May 2025 11:56:59 -0400 Subject: [PATCH 2/4] restore old kernel metadata --- .../beam-ml/custom_remote_inference.ipynb | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/examples/notebooks/beam-ml/custom_remote_inference.ipynb b/examples/notebooks/beam-ml/custom_remote_inference.ipynb index 6a5090517d6d..ae765259c561 100644 --- a/examples/notebooks/beam-ml/custom_remote_inference.ipynb +++ b/examples/notebooks/beam-ml/custom_remote_inference.ipynb @@ -628,21 +628,18 @@ "provenance": [] }, "kernelspec": { - "display_name": "beam101", + "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.10.1" + "version": "3.10.7 (main, Dec 7 2022, 13:34:16) [Clang 14.0.0 (clang-1400.0.29.102)]" + }, + "vscode": { + "interpreter": { + "hash": "40c55305dca37c951f6b497e2e996ca59c449c4502b9f8a4515c118ec923845d" + } } }, "nbformat": 4, From f114b79504ead5388b4d5365146e34bb4e1c237a Mon Sep 17 00:00:00 2001 From: Jack McCluskey <34928439+jrmccluskey@users.noreply.github.com> Date: Tue, 3 Jun 2025 09:47:00 -0400 Subject: [PATCH 3/4] Update examples/notebooks/beam-ml/custom_remote_inference.ipynb Co-authored-by: Danny McCormick --- examples/notebooks/beam-ml/custom_remote_inference.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/notebooks/beam-ml/custom_remote_inference.ipynb b/examples/notebooks/beam-ml/custom_remote_inference.ipynb index ae765259c561..12e5d0cf1fdd 100644 --- a/examples/notebooks/beam-ml/custom_remote_inference.ipynb +++ b/examples/notebooks/beam-ml/custom_remote_inference.ipynb @@ -137,7 +137,7 @@ "source": [ "!pip install --upgrade pip\n", "!pip install protobuf==3.19.4\n", - "!pip install apache-beam[interactive,gcp]==2.65.0\n", + "!pip install apache-beam[interactive,gcp]>=2.65.0\n", "!pip install google-cloud-vision==3.1.1\n", "!pip install requests\n", "\n", From ed663eebb9d3ceb841d0abb03955a2afc74458b5 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Tue, 3 Jun 2025 09:58:33 -0400 Subject: [PATCH 4/4] Add DLQ --- .../beam-ml/custom_remote_inference.ipynb | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/examples/notebooks/beam-ml/custom_remote_inference.ipynb b/examples/notebooks/beam-ml/custom_remote_inference.ipynb index 12e5d0cf1fdd..e510cd38e216 100644 --- a/examples/notebooks/beam-ml/custom_remote_inference.ipynb +++ b/examples/notebooks/beam-ml/custom_remote_inference.ipynb @@ -192,7 +192,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 5, "metadata": { "id": "gE0go8CpnTy3" }, @@ -220,7 +220,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 6, "metadata": { "id": "_89eN_1QeYEd" }, @@ -257,8 +257,7 @@ "\n", "* **Exponential backoff:** Retry failed remote calls with exponentially growing pauses between retries. Using exponential backoff ensures that failures don't lead to an overwhelming number of retries in quick succession. The `RemoteModelHandler` base class handles this logic, with the `retry_fn` argument determining which errors are retryable. For this example we will always retry. \n", "\n", - "* **Dead-letter queues:** Route failed inferences to a separate `PCollection` without failing the whole transform. Continue execution without failing the job (batch jobs' default behavior) or retrying indefinitely (streaming jobs' default behavior).\n", - "You can then run custom pipeline logic on the dead-letter (unprocessed messages) queue to log the failure, send an alert, and push the failed message to temporary storage so that it can eventually be reprocessed." + "* **Dead-letter queues:** Route failed inferences to a separate `PCollection` without failing the whole transform. Continue execution without failing the job (batch jobs' default behavior) or retrying indefinitely (streaming jobs' default behavior). This is provided through the `with_exception_handling()` [option](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference.with_exception_handling) for RunInference. This produces tagged outputs for the failed inferences which can be handled separately from successful ones. You can then run custom pipeline logic on the dead-letter (unprocessed messages) queue to log the failure, send an alert, and push the failed message to temporary storage so that it can eventually be reprocessed." ] }, { @@ -337,7 +336,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 9, "metadata": { "colab": { "base_uri": "https://localhost:8080/" @@ -596,11 +595,12 @@ ], "source": [ "with beam.Pipeline() as pipeline:\n", - " _ = (pipeline | \"Create inputs\" >> beam.Create(image_urls)\n", + " main, failed = (pipeline | \"Create inputs\" >> beam.Create(image_urls)\n", " | \"Read images\" >> beam.Map(read_image)\n", - " | \"Inference\" >> RunInference(model_handler=CloudVisionModelHandler())\n", - " | \"Print image_url and annotation\" >> beam.Map(print)\n", - " )" + " | \"Inference\" >> RunInference(model_handler=CloudVisionModelHandler()).with_exception_handling()\n", + " )\n", + " _ = main | \"Print image_url and annotation\" >> beam.Map(print)\n", + " _ = failed.failed_inferences | \"Print failed inferences\" >> beam.Map(print)" ] }, {