diff --git a/examples/notebooks/beam-ml/spanner_product_catalog_embeddings.ipynb b/examples/notebooks/beam-ml/spanner_product_catalog_embeddings.ipynb new file mode 100644 index 000000000000..55d2a3946bfb --- /dev/null +++ b/examples/notebooks/beam-ml/spanner_product_catalog_embeddings.ipynb @@ -0,0 +1,2313 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "ZNxqxc73tIEL" + }, + "outputs": [], + "source": [ + "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", + "\n", + "# Licensed to the Apache Software Foundation (ASF) under one\n", + "# or more contributor license agreements. See the NOTICE file\n", + "# distributed with this work for additional information\n", + "# regarding copyright ownership. The ASF licenses this file\n", + "# to you under the Apache License, Version 2.0 (the\n", + "# \"License\"); you may not use this file except in compliance\n", + "# with the License. You may obtain a copy of the License at\n", + "#\n", + "# http://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing,\n", + "# software distributed under the License is distributed on an\n", + "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", + "# KIND, either express or implied. See the License for the\n", + "# specific language governing permissions and limitations\n", + "# under the License." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "4Ws_jQRmtOmv" + }, + "source": [ + "# Vector Embedding Ingestion with Apache Beam and Cloud Spanner\n", + "\n", + "\n", + " \n", + " \n", + "
\n", + " Run in Google Colab\n", + " \n", + " View source on GitHub\n", + "
" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "Y8-IIxkptVFL" + }, + "source": [ + "\n", + "# Introduction\n", + "\n", + "This Colab demonstrates how to generate embeddings from data and ingest them into [Cloud Spanner](https://cloud.google.com/spanner). We'll use Apache Beam and Dataflow for scalable data processing.\n", + "\n", + "## Example: Furniture Product Catalog\n", + "\n", + "We'll work with a sample e-commerce dataset representing a furniture product catalog. Each product has:\n", + "\n", + "* **Structured fields:** `id`, `name`, `category`, `price`\n", + "* **Detailed text descriptions:** Longer text describing the product's features.\n", + "* **Additional metadata:** `material`, `dimensions`\n", + "\n", + "## Pipeline Overview\n", + "We will build a pipeline to:\n", + "1. Read product data\n", + "2. Convert unstructured product data, to embeddable `Chunk`[1] type\n", + "2. Generate Embeddings: Use a pre-trained Hugging Face model (via MLTransform) to create vector embeddings\n", + "3. Write embeddings and metadata to Spanner table\n", + "\n", + "Here's a visualization of the data flow:\n", + "\n", + "| Stage | Data Representation | Notes |\n", + "| :------------------------ | :------------------------------------------------------- | :---------------------------------------------------------------------------------------------------------------------- |\n", + "| **1. Ingest Data** | `{`
` \"id\": \"desk-001\",`
` \"name\": \"Modern Desk\",`
` \"description\": \"Sleek...\",`
` \"category\": \"Desks\",`
` ...`
`}` | Supports:
- Reading from batch (e.g., files, databases)
- Streaming sources (e.g., Pub/Sub). |\n", + "| **2. Convert to Chunks** | `Chunk(`
  `id=\"desk-001\",`
  `content=Content(`
    `text=\"Modern Desk\"`
   `),`
  `metadata={...}`
`)` | - `Chunk` is the structured input for generating and ingesting embeddings.
- `chunk.content.text` is the field that is embedded.
- Converting to `Chunk` does not mean breaking data into smaller pieces,
   it's simply organizing your data in a standard format for the embedding pipeline.
- `Chunk` allows data to flow seamlessly throughout embedding pipelines. |\n", + "| **3. Generate Embeddings**| `Chunk(`
  `id=\"desk-001\",`
  `embedding=[-0.1, 0.6, ...],`
`...)` | Supports:
- Local Hugging Face models
- Remote Vertex AI models
- Custom embedding implementations. |\n", + "| **4. Write to Spanner** | **Spanner Table (Example Row):**
`id: desk-001`
`embedding: [-0.1, 0.6, ...]`
`name = \"Modern Desk\"`,
`Other fields ...` | Supports:
- Custom schemas
- Conflict resolution strategies for handling updates |\n", + "\n", + "\n", + "[1]: Chunk represents an embeddable unit of input. It specifies which fields should be embedded and which fields should be treated as metadata. Converting to Chunk does not necessarily mean breaking your text into smaller pieces - it's primarily about structuring your data for the embedding pipeline. For very long texts that exceed the embedding model's maximum input size, you can optionally [use Langchain TextSplitters](https://beam.apache.org/releases/pydoc/2.63.0/apache_beam.ml.rag.chunking.langchain.html) to break the text into smaller `Chunk`'s.\n", + "\n", + "\n", + "## Execution Environments\n", + "\n", + "This notebook demonstrates two execution environments:\n", + "\n", + "1. **DirectRunner (Local Execution)**: All examples in this notebook run on DirectRunner by default, which executes the pipeline locally. This is ideal for development, testing, and processing small datasets.\n", + "\n", + "2. **DataflowRunner (Distributed Execution)**: The [Run on Dataflow](#scrollTo=Quick_Start_Run_on_Dataflow) section demonstrates how to execute the same pipeline on Google Cloud Dataflow for scalable, distributed processing. This is recommended for production workloads and large datasets.\n", + "\n", + "All examples in this notebook can be adapted to run on Dataflow by following the pattern shown in the \"Run on Dataflow\" section.\n", + "\n", + "# Setup and Prerequisites\n", + "\n", + "This example requires:\n", + "1. A Cloud Spanner instance\n", + "2. Apache Beam 2.70.0 or later\n", + "\n", + "## Install Packages and Dependencies\n", + "\n", + "First, let's install the Python packages required for the embedding and ingestion pipeline:\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "WdqT4-h1tKUS" + }, + "outputs": [], + "source": [ + "# Apache Beam with GCP support\n", + "!pip install apache_beam[interactive,gcp]>=2.70.0 --quiet\n", + "# Huggingface sentence-transformers for embedding models\n", + "!pip install sentence-transformers --quiet\n", + "!pip show apache-beam" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "e9-blURmxeEc" + }, + "source": [ + "## Database Setup\n", + "\n", + "To connect to Cloud Spanner, you'll need:\n", + "1. GCP project ID where the Spanner instance is located\n", + "2. Spanner instance ID\n", + "3. Database ID (Database will be created if it doesn't exist)\n", + "\n", + "Replace these placeholder values with your actual Cloud Spanner details:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "trYKbTzDxEzJ" + }, + "outputs": [], + "source": [ + "PROJECT_ID = \"\" # @param {type:'string'}\n", + "INSTANCE_ID = \"\" # @param {type:'string'}\n", + "DATABASE_ID = \"\" # @param {type:'string'}" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "2v-3DUjwx6OQ" + }, + "source": [ + "## Authenticate to Google Cloud\n", + "\n", + "To connect to the Cloud Spanner instance, we need to set up authentication. \n", + "\n", + "**Why multiple authentication steps?**\n", + "\n", + "The Spanner I/O connector uses a cross-language Java transform under the hood. This means:\n", + "1. `auth.authenticate_user()` authenticates the Python environment\n", + "2. `gcloud auth application-default login` writes credentials to disk where the Java runtime can access them\n", + "\n", + "**Recommended: Use a Service Account**\n", + "\n", + "For production workloads or to avoid interactive login prompts, we recommend using a service account with appropriate Spanner permissions:\n", + "\n", + "1. Create a service account with the `Cloud Spanner Database User` role (or `Cloud Spanner Database Admin` if creating tables)\n", + "2. Download the JSON key file\n", + "3. Set the environment variable: `export GOOGLE_APPLICATION_CREDENTIALS=\"/path/to/service-account-key.json\"`\n", + "\n", + "When using a service account, both Python and Java runtimes will automatically pick up the credentials, and you can skip the interactive authentication below." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "9Iq58UZHxvxj" + }, + "outputs": [], + "source": [ + "import sys\n", + "if 'google.colab' in sys.modules:\n", + "from google.colab import auth\n", + "# Authenticates Python SDK\n", + "auth.authenticate_user(project_id=PROJECT_ID)\n", + "\n", + "# Writes application default credentials to disk for Java cross-language transforms\n", + "!gcloud auth application-default login\n", + "\n", + "!gcloud config set project {PROJECT_ID}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "VMv18QXUyAIr" + }, + "outputs": [], + "source": [ + "# @title Spanner Helper Functions for Creating Tables and Verifying Data\n", + "\n", + "from google.cloud import spanner\n", + "from google.api_core.exceptions import NotFound, AlreadyExists\n", + "import time\n", + "\n", + "def get_spanner_client(project_id: str) -> spanner.Client:\n", + " \"\"\"Creates a Spanner client.\"\"\"\n", + " return spanner.Client(project=project_id)\n", + "\n", + "\n", + "def ensure_instance_exists(\n", + " client: spanner.Client,\n", + " instance_id: str\n", + "):\n", + " \"\"\"Ensure Spanner instance exists, raise an error if it doesn't.\n", + "\n", + " Args:\n", + " client: Spanner client\n", + " instance_id: Instance ID to check\n", + "\n", + " Returns:\n", + " The Spanner Instance object.\n", + "\n", + " Raises:\n", + " NotFound: If the instance does not exist.\n", + " \"\"\"\n", + " instance = client.instance(instance_id)\n", + "\n", + " try:\n", + " # Attempt to load the instance metadata\n", + " instance.reload()\n", + " print(f\"✓ Spanner Instance '{instance_id}' exists\")\n", + " return instance\n", + " except NotFound:\n", + " # Instance does not exist\n", + " raise NotFound(\n", + " f\"Error: Spanner Instance '{instance_id}' not found. \"\n", + " \"Please create the instance before running this script.\"\n", + " )\n", + "\n", + "def ensure_database_exists(\n", + " client: spanner.Client,\n", + " instance_id: str,\n", + " database_id: str,\n", + " ddl_statements: list = None\n", + "):\n", + " \"\"\"Ensure database exists, create if it doesn't.\n", + "\n", + " Args:\n", + " client: Spanner client\n", + " instance_id: Instance ID to get\n", + " database_id: Database ID to create or get\n", + " ddl_statements: Optional DDL statements for table creation\n", + "\n", + " Returns:\n", + " Database instance\n", + " \"\"\"\n", + " instance = ensure_instance_exists(client, instance_id)\n", + " database = instance.database(database_id)\n", + "\n", + " try:\n", + " # Try to get existing database\n", + " database.reload()\n", + " print(f\"✓ Database '{database_id}' already exists\")\n", + " return database\n", + " except NotFound:\n", + " # Create new database\n", + " print(f\"Creating database '{database_id}'...\")\n", + " operation = database.create()\n", + " operation.result(timeout=120)\n", + " print(f\"✓ Database '{database_id}' created successfully\")\n", + " return database\n", + "\n", + "def create_or_replace_table(\n", + " client: spanner.Client,\n", + " instance_id: str,\n", + " database_id: str,\n", + " table_name: str,\n", + " table_ddl: str\n", + "):\n", + " \"\"\"Create or replace a table in Spanner.\n", + "\n", + " Args:\n", + " client: Spanner client\n", + " instance_id: Instance ID to get\n", + " database_id: Database ID\n", + " table_name: Table name to create\n", + " table_ddl: Complete CREATE TABLE DDL statement\n", + " \"\"\"\n", + " instance = ensure_instance_exists(client, instance_id)\n", + " database = instance.database(database_id)\n", + "\n", + " # Drop table if exists\n", + " try:\n", + " print(f\"Dropping table '{table_name}' if it exists...\")\n", + " operation = database.update_ddl([f\"DROP TABLE {table_name}\"])\n", + " operation.result(timeout=120)\n", + " print(f\"✓ Dropped existing table '{table_name}'\")\n", + " time.sleep(2) # Wait for drop to complete\n", + " except Exception as e:\n", + " if \"NOT_FOUND\" not in str(e):\n", + " print(f\"Note: Table may not exist (this is normal): {e}\")\n", + "\n", + " # Create table\n", + " print(f\"Creating table '{table_name}'...\")\n", + " operation = database.update_ddl([table_ddl])\n", + " operation.result(timeout=120)\n", + " print(f\"✓ Table '{table_name}' created successfully\")\n", + "\n", + "def verify_embeddings_spanner(\n", + " client: spanner.Client,\n", + " instance_id: str,\n", + " database_id: str,\n", + " table_name: str\n", + "):\n", + " \"\"\"Query and display all rows from a Spanner table.\n", + "\n", + " Args:\n", + " client: Spanner client\n", + " instance_id: Instance ID to get\n", + " database_id: Database ID\n", + " table_name: Table name to query\n", + " \"\"\"\n", + " instance = ensure_instance_exists(client, instance_id)\n", + " database = instance.database(database_id)\n", + "\n", + " with database.snapshot() as snapshot:\n", + " results = snapshot.execute_sql(f\"SELECT * FROM {table_name}\")\n", + " rows = list(results)\n", + "\n", + " print(f\"\\nFound {len(rows)} products in '{table_name}':\")\n", + " print(\"-\" * 80)\n", + "\n", + " if not rows:\n", + " print(\"Table is empty.\")\n", + " print(\"-\" * 80)\n", + " else:\n", + " # Print each row\n", + " for row in rows:\n", + " for i, value in enumerate(row):\n", + " # Limit embedding display to first 5 values\n", + " if isinstance(value, list) and len(value) > 5:\n", + " print(f\"Column {i}: [{value[0]:.4f}, {value[1]:.4f}, ..., {value[-1]:.4f}] (length: {len(value)})\")\n", + " else:\n", + " print(f\"Column {i}: {value}\")\n", + " print(\"-\" * 80)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "5-T1tJncyXpt" + }, + "source": [ + "## Create Sample Product Catalog Data\n", + "\n", + "We'll create a typical e-commerce catalog where you want to:\n", + "- Generate embeddings for product text\n", + "- Store vectors alongside product data\n", + "- Enable vector similarity features\n", + "\n", + "Example product:\n", + "```python\n", + "{\n", + " \"id\": \"desk-001\",\n", + " \"name\": \"Modern Minimalist Desk\",\n", + " \"description\": \"Sleek minimalist desk with clean lines and a spacious work surface. \"\n", + " \"Features cable management system and sturdy steel frame. \"\n", + " \"Perfect for contemporary home offices and workspaces.\",\n", + " \"category\": \"Desks\",\n", + " \"price\": 399.99,\n", + " \"material\": \"Engineered Wood, Steel\",\n", + " \"dimensions\": \"60W x 30D x 29H inches\"\n", + "}\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "cellView": "form", + "id": "HpZ6tzlZyUKj" + }, + "outputs": [], + "source": [ + "#@title Create sample data\n", + "PRODUCTS_DATA = [\n", + " {\n", + " \"id\": \"desk-001\",\n", + " \"name\": \"Modern Minimalist Desk\",\n", + " \"description\": \"Sleek minimalist desk with clean lines and a spacious work surface. \"\n", + " \"Features cable management system and sturdy steel frame. \"\n", + " \"Perfect for contemporary home offices and workspaces.\",\n", + " \"category\": \"Desks\",\n", + " \"price\": 399.99,\n", + " \"material\": \"Engineered Wood, Steel\",\n", + " \"dimensions\": \"60W x 30D x 29H inches\"\n", + " },\n", + " {\n", + " \"id\": \"chair-001\",\n", + " \"name\": \"Ergonomic Mesh Office Chair\",\n", + " \"description\": \"Premium ergonomic office chair with breathable mesh back, \"\n", + " \"adjustable lumbar support, and 4D armrests. Features synchronized \"\n", + " \"tilt mechanism and memory foam seat cushion. Ideal for long work hours.\",\n", + " \"category\": \"Office Chairs\",\n", + " \"price\": 299.99,\n", + " \"material\": \"Mesh, Metal, Premium Foam\",\n", + " \"dimensions\": \"26W x 26D x 48H inches\"\n", + " },\n", + " {\n", + " \"id\": \"sofa-001\",\n", + " \"name\": \"Contemporary Sectional Sofa\",\n", + " \"description\": \"Modern L-shaped sectional with chaise lounge. Upholstered in premium \"\n", + " \"performance fabric. Features deep seats, plush cushions, and solid \"\n", + " \"wood legs. Perfect for modern living rooms.\",\n", + " \"category\": \"Sofas\",\n", + " \"price\": 1299.99,\n", + " \"material\": \"Performance Fabric, Solid Wood\",\n", + " \"dimensions\": \"112W x 65D x 34H inches\"\n", + " },\n", + " {\n", + " \"id\": \"table-001\",\n", + " \"name\": \"Rustic Dining Table\",\n", + " \"description\": \"Farmhouse-style dining table with solid wood construction. \"\n", + " \"Features distressed finish and trestle base. Seats 6-8 people \"\n", + " \"comfortably. Perfect for family gatherings.\",\n", + " \"category\": \"Dining Tables\",\n", + " \"price\": 899.99,\n", + " \"material\": \"Solid Pine Wood\",\n", + " \"dimensions\": \"72W x 42D x 30H inches\"\n", + " },\n", + " {\n", + " \"id\": \"bed-001\",\n", + " \"name\": \"Platform Storage Bed\",\n", + " \"description\": \"Modern queen platform bed with integrated storage drawers. \"\n", + " \"Features upholstered headboard and durable wood slat support. \"\n", + " \"No box spring needed. Perfect for maximizing bedroom space.\",\n", + " \"category\": \"Beds\",\n", + " \"price\": 799.99,\n", + " \"material\": \"Engineered Wood, Linen Fabric\",\n", + " \"dimensions\": \"65W x 86D x 48H inches\"\n", + " }\n", + "]\n", + "print(f\"\"\"✓ Created PRODUCTS_DATA with {len(PRODUCTS_DATA)} records\"\"\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "rqh8EY_cyljn" + }, + "source": [ + "## Importing Pipeline Components\n", + "\n", + "We import the following components to configure our embedding ingestion pipeline:\n", + "- `apache_beam.ml.rag.types.Chunk`, the structured input for generating and ingesting embeddings\n", + "- `apache_beam.ml.rag.ingestion.spanner.SpannerVectorWriterConfig` for configuring write behavior\n", + "- `apache_beam.ml.rag.ingestion.spanner.SpannerColumnSpecsBuilder` for custom schema mapping\n", + "- `apache_beam.ml.rag.ingestion.base.VectorDatabaseWriteTransform` to perform the write step" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "xOywsu1lzhm6" + }, + "outputs": [], + "source": [ + "from apache_beam.ml.rag.ingestion.spanner import SpannerVectorWriterConfig\n", + "from apache_beam.ml.rag.ingestion.spanner import SpannerColumnSpecsBuilder\n", + "from apache_beam.ml.rag.ingestion.base import VectorDatabaseWriteTransform\n", + "from apache_beam.ml.rag.types import Chunk, Content\n", + "from apache_beam.ml.rag.embeddings.huggingface import HuggingfaceTextEmbeddings\n", + "\n", + "import apache_beam as beam\n", + "from apache_beam.options.pipeline_options import PipelineOptions\n", + "from apache_beam.ml.transforms.base import MLTransform" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "hK8X1hLMzBPb" + }, + "source": [ + "# What's next?\n", + "\n", + "This colab covers several use cases that you can explore based on your needs after completing the Setup and Prerequisites:\n", + "\n", + "🔰 **New to vector embeddings?**\n", + "- [Start with Quick Start](#scrollTo=Quick_Start_Basic_Vector_Ingestion)\n", + "- Uses simple out-of-box schema\n", + "- Perfect for initial testing\n", + "\n", + "🚀 **Need to scale to large datasets?**\n", + "- [Go to Run on Dataflow](#scrollTo=Quick_Start_Run_on_Dataflow)\n", + "- Learn how to execute the same pipeline at scale\n", + "- Fully managed\n", + "- Process large datasets efficiently\n", + "\n", + "🎯 **Have a specific schema?**\n", + "- [Go to Custom Schema](#scrollTo=Custom_Schema_with_Column_Mapping)\n", + "- Learn to use different column names\n", + "- Map metadata to individual columns\n", + "\n", + "🔄 **Need to update embeddings?**\n", + "- [Check out Updating Embeddings](#scrollTo=Update_Embeddings_and_Metadata_with_Write_Mode)\n", + "- Handle conflicts\n", + "- Selective field updates\n", + "\n", + "🔗 **Need to generate and Store Embeddings for Existing Spanner Table?**\n", + "- [See Database Integration](#scrollTo=Adding_Embeddings_to_Existing_Database_Records)\n", + "- Read data from your Spanner table.\n", + "- Generate embeddings for the relevant fields.\n", + "- Update your table (or a related table) with the generated embeddings.\n", + "\n", + "🤖 **Want to use Google's AI models?**\n", + "- [Try Vertex AI Embeddings](#scrollTo=Generate_Embeddings_with_VertexAI_Text_Embeddings)\n", + "- Use Google's powerful embedding models\n", + "- Seamlessly integrate with other Google Cloud services\n", + "\n", + "🔄 Need real-time embedding updates?\n", + "\n", + "- [Try Streaming Embeddings from PubSub](#scrollTo=Streaming_Embeddings_Updates_from_PubSub)\n", + "- Process continuous data streams\n", + "- Update embeddings in real-time as information changes" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "0SWE68Nqywv-" + }, + "source": [ + "# Quick Start: Basic Vector Ingestion\n", + "\n", + "This section shows the simplest way to generate embeddings and store them in Cloud Spanner.\n", + "\n", + "## Create table with default schema\n", + "\n", + "Before running the pipeline, we need a table to store our embeddings:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "d6lHAldOy2qb" + }, + "outputs": [], + "source": [ + "table_name = \"default_product_embeddings\"\n", + "table_ddl = f\"\"\"\n", + "CREATE TABLE {table_name} (\n", + " id STRING(1024) NOT NULL,\n", + " embedding ARRAY(vector_length=>384),\n", + " content STRING(MAX),\n", + " metadata JSON\n", + ") PRIMARY KEY (id)\n", + "\"\"\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "lyYd5lyuyhJw" + }, + "outputs": [], + "source": [ + "client = get_spanner_client(PROJECT_ID)\n", + "ensure_database_exists(client, INSTANCE_ID, DATABASE_ID)\n", + "create_or_replace_table(client, INSTANCE_ID, DATABASE_ID, table_name, table_ddl)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "ptGVmEELzPZg" + }, + "source": [ + "## Configure Pipeline Components\n", + "\n", + "Now define the components that control the pipeline behavior:\n", + "\n", + "### Convert ingested product data to embeddable Chunks\n", + "- Our data is ingested as product dictionaries\n", + "- Embedding generation and ingestion processes `Chunks`\n", + "- We convert each product dictionary to a `Chunk` to configure what text to embed and what to treat as metadata" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "VW8FktqQyzYu" + }, + "outputs": [], + "source": [ + "from typing import Dict, Any\n", + "\n", + "def create_chunk(product: Dict[str, Any]) -> Chunk:\n", + " \"\"\"Convert a product dictionary into an embeddable object.\"\"\"\n", + " return Chunk(\n", + " content=Content(\n", + " text=f\"{product['name']}: {product['description']}\"\n", + " ),\n", + " id=product['id'],\n", + " metadata=product,\n", + " )\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "sJ8SDDw7zw_h" + }, + "source": [ + "### Generate embeddings with HuggingFace SentenceTransformer\n", + "\n", + "We use a local pre-trained Hugging Face model to create vector embeddings from the product descriptions." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "4QHRDrbOzng4" + }, + "outputs": [], + "source": [ + "huggingface_embedder = HuggingfaceTextEmbeddings(\n", + " model_name=\"sentence-transformers/all-MiniLM-L6-v2\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "9UPyExqtz-9C" + }, + "source": [ + "### Write to Cloud Spanner\n", + "\n", + "The default SpannerVectorWriterConfig maps Chunk fields to database columns as:\n", + "\n", + "| Database Column | Chunk Field | Description |\n", + "|----------------|-------------|-------------|\n", + "| id | chunk.id | Unique identifier |\n", + "| embedding | chunk.embedding.dense_embedding | Vector as ARRAY |\n", + "| content | chunk.content.text | Text that was embedded |\n", + "| metadata | chunk.metadata | Additional data as JSON |" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "wo1goRJNz7DM" + }, + "outputs": [], + "source": [ + "spanner_writer_config = SpannerVectorWriterConfig(\n", + " project_id=PROJECT_ID,\n", + " instance_id=INSTANCE_ID,\n", + " database_id=DATABASE_ID,\n", + " table_name=table_name\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "FIHAqaW40Zez" + }, + "source": [ + "## Assemble and Run Pipeline\n", + "\n", + "Now we can create our pipeline that:\n", + "1. Takes our product data\n", + "2. Converts each product to a Chunk\n", + "3. Generates embeddings for each Chunk\n", + "4. Stores everything in Cloud Spanner\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "raXTBuV60Vuk" + }, + "outputs": [], + "source": [ + "import tempfile\n", + "\n", + "# Executing on DirectRunner (local execution)\n", + "with beam.Pipeline() as p:\n", + " _ = (\n", + " p\n", + " | 'Create Products' >> beam.Create(PRODUCTS_DATA)\n", + " | 'Convert to Chunks' >> beam.Map(create_chunk)\n", + " | 'Generate Embeddings' >> MLTransform(write_artifact_location=tempfile.mkdtemp())\n", + " .with_transform(huggingface_embedder)\n", + " | 'Write to Spanner' >> VectorDatabaseWriteTransform(\n", + " spanner_writer_config\n", + " )\n", + " )" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "7yEhthZaGJGf" + }, + "source": [ + "## Verify Embeddings\n", + "Let's check what was written to our Cloud Spanner table:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "P5AbwAB30bga" + }, + "outputs": [], + "source": [ + "verify_embeddings_spanner(client,INSTANCE_ID, DATABASE_ID, table_name)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "uF5II-qaGVH-" + }, + "source": [ + "## Quick Start Summary\n", + "\n", + "In this section, you learned how to:\n", + "- Convert product data to the Chunk format expected by embedding pipelines\n", + "- Generate embeddings using a HuggingFace model\n", + "- Configure and run a basic embedding ingestion pipeline\n", + "- Store embeddings and metadata in Cloud Spanner\n", + "\n", + "This basic pattern forms the foundation for all the advanced use cases covered in the following sections." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "47Ggr8O3Ghk1" + }, + "source": [ + "# Quick Start: Run on Dataflow\n", + "\n", + "This section demonstrates how to launch the Quick Start embedding pipeline on Google Cloud Dataflow from the colab. While previous examples used DirectRunner for local execution, Dataflow provides a fully managed, distributed execution environment that is:\n", + "- Scalable: Automatically scales to handle large datasets\n", + "- Fault-tolerant: Handles worker failures and ensures exactly-once processing\n", + "- Fully managed: No need to provision or manage infrastructure\n", + "\n", + "For more in-depth documentation to package your pipeline into a python file and launch a DataFlow job from the command line see [Create Dataflow pipeline using Python](https://cloud.google.com/dataflow/docs/quickstarts/create-pipeline-python)." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "HvVMkiTRGpY6" + }, + "source": [ + "## Create the Cloud Spanner table with default schema\n", + "\n", + "Before running the pipeline, we need a table to store our embeddings:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "d7iJAhHPGN9I" + }, + "outputs": [], + "source": [ + "table_name = \"default_dataflow_product_embeddings\"\n", + "table_ddl = f\"\"\"\n", + "CREATE TABLE {table_name} (\n", + " id STRING(1024) NOT NULL,\n", + " embedding ARRAY(vector_length=>384),\n", + " content STRING(MAX),\n", + " metadata JSON\n", + ") PRIMARY KEY (id)\n", + "\"\"\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "CLz6VWsMGsuJ" + }, + "outputs": [], + "source": [ + "client = get_spanner_client(PROJECT_ID)\n", + "ensure_database_exists(client, INSTANCE_ID, DATABASE_ID)\n", + "create_or_replace_table(client, INSTANCE_ID, DATABASE_ID, table_name, table_ddl)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "gMf1mIYzGx60" + }, + "source": [ + "## Save our Pipeline to a python file\n", + "\n", + "To launch our pipeline job on DataFlow, we\n", + "1. Add command line arguments for passing pipeline options\n", + "2. Save our pipeline code to a local file `basic_ingestion_pipeline.py`" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "x-9VZEpgGttI" + }, + "outputs": [], + "source": [ + "file_content = \"\"\"\n", + "import apache_beam as beam\n", + "from apache_beam.options.pipeline_options import PipelineOptions\n", + "import argparse\n", + "import tempfile\n", + "\n", + "from apache_beam.ml.transforms.base import MLTransform\n", + "from apache_beam.ml.rag.types import Chunk, Content\n", + "from apache_beam.ml.rag.ingestion.base import VectorDatabaseWriteTransform\n", + "from apache_beam.ml.rag.ingestion.spanner import SpannerVectorWriterConfig\n", + "from apache_beam.ml.rag.embeddings.huggingface import HuggingfaceTextEmbeddings\n", + "from apache_beam.options.pipeline_options import SetupOptions\n", + "\n", + "PRODUCTS_DATA = [\n", + " {\n", + " \"id\": \"desk-001\",\n", + " \"name\": \"Modern Minimalist Desk\",\n", + " \"description\": \"Sleek minimalist desk with clean lines and a spacious work surface. \"\n", + " \"Features cable management system and sturdy steel frame. \"\n", + " \"Perfect for contemporary home offices and workspaces.\",\n", + " \"category\": \"Desks\",\n", + " \"price\": 399.99,\n", + " \"material\": \"Engineered Wood, Steel\",\n", + " \"dimensions\": \"60W x 30D x 29H inches\"\n", + " },\n", + " {\n", + " \"id\": \"chair-001\",\n", + " \"name\": \"Ergonomic Mesh Office Chair\",\n", + " \"description\": \"Premium ergonomic office chair with breathable mesh back, \"\n", + " \"adjustable lumbar support, and 4D armrests. Features synchronized \"\n", + " \"tilt mechanism and memory foam seat cushion. Ideal for long work hours.\",\n", + " \"category\": \"Office Chairs\",\n", + " \"price\": 299.99,\n", + " \"material\": \"Mesh, Metal, Premium Foam\",\n", + " \"dimensions\": \"26W x 26D x 48H inches\"\n", + " }\n", + "]\n", + "\n", + "def run(argv=None):\n", + " parser = argparse.ArgumentParser()\n", + " parser.add_argument('--instance_id', required=True, help='Spanner instance ID')\n", + " parser.add_argument('--database_id', required=True, help='Spanner database ID')\n", + " parser.add_argument('--table_name', required=True, help='Spanner table name')\n", + "\n", + " known_args, pipeline_args = parser.parse_known_args(argv)\n", + "\n", + " pipeline_options = PipelineOptions(pipeline_args)\n", + " project_id = pipeline_options.get_all_options()['project']\n", + "\n", + " with beam.Pipeline(options=pipeline_options) as p:\n", + " _ = (\n", + " p\n", + " | 'Create Products' >> beam.Create(PRODUCTS_DATA)\n", + " | 'Convert to Chunks' >> beam.Map(lambda product: Chunk(\n", + " content=Content(\n", + " text=f\"{product['name']}: {product['description']}\"\n", + " ),\n", + " id=product['id'],\n", + " metadata=product,\n", + " ))\n", + " | 'Generate Embeddings' >> MLTransform(write_artifact_location=tempfile.mkdtemp())\n", + " .with_transform(HuggingfaceTextEmbeddings(model_name=\"sentence-transformers/all-MiniLM-L6-v2\"))\n", + " | 'Write to Spanner' >> VectorDatabaseWriteTransform(\n", + " SpannerVectorWriterConfig(\n", + " project_id=project_id,\n", + " instance_id=known_args.instance_id,\n", + " database_id=known_args.database_id,\n", + " table_name=known_args.table_name\n", + " )\n", + " )\n", + " )\n", + "\n", + "if __name__ == '__main__':\n", + " run()\n", + "\"\"\"\n", + "\n", + "with open(\"basic_ingestion_pipeline.py\", \"w\") as f:\n", + " f.write(file_content)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "KpSQlMzGHZN1" + }, + "source": [ + "## Configure the Pipeline options\n", + "To run the pipeline on DataFlow we need\n", + "- A gcs bucket for staging DataFlow files. Replace ``: the name of a valid Google Cloud Storage bucket.\n", + "- Optionally set the Google Cloud region that you want to run Dataflow in. Replace `` with the desired location.\n", + "- Optionally provide `NETWORK` and `SUBNETWORK` for dataflow workers to run on.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "_dP6EMvAHC5o" + }, + "outputs": [], + "source": [ + "BUCKET_NAME = '' # @param {type:'string'}\n", + "REGION = '' # @param {type:'string'}\n", + "NETWORK = '' # @param {type:'string'}\n", + "SUBNETWORK = '' # @param {type:'string'}" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "_VCCyA8LH9M9" + }, + "source": [ + "## Provide additional Python dependencies to be installed on Worker VM's\n", + "\n", + "We are making use of the HuggingFace `sentence-transformers` package to generate embeddings. Since this package is not installed on Worker VM's by default, we create a requirements.txt file with the additional dependencies to be installed on worker VM's.\n", + "\n", + "See [Managing Python Pipeline Dependencies](https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/) for more details.\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "bSXTRm83H_Zo" + }, + "outputs": [], + "source": [ + "!echo \"sentence-transformers\" > ./requirements.txt\n", + "!cat ./requirements.txt" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "rDOfSZW1ICja" + }, + "source": [ + "## Run Pipeline on Dataflow\n", + "\n", + "We launch the pipeline via the command line, passing\n", + "- Cloud Spanner pipeline arguments defined in `basic_ingestion_pipeline.py`\n", + "- GCP Project ID\n", + "- Job Region\n", + "- The runner (DataflowRunner)\n", + "- Temp and Staging GCS locations for Pipeline artifacts\n", + "- Requirement file location for additional dependencies\n", + "- (Optional) The VPC network and Subnetwork that has access to the Cloud Spanner instance\n", + "\n", + "Once the job is launched, you can monitor its progress in the Google Cloud Console:\n", + "1. Go to https://console.cloud.google.com/dataflow/jobs\n", + "2. Select your project\n", + "3. Click on the job named \"spanner-dataflow-basic-embedding-ingest\"\n", + "4. View detailed execution graphs, logs, and metrics" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "Wk94NE8zHrEs" + }, + "outputs": [], + "source": [ + "command_parts = [\n", + " \"python ./basic_ingestion_pipeline.py\",\n", + " f\"--project={PROJECT_ID}\",\n", + " f\"--instance_id={INSTANCE_ID}\",\n", + " f\"--database_id={DATABASE_ID}\",\n", + " f\"--table_name={table_name}\",\n", + " f\"--job_name=spanner-dataflow-basic-embedding-ingest\",\n", + " f\"--region={REGION}\",\n", + " \"--runner=DataflowRunner\",\n", + " f\"--temp_location=gs://{BUCKET_NAME}/temp\",\n", + " f\"--staging_location=gs://{BUCKET_NAME}/staging\",\n", + " \"--disk_size_gb=50\",\n", + " \"--requirements_file=requirements.txt\"\n", + "]\n", + "\n", + "if NETWORK:\n", + " command_parts.append(f\"--network={NETWORK}\")\n", + "\n", + "if SUBNETWORK:\n", + " command_parts.append(f\"--subnetwork=regions/{REGION}/subnetworks/{SUBNETWORK}\")\n", + "\n", + "final_command = \" \".join(command_parts)\n", + "import logging\n", + "logging.getLogger().setLevel(logging.INFO)\n", + "print(\"Generated command:\\n\", final_command)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "a8tc5DX5HupO" + }, + "outputs": [], + "source": [ + "# Launch pipeline with generated command\n", + "!{final_command}" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "i-UnbYAMJRd0" + }, + "source": [ + "## Verify the Written Embeddings\n", + "\n", + "Once the dataflow job is complete we check what was written to our Cloud Spanner table:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "jKLcmeBkIngu" + }, + "outputs": [], + "source": [ + "verify_embeddings_spanner(client,INSTANCE_ID, DATABASE_ID, table_name)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "oIntMACQJm7p" + }, + "source": [ + "# Advanced Use Cases\n", + "\n", + "This section demonstrates more complex scenarios for using Spanner with Apache Beam for vector embeddings.\n", + "\n", + "🎯 **Have a specific schema?**\n", + "- [Go to Custom Schema](#scrollTo=Custom_Schema_with_Column_Mapping)\n", + "- Learn to use different column names and transform values\n", + "- Map metadata to individual columns\n", + "\n", + "🔄 **Need to update embeddings?**\n", + "- [Check out Updating Embeddings](#scrollTo=Update_Embeddings_and_Metadata_with_Write_Mode)\n", + "- Handle conflicts\n", + "- Selective field updates\n", + "\n", + "🔗 **Need to generate and Store Embeddings for Existing Cloud Spanner Data??**\n", + "- [See Database Integration](#scrollTo=Adding_Embeddings_to_Existing_Database_Records)\n", + "- Read data from your Cloud Spanner table.\n", + "- Generate embeddings for the relevant fields.\n", + "- Update your table (or a related table) with the generated embeddings.\n", + "\n", + "🤖 **Want to use Google's AI models?**\n", + "- [Try Vertex AI Embeddings](#scrollTo=Generate_Embeddings_with_VertexAI_Text_Embeddings)\n", + "- Use Google's powerful embedding models\n", + "- Seamlessly integrate with other Google Cloud services\n", + "\n", + "🔄 Need real-time embedding updates?\n", + "\n", + "- [Try Streaming Embeddings from PubSub](#scrollTo=Streaming_Embeddings_Updates_from_PubSub)\n", + "- Process continuous data streams\n", + "- Update embeddings in real-time as information changes\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "ck2gshzxJ0dd" + }, + "source": [ + "## Custom Schema with Column Mapping\n", + "\n", + "In this example, we'll create a custom schema that:\n", + "- Uses different column names\n", + "- Maps metadata to individual columns\n", + "- Uses functions to transform values\n", + "\n", + "### ColumnSpec and SpannerColumnSpecsBuilder\n", + "\n", + "\n", + "ColumnSpec specifies how to map data to a database column. For example:\n", + "```python\n", + "from apache_beam.ml.rag.ingestion.spanner import ColumnSpec\n", + "\n", + "ColumnSpec(\n", + " column_name=\"price\", # Database column\n", + " python_type=float, # Python Type for the value\n", + " value_fn=lambda c: c.metadata['price'], # Extract price from Chunk\n", + ")\n", + "```\n", + "In this example `value_fn` extracts price from metadata, `python_type` indicates that the extracted value is of type float, `column_name` inserts it into the Spanner column price.\n", + "\n", + "`SpannerColumnSpecBuilder` offers a fluent api for adding column specs:\n", + "```python\n", + "specs = (\n", + " SpannerColumnSpecsBuilder()\n", + " .with_id_spec() # Default id spec map Chunk.id to Spanner column \"id\" as a string\n", + " .with_embedding_spec() # Default embedding spec maps Chunk.embedding.dense_embedding to Spanner column \"embedding\" of type list\n", + " .with_content_spec() # Default content spec maps Chunk.content.text to Spanner column \"content\"\n", + " .add_metadata_field(field=\"source\", python_type=str) # Extracts the \"source\" field from Chunk.metadata and inserts into Spanner column \"source\" as string type.\n", + " .with_metadata_spec() # Default metadata spec inserts entire Chunk.metadata to spanner as JSON.\n", + " .build()\n", + ")\n", + "\n", + "```\n", + "\n", + "### Create Custom Schema Table" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "KLkWgmlJJdPB" + }, + "outputs": [], + "source": [ + "table_name = \"custom_product_embeddings\"\n", + "table_ddl = f\"\"\"\n", + "CREATE TABLE {table_name} (\n", + " product_id STRING(1024) NOT NULL,\n", + " vector_embedding ARRAY(vector_length=>384),\n", + " product_name STRING(MAX),\n", + " description STRING(MAX),\n", + " price FLOAT64,\n", + " category STRING(MAX),\n", + " display_text STRING(MAX),\n", + " model_name STRING(MAX),\n", + " created_at TIMESTAMP\n", + ") PRIMARY KEY (product_id)\n", + "\"\"\"\n", + "client = get_spanner_client(PROJECT_ID)\n", + "ensure_database_exists(client, INSTANCE_ID, DATABASE_ID)\n", + "create_or_replace_table(client, INSTANCE_ID, DATABASE_ID, table_name, table_ddl)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "UOsnbsdBodQm" + }, + "source": [ + "### Configure Column Specs\n", + "\n", + "We extract fields from our `Chunk` and map them to our database schema." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "4wqVkr8oJ4rm" + }, + "outputs": [], + "source": [ + "from datetime import datetime\n", + "\n", + "column_specs = (\n", + " SpannerColumnSpecsBuilder()\n", + " .with_id_spec(column_name='product_id')\n", + " .with_embedding_spec(column_name='vector_embedding')\n", + " .with_content_spec(column_name='description')\n", + " .add_metadata_field('name', str, column_name='product_name')\n", + " .add_metadata_field('price', float, column_name='price')\n", + " .add_metadata_field('category', str, column_name='category')\n", + " .add_column(\n", + " column_name='display_text',\n", + " python_type=str,\n", + " value_fn=lambda chunk: f\"{chunk.metadata['name']} - ${chunk.metadata['price']:.2f}\"\n", + " )\n", + " .add_column(\n", + " column_name='model_name',\n", + " python_type=str,\n", + " value_fn=lambda _: \"all-MiniLM-L6-v2\"\n", + " )\n", + " .add_column(\n", + " column_name='created_at',\n", + " python_type=str,\n", + " value_fn=lambda _: datetime.now().isoformat()+'Z'\n", + " )\n", + " .build()\n", + ")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "cJ-N8LWFounI" + }, + "source": [ + "### Run Pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "xn7ncf2ios0Y" + }, + "outputs": [], + "source": [ + "import tempfile\n", + "\n", + "# Executing on DirectRunner (local execution)\n", + "with beam.Pipeline() as p:\n", + " _ = (\n", + " p\n", + " | 'Create Products' >> beam.Create(PRODUCTS_DATA)\n", + " | 'Convert to Chunks' >> beam.Map(lambda product_dict: Chunk(Content(text=f\"{product_dict['name']}: {product_dict['description']}\"), id=product_dict[\"id\"], metadata=product_dict))\n", + " | 'Generate Embeddings' >> MLTransform(write_artifact_location=tempfile.mkdtemp())\n", + " .with_transform(HuggingfaceTextEmbeddings(model_name=\"sentence-transformers/all-MiniLM-L6-v2\"))\n", + " | 'Write to Spanner' >> VectorDatabaseWriteTransform(\n", + " SpannerVectorWriterConfig(\n", + " project_id=PROJECT_ID,\n", + " instance_id=INSTANCE_ID,\n", + " database_id=DATABASE_ID,\n", + " table_name=table_name,\n", + " column_specs=column_specs\n", + " )\n", + " )\n", + " )" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "2Pq2vgiGp-pC" + }, + "source": [ + "## Verify Embeddings\n", + "Let's check what was written to our Cloud Spanner table:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "jdvJFgkmpla1" + }, + "outputs": [], + "source": [ + "verify_embeddings_spanner(client,INSTANCE_ID, DATABASE_ID, table_name)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "6rvFPMoEqSln" + }, + "source": [ + "# Update Embeddings and Metadata with Write Mode \n", + "\n", + "This section demonstrates how to handle periodic updates to product descriptions and their embeddings using the default schema. We'll show how embeddings and metadata get updated when product descriptions change.\n", + "\n", + "Spanner supports different write modes for handling updates:\n", + "- `INSERT`: Fail if row exists\n", + "- `UPDATE`: Fail if row doesn't exist \n", + "- `REPLACE`: Delete then insert\n", + "- `INSERT_OR_UPDATE`: Insert or update if exists (default)\n", + "Any of these can be selected via the `write_mode` `SpannerVectorWriterConfig` argument\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "pRMy4s5SqWKC" + }, + "source": [ + "### Create table with desired schema\n", + "\n", + "Let's use the same default schema as in Quick Start:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "34CNLo9iqhtH" + }, + "outputs": [], + "source": [ + "table_name = \"mutable_product_embeddings\"\n", + "table_ddl = f\"\"\"\n", + "CREATE TABLE {table_name} (\n", + " id STRING(1024) NOT NULL,\n", + " embedding ARRAY(vector_length=>384),\n", + " content STRING(MAX),\n", + " metadata JSON,\n", + " created_at TIMESTAMP,\n", + " last_updated TIMESTAMP\n", + ") PRIMARY KEY (id)\n", + "\"\"\"\n", + "client = get_spanner_client(PROJECT_ID)\n", + "ensure_database_exists(client, INSTANCE_ID, DATABASE_ID)\n", + "create_or_replace_table(client, INSTANCE_ID, DATABASE_ID, table_name, table_ddl)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "RuZnsrC6qnzH" + }, + "source": [ + "### Sample Data: Day 1 vs Day 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "_wfsapt9p_Zr" + }, + "outputs": [], + "source": [ + "PRODUCTS_DATA_DAY1 = [\n", + " {\n", + " \"id\": \"desk-001\",\n", + " \"name\": \"Modern Minimalist Desk\",\n", + " \"description\": \"Sleek minimalist desk with clean lines and a spacious work surface. \"\n", + " \"Features cable management system and sturdy steel frame.\",\n", + " \"category\": \"Desks\",\n", + " \"price\": 399.99,\n", + " \"update_timestamp\": \"2024-02-18\"\n", + " }\n", + "]\n", + "\n", + "PRODUCTS_DATA_DAY2 = [\n", + " {\n", + " \"id\": \"desk-001\", # Same ID as Day 1\n", + " \"name\": \"Modern Minimalist Desk\",\n", + " \"description\": \"Updated: Sleek minimalist desk with built-in wireless charging. \"\n", + " \"Features cable management system, sturdy steel frame, and Qi charging pad. \"\n", + " \"Perfect for modern tech-enabled workspaces.\",\n", + " \"category\": \"Smart Desks\", # Category changed\n", + " \"price\": 449.99, # Price increased\n", + " \"update_timestamp\": \"2024-02-19\"\n", + " }\n", + "]" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "8sArrr9kqujJ" + }, + "source": [ + "### Configure Pipeline Components\n", + "#### Writer with `write_mode` specified" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "nh4siHpPqplb" + }, + "outputs": [], + "source": [ + "# Day 1 data\n", + "config_day1 = SpannerVectorWriterConfig(\n", + " project_id=PROJECT_ID,\n", + " instance_id=INSTANCE_ID,\n", + " database_id=DATABASE_ID,\n", + " table_name=table_name,\n", + " write_mode='INSERT',\n", + " column_specs=SpannerColumnSpecsBuilder()\n", + " .with_defaults()\n", + " .add_column(\n", + " column_name='created_at',\n", + " python_type=str,\n", + " value_fn=lambda _: datetime.now().isoformat()+'Z'\n", + " )\n", + " .add_column(\n", + " column_name='last_updated',\n", + " python_type=str,\n", + " value_fn=lambda _: datetime.now().isoformat()+'Z'\n", + " ).build()\n", + ")\n", + "\n", + "# Day 2 update\n", + "config_day2 = SpannerVectorWriterConfig(\n", + " project_id=PROJECT_ID,\n", + " instance_id=INSTANCE_ID,\n", + " database_id=DATABASE_ID,\n", + " table_name=table_name,\n", + " write_mode='UPDATE', # 'UPDATE' to fail if doesn't exist\n", + " column_specs=SpannerColumnSpecsBuilder()\n", + " .with_defaults()\n", + " .add_column(\n", + " column_name='last_updated',\n", + " python_type=str,\n", + " value_fn=lambda _: datetime.now().isoformat()+'Z'\n", + " ).build()\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "GngBtsnXrpaz" + }, + "source": [ + "Run Day 1 Pipeline\n", + "\n", + "First, let's ingest our initial product data:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "CdUVFmZxrZzG" + }, + "outputs": [], + "source": [ + "import tempfile\n", + "\n", + "# Executing on DirectRunner (local execution)\n", + "with beam.Pipeline() as p:\n", + " _ = (\n", + " p\n", + " | 'Create Products' >> beam.Create(PRODUCTS_DATA_DAY1)\n", + " | 'Convert to Chunks' >> beam.Map(lambda product_dict: Chunk(Content(text=f\"{product_dict['name']}: {product_dict['description']}\"), id=product_dict[\"id\"], metadata=product_dict))\n", + " | 'Generate Embeddings' >> MLTransform(write_artifact_location=tempfile.mkdtemp())\n", + " .with_transform(HuggingfaceTextEmbeddings(model_name=\"sentence-transformers/all-MiniLM-L6-v2\"))\n", + " | 'Write to Spanner' >> VectorDatabaseWriteTransform(\n", + " config_day1\n", + " )\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "X0wqqiwxrliM" + }, + "outputs": [], + "source": [ + "print(\"\\nAfter Day 1 ingestion:\")\n", + "verify_embeddings_spanner(client,INSTANCE_ID, DATABASE_ID, table_name)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "L4Rirduyr6n1" + }, + "source": [ + "### Run Day 2 Pipeline\n", + "\n", + "Now let's process our updated product data:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "XB8FPpCDr3Ss" + }, + "outputs": [], + "source": [ + "import tempfile\n", + "\n", + "# Executing on DirectRunner (local execution)\n", + "with beam.Pipeline() as p:\n", + " _ = (\n", + " p\n", + " | 'Create Products' >> beam.Create(PRODUCTS_DATA_DAY2)\n", + " | 'Convert to Chunks' >> beam.Map(lambda product_dict: Chunk(Content(text=f\"{product_dict['name']}: {product_dict['description']}\"), id=product_dict[\"id\"], metadata=product_dict))\n", + " | 'Generate Embeddings' >> MLTransform(write_artifact_location=tempfile.mkdtemp())\n", + " .with_transform(HuggingfaceTextEmbeddings(model_name=\"sentence-transformers/all-MiniLM-L6-v2\"))\n", + " | 'Write to Spanner' >> VectorDatabaseWriteTransform(\n", + " config_day2\n", + " )\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "NAnqICTlr-Hd" + }, + "outputs": [], + "source": [ + "print(\"\\nAfter Day 2 ingestion:\")\n", + "verify_embeddings_spanner(client,INSTANCE_ID, DATABASE_ID, table_name)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "lj-MATEfsFgZ" + }, + "source": [ + "### What Changed?\n", + "\n", + "Key points to notice:\n", + "\n", + "1. The embedding vector changed because the product description was updated\n", + "2. The metadata JSON field contains the updated category, price, and timestamp\n", + "3. The content field reflects the new description\n", + "4. The original ID remained the same\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "R9THY7pysvWT" + }, + "source": [ + "## Adding Embeddings to Existing Database Records \n", + "\n", + "This section demonstrates how to:\n", + "1. Read existing product data from a database\n", + "2. Generate embeddings for that data\n", + "3. Write the embeddings back to the database" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "wkZnrxQRsCc9" + }, + "outputs": [], + "source": [ + "table_name = \"existing_product_embeddings\"\n", + "table_ddl = f\"\"\"\n", + "CREATE TABLE {table_name} (\n", + " id STRING(1024) NOT NULL,\n", + " embedding ARRAY(vector_length=>384),\n", + " content STRING(MAX),\n", + " description STRING(MAX),\n", + " created_at TIMESTAMP,\n", + " last_updated TIMESTAMP\n", + ") PRIMARY KEY (id)\n", + "\"\"\"\n", + "client = get_spanner_client(PROJECT_ID)\n", + "ensure_database_exists(client, INSTANCE_ID, DATABASE_ID)\n", + "create_or_replace_table(client, INSTANCE_ID, DATABASE_ID, table_name, table_ddl)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "Ksb6nKzUtO0M" + }, + "source": [ + "Lets first ingest some unembedded data into our table.\n", + "\n", + "Note this just reuses SpannerVectorWriter to easily ingest unembeded data." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "2mZMoiIhtNIq" + }, + "outputs": [], + "source": [ + "import tempfile\n", + "\n", + "data = PRODUCTS_DATA.copy()\n", + "\n", + "# Executing on DirectRunner (local execution)\n", + "with beam.Pipeline() as p:\n", + " _ = (\n", + " p\n", + " | 'Create Products' >> beam.Create(PRODUCTS_DATA)\n", + " | 'Convert to Chunks' >> beam.Map(lambda product_dict: Chunk(Content(text=f\"{product_dict['name']}: {product_dict['description']}\"), id=product_dict[\"id\"], metadata=product_dict))\n", + " | 'Write to Spanner' >> VectorDatabaseWriteTransform(\n", + " SpannerVectorWriterConfig(\n", + " PROJECT_ID,\n", + " INSTANCE_ID,\n", + " DATABASE_ID,\n", + " table_name,\n", + " column_specs=(\n", + " SpannerColumnSpecsBuilder()\n", + " .with_id_spec()\n", + " .with_content_spec()\n", + " .add_metadata_field(\"description\", str)\n", + " .add_column(\n", + " column_name='created_at',\n", + " python_type=str,\n", + " value_fn=lambda _: datetime.now().isoformat()+'Z'\n", + " )\n", + " .add_column(\n", + " column_name='last_updated',\n", + " python_type=str,\n", + " value_fn=lambda _: datetime.now().isoformat()+'Z'\n", + " ).build())\n", + " )\n", + " )\n", + " )" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "gdEkUNZKvlVI" + }, + "source": [ + "Lets look at the current state of our table. Notice there are no embeddings (Column 1)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "psFJUoh9s5OY" + }, + "outputs": [], + "source": [ + "verify_embeddings_spanner(client,INSTANCE_ID, DATABASE_ID, table_name)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "4dqUri8rvjK8" + }, + "outputs": [], + "source": [ + "from apache_beam.io.gcp import spanner\n", + "\n", + "from typing import NamedTuple\n", + "from apache_beam import coders\n", + "\n", + "class SpannerRow(NamedTuple):\n", + " id: str\n", + " content: str\n", + "\n", + "def spanner_row_to_chunk(spanner_row):\n", + " return Chunk(\n", + " content= Content(spanner_row.content),\n", + " id=spanner_row.id\n", + " )\n", + "\n", + "coders.registry.register_coder(SpannerRow, coders.RowCoder)\n", + "\n", + "with beam.Pipeline() as p:\n", + " _ = (\n", + " p\n", + " | \"Read Unembedded data\" >> spanner.ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DATABASE_ID, row_type=SpannerRow, sql=f\"select id, content from {table_name}\")\n", + " | \"Spanner Row to Chunk\" >> beam.Map(spanner_row_to_chunk)\n", + " | \"Generate Embeddings\" >> MLTransform(write_artifact_location=tempfile.mkdtemp())\n", + " .with_transform(HuggingfaceTextEmbeddings(model_name=\"sentence-transformers/all-MiniLM-L6-v2\"))\n", + " | \"Update Spanner with embeddings\" >> VectorDatabaseWriteTransform(\n", + " SpannerVectorWriterConfig(\n", + " PROJECT_ID,\n", + " INSTANCE_ID,\n", + " DATABASE_ID,\n", + " table_name,\n", + " column_specs=SpannerColumnSpecsBuilder().with_id_spec().with_embedding_spec().build()\n", + " )\n", + " )\n", + " )\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "cnOsSQ6q96XB" + }, + "source": [ + "Now we confirm that our Spanner table was updated with embeddings" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "wgZxLpyQ7lSc" + }, + "outputs": [], + "source": [ + "verify_embeddings_spanner(client,INSTANCE_ID, DATABASE_ID, table_name)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "hs08aIY--E1I" + }, + "source": [ + "What Happened?\n", + "1. We started with a table containing product data but no embeddings\n", + "2. Read the id and content from existing records using ReadFromSpanner\n", + "3. Converted Spanner rows to Chunks, using the spanner id column as our Chunk id, and Spanner content column as our Chunk content to be embedded\n", + "4. Generated embeddings using our model\n", + "5. Wrote back to the same table, updating only the embedding field,\n", + "preserving all other fields (price, etc.)\n", + "\n", + "This pattern is useful when:\n", + "\n", + "- You have an existing product database\n", + "- You want to add embeddings without disrupting current data\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "O-lIoBUZ-qIa" + }, + "source": [ + "## Generate Embeddings with VertexAI Text Embeddings\n", + "\n", + "This section demonstrates how to use use the Vertex AI text-embeddings API to generate text embeddings that use Googles large generative artificial intelligence (AI) models.\n", + "\n", + "Vertex AI models are subject to [Rate Limits and Quotas](https://cloud.google.com/vertex-ai/generative-ai/docs/quotas#view-the-quotas-by-region-and-by-model) and Dataflow automatically retries throttled requests with exponential backoff.\n", + "\n", + "\n", + "For more information, see [Get text embeddings](https://cloud.google.com/vertex-ai/docs/generative-ai/embeddings/get-text-embeddings) in the Vertex AI documentation.\n", + "\n", + "### Authenticate with Google Cloud\n", + "To use the Vertex AI API, we authenticate with Google Cloud.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "jzJvOJ5V8Ccd" + }, + "outputs": [], + "source": [ + "import sys\n", + "if 'google.colab' in sys.modules:\n", + " from google.colab import auth\n", + " auth.authenticate_user(project_id=PROJECT_ID)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "iyW6Qiaj-5H1" + }, + "outputs": [], + "source": [ + "table_name = \"vertex_product_embeddings\"\n", + "table_ddl = f\"\"\"\n", + "CREATE TABLE {table_name} (\n", + " id STRING(1024) NOT NULL,\n", + " embedding ARRAY(vector_length=>768),\n", + " content STRING(MAX),\n", + " metadata JSON\n", + ") PRIMARY KEY (id)\n", + "\"\"\"\n", + "client = get_spanner_client(PROJECT_ID)\n", + "ensure_database_exists(client, INSTANCE_ID, DATABASE_ID)\n", + "create_or_replace_table(client, INSTANCE_ID, DATABASE_ID, table_name, table_ddl)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "Zx0QoZMx_V6h" + }, + "source": [ + "### Configure Embedding Handler\n", + "\n", + "Import the `VertexAITextEmbeddings` handler, and specify the desired `textembedding-gecko` model." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "YekQCo8k_EwH" + }, + "outputs": [], + "source": [ + "from apache_beam.ml.rag.embeddings.vertex_ai import VertexAITextEmbeddings\n", + "\n", + "vertexai_embedder = VertexAITextEmbeddings(model_name=\"text-embedding-005\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "TIYPpGmz_Xmq" + }, + "outputs": [], + "source": [ + "import tempfile\n", + "\n", + "# Executing on DirectRunner (local execution)\n", + "with beam.Pipeline() as p:\n", + " _ = (\n", + " p\n", + " | 'Create Products' >> beam.Create(PRODUCTS_DATA)\n", + " | 'Convert to Chunks' >> beam.Map(create_chunk)\n", + " | 'Generate Embeddings' >> MLTransform(write_artifact_location=tempfile.mkdtemp())\n", + " .with_transform(vertexai_embedder)\n", + " | 'Write to Spanner' >> VectorDatabaseWriteTransform(\n", + " SpannerVectorWriterConfig(\n", + " project_id=PROJECT_ID,\n", + " instance_id=INSTANCE_ID,\n", + " database_id=DATABASE_ID,\n", + " table_name=table_name\n", + " )\n", + " )\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "B73oZ0GH_iWm" + }, + "outputs": [], + "source": [ + "verify_embeddings_spanner(client,INSTANCE_ID, DATABASE_ID, table_name)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "rBgQx9iEJpBc" + }, + "source": [ + "## Streaming Embeddings Updates from PubSub\n", + "\n", + "This section demonstrates how to build a real-time embedding pipeline that continuously processes product updates and maintains fresh embeddings in Spanner. This approach is ideal data that changes frequently.\n", + "\n", + "### Authenticate with Google Cloud\n", + "To use the PubSub, we authenticate with Google Cloud.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "N8GdsWyqJe6r" + }, + "outputs": [], + "source": [ + "import sys\n", + "if 'google.colab' in sys.modules:\n", + " from google.colab import auth\n", + " auth.authenticate_user(project_id=PROJECT_ID)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "NKtWAjMbJ0AU" + }, + "source": [ + "### Setting Up PubSub Resources\n", + "\n", + "First, let's set up the necessary PubSub topics and subscriptions:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "CFdv3oVEJxwB" + }, + "outputs": [], + "source": [ + "from google.cloud import pubsub_v1\n", + "from google.api_core.exceptions import AlreadyExists\n", + "import json\n", + "\n", + "# Define pubsub topic\n", + "TOPIC = \"\" # @param {type:'string'}\n", + "\n", + "# Create publisher client and topic\n", + "publisher = pubsub_v1.PublisherClient()\n", + "topic_path = publisher.topic_path(PROJECT_ID, TOPIC)\n", + "try:\n", + " topic = publisher.create_topic(request={\"name\": topic_path})\n", + " print(f\"Created topic: {topic.name}\")\n", + "except AlreadyExists:\n", + " print(f\"Topic {topic_path} already exists.\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "2wJUqXDSJ62y" + }, + "source": [ + "### Create Spanner Table for Streaming Updates\n", + "\n", + "Next, create a table to store the embedded data." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "Lc1mmxn_J8lm" + }, + "outputs": [], + "source": [ + "table_name = \"streaming_product_embeddings\"\n", + "table_ddl = f\"\"\"\n", + "CREATE TABLE {table_name} (\n", + " id STRING(1024) NOT NULL,\n", + " embedding ARRAY(vector_length=>384),\n", + " content STRING(MAX),\n", + " metadata JSON\n", + ") PRIMARY KEY (id)\n", + "\"\"\"\n", + "client = get_spanner_client(PROJECT_ID)\n", + "ensure_database_exists(client, INSTANCE_ID, DATABASE_ID)\n", + "create_or_replace_table(client, INSTANCE_ID, DATABASE_ID, table_name, table_ddl)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "LSb9CZY_KMIa" + }, + "source": [ + "### Configure the Pipeline options\n", + "To run the pipeline on DataFlow we need\n", + "- A gcs bucket for staging DataFlow files. Replace ``: the name of a valid Google Cloud Storage bucket. Don't include a gs:// prefix or trailing slashes\n", + "- Optionally set the Google Cloud region that you want to run Dataflow in. Replace `` with the desired location\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "OTU6fIkPKG8e" + }, + "outputs": [], + "source": [ + "from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, SetupOptions, GoogleCloudOptions, WorkerOptions\n", + "\n", + "options = PipelineOptions()\n", + "options.view_as(StandardOptions).streaming = True\n", + "\n", + "# Provide required pipeline options for the Dataflow Runner.\n", + "options.view_as(StandardOptions).runner = \"DataflowRunner\"\n", + "\n", + "# Set the Google Cloud region that you want to run Dataflow in.\n", + "REGION = '' # @param {type:'string'}\n", + "options.view_as(GoogleCloudOptions).region = REGION\n", + "\n", + "NETWORK = '' # @param {type:'string'}\n", + "if NETWORK:\n", + " options.view_as(WorkerOptions).network = NETWORK\n", + "\n", + "SUBNETWORK = '' # @param {type:'string'}\n", + "if SUBNETWORK:\n", + " options.view_as(WorkerOptions).subnetwork = f\"regions/{REGION}/subnetworks/{SUBNETWORK}\"\n", + "\n", + "options.view_as(GoogleCloudOptions).project = PROJECT_ID\n", + "\n", + "BUCKET_NAME = '' # @param {type:'string'}\n", + "dataflow_gcs_location = \"gs://%s/dataflow\" % BUCKET_NAME\n", + "\n", + "# The Dataflow staging location. This location is used to stage the Dataflow pipeline and the SDK binary.\n", + "options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location\n", + "\n", + "# The Dataflow temp location. This location is used to store temporary files or intermediate results before outputting to the sink.\n", + "options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location\n", + "\n", + "import random\n", + "options.view_as(GoogleCloudOptions).job_name = f\"spanner-streaming-embedding-ingest{random.randint(0,1000)}\"\n", + "\n", + "# options.view_as(SetupOptions).save_main_session = True\n", + "options.view_as(SetupOptions).requirements_file = \"./requirements.txt\"\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "LnzIGrBmKbLr" + }, + "outputs": [], + "source": [ + "!echo \"sentence-transformers\" > ./requirements.txt\n", + "!cat ./requirements.txt\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "O8gMMTT1KgHr" + }, + "source": [ + "### Provide additional Python dependencies to be installed on Worker VM's\n", + "\n", + "We are making use of the HuggingFace `sentence-transformers` package to generate embeddings. Since this package is not installed on Worker VM's by default, we create a requirements.txt file with the additional dependencies to be installed on worker VM's.\n", + "\n", + "See [Managing Python Pipeline Dependencies](https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/) for more details.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "KwhEaKR_KicY" + }, + "source": [ + "### Configure and Run Pipeline\n", + "\n", + "Our pipeline contains these key components:\n", + "\n", + "1. **Source**: Continuously reads messages from PubSub\n", + "3. **Transformation**: Converts JSON messages to Chunk objects for embedding\n", + "4. **ML Processing**: Generates embeddings using HuggingFace models\n", + "5. **Sink**: Writes results to Spanner (INSERT_OR_UPDATE)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "tV_e_v2zKfGK" + }, + "outputs": [], + "source": [ + "def parse_message(message):\n", + " #Parse a message containing product data.\n", + " product_json = json.loads(message.decode('utf-8'))\n", + " return Chunk(\n", + " content=Content(\n", + " text=f\"{product_json.get('name', '')}: {product_json.get('description', '')}\"\n", + " ),\n", + " id=product_json.get('id', ''),\n", + " metadata=product_json\n", + " )\n", + "\n", + "pipeline = beam.Pipeline(options=options)\n", + "# Streaming pipeline\n", + "_ = (\n", + " pipeline\n", + " | \"Read from PubSub\" >> beam.io.ReadFromPubSub(\n", + " topic=f\"projects/{PROJECT_ID}/topics/{TOPIC}\"\n", + " )\n", + " | \"Parse Messages\" >> beam.Map(parse_message)\n", + " | \"Generate Embeddings\" >> MLTransform(write_artifact_location=tempfile.mkdtemp())\n", + " .with_transform(HuggingfaceTextEmbeddings(model_name=\"sentence-transformers/all-MiniLM-L6-v2\"))\n", + " | \"Write to Spanner\" >> VectorDatabaseWriteTransform(\n", + " SpannerVectorWriterConfig(\n", + " PROJECT_ID,\n", + " INSTANCE_ID,\n", + " DATABASE_ID,\n", + " table_name\n", + " )\n", + " )\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "We4dWPJoNIh0" + }, + "source": [ + "### Create Publisher Subprocess\n", + "The publisher simulates real-time product updates by:\n", + "- Publishing sample product data to the PubSub topic every 5 seconds\n", + "- Modifying prices and descriptions to represent changes\n", + "- Adding timestamps to track update times\n", + "- Running for 25 minutes in the background while our pipeline processes the data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "X_gyk-8RNKaw" + }, + "outputs": [], + "source": [ + "#@title Define PubSub publisher function\n", + "import threading\n", + "import time\n", + "import json\n", + "import logging\n", + "from google.cloud import pubsub_v1\n", + "import datetime\n", + "import os\n", + "import sys\n", + "log_file = os.path.join(os.getcwd(), \"publisher_log.txt\")\n", + "\n", + "print(f\"Log file will be created at: {log_file}\")\n", + "\n", + "def publisher_function(project_id, topic):\n", + " \"\"\"Function that publishes sample product updates to a PubSub topic.\n", + "\n", + " This function runs in a separate thread and continuously publishes\n", + " messages to simulate real-time product updates.\n", + " \"\"\"\n", + " time.sleep(300)\n", + " thread_id = threading.current_thread().ident\n", + "\n", + " process_log_file = os.path.join(os.getcwd(), f\"publisher_{thread_id}.log\")\n", + "\n", + " file_handler = logging.FileHandler(process_log_file)\n", + " file_handler.setFormatter(logging.Formatter('%(asctime)s - ThreadID:%(thread)d - %(levelname)s - %(message)s'))\n", + "\n", + " logger = logging.getLogger(f\"worker.{thread_id}\")\n", + " logger.setLevel(logging.INFO)\n", + " logger.addHandler(file_handler)\n", + "\n", + " logger.info(f\"Publisher thread started with ID: {thread_id}\")\n", + " file_handler.flush()\n", + "\n", + " publisher = pubsub_v1.PublisherClient()\n", + " topic_path = publisher.topic_path(project_id, topic)\n", + "\n", + " logger.info(\"Starting to publish messages...\")\n", + " file_handler.flush()\n", + " for i in range(300):\n", + " message_index = i % len(PRODUCTS_DATA)\n", + " message = PRODUCTS_DATA[message_index].copy()\n", + "\n", + "\n", + " dynamic_factor = 1.05 + (0.1 * ((i % 20) / 20))\n", + " message[\"price\"] = round(message[\"price\"] * dynamic_factor, 2)\n", + " message[\"description\"] = f\"PRICE UPDATE (factor: {dynamic_factor:.3f}): \" + message[\"description\"]\n", + "\n", + " message[\"published_at\"] = datetime.datetime.now().isoformat()\n", + "\n", + " data = json.dumps(message).encode('utf-8')\n", + " publish_future = publisher.publish(topic_path, data)\n", + "\n", + " try:\n", + " logger.info(f\"Publishing message {message}\")\n", + " file_handler.flush()\n", + " message_id = publish_future.result()\n", + " logger.info(f\"Published message {i+1}: {message['id']} (Message ID: {message_id})\")\n", + " file_handler.flush()\n", + " except Exception as e:\n", + " logger.error(f\"Error publishing message: {e}\")\n", + " file_handler.flush()\n", + "\n", + " time.sleep(5)\n", + "\n", + " logger.info(\"Finished publishing all messages.\")\n", + " file_handler.flush()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "gAH8VRONNNsf" + }, + "source": [ + "#### Start publishing to PubSub in background" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "QJCnWbpuMtNM" + }, + "outputs": [], + "source": [ + "# Launch publisher in a separate thread\n", + "print(\"Starting publisher thread in 5 minutes...\")\n", + "publisher_thread = threading.Thread(\n", + " target=publisher_function,\n", + " args=(PROJECT_ID, TOPIC),\n", + " daemon=True\n", + ")\n", + "publisher_thread.start()\n", + "print(f\"Publisher thread started with ID: {publisher_thread.ident}\")\n", + "print(f\"Publisher thread logging to file: publisher_{publisher_thread.ident}.log\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "rkIxfZrkNRiO" + }, + "source": [ + "### Run Pipeline on Dataflow\n", + "\n", + "We launch the pipeline to run remotely on Dataflow. Once the job is launched, you can monitor its progress in the Google Cloud Console:\n", + "1. Go to https://console.cloud.google.com/dataflow/jobs\n", + "2. Select your project\n", + "3. Click on the job named \"spanner-streaming-embedding-ingest\"\n", + "4. View detailed execution graphs, logs, and metrics\n", + "\n", + "**Note**: This streaming pipeline runs indefinitely until manually stopped. Be sure to monitor usage and terminate the job in the [dataflow job console](https://console.cloud.google.com/dataflow/jobs) when finished testing to avoid unnecessary costs.\n", + "\n", + "### What to Expect\n", + "After running this pipeline, you should see:\n", + "- Continuous updates to product embeddings in the Spanner table\n", + "- Price and description changes reflected in the metadata\n", + "- New embeddings generated for updated product descriptions\n", + "- Timestamps showing when each record was last modified" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "skC7fJLhNS9F" + }, + "outputs": [], + "source": [ + "# Run pipeline\n", + "pipeline_result = pipeline.run_async()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "IhfW8YJ8No0O" + }, + "outputs": [], + "source": [ + "pipeline_result" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "5wvD0bceN5h1" + }, + "source": [ + "### Verify data\n", + "Monitor your job in https://console.cloud.google.com/dataflow/jobs. Once it the workers have started processing requests verify that data has been written" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "yTBkM5DzN4vt" + }, + "outputs": [], + "source": [ + "verify_embeddings_spanner(client,INSTANCE_ID, DATABASE_ID, table_name)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "22FYTQ9tOShP" + }, + "source": [ + "Finally, stop your streaming job to tear down the resources." + ] + } + ], + "metadata": { + "colab": { + "provenance": [], + "runtime_attributes": { + "runtime_version": "2025.07" + }, + "toc_visible": true + }, + "kernelspec": { + "display_name": "Python 3", + "name": "python3" + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +}