diff --git a/echo/frontend/src/components/aspect/AspectCard.tsx b/echo/frontend/src/components/aspect/AspectCard.tsx
index 0fbd4c72..dd8ffde2 100644
--- a/echo/frontend/src/components/aspect/AspectCard.tsx
+++ b/echo/frontend/src/components/aspect/AspectCard.tsx
@@ -1,5 +1,5 @@
import { Trans } from "@lingui/react/macro";
-import { cn } from "@/lib/utils";
+import { cn, sanitizeImageUrl } from "@/lib/utils";
import { Box, Button, LoadingOverlay, Paper, Stack, Text } from "@mantine/core";
import { IconArrowsDiagonal } from "@tabler/icons-react";
import { useParams } from "react-router-dom";
@@ -47,7 +47,7 @@ export const AspectCard = ({
{project.data?.image_generation_model !== "PLACEHOLDER" && (
diff --git a/echo/frontend/src/lib/utils.ts b/echo/frontend/src/lib/utils.ts
index f7881a4a..125d566a 100644
--- a/echo/frontend/src/lib/utils.ts
+++ b/echo/frontend/src/lib/utils.ts
@@ -33,3 +33,11 @@ export const checkPermissionError = async () => {
return "error" as const;
}
};
+
+export const sanitizeImageUrl = (url: string) => {
+ // interim solution to fix image urls for local development
+ if (url.startsWith("http://minio:9000")) {
+ return url.replace("http://minio:9000", "http://localhost:9000");
+ }
+ return url;
+};
diff --git a/echo/frontend/src/routes/project/library/ProjectLibraryAspect.tsx b/echo/frontend/src/routes/project/library/ProjectLibraryAspect.tsx
index 83ea20b0..4f1b3add 100644
--- a/echo/frontend/src/routes/project/library/ProjectLibraryAspect.tsx
+++ b/echo/frontend/src/routes/project/library/ProjectLibraryAspect.tsx
@@ -16,6 +16,7 @@ import { Breadcrumbs } from "@/components/common/Breadcrumbs";
import { useMemo } from "react";
import { useCopyAspect } from "@/hooks/useCopyAspect";
import { CopyIconButton } from "@/components/common/CopyIconButton";
+import { sanitizeImageUrl } from "@/lib/utils";
const dedupeQuotes = (quotes: QuoteAspect[]): QuoteAspect[] => {
const seen = new Set();
@@ -78,7 +79,7 @@ export const ProjectLibraryAspect = () => {
{project.data?.image_generation_model !== "PLACEHOLDER" && (
diff --git a/echo/server/dembrane/image_utils.py b/echo/server/dembrane/image_utils.py
index 7b7bd469..bfec2f0a 100644
--- a/echo/server/dembrane/image_utils.py
+++ b/echo/server/dembrane/image_utils.py
@@ -2,6 +2,7 @@
import logging
from dembrane.s3 import save_to_s3_from_url
+from dembrane.utils import generate_uuid
from dembrane.openai import client
logger = logging.getLogger("image_utils")
@@ -115,7 +116,7 @@ def generate_image(prompt: str) -> str:
image_url = response.data[0].url
if image_url:
logger.debug("saving the image and getting the public url")
- image_url = save_to_s3_from_url(image_url)
+ image_url = save_to_s3_from_url(image_url, "images/" + generate_uuid(), public=True)
else:
image_url = None
except Exception as e:
diff --git a/echo/server/dembrane/quote_utils.py b/echo/server/dembrane/quote_utils.py
index fa0464a7..e722eec7 100644
--- a/echo/server/dembrane/quote_utils.py
+++ b/echo/server/dembrane/quote_utils.py
@@ -803,9 +803,17 @@ def generate_aspect_image(db: Session, aspect_id: str) -> AspectModel:
try:
if response:
image_url = response.data[0].url
+ try:
+ image_extension = str(image_url).split(".")[-1].split("?")[0]
+ except Exception as e:
+ logger.error(f"Error getting image extension: {e}")
+ image_extension = "png"
+
if image_url:
logger.debug("saving the image and getting the public url")
- image_url = save_to_s3_from_url(image_url)
+ image_url = save_to_s3_from_url(
+ image_url, "images/" + generate_uuid() + "." + image_extension, public=True
+ )
else:
image_url = None
except Exception as e:
diff --git a/echo/server/dembrane/tasks.py b/echo/server/dembrane/tasks.py
index 3ed9f3e9..6105700f 100644
--- a/echo/server/dembrane/tasks.py
+++ b/echo/server/dembrane/tasks.py
@@ -1,8 +1,10 @@
# mypy: disable-error-code="no-untyped-def"
from typing import List
+from pathlib import Path
-from celery import Celery, chain, chord, group, signals # type: ignore
+from celery import Celery, chain, chord, group, signals, bootsteps # type: ignore
from sentry_sdk import capture_exception
+from celery.signals import worker_ready, worker_shutdown # type: ignore
from celery.utils.log import get_task_logger # type: ignore
import dembrane.tasks_config
@@ -35,6 +37,36 @@
)
from dembrane.api.stateless import generate_summary
+# File for validating worker readiness
+READINESS_FILE = Path("/tmp/celery_ready")
+
+# File for validating worker liveness
+HEARTBEAT_FILE = Path("/tmp/celery_worker_heartbeat")
+
+
+class LivenessProbe(bootsteps.StartStopStep):
+ requires = {"celery.worker.components:Timer"}
+
+ def __init__(self, parent, **kwargs):
+ super().__init__(parent, **kwargs)
+ self.requests = []
+ self.tref = None
+
+ def start(self, worker):
+ self.tref = worker.timer.call_repeatedly(
+ 1.0,
+ self.update_heartbeat_file,
+ (worker,),
+ priority=10,
+ )
+
+ def stop(self, _worker):
+ HEARTBEAT_FILE.unlink(missing_ok=True)
+
+ def update_heartbeat_file(self, _worker):
+ HEARTBEAT_FILE.touch()
+
+
logger = get_task_logger("celery_tasks")
assert REDIS_URL, "REDIS_URL environment variable is not set"
@@ -53,6 +85,18 @@
celery_app.config_from_object(dembrane.tasks_config)
+celery_app.steps["worker"].add(LivenessProbe)
+
+
+@worker_ready.connect # type: ignore
+def worker_ready(**_):
+ READINESS_FILE.touch()
+
+
+@worker_shutdown.connect # type: ignore
+def worker_shutdown(**_):
+ READINESS_FILE.unlink(missing_ok=True)
+
@signals.celeryd_init.connect
def init_sentry_celery(**_kwargs):
@@ -106,12 +150,7 @@ def task_transcribe_conversation_chunk(self, conversation_chunk_id: str):
raise self.retry(exc=e) from e
-@celery_app.task(
- bind=True,
- retry_backoff=True,
- ignore_result=True,
- base=BaseTask,
-)
+@celery_app.task(bind=True, retry_backoff=True, ignore_result=True, base=BaseTask)
def task_transcribe_conversation_chunks(self, conversation_chunk_id: List[str]):
try:
task_signatures = [
@@ -135,6 +174,7 @@ def task_transcribe_conversation_chunks(self, conversation_chunk_id: List[str]):
retry_backoff=True,
ignore_result=False,
base=BaseTask,
+ queue="cpu",
)
def task_split_audio_chunk(self, chunk_id: str) -> List[str]:
"""
@@ -346,6 +386,7 @@ def task_generate_insight_extras_multiple(self, insight_ids: List[str], language
retry_kwargs={"max_retries": 3},
ignore_result=False,
base=BaseTask,
+ queue="cpu",
)
def task_initialize_insights(self, project_analysis_run_id: str) -> List[str]:
with DatabaseSession() as db:
@@ -450,6 +491,7 @@ def task_generate_view_extras(self, view_id: str, language: str):
retry_kwargs={"max_retries": 3},
ignore_result=False,
base=BaseTask,
+ queue="cpu",
)
def task_assign_aspect_centroid(self, aspect_id: str, language: str = "en"):
with DatabaseSession() as db:
@@ -467,6 +509,7 @@ def task_assign_aspect_centroid(self, aspect_id: str, language: str = "en"):
retry_kwargs={"max_retries": 3},
ignore_result=False,
base=BaseTask,
+ queue="cpu",
)
def task_cluster_quotes_using_aspect_centroids(self, view_id: str):
with DatabaseSession() as db:
diff --git a/echo/server/dembrane/tasks_config.py b/echo/server/dembrane/tasks_config.py
index b1fb8003..c6817302 100644
--- a/echo/server/dembrane/tasks_config.py
+++ b/echo/server/dembrane/tasks_config.py
@@ -3,23 +3,28 @@
enable_utc = True
worker_hijack_root_logger = False
+worker_prefetch_multiplier = 1
task_queues = (
- Queue("high", Exchange("high"), routing_key="high"),
Queue("normal", Exchange("normal"), routing_key="normal"),
- Queue("low", Exchange("low"), routing_key="low"),
+ Queue("cpu", Exchange("cpu"), routing_key="cpu"),
)
task_default_queue = "normal"
task_default_exchange = "normal"
task_default_routing_key = "normal"
-task_ignore_result = True
+task_ignore_result = False
-# TODO: configure later
-# CELERY_ROUTES = {
-# # -- HIGH PRIORITY QUEUE -- #
-# 'myapp.tasks.check_payment_status': {'queue': 'high'},
-# # -- LOW PRIORITY QUEUE -- #
-# 'myapp.tasks.close_session': {'queue': 'low'},
-# }
+task_acks_late = True
+task_reject_on_worker_lost = True
+
+broker_connection_retry = True
+broker_connection_retry_on_startup = True
+broker_connection_max_retries = 5
+broker_connection_timeout = 2
+
+broker_transport_options = {
+ "visibility_timeout": 1800,
+ "socket_keepalive": True,
+}
diff --git a/echo/server/prod-worker-cpu.sh b/echo/server/prod-worker-cpu.sh
new file mode 100644
index 00000000..7189b892
--- /dev/null
+++ b/echo/server/prod-worker-cpu.sh
@@ -0,0 +1,5 @@
+#!/bin/sh
+echo "Starting worker"
+
+POD_HOSTNAME=$(hostname)
+celery -A dembrane.tasks worker -l INFO -n worker.${POD_HOSTNAME} -Q cpu
\ No newline at end of file
diff --git a/echo/server/prod-worker-liveness.py b/echo/server/prod-worker-liveness.py
new file mode 100755
index 00000000..23760ba0
--- /dev/null
+++ b/echo/server/prod-worker-liveness.py
@@ -0,0 +1,17 @@
+import sys
+import time
+from pathlib import Path
+
+LIVENESS_FILE = Path("/tmp/celery_worker_heartbeat")
+if not LIVENESS_FILE.is_file():
+ print("Celery liveness file NOT found.")
+ sys.exit(1)
+stats = LIVENESS_FILE.stat()
+heartbeat_timestamp = stats.st_mtime
+current_timestamp = time.time()
+time_diff = current_timestamp - heartbeat_timestamp
+if time_diff > 60:
+ print("Celery Worker liveness file timestamp DOES NOT matches the given constraint.")
+ sys.exit(1)
+print("Celery Worker liveness file found and timestamp matches the given constraint.")
+sys.exit(0)
diff --git a/echo/server/prod-worker-readiness.py b/echo/server/prod-worker-readiness.py
new file mode 100755
index 00000000..aee37a41
--- /dev/null
+++ b/echo/server/prod-worker-readiness.py
@@ -0,0 +1,9 @@
+import sys
+from pathlib import Path
+
+READINESS_FILE = Path("/tmp/celery_ready")
+if not READINESS_FILE.is_file():
+ print("Celery readiness file NOT found.")
+ sys.exit(1)
+print("Celery readiness file found.")
+sys.exit(0)
diff --git a/echo/server/prod-worker.sh b/echo/server/prod-worker.sh
index 08eb3275..a76d9944 100755
--- a/echo/server/prod-worker.sh
+++ b/echo/server/prod-worker.sh
@@ -2,4 +2,4 @@
echo "Starting worker"
POD_HOSTNAME=$(hostname)
-celery -A dembrane.tasks worker -l INFO -n worker.normal.${POD_HOSTNAME} -Q normal
\ No newline at end of file
+celery -A dembrane.tasks worker -l INFO -n worker.${POD_HOSTNAME} -Q normal
\ No newline at end of file
diff --git a/echo/server/run-worker.sh b/echo/server/run-worker.sh
index 41fa31b1..78931f06 100755
--- a/echo/server/run-worker.sh
+++ b/echo/server/run-worker.sh
@@ -29,6 +29,7 @@ WORKER_PIDS=()
# Launch the workers
launch_worker "worker.normal" "normal"
+launch_worker "worker.cpu" "cpu"
# Launch Flower
# echo "Launching Flower"