Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Add deployment table

Revision ID: 2a5defa5ddc0
Revises: 8106300be7aa
Create Date: 2026-03-03 12:01:00.000000

Phase: EXPAND
"""

from collections.abc import Sequence

import sqlalchemy as sa
import sqlmodel
from alembic import op
from langflow.utils import migration

# revision identifiers, used by Alembic.
revision: str = "2a5defa5ddc0" # pragma: allowlist secret
down_revision: str | None = "8106300be7aa" # pragma: allowlist secret
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None

NAME_UNIQUE_CONSTRAINT = "uq_deployment_name_in_provider"
RESOURCE_KEY_UNIQUE_CONSTRAINT = "uq_deployment_resource_key_in_provider"


def upgrade() -> None:
conn = op.get_bind()
if migration.table_exists("deployment", conn):
return

op.create_table(
"deployment",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("resource_key", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("user_id", sa.Uuid(), nullable=False),
sa.Column("project_id", sa.Uuid(), nullable=False),
sa.Column("deployment_provider_account_id", sa.Uuid(), nullable=False),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.ForeignKeyConstraint(
["project_id"],
["folder.id"],
name=op.f("fk_deployment_project_id_folder"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["deployment_provider_account_id"],
["deployment_provider_account.id"],
name=op.f("fk_deployment_deployment_provider_account_id_deployment_provider_account"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["user_id"],
["user.id"],
name=op.f("fk_deployment_user_id_user"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_deployment")),
)
with op.batch_alter_table("deployment", schema=None) as batch_op:
batch_op.create_index(batch_op.f("ix_deployment_name"), ["name"], unique=False)
batch_op.create_index(batch_op.f("ix_deployment_project_id"), ["project_id"], unique=False)
batch_op.create_index(
batch_op.f("ix_deployment_deployment_provider_account_id"),
["deployment_provider_account_id"],
unique=False,
)
batch_op.create_index(batch_op.f("ix_deployment_resource_key"), ["resource_key"], unique=False)
batch_op.create_index(batch_op.f("ix_deployment_user_id"), ["user_id"], unique=False)
batch_op.create_unique_constraint(NAME_UNIQUE_CONSTRAINT, ["deployment_provider_account_id", "name"])
batch_op.create_unique_constraint(
RESOURCE_KEY_UNIQUE_CONSTRAINT, ["deployment_provider_account_id", "resource_key"]
)


def downgrade() -> None:
conn = op.get_bind()
if not migration.table_exists("deployment", conn):
return

with op.batch_alter_table("deployment", schema=None) as batch_op:
batch_op.drop_constraint(RESOURCE_KEY_UNIQUE_CONSTRAINT, type_="unique")
batch_op.drop_constraint(NAME_UNIQUE_CONSTRAINT, type_="unique")
batch_op.drop_index(batch_op.f("ix_deployment_user_id"))
batch_op.drop_index(batch_op.f("ix_deployment_resource_key"))
batch_op.drop_index(batch_op.f("ix_deployment_deployment_provider_account_id"))
batch_op.drop_index(batch_op.f("ix_deployment_project_id"))
batch_op.drop_index(batch_op.f("ix_deployment_name"))

op.drop_table("deployment")
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Add deployment provider account table

Revision ID: 8106300be7aa
Revises: 7d327cfafab6
Create Date: 2026-03-03 12:00:00.000000

Phase: EXPAND
"""

from collections.abc import Sequence

import sqlalchemy as sa
from alembic import op
from langflow.utils import migration
from sqlmodel.sql.sqltypes import AutoString

# revision identifiers, used by Alembic.
revision: str = "8106300be7aa" # pragma: allowlist secret
down_revision: str | None = "7d327cfafab6" # pragma: allowlist secret
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None

TABLE_NAME = "deployment_provider_account"
UNIQUE_CONSTRAINT_NAME = "uq_deployment_provider_account_user_url_tenant"


def upgrade() -> None:
conn = op.get_bind()
if migration.table_exists(TABLE_NAME, conn):
return

op.create_table(
TABLE_NAME,
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("user_id", sa.Uuid(), nullable=False),
sa.Column("provider_tenant_id", AutoString(), nullable=True),
sa.Column("provider_key", AutoString(), nullable=False),
sa.Column("provider_url", AutoString(), nullable=False),
sa.Column("api_key", AutoString(), nullable=False), # MUST be stored encrypted
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.ForeignKeyConstraint(
["user_id"],
["user.id"],
name=op.f("fk_deployment_provider_account_user_id_user"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_deployment_provider_account")),
sa.UniqueConstraint("user_id", "provider_url", "provider_tenant_id", name=UNIQUE_CONSTRAINT_NAME),
)

with op.batch_alter_table(TABLE_NAME, schema=None) as batch_op:
batch_op.create_index(batch_op.f("ix_deployment_provider_account_user_id"), ["user_id"], unique=False)
batch_op.create_index(
batch_op.f("ix_deployment_provider_account_provider_tenant_id"), ["provider_tenant_id"], unique=False
)
batch_op.create_index(batch_op.f("ix_deployment_provider_account_provider_key"), ["provider_key"], unique=False)


def downgrade() -> None:
conn = op.get_bind()
if not migration.table_exists(TABLE_NAME, conn):
return

with op.batch_alter_table(TABLE_NAME, schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_deployment_provider_account_provider_key"))
batch_op.drop_index(batch_op.f("ix_deployment_provider_account_provider_tenant_id"))
batch_op.drop_index(batch_op.f("ix_deployment_provider_account_user_id"))

op.drop_table(TABLE_NAME)
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from .api_key import ApiKey
from .auth import SSOConfig, SSOUserProfile
from .deployment import Deployment
from .deployment_provider_account import DeploymentProviderAccount
from .file import File
from .flow import Flow
from .flow_version import FlowVersion
Expand All @@ -13,6 +15,8 @@

__all__ = [
"ApiKey",
"Deployment",
"DeploymentProviderAccount",
"File",
"Flow",
"FlowVersion",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .model import Deployment, DeploymentCreate, DeploymentRead, DeploymentUpdate

__all__ = ["Deployment", "DeploymentCreate", "DeploymentRead", "DeploymentUpdate"]
195 changes: 195 additions & 0 deletions src/backend/base/langflow/services/database/models/deployment/crud.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
from __future__ import annotations

from datetime import datetime, timezone
from typing import TYPE_CHECKING

from lfx.log.logger import logger
from sqlalchemy.exc import IntegrityError
from sqlmodel import col, delete, func, select

from langflow.services.database.models.deployment.model import Deployment
from langflow.services.database.utils import parse_uuid

if TYPE_CHECKING:
from uuid import UUID

from sqlmodel.ext.asyncio.session import AsyncSession


def _strip_or_raise(value: str, field_name: str) -> str:
"""Return *value* stripped of whitespace, or raise if blank."""
stripped = value.strip()
if not stripped:
msg = f"{field_name} must not be empty"
raise ValueError(msg)
return stripped


async def create_deployment(
db: AsyncSession,
*,
user_id: UUID,
project_id: UUID,
deployment_provider_account_id: UUID,
resource_key: str,
name: str,
) -> Deployment:
# The Deployment model has its own field validators, but pre-checking here
# gives clearer errors and avoids constructing the object.
resource_key_s = _strip_or_raise(resource_key, "resource_key")
name_s = _strip_or_raise(name, "name")

row = Deployment(
user_id=user_id,
project_id=project_id,
deployment_provider_account_id=deployment_provider_account_id,
resource_key=resource_key_s,
name=name_s,
)
db.add(row)
try:
await db.flush()
except IntegrityError as exc:
await db.rollback()
await logger.aerror("IntegrityError creating deployment: %s", exc)
msg = f"Deployment conflicts with an existing record (resource_key={resource_key!r}, name={name!r})"
raise ValueError(msg) from exc
await db.refresh(row)
return row


async def get_deployment_by_resource_key(
db: AsyncSession,
*,
user_id: UUID,
deployment_provider_account_id: UUID,
resource_key: str,
) -> Deployment | None:
stmt = select(Deployment).where(
Deployment.user_id == user_id,
Deployment.deployment_provider_account_id == deployment_provider_account_id,
Deployment.resource_key == resource_key.strip(),
)
return (await db.exec(stmt)).first()


async def get_deployment(
db: AsyncSession,
*,
user_id: UUID,
deployment_id: UUID | str,
) -> Deployment | None:
deployment_uuid = parse_uuid(deployment_id, field_name="deployment_id")
stmt = select(Deployment).where(
Deployment.user_id == user_id,
Deployment.id == deployment_uuid,
)
return (await db.exec(stmt)).first()


async def update_deployment(
db: AsyncSession,
*,
deployment: Deployment,
name: str | None = None,
project_id: UUID | None = None,
) -> Deployment:
if name is not None:
deployment.name = _strip_or_raise(name, "name")
if project_id is not None:
deployment.project_id = project_id
deployment.updated_at = datetime.now(timezone.utc)
db.add(deployment)
try:
await db.flush()
except IntegrityError as exc:
await db.rollback()
await logger.aerror("IntegrityError updating deployment id=%s: %s", deployment.id, exc)
msg = "Deployment update conflicts with an existing record"
raise ValueError(msg) from exc
await db.refresh(deployment)
return deployment


async def list_deployments_page(
db: AsyncSession,
*,
user_id: UUID,
deployment_provider_account_id: UUID,
offset: int,
limit: int,
) -> list[Deployment]:
if offset < 0:
msg = "offset must be greater than or equal to 0"
raise ValueError(msg)
if limit <= 0:
msg = "limit must be greater than 0"
raise ValueError(msg)

stmt = (
select(Deployment)
.where(
Deployment.user_id == user_id,
Deployment.deployment_provider_account_id == deployment_provider_account_id,
)
.order_by(col(Deployment.created_at).desc(), col(Deployment.id).desc())
.offset(offset)
.limit(limit)
)
Comment thread
HzaRashid marked this conversation as resolved.
return list((await db.exec(stmt)).all())


async def count_deployments_by_provider(
db: AsyncSession,
*,
user_id: UUID,
deployment_provider_account_id: UUID,
) -> int:
stmt = select(func.count(Deployment.id)).where(
Deployment.user_id == user_id,
Deployment.deployment_provider_account_id == deployment_provider_account_id,
)
return int((await db.exec(stmt)).one())


async def delete_deployment_by_resource_key(
db: AsyncSession,
*,
user_id: UUID,
deployment_provider_account_id: UUID,
resource_key: str,
) -> int:
stmt = delete(Deployment).where(
Deployment.user_id == user_id,
Deployment.deployment_provider_account_id == deployment_provider_account_id,
Deployment.resource_key == resource_key.strip(),
)
result = await db.exec(stmt)
if result.rowcount is None:
await logger.aerror(
"DELETE rowcount was None for deployment resource_key=%r -- "
"database driver may not support rowcount for DELETE statements",
resource_key,
)
return int(result.rowcount or 0)


async def delete_deployment_by_id(
db: AsyncSession,
*,
user_id: UUID,
deployment_id: UUID | str,
) -> int:
deployment_uuid = parse_uuid(deployment_id, field_name="deployment_id")
stmt = delete(Deployment).where(
Deployment.user_id == user_id,
Deployment.id == deployment_uuid,
)
result = await db.exec(stmt)
if result.rowcount is None:
await logger.aerror(
"DELETE rowcount was None for deployment id=%s -- "
"database driver may not support rowcount for DELETE statements",
deployment_uuid,
)
return int(result.rowcount or 0)
Loading
Loading