Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions echo/frontend/src/components/aspect/AspectCard.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Trans } from "@lingui/react/macro";
import { cn } from "@/lib/utils";
import { cn, sanitizeImageUrl } from "@/lib/utils";
import { Box, Button, LoadingOverlay, Paper, Stack, Text } from "@mantine/core";
import { IconArrowsDiagonal } from "@tabler/icons-react";
import { useParams } from "react-router-dom";
Expand Down Expand Up @@ -47,7 +47,7 @@ export const AspectCard = ({
</Box>
{project.data?.image_generation_model !== "PLACEHOLDER" && (
<img
src={data.image_url ?? "/placeholder.png"}
src={sanitizeImageUrl(data.image_url ?? "/placeholder.png")}
alt={data.name ?? ""}
className="h-[200px] w-full object-cover"
/>
Expand Down
8 changes: 8 additions & 0 deletions echo/frontend/src/lib/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,11 @@ export const checkPermissionError = async () => {
return "error" as const;
}
};

export const sanitizeImageUrl = (url: string) => {
// interim solution to fix image urls for local development
if (url.startsWith("http://minio:9000")) {
return url.replace("http://minio:9000", "http://localhost:9000");
}
return url;
};
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { Breadcrumbs } from "@/components/common/Breadcrumbs";
import { useMemo } from "react";
import { useCopyAspect } from "@/hooks/useCopyAspect";
import { CopyIconButton } from "@/components/common/CopyIconButton";
import { sanitizeImageUrl } from "@/lib/utils";

const dedupeQuotes = (quotes: QuoteAspect[]): QuoteAspect[] => {
const seen = new Set();
Expand Down Expand Up @@ -78,7 +79,7 @@ export const ProjectLibraryAspect = () => {
<LoadingOverlay visible={isLoading} />
{project.data?.image_generation_model !== "PLACEHOLDER" && (
<img
src={aspect?.image_url ?? "/placeholder.png"}
src={sanitizeImageUrl(aspect?.image_url ?? "/placeholder.png")}
alt={aspect?.name ?? ""}
className="h-[400px] w-full object-cover"
/>
Expand Down
3 changes: 2 additions & 1 deletion echo/server/dembrane/image_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging

from dembrane.s3 import save_to_s3_from_url
from dembrane.utils import generate_uuid
from dembrane.openai import client

logger = logging.getLogger("image_utils")
Expand Down Expand Up @@ -115,7 +116,7 @@ def generate_image(prompt: str) -> str:
image_url = response.data[0].url
if image_url:
logger.debug("saving the image and getting the public url")
image_url = save_to_s3_from_url(image_url)
image_url = save_to_s3_from_url(image_url, "images/" + generate_uuid(), public=True)
else:
image_url = None
except Exception as e:
Expand Down
10 changes: 9 additions & 1 deletion echo/server/dembrane/quote_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,9 +803,17 @@ def generate_aspect_image(db: Session, aspect_id: str) -> AspectModel:
try:
if response:
image_url = response.data[0].url
try:
image_extension = str(image_url).split(".")[-1].split("?")[0]
except Exception as e:
logger.error(f"Error getting image extension: {e}")
image_extension = "png"

if image_url:
logger.debug("saving the image and getting the public url")
image_url = save_to_s3_from_url(image_url)
image_url = save_to_s3_from_url(
image_url, "images/" + generate_uuid() + "." + image_extension, public=True
)
else:
image_url = None
except Exception as e:
Expand Down
57 changes: 50 additions & 7 deletions echo/server/dembrane/tasks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# mypy: disable-error-code="no-untyped-def"
from typing import List
from pathlib import Path

from celery import Celery, chain, chord, group, signals # type: ignore
from celery import Celery, chain, chord, group, signals, bootsteps # type: ignore
from sentry_sdk import capture_exception
from celery.signals import worker_ready, worker_shutdown # type: ignore
from celery.utils.log import get_task_logger # type: ignore

import dembrane.tasks_config
Expand Down Expand Up @@ -35,6 +37,36 @@
)
from dembrane.api.stateless import generate_summary

# File for validating worker readiness
READINESS_FILE = Path("/tmp/celery_ready")

# File for validating worker liveness
HEARTBEAT_FILE = Path("/tmp/celery_worker_heartbeat")


class LivenessProbe(bootsteps.StartStopStep):
requires = {"celery.worker.components:Timer"}

def __init__(self, parent, **kwargs):
super().__init__(parent, **kwargs)
self.requests = []
self.tref = None

def start(self, worker):
self.tref = worker.timer.call_repeatedly(
1.0,
self.update_heartbeat_file,
(worker,),
priority=10,
)

def stop(self, _worker):
HEARTBEAT_FILE.unlink(missing_ok=True)

def update_heartbeat_file(self, _worker):
HEARTBEAT_FILE.touch()


logger = get_task_logger("celery_tasks")

assert REDIS_URL, "REDIS_URL environment variable is not set"
Expand All @@ -53,6 +85,18 @@

celery_app.config_from_object(dembrane.tasks_config)

celery_app.steps["worker"].add(LivenessProbe)


@worker_ready.connect # type: ignore
def worker_ready(**_):
READINESS_FILE.touch()


@worker_shutdown.connect # type: ignore
def worker_shutdown(**_):
READINESS_FILE.unlink(missing_ok=True)


@signals.celeryd_init.connect
def init_sentry_celery(**_kwargs):
Expand Down Expand Up @@ -106,12 +150,7 @@ def task_transcribe_conversation_chunk(self, conversation_chunk_id: str):
raise self.retry(exc=e) from e


@celery_app.task(
bind=True,
retry_backoff=True,
ignore_result=True,
base=BaseTask,
)
@celery_app.task(bind=True, retry_backoff=True, ignore_result=True, base=BaseTask)
def task_transcribe_conversation_chunks(self, conversation_chunk_id: List[str]):
try:
task_signatures = [
Expand All @@ -135,6 +174,7 @@ def task_transcribe_conversation_chunks(self, conversation_chunk_id: List[str]):
retry_backoff=True,
ignore_result=False,
base=BaseTask,
queue="cpu",
)
def task_split_audio_chunk(self, chunk_id: str) -> List[str]:
"""
Expand Down Expand Up @@ -346,6 +386,7 @@ def task_generate_insight_extras_multiple(self, insight_ids: List[str], language
retry_kwargs={"max_retries": 3},
ignore_result=False,
base=BaseTask,
queue="cpu",
)
def task_initialize_insights(self, project_analysis_run_id: str) -> List[str]:
with DatabaseSession() as db:
Expand Down Expand Up @@ -450,6 +491,7 @@ def task_generate_view_extras(self, view_id: str, language: str):
retry_kwargs={"max_retries": 3},
ignore_result=False,
base=BaseTask,
queue="cpu",
)
def task_assign_aspect_centroid(self, aspect_id: str, language: str = "en"):
with DatabaseSession() as db:
Expand All @@ -467,6 +509,7 @@ def task_assign_aspect_centroid(self, aspect_id: str, language: str = "en"):
retry_kwargs={"max_retries": 3},
ignore_result=False,
base=BaseTask,
queue="cpu",
)
def task_cluster_quotes_using_aspect_centroids(self, view_id: str):
with DatabaseSession() as db:
Expand Down
25 changes: 15 additions & 10 deletions echo/server/dembrane/tasks_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,28 @@
enable_utc = True

worker_hijack_root_logger = False
worker_prefetch_multiplier = 1

task_queues = (
Queue("high", Exchange("high"), routing_key="high"),
Queue("normal", Exchange("normal"), routing_key="normal"),
Queue("low", Exchange("low"), routing_key="low"),
Queue("cpu", Exchange("cpu"), routing_key="cpu"),
)

task_default_queue = "normal"
task_default_exchange = "normal"
task_default_routing_key = "normal"

task_ignore_result = True
task_ignore_result = False

# TODO: configure later
# CELERY_ROUTES = {
# # -- HIGH PRIORITY QUEUE -- #
# 'myapp.tasks.check_payment_status': {'queue': 'high'},
# # -- LOW PRIORITY QUEUE -- #
# 'myapp.tasks.close_session': {'queue': 'low'},
# }
task_acks_late = True
task_reject_on_worker_lost = True

broker_connection_retry = True
broker_connection_retry_on_startup = True
broker_connection_max_retries = 5
broker_connection_timeout = 2

broker_transport_options = {
"visibility_timeout": 1800,
"socket_keepalive": True,
}
5 changes: 5 additions & 0 deletions echo/server/prod-worker-cpu.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/sh
echo "Starting worker"

POD_HOSTNAME=$(hostname)
celery -A dembrane.tasks worker -l INFO -n worker.${POD_HOSTNAME} -Q cpu
17 changes: 17 additions & 0 deletions echo/server/prod-worker-liveness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import sys
import time
from pathlib import Path

LIVENESS_FILE = Path("/tmp/celery_worker_heartbeat")
if not LIVENESS_FILE.is_file():
print("Celery liveness file NOT found.")
sys.exit(1)
stats = LIVENESS_FILE.stat()
heartbeat_timestamp = stats.st_mtime
current_timestamp = time.time()
time_diff = current_timestamp - heartbeat_timestamp
if time_diff > 60:
print("Celery Worker liveness file timestamp DOES NOT matches the given constraint.")
sys.exit(1)
print("Celery Worker liveness file found and timestamp matches the given constraint.")
sys.exit(0)
9 changes: 9 additions & 0 deletions echo/server/prod-worker-readiness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import sys
from pathlib import Path

READINESS_FILE = Path("/tmp/celery_ready")
if not READINESS_FILE.is_file():
print("Celery readiness file NOT found.")
sys.exit(1)
print("Celery readiness file found.")
sys.exit(0)
2 changes: 1 addition & 1 deletion echo/server/prod-worker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
echo "Starting worker"

POD_HOSTNAME=$(hostname)
celery -A dembrane.tasks worker -l INFO -n worker.normal.${POD_HOSTNAME} -Q normal
celery -A dembrane.tasks worker -l INFO -n worker.${POD_HOSTNAME} -Q normal
1 change: 1 addition & 0 deletions echo/server/run-worker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ WORKER_PIDS=()

# Launch the workers
launch_worker "worker.normal" "normal"
launch_worker "worker.cpu" "cpu"

# Launch Flower
# echo "Launching Flower"
Expand Down