From 5237ccd01d036027dc98432f162573ed9a5d9066 Mon Sep 17 00:00:00 2001 From: Sameer Pashikanti Date: Thu, 17 Jul 2025 10:35:57 +0200 Subject: [PATCH] link runpod views pipeline --- .../directus/sync/collections/operations.json | 300 ++++++++++++++++++ .../fields/aspect/aspect_segment.json | 31 ++ .../fields/aspect_segment/aspect.json | 4 +- .../relations/aspect_segment/aspect.json | 4 +- echo/server/dembrane/config.py | 11 + echo/server/dembrane/tasks.py | 66 +++- 6 files changed, 406 insertions(+), 10 deletions(-) create mode 100644 echo/directus/sync/snapshot/fields/aspect/aspect_segment.json diff --git a/echo/directus/sync/collections/operations.json b/echo/directus/sync/collections/operations.json index eb705bf5..d0d8f621 100644 --- a/echo/directus/sync/collections/operations.json +++ b/echo/directus/sync/collections/operations.json @@ -19,6 +19,26 @@ "flow": "17703446-fef0-49e9-bdc4-385db1311137", "_syncId": "5bea9e07-e6f0-41c6-821d-b426dec3427c" }, + { + "name": "Check Language", + "key": "check_language", + "type": "condition", + "position_x": 20, + "position_y": 1, + "options": { + "filter": { + "$trigger": { + "language": { + "_eq": "en" + } + } + } + }, + "resolve": "9390ed2f-7dc6-4a6a-83da-2d87d478261d", + "reject": "4795cc24-3f3c-4be9-9844-24552da522fa", + "flow": "17703446-fef0-49e9-bdc4-385db1311137", + "_syncId": "af7a671d-78fa-4d07-adad-c3bfabc4617c" + }, { "name": "Check Report Language", "key": "check_report_language", @@ -45,6 +65,32 @@ "flow": "ec4e7ea5-72de-4365-b66f-d8f11b549495", "_syncId": "d154ee34-0931-43f3-aaca-9168f88fa04b" }, + { + "name": "Check Report Language", + "key": "check_report_language", + "type": "item-read", + "position_x": 73, + "position_y": 1, + "options": { + "query": { + "filter": { + "project_id": { + "id": { + "_eq": "{{$trigger.payload.project_id.id}}" + } + } + }, + "fields": [ + "language" + ] + }, + "collection": "project_report" + }, + "resolve": "ca1ffbc5-cfce-4fb4-8f15-c128ea407d41", + "reject": null, + "flow": "ec4e7ea5-72de-4365-b66f-d8f11b549495", + "_syncId": "c5d2aff3-1a1a-4ea2-8e39-5189409b7dc1" + }, { "name": "Dutch", "key": "dutch", @@ -65,6 +111,26 @@ "flow": "17703446-fef0-49e9-bdc4-385db1311137", "_syncId": "eaeb2c39-32e0-428d-ad03-ff0e6052adcc" }, + { + "name": "Dutch", + "key": "dutch", + "type": "condition", + "position_x": 20, + "position_y": 20, + "options": { + "filter": { + "$trigger": { + "language": { + "_eq": "nl" + } + } + } + }, + "resolve": "ea78ec02-364d-4f18-80f8-ea5ac4c787ed", + "reject": "eb6f8253-647f-4fb1-9010-e93594ba065e", + "flow": "17703446-fef0-49e9-bdc4-385db1311137", + "_syncId": "4795cc24-3f3c-4be9-9844-24552da522fa" + }, { "name": "Email Send Operation Failed", "key": "email_send_operation_failed", @@ -79,6 +145,20 @@ "flow": "17703446-fef0-49e9-bdc4-385db1311137", "_syncId": "920bd181-b2a2-4f0d-94dc-3b1a08c3f4ef" }, + { + "name": "Email Send Operation Failed", + "key": "email_send_operation_failed", + "type": "log", + "position_x": 58, + "position_y": 1, + "options": { + "message": "An email could not be sent due to some error: {{$last}}" + }, + "resolve": null, + "reject": null, + "flow": "17703446-fef0-49e9-bdc4-385db1311137", + "_syncId": "2b24450b-6a2e-4452-aba1-9814d17fef42" + }, { "name": "Email Send Operation Failed Dutch", "key": "email_send_operation_failed_dutch", @@ -93,6 +173,20 @@ "flow": "17703446-fef0-49e9-bdc4-385db1311137", "_syncId": "84c38ea6-5d15-429f-8c24-9485d54ba7be" }, + { + "name": "Email Send Operation Failed Dutch", + "key": "email_send_operation_failed_dutch", + "type": "log", + "position_x": 58, + "position_y": 20, + "options": { + "message": "An email could not be sent due to some error: {{$last}}" + }, + "resolve": null, + "reject": null, + "flow": "17703446-fef0-49e9-bdc4-385db1311137", + "_syncId": "615a54cd-a72e-41ad-9403-9577c80280d6" + }, { "name": "failed", "key": "failed", @@ -107,6 +201,20 @@ "flow": "17703446-fef0-49e9-bdc4-385db1311137", "_syncId": "8d8d787a-dbc4-44f9-9ab4-28e3f3d5f31c" }, + { + "name": "failed", + "key": "failed", + "type": "log", + "position_x": 20, + "position_y": 39, + "options": { + "message": "language detection failed" + }, + "resolve": null, + "reject": null, + "flow": "17703446-fef0-49e9-bdc4-385db1311137", + "_syncId": "eb6f8253-647f-4fb1-9010-e93594ba065e" + }, { "name": "Filter Emails", "key": "filter_emails", @@ -121,6 +229,47 @@ "flow": "ec4e7ea5-72de-4365-b66f-d8f11b549495", "_syncId": "efb3982e-5703-4c07-8982-a6e1b5218e4a" }, + { + "name": "Filter Emails", + "key": "filter_emails", + "type": "exec", + "position_x": 91, + "position_y": 1, + "options": { + "code": "module.exports = async function(data) {\n\n const submissions = data.get_all_participants;\n \n // Filter submissions to only include those where email_opt_in is true\n const filteredSubmissions = submissions.filter(sub => sub.email_opt_in === true);\n\n // Create an array with email, project_id and an email_opt_out token for each submission\n const result = filteredSubmissions.map(sub => ({\n project_name: data.project_data[0].name || '',\n\t\tdefault_conversation_title: data.project_data[0].default_conversation_title || '',\n\t\tconversation_name: sub.conversation_id.participant_name || '',\n email: sub.email,\n project_id: sub.project_id || '',\n token: sub.email_opt_out_token,\n language: data.check_report_language[0].language || 'empty',\n ADMIN_BASE_URL: \"{{ $env.ADMIN_BASE_URL }}\" || \"http://localhost:5173\",\n PARTICIPANT_BASE_URL: \"{{ $env.PARTICIPANT_BASE_URL }}\" || \"http://localhost:5174\", \n }));\n \n return result;\n};" + }, + "resolve": "b8144cee-59f6-40d9-a849-dd0c639e4e31", + "reject": null, + "flow": "ec4e7ea5-72de-4365-b66f-d8f11b549495", + "_syncId": "ca1ffbc5-cfce-4fb4-8f15-c128ea407d41" + }, + { + "name": "Get All Participants", + "key": "get_all_participants", + "type": "item-read", + "position_x": 55, + "position_y": 1, + "options": { + "query": { + "filter": { + "project_id": { + "id": { + "_eq": "{{$trigger.payload.project_id}}" + } + } + }, + "fields": [ + "*", + "conversation_id.participant_name" + ] + }, + "collection": "project_report_notification_participants" + }, + "resolve": "c5d2aff3-1a1a-4ea2-8e39-5189409b7dc1", + "reject": null, + "flow": "ec4e7ea5-72de-4365-b66f-d8f11b549495", + "_syncId": "47f1463c-3cb6-4420-a50e-92938fed3197" + }, { "name": "Get All Participants", "key": "get_all_participants", @@ -148,6 +297,20 @@ "flow": "ec4e7ea5-72de-4365-b66f-d8f11b549495", "_syncId": "107ebc18-7d2d-4299-9d98-b7d1e7322b7c" }, + { + "name": "log environment vars", + "key": "log_environment_vars", + "type": "log", + "position_x": 127, + "position_y": 1, + "options": { + "message": "{{data['$env']}} {{data.$env}} {{process.env}} {{$env}} {{$env.PARTICIPANT_BASE_URL}}" + }, + "resolve": null, + "reject": null, + "flow": "ec4e7ea5-72de-4365-b66f-d8f11b549495", + "_syncId": "7161e52e-b1cc-4c13-bedd-a9d64527026f" + }, { "name": "log environment vars", "key": "log_environment_vars", @@ -184,6 +347,28 @@ "flow": "ec4e7ea5-72de-4365-b66f-d8f11b549495", "_syncId": "239c6fea-242d-4b7d-b854-c99d2fe43cf3" }, + { + "name": "PROCEED_ONLY_IF \"published\" in payload", + "key": "proceed_only_if_published_in_payload", + "type": "condition", + "position_x": 19, + "position_y": 1, + "options": { + "filter": { + "$trigger": { + "payload": { + "status": { + "_eq": "published" + } + } + } + } + }, + "resolve": "d8554457-95b6-474f-ba67-dfd5f936d575", + "reject": "e8274ad4-5844-42cd-8a6b-d40d08cf83d3", + "flow": "ec4e7ea5-72de-4365-b66f-d8f11b549495", + "_syncId": "2784a141-2ae3-48d1-aedf-57c0be3bcc74" + }, { "name": "Project Data", "key": "project_data", @@ -207,6 +392,43 @@ "flow": "ec4e7ea5-72de-4365-b66f-d8f11b549495", "_syncId": "ea051ae4-776f-490b-8b02-eaf4099243ef" }, + { + "name": "Project Data", + "key": "project_data", + "type": "item-read", + "position_x": 37, + "position_y": 1, + "options": { + "collection": "project", + "query": { + "filter": { + "id": { + "id": { + "_eq": "{{$trigger.payload.project_id}}" + } + } + } + } + }, + "resolve": "47f1463c-3cb6-4420-a50e-92938fed3197", + "reject": null, + "flow": "ec4e7ea5-72de-4365-b66f-d8f11b549495", + "_syncId": "d8554457-95b6-474f-ba67-dfd5f936d575" + }, + { + "name": "Report Not Published", + "key": "report_not_published", + "type": "log", + "position_x": 19, + "position_y": 19, + "options": { + "message": "The report is not yet published" + }, + "resolve": null, + "reject": null, + "flow": "ec4e7ea5-72de-4365-b66f-d8f11b549495", + "_syncId": "e8274ad4-5844-42cd-8a6b-d40d08cf83d3" + }, { "name": "Report Not Published", "key": "report_not_published", @@ -221,6 +443,37 @@ "flow": "ec4e7ea5-72de-4365-b66f-d8f11b549495", "_syncId": "84852456-3f3a-4906-be94-8b750159883b" }, + { + "name": "Send Email Dutch", + "key": "send_email_dutch", + "type": "mail", + "position_x": 39, + "position_y": 20, + "options": { + "to": [ + "{{$trigger.email}}" + ], + "type": "template", + "subject": "Er is een rapport klaar met uw inbreng", + "body": null, + "data": { + "PARTICIPANT_BASE_URL": "{{$trigger.PARTICIPANT_BASE_URL}}", + "project_id": "{{$trigger.project_id}}", + "project_name": "{{$trigger.project_name}}", + "default_conversation_title": "{{$trigger.default_conversation_title}}", + "conversation_name": "{{$trigger.conversation_name}}", + "token": "{{$trigger.token}}" + }, + "template": "report-notification-nl", + "replyTo": [ + "info@dembrane.com" + ] + }, + "resolve": null, + "reject": "615a54cd-a72e-41ad-9403-9577c80280d6", + "flow": "17703446-fef0-49e9-bdc4-385db1311137", + "_syncId": "ea78ec02-364d-4f18-80f8-ea5ac4c787ed" + }, { "name": "Send Email Dutch", "key": "send_email_dutch", @@ -252,6 +505,37 @@ "flow": "17703446-fef0-49e9-bdc4-385db1311137", "_syncId": "34fb6ee5-2813-484a-a1cc-f97de097509b" }, + { + "name": "Send Email English", + "key": "send_email_english", + "type": "mail", + "position_x": 39, + "position_y": 1, + "options": { + "to": [ + "{{$trigger.email}}" + ], + "subject": "A Report Featuring Your Input is Ready", + "body": null, + "type": "template", + "template": "report-notification-en", + "data": { + "PARTICIPANT_BASE_URL": "{{$trigger.PARTICIPANT_BASE_URL}}", + "project_id": "{{$trigger.project_id}}", + "project_name": "{{$trigger.project_name}}", + "default_conversation_title": "{{$trigger.default_conversation_title}}", + "conversation_name": "{{$trigger.conversation_name}}", + "token": "{{$trigger.token}}" + }, + "replyTo": [ + "info@dembrane.com" + ] + }, + "resolve": null, + "reject": "2b24450b-6a2e-4452-aba1-9814d17fef42", + "flow": "17703446-fef0-49e9-bdc4-385db1311137", + "_syncId": "9390ed2f-7dc6-4a6a-83da-2d87d478261d" + }, { "name": "Send Email English", "key": "send_email_english", @@ -283,6 +567,22 @@ "flow": "17703446-fef0-49e9-bdc4-385db1311137", "_syncId": "3dbf2ea1-17f8-4bde-aa89-43278fe9a00f" }, + { + "name": "Trigger Email Flow", + "key": "trigger_email_flow", + "type": "trigger", + "position_x": 109, + "position_y": 1, + "options": { + "flow": "17703446-fef0-49e9-bdc4-385db1311137", + "iterationMode": "serial", + "payload": "{{filter_emails}}" + }, + "resolve": null, + "reject": null, + "flow": "ec4e7ea5-72de-4365-b66f-d8f11b549495", + "_syncId": "b8144cee-59f6-40d9-a849-dd0c639e4e31" + }, { "name": "Trigger Email Flow", "key": "trigger_email_flow", diff --git a/echo/directus/sync/snapshot/fields/aspect/aspect_segment.json b/echo/directus/sync/snapshot/fields/aspect/aspect_segment.json new file mode 100644 index 00000000..9a397f73 --- /dev/null +++ b/echo/directus/sync/snapshot/fields/aspect/aspect_segment.json @@ -0,0 +1,31 @@ +{ + "collection": "aspect", + "field": "aspect_segment", + "type": "alias", + "meta": { + "collection": "aspect", + "conditions": null, + "display": null, + "display_options": null, + "field": "aspect_segment", + "group": null, + "hidden": false, + "interface": "list-o2m", + "note": null, + "options": { + "enableLink": true, + "sort": null, + "template": "{{segment.id}}" + }, + "readonly": false, + "required": false, + "sort": 13, + "special": [ + "o2m" + ], + "translations": null, + "validation": null, + "validation_message": null, + "width": "full" + } +} diff --git a/echo/directus/sync/snapshot/fields/aspect_segment/aspect.json b/echo/directus/sync/snapshot/fields/aspect_segment/aspect.json index 9d779390..44e2bafc 100644 --- a/echo/directus/sync/snapshot/fields/aspect_segment/aspect.json +++ b/echo/directus/sync/snapshot/fields/aspect_segment/aspect.json @@ -16,9 +16,7 @@ "readonly": false, "required": false, "sort": 6, - "special": [ - "m2o" - ], + "special": null, "translations": null, "validation": null, "validation_message": null, diff --git a/echo/directus/sync/snapshot/relations/aspect_segment/aspect.json b/echo/directus/sync/snapshot/relations/aspect_segment/aspect.json index 9cebc7df..743110e4 100644 --- a/echo/directus/sync/snapshot/relations/aspect_segment/aspect.json +++ b/echo/directus/sync/snapshot/relations/aspect_segment/aspect.json @@ -10,7 +10,7 @@ "one_collection": "aspect", "one_collection_field": null, "one_deselect_action": "nullify", - "one_field": null, + "one_field": "aspect_segment", "sort_field": null }, "schema": { @@ -20,6 +20,6 @@ "foreign_key_column": "id", "constraint_name": "aspect_segment_aspect_foreign", "on_update": "NO ACTION", - "on_delete": "SET NULL" + "on_delete": "CASCADE" } } diff --git a/echo/server/dembrane/config.py b/echo/server/dembrane/config.py index ad1ee3d4..040845c4 100644 --- a/echo/server/dembrane/config.py +++ b/echo/server/dembrane/config.py @@ -200,6 +200,17 @@ RUNPOD_WHISPER_MAX_REQUEST_THRESHOLD = int( str(os.environ.get("RUNPOD_WHISPER_MAX_REQUEST_THRESHOLD")) ) + +RUNPOD_TOPIC_MODELER_URL = os.environ.get("RUNPOD_TOPIC_MODELER_URL") +logger.debug(f"RUNPOD_TOPIC_MODELER_URL: {RUNPOD_TOPIC_MODELER_URL}") + +RUNPOD_TOPIC_MODELER_API_KEY = os.environ.get("RUNPOD_TOPIC_MODELER_API_KEY") +if RUNPOD_TOPIC_MODELER_URL: + assert RUNPOD_TOPIC_MODELER_API_KEY, ( + "RUNPOD_TOPIC_MODELER_API_KEY environment variable is not set" + ) + logger.debug("RUNPOD_TOPIC_MODELER_API_KEY: set") + if ENABLE_RUNPOD_WHISPER_TRANSCRIPTION: assert RUNPOD_WHISPER_MAX_REQUEST_THRESHOLD, ( "RUNPOD_WHISPER_MAX_REQUEST_THRESHOLD environment variable is not set" diff --git a/echo/server/dembrane/tasks.py b/echo/server/dembrane/tasks.py index a8f334f6..dc550a3f 100644 --- a/echo/server/dembrane/tasks.py +++ b/echo/server/dembrane/tasks.py @@ -14,9 +14,15 @@ from dramatiq.results.backends.redis import RedisBackend as ResultsRedisBackend from dembrane.utils import get_utc_timestamp -from dembrane.config import REDIS_URL, RUNPOD_WHISPER_API_KEY, ENABLE_AUDIO_LIGHTRAG_INPUT +from dembrane.config import ( + REDIS_URL, + RUNPOD_WHISPER_API_KEY, + RUNPOD_TOPIC_MODELER_URL, + ENABLE_AUDIO_LIGHTRAG_INPUT, + RUNPOD_TOPIC_MODELER_API_KEY, +) from dembrane.sentry import init_sentry -from dembrane.directus import directus +from dembrane.directus import directus, directus_client_context from dembrane.transcribe import transcribe_conversation_chunk from dembrane.conversation_utils import ( collect_unfinished_conversations, @@ -422,9 +428,59 @@ def task_create_view( language: str, ) -> None: logger = getLogger("dembrane.tasks.task_create_view") - logger.warning( - f"NOT IMPLEMENTED: task_create_view (project_analysis_run_id: {project_analysis_run_id}, user_query: {user_query}, user_query_context: {user_query_context}, language: {language})" - ) + try: + with directus_client_context() as client: + response = client.get_items( + "project_analysis_run", + { + "query": { + "filter": { + "id": project_analysis_run_id, + }, + "fields": ["project_id"], + } + }, + ) + project_id = response[0]["project_id"] + # get all segment ids from project_id + segments = client.get_items( + "project", + { + "query": { + "filter": { + "id": project_id, + }, + "fields": ["conversations.conversation_segments.id"], + } + }, + ) + segment_ids = list( + set( + [ + seg["id"] + for conv in segments[0]["conversations"] + for seg in conv["conversation_segments"] + ] + ) + ) + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {RUNPOD_TOPIC_MODELER_API_KEY}", + } + data = { + "input": { + "response_language": language, + "segment_ids": segment_ids, + "user_prompt": "\n\n\n".join([user_query, user_query_context]), + "project_analysis_run_id": project_analysis_run_id, + } + } + url = f"{str(RUNPOD_TOPIC_MODELER_URL).rstrip('/')}/run" + + response = requests.post(url, headers=headers, json=data, timeout=600) + except Exception as e: + logger.error(f"Error in task_create_view: {e}") + raise e from e return