diff --git a/src/BuildingBlocks/Shared/Protos/Classroom/classrooms.proto b/src/BuildingBlocks/Shared/Protos/Classroom/classrooms.proto index b1684272..cec8ca5d 100644 --- a/src/BuildingBlocks/Shared/Protos/Classroom/classrooms.proto +++ b/src/BuildingBlocks/Shared/Protos/Classroom/classrooms.proto @@ -255,6 +255,18 @@ message GetClassroomLearningSnapshotRequest { google.protobuf.Int32Value days_back = 3; } +message GrpcStudentProgressSummary { + string student_id = 1; + + double assessment_completion_rate = 2; + int32 total_assessments = 3; + int32 completed_assessments = 4; + + double content_completion_rate = 5; + int32 total_sections = 6; + int32 completed_sections = 7; +} + message GrpcClassroomLearningSnapshotResponse { GrpcClassroomBasicInfo classroom = 1; repeated GrpcStudentLearningData students = 2; @@ -265,6 +277,8 @@ message GrpcClassroomLearningSnapshotResponse { repeated GrpcSectionProgressData section_progress = 7; repeated GrpcTopicCatalogItem topics_catalog = 8; GrpcAnalysisPeriod analysis_period = 9; + + repeated GrpcStudentProgressSummary student_progress_summaries = 10; } message GrpcClassroomBasicInfo { diff --git a/src/Services/AIService/app/api/http/dependencies.py b/src/Services/AIService/app/api/http/dependencies.py index b560f4db..4d005986 100644 --- a/src/Services/AIService/app/api/http/dependencies.py +++ b/src/Services/AIService/app/api/http/dependencies.py @@ -9,6 +9,7 @@ from app.core.llm.client import LLMClient from app.core.rag.ingestion_pipeline import IngestionPipeline +from app.core.rag.ingestion_service import IngestionService from app.core.rag.document_processor import DocumentProcessor from app.core.embedding.pipeline import EmbeddingPipeline, get_embedding_pipeline from app.core.graph.builder import GraphBuilder @@ -285,11 +286,38 @@ def get_classroom_snapshot_updater() -> ClassroomSnapshotUpdater: ) +@lru_cache(maxsize=1) +def get_ingestion_service() -> Optional[IngestionService]: + """ + Provide ingestion service for RAG indexing with debouncing. + Returns None if RAG features are disabled or unavailable. + """ + try: + ingestion_pipeline = get_ingestion_pipeline() + if not ingestion_pipeline: + return None + + classroom_repository = get_classroom_repository() + + return IngestionService( + ingestion_pipeline=ingestion_pipeline, + classroom_repository=classroom_repository, + debounce_seconds=getattr(settings, 'RAG_INGESTION_DEBOUNCE_SECONDS', 300), + ingestion_ttl_hours=getattr(settings, 'RAG_INGESTION_TTL_HOURS', 24), + ) + except Exception as e: + logger = logging.getLogger(__name__) + logger.warning(f"Failed to initialize ingestion service: {e}. Continuing without RAG ingestion.") + return None + + @lru_cache(maxsize=1) def get_classroom_snapshot_event_handler() -> ClassroomSnapshotEventHandler: + ingestion_service = get_ingestion_service() return ClassroomSnapshotEventHandler( snapshot_store=get_classroom_snapshot_store(), snapshot_updater=get_classroom_snapshot_updater(), + ingestion_service=ingestion_service, ) @@ -383,6 +411,7 @@ def get_teacher_service() -> TeacherService: classroom_snapshot_store=get_classroom_snapshot_store(), classroom_snapshot_updater=get_classroom_snapshot_updater(), direct_grading_pipeline=direct_grading_pipeline, + ingestion_service=get_ingestion_service(), ) diff --git a/src/Services/AIService/app/api/http/routers/teacher.py b/src/Services/AIService/app/api/http/routers/teacher.py index 6811f2bc..5f20b544 100644 --- a/src/Services/AIService/app/api/http/routers/teacher.py +++ b/src/Services/AIService/app/api/http/routers/teacher.py @@ -44,6 +44,16 @@ class BuildGraphRequest(BaseModel): force_rebuild: bool = False +class TriggerProgressEventRequest(BaseModel): + """Request model for triggering test progress event""" + classroom_id: int + student_id: str + course_enrollment_id: int = 1 + course_id: int = 1 + progress_percentage: int = 50 + status: str = "InProgress" + + @router.post("/student-analysis", response_model=InterventionResponse) async def student_analysis( request: StudentAnalysisRequest, @@ -132,3 +142,136 @@ async def build_graph( raise HTTPException(status_code=500, detail=str(e)) +@router.post("/test/trigger-progress-event") +async def trigger_progress_event( + request: TriggerProgressEventRequest, + direct: bool = False, # Query parameter: ?direct=true +): + """ + Test endpoint to trigger ClassroomStudentProgressUpdatedEvent. + + This can work in two modes: + 1. RabbitMQ mode (default): Publishes event to RabbitMQ + 2. Direct mode (direct=true): Directly calls event handler (useful when RabbitMQ unavailable) + + The event will trigger: + 1. Snapshot refresh + 2. RAG ingestion (with debouncing - waits 5 minutes before ingesting) + + Use this to test the event-driven ingestion flow. + """ + from app.core.snapshot.events import ClassroomEvent + from app.api.http.dependencies import get_classroom_snapshot_event_handler + + # Create event payload matching C# event structure + event_data = { + "StudentId": request.student_id, + "ClassroomId": request.classroom_id, + "CourseEnrollmentId": request.course_enrollment_id, + "CourseId": request.course_id, + "ProgressPercentage": request.progress_percentage, + "Status": request.status, + } + + if direct: + # Direct mode: Call event handler directly (bypass RabbitMQ) + try: + event_handler = get_classroom_snapshot_event_handler() + classroom_event = ClassroomEvent( + type="STUDENT_PROGRESS_UPDATED", + classroom_id=request.classroom_id, + student_id=request.student_id, + payload={ + "course_enrollment_id": request.course_enrollment_id, + "course_id": request.course_id, + "progress_percentage": request.progress_percentage, + "status": request.status, + }, + ) + await event_handler.handle_event(classroom_event) + + logger.info( + f"[TeacherRouter] Test progress event handled directly for classroom {request.classroom_id}, " + f"student {request.student_id}" + ) + + return { + "success": True, + "message": "Event handled directly (bypassed RabbitMQ)", + "event_data": event_data, + "note": "Event was processed directly. RAG ingestion will be scheduled with 5-minute debounce.", + } + except Exception as e: + logger.error("[TeacherRouter] Error handling event directly: %s", e, exc_info=True) + raise HTTPException(status_code=500, detail=f"Failed to handle event: {str(e)}") + else: + # RabbitMQ mode: Publish to RabbitMQ + try: + import aio_pika + import json + from app.infrastructure.config.settings import settings + + exchange_name = "EventBus.Messages:ClassroomStudentProgressUpdatedEvent" + routing_key = "EventBus.Messages:ClassroomStudentProgressUpdatedEvent" + + # Connect and publish + connection = await aio_pika.connect_robust(settings.RABBITMQ_URL) + + try: + async with connection: + channel = await connection.channel() + + # Declare exchange + try: + exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.TOPIC, + durable=True, + ) + except Exception: + exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.FANOUT, + durable=True, + ) + + # Publish message + message_body = json.dumps(event_data).encode("utf-8") + message = aio_pika.Message( + body=message_body, + content_type="application/json", + delivery_mode=aio_pika.DeliveryMode.PERSISTENT, + ) + + await exchange.publish( + message, + routing_key=routing_key, + ) + + logger.info( + f"[TeacherRouter] Test progress event published to RabbitMQ for classroom {request.classroom_id}, " + f"student {request.student_id}" + ) + + return { + "success": True, + "message": "Event published to RabbitMQ successfully", + "event_data": event_data, + "note": "Event will be consumed by ClassroomProgressEventConsumer. " + "RAG ingestion will be scheduled with 5-minute debounce.", + } + finally: + await connection.close() + + except Exception as e: + logger.warning( + f"[TeacherRouter] Failed to publish to RabbitMQ: {e}. " + f"Hint: Use ?direct=true to test without RabbitMQ" + ) + raise HTTPException( + status_code=503, + detail=f"Failed to publish event to RabbitMQ: {str(e)}. " + f"Use ?direct=true query parameter to test without RabbitMQ." + ) + + diff --git a/src/Services/AIService/app/core/agent/plan_solve_agent.py b/src/Services/AIService/app/core/agent/plan_solve_agent.py index 4ad57b8b..7f7eb405 100644 --- a/src/Services/AIService/app/core/agent/plan_solve_agent.py +++ b/src/Services/AIService/app/core/agent/plan_solve_agent.py @@ -86,17 +86,9 @@ async def _generate_plan(self, question: str) -> List[str]: """Generate action plan""" prompt = self.PLANNER_PROMPT.format(question=question) messages: List[LLMMessage] = [{"role": "user", "content": prompt}] - - # Log request data before calling LLM (full prompt, no truncate) - logger.info( - f"[Plan-Solve] Calling LLM to generate plan | " - f"question={question}, prompt_length={len(prompt)}, use_remote={self.use_remote} | " - f"full_prompt={prompt}" - ) - response = await self.llm.generate(messages, use_remote=self.use_remote) response_text = response.content if hasattr(response, 'content') else str(response) - + # Parse Python list try: if "```python" in response_text: @@ -107,6 +99,7 @@ async def _generate_plan(self, question: str) -> List[str]: plan_str = response_text.strip() plan = ast.literal_eval(plan_str) + return plan if isinstance(plan, list) else [] except Exception as e: logger.warning(f"[Plan-Solve] Failed to parse plan: {e}") @@ -128,15 +121,6 @@ async def _execute_step( current_step=current_step ) messages: List[LLMMessage] = [{"role": "user", "content": prompt}] - - # Log request data before calling LLM (full prompt, no truncate) - logger.info( - f"[Plan-Solve] Calling LLM to execute step | " - f"current_step={current_step}, question={question}, " - f"prompt_length={len(prompt)}, use_remote={self.use_remote} | " - f"full_prompt={prompt}" - ) - response = await self.llm.generate(messages, use_remote=self.use_remote) return response.content if hasattr(response, 'content') else str(response) diff --git a/src/Services/AIService/app/core/context/builder.py b/src/Services/AIService/app/core/context/builder.py index 7f495595..150b7339 100644 --- a/src/Services/AIService/app/core/context/builder.py +++ b/src/Services/AIService/app/core/context/builder.py @@ -59,11 +59,25 @@ async def build( return bundle candidates = await self.gatherer.gather(query=query, user_id=user_id, top_k=top_k) + logger.info(f"[JITContextBuilder] Gathered {len(candidates)} candidates") + selected = self.selector.select(candidates) + logger.info(f"[JITContextBuilder] Selected {len(selected)} items after selection") + structured = self.structurer.structure(selected) + logger.info( + f"[JITContextBuilder] Structured: memory={len(structured.get('memory', []))}, " + f"retrieval={len(structured.get('retrieval', []))}, other={len(structured.get('other', []))}" + ) + compressed = self.compressor.compress(structured) + logger.info( + f"[JITContextBuilder] Compressed: memory={len(compressed.get('memory', []))}, " + f"retrieval={len(compressed.get('retrieval', []))}, other={len(compressed.get('other', []))}" + ) total_tokens = self._estimate_tokens(compressed) + logger.info(f"[JITContextBuilder] Estimated {total_tokens} tokens") notes = None if total_tokens > self.token_budget: diff --git a/src/Services/AIService/app/core/context/gather.py b/src/Services/AIService/app/core/context/gather.py index 462338ca..24e0de70 100644 --- a/src/Services/AIService/app/core/context/gather.py +++ b/src/Services/AIService/app/core/context/gather.py @@ -47,12 +47,16 @@ async def gather( ) for layer, items in mem_results.items(): for item in items: + metadata = item.get("metadata", {}) or {} + if metadata.get("type") == "teacher_student_analysis": + continue + candidates.append( ContextItem( content=item.get("content", ""), score=item.get("relevance_score", 0.5), source=f"memory:{layer}", - metadata=item.get("metadata", {}), + metadata=metadata, ) ) except Exception as e: @@ -60,16 +64,167 @@ async def gather( # Retrieval (vector + graph hybrid) try: - retrieval_results = await self.hybrid_retriever.retrieve(query, top_k=top_k) + retrieval_top_k = max(top_k * 3, 30) # Request at least 30 documents + + # Query 1: General retrieval (may include teacher_student_analysis) + general_results = await self.hybrid_retriever.retrieve(query, top_k=retrieval_top_k) + + # Query 2: Try to get documents with document_type (from ingestion) + # Request many more documents to find ingestion documents that may have lower similarity scores + ingestion_results = [] + try: + # Request significantly more documents to increase chance of finding ingestion documents + # that may rank lower due to embedding mismatch but are still relevant + ingestion_results = await self.hybrid_retriever.retrieve_vector_only( + query=query, + top_k=retrieval_top_k * 5 # Request 5x more to find ingestion documents + ) + + logger.info( + f"[ContextGatherer] Vector-only retrieval found {len(ingestion_results)} documents " + f"(requested {retrieval_top_k * 5} to find ingestion documents)" + ) + except Exception as e: + logger.debug(f"[ContextGatherer] Vector-only retrieval failed: {e}") + + + seen_docs = {} # doc_id -> (doc, priority) + + for doc in ingestion_results: + doc_id = doc.get("document_id") or doc.get("id") + if doc_id: + inner_meta = doc.get("metadata", {}).get("metadata", {}) or {} + has_document_type = "document_type" in inner_meta + priority = 1 if has_document_type else 3 # Higher priority for ingestion docs + if doc_id not in seen_docs or seen_docs[doc_id][1] > priority: + seen_docs[doc_id] = (doc, priority) + + # Priority 2: General results that weren't already included + for doc in general_results: + doc_id = doc.get("document_id") or doc.get("id") + if doc_id and doc_id not in seen_docs: + inner_meta = doc.get("metadata", {}).get("metadata", {}) or {} + has_document_type = "document_type" in inner_meta + priority = 2 if has_document_type else 4 # Lower priority for general docs + seen_docs[doc_id] = (doc, priority) + + # Sort by priority (lower number = higher priority), then by retrieval_score + retrieval_results = sorted( + [doc for doc, _ in seen_docs.values()], + key=lambda d: ( + seen_docs.get(d.get("document_id") or d.get("id"), (None, 999))[1], + -d.get("retrieval_score", 0) # Negative for descending + ) + ) + + + + logger.info( + f"[ContextGatherer] Combined retrieval: {len(retrieval_results)} unique documents " + f"(from {len(general_results)} general + {len(ingestion_results)} ingestion-focused, " + f"prioritized by document_type)" + ) + + # Log document types for debugging + doc_types = {} + filtered_count = 0 + added_count = 0 + + + for doc in retrieval_results: + metadata = {k: v for k, v in doc.items() if k not in ("content", "retrieval_score", "score")} + inner_metadata = metadata.get("metadata", {}) or {} + + doc_type = ( + metadata.get("type") or # Top level (episodic memory) + inner_metadata.get("type") or # Inner metadata + inner_metadata.get("document_type") or # Ingestion documents + "unknown" + ) + doc_types[doc_type] = doc_types.get(doc_type, 0) + 1 + + if doc_type == "teacher_student_analysis": + filtered_count += 1 + continue + + score = doc.get("retrieval_score") or doc.get("score", 0.0) + candidates.append( ContextItem( content=doc.get("content", ""), - score=doc.get("score", 0.0), + score=float(score), source="retrieval", - metadata={k: v for k, v in doc.items() if k != "content"}, + metadata=metadata, ) ) + added_count += 1 + + # Log summary with document IDs for debugging + sample_doc_ids = [doc.get("document_id", "unknown") for doc in retrieval_results[:5]] + # Check if any documents have classroom_id pattern (from ingestion) + ingestion_pattern_count = sum( + 1 for doc in retrieval_results + if doc.get("document_id", "").startswith("classroom_") + or (metadata.get("metadata", {}).get("classroom_id") is not None) + ) + + logger.info( + f"[ContextGatherer] Document types retrieved: {doc_types} | " + f"Filtered {filtered_count} teacher_student_analysis | " + f"Added {added_count} classroom data documents | " + f"Documents with ingestion pattern: {ingestion_pattern_count}/{len(retrieval_results)} | " + f"Sample doc_ids: {sample_doc_ids}" + ) + + if ingestion_pattern_count == 0 and len(retrieval_results) > 0: + logger.warning( + f"[ContextGatherer] No documents with ingestion pattern found. " + f"All {len(retrieval_results)} documents appear to be from memory, not ingestion. " + f"This suggests: 1) Ingestion documents not in vector store, " + f"2) Embeddings don't match query, 3) Documents overwritten by memory." + ) + + # Fallback: If no classroom data documents found, use a few teacher_student_analysis + # documents with lower score to provide some context (better than empty context) + if added_count == 0 and filtered_count > 0: + logger.warning( + f"[ContextGatherer] WARNING: All {filtered_count} retrieved documents are teacher_student_analysis. " + f"No classroom data documents found. This may indicate: " + f"1) Classroom not ingested yet, 2) Only memory documents in vector store, " + f"3) Need to ingest classroom data first. " + f"Using {min(3, filtered_count)} teacher_student_analysis documents as fallback." + ) + + # Add a few teacher_student_analysis documents with reduced score as fallback + fallback_count = 0 + for doc in retrieval_results: + if fallback_count >= 3: # Limit to 3 fallback documents + break + + metadata = {k: v for k, v in doc.items() if k not in ("content", "retrieval_score", "score")} + inner_metadata = metadata.get("metadata", {}) or {} + doc_type = inner_metadata.get("type") or inner_metadata.get("document_type") or "unknown" + + if doc_type == "teacher_student_analysis": + # Use lower score for fallback documents + score = (doc.get("retrieval_score") or doc.get("score", 0.0)) * 0.3 # Reduce score by 70% + + candidates.append( + ContextItem( + content=doc.get("content", ""), + score=float(score), + source="retrieval:fallback", + metadata={**metadata, "fallback": True, "original_type": doc_type}, + ) + ) + fallback_count += 1 + + if fallback_count > 0: + logger.info( + f"[ContextGatherer] Added {fallback_count} fallback teacher_student_analysis documents " + f"with reduced scores (score * 0.3)" + ) except Exception as e: logger.warning(f"[ContextGatherer] Retrieval gather failed: {e}") diff --git a/src/Services/AIService/app/core/graph/builder.py b/src/Services/AIService/app/core/graph/builder.py index 2ed1d363..afac9db3 100644 --- a/src/Services/AIService/app/core/graph/builder.py +++ b/src/Services/AIService/app/core/graph/builder.py @@ -646,14 +646,49 @@ async def _create_student_assignment_nodes(self, data: Dict[str, Any]): """Create StudentAssignment nodes""" student_assignments = data.get("assignments", {}).get("student_assignments", []) - for sa in student_assignments: + # Track synthesized IDs to avoid duplicates + seen_synthesized_ids = set() + synthesis_counter = {} # (student_id, assignment_id) -> counter + + for idx, sa in enumerate(student_assignments): node_id = sa.get("id") student_id = sa.get("student_id", "") if not student_id: logger.warning("Skipping student_assignment with missing student_id: %s", sa) continue if not node_id: - node_id = f"sa-{student_id}-{sa.get('assignment_id', 'unknown')}" + + assignment_id = sa.get("assignment_id") + if assignment_id: + node_id = f"sa-{student_id}-{assignment_id}" + else: + unique_parts = [] + if sa.get("due_date"): + unique_parts.append(str(hash(sa.get("due_date")) % 10000)) + if sa.get("submitted_at"): + unique_parts.append(str(hash(sa.get("submitted_at")) % 10000)) + if sa.get("assigned_at"): + unique_parts.append(str(hash(sa.get("assigned_at")) % 10000)) + + if not unique_parts: + unique_parts.append(str(hash(str(sa)) % 100000)) + + # Use counter as last resort + key = (student_id, "unknown") + if key not in synthesis_counter: + synthesis_counter[key] = 0 + synthesis_counter[key] += 1 + unique_parts.append(str(synthesis_counter[key])) + + node_id = f"sa-{student_id}-{'-'.join(unique_parts)}" + + original_node_id = node_id + counter = 1 + while node_id in seen_synthesized_ids: + node_id = f"{original_node_id}-{counter}" + counter += 1 + + seen_synthesized_ids.add(node_id) sa["id"] = node_id logger.info("Synthesized student_assignment id=%s for student_id=%s", node_id, student_id) node_id = str(node_id) @@ -941,6 +976,11 @@ async def _create_enrollment_relationships(self, data: Dict[str, Any]): "FOR_COURSE", "Course", course_id ) + else: + logger.debug( + f"CourseEnrollment {ce_id} has no course_id - will be orphaned. " + f"Backend should provide course_id in course_enrollments data." + ) # Also create Student -> Classroom relationship for backward compatibility conflicts = self.monitor.check_relationship( @@ -1236,7 +1276,11 @@ async def _create_quiz_relationships(self, data: Dict[str, Any]): continue if not quiz_id: - logger.warning(f"StudentQuiz for QuizAttempt {attempt_id} has no quiz_id - skipping") + logger.warning( + f"StudentQuiz for QuizAttempt {attempt_id} has no quiz_id - skipping relationship creation. " + f"Backend should provide quiz_id in student_quizzes data. " + f"This will result in orphan QuizAttempt node." + ) continue student_id = str(student_id) @@ -1764,8 +1808,21 @@ async def _create_performance_relationships(self, data: Dict[str, Any]) -> Dict[ if not question_attempts: continue + # Try to get quiz_id for fallback + quiz_id = student_quiz.get("quiz_id") + for qa in question_attempts: topics = qa.get("topics", []) + + # Fallback: If question_attempt has no topics, try to get from quiz level + if not topics and quiz_id: + # Try to find quiz node and get its topics from HAS_TOPIC relationships + # This is a fallback when question-level topics are missing + logger.debug( + f"[LEVEL 5] Question attempt missing topics, quiz_id={quiz_id}. " + f"Note: Backend should provide topics in question_attempts for accurate performance analysis." + ) + if not topics: continue @@ -1788,6 +1845,12 @@ async def _create_performance_relationships(self, data: Dict[str, Any]) -> Dict[ quiz_attempts_with_topics += 1 else: quiz_attempts_without_topics += 1 + if quiz_id: + logger.debug( + f"[LEVEL 5] Quiz attempt {attempt.get('id', 'unknown')} has no topics in question_attempts. " + f"quiz_id={quiz_id}, student_id={student_id}. " + f"Backend should populate question_attempts[].topics[] for Level 5 performance analysis." + ) logger.info( f"[LEVEL 5] Quiz attempts: {quiz_attempts_processed} processed, " diff --git a/src/Services/AIService/app/core/llm/providers/deepseek_provider.py b/src/Services/AIService/app/core/llm/providers/deepseek_provider.py index 3e891fef..f0c3aa26 100644 --- a/src/Services/AIService/app/core/llm/providers/deepseek_provider.py +++ b/src/Services/AIService/app/core/llm/providers/deepseek_provider.py @@ -66,16 +66,13 @@ async def generate( for msg in messages ] - # Log request data before API call temperature = kwargs.get("temperature", self.temperature) max_tokens = kwargs.get("max_tokens", self.max_tokens) - # Prepare messages summary for logging (full content, no truncate) messages_summary = [] for msg in deepseek_messages: role = msg.get("role", "unknown") content = msg.get("content", "") - # Log full content without truncation messages_summary.append(f"{role}: {content} (length={len(content)})") other_params = {k: v for k, v in kwargs.items() if k not in ["temperature", "max_tokens"]} diff --git a/src/Services/AIService/app/core/memory/memory_manager.py b/src/Services/AIService/app/core/memory/memory_manager.py index 6f3b5b51..4a28c16e 100644 --- a/src/Services/AIService/app/core/memory/memory_manager.py +++ b/src/Services/AIService/app/core/memory/memory_manager.py @@ -63,17 +63,7 @@ async def add_memory( memory_type: str, metadata: Dict[str, Any] ) -> str: - """ - Add memory to appropriate layer - - Args: - content: Memory content - memory_type: Type of memory ("working", "episodic", "semantic", "perceptual") - metadata: Memory metadata - - Returns: - Memory ID - """ + if memory_type == "working": return await self.working_memory.add(content, metadata) diff --git a/src/Services/AIService/app/core/rag/ingestion_service.py b/src/Services/AIService/app/core/rag/ingestion_service.py new file mode 100644 index 00000000..3f952eee --- /dev/null +++ b/src/Services/AIService/app/core/rag/ingestion_service.py @@ -0,0 +1,170 @@ +import asyncio +from typing import Dict, Any, Optional, Set +from datetime import datetime, timedelta +import logging + +from app.core.rag.ingestion_pipeline import IngestionPipeline +from app.core.data.classroom_repository import ClassroomRepository + +logger = logging.getLogger(__name__) + + +class IngestionService: + def __init__( + self, + ingestion_pipeline: IngestionPipeline, + classroom_repository: ClassroomRepository, + debounce_seconds: int = 300, # 5 minutes default + ingestion_ttl_hours: int = 24, # Consider ingestion stale after 24h + ): + self.ingestion_pipeline = ingestion_pipeline + self.classroom_repository = classroom_repository + self.debounce_seconds = debounce_seconds + self.ingestion_ttl_hours = ingestion_ttl_hours + + self._pending_tasks: Dict[int, asyncio.Task] = {} + self._ingestion_status: Dict[int, datetime] = {} + + self._ingesting: Set[int] = set() + + logger.info( + f"[IngestionService] Initialized with debounce={debounce_seconds}s, " + f"ttl={ingestion_ttl_hours}h" + ) + + async def schedule_ingestion( + self, + classroom_id: int, + force: bool = False, + ) -> bool: + if not force and self._is_recently_ingested(classroom_id): + logger.debug( + f"[IngestionService] Classroom {classroom_id} already ingested recently, skipping" + ) + return False + + if classroom_id in self._pending_tasks: + task = self._pending_tasks[classroom_id] + if not task.done(): + task.cancel() + logger.debug( + f"[IngestionService] Cancelled pending ingestion for classroom {classroom_id}" + ) + + if force: + asyncio.create_task(self._ingest_classroom(classroom_id)) + return True + else: + async def debounced_ingest(): + try: + await asyncio.sleep(self.debounce_seconds) + await self._ingest_classroom(classroom_id) + except asyncio.CancelledError: + logger.debug( + f"[IngestionService] Debounced ingestion cancelled for classroom {classroom_id}" + ) + raise + + self._pending_tasks[classroom_id] = asyncio.create_task(debounced_ingest()) + logger.info( + f"[IngestionService] Scheduled ingestion for classroom {classroom_id} " + f"(debounce={self.debounce_seconds}s)" + ) + return True + + async def ensure_ingested( + self, + classroom_id: int, + max_wait_seconds: int = 0, + ) -> bool: + + if self._is_recently_ingested(classroom_id): + return True + + if classroom_id in self._ingesting: + if max_wait_seconds > 0: + wait_interval = 1 + waited = 0 + while classroom_id in self._ingesting and waited < max_wait_seconds: + await asyncio.sleep(wait_interval) + waited += wait_interval + return self._is_recently_ingested(classroom_id) + return True + + logger.info( + f"[IngestionService] Triggering immediate ingestion for classroom {classroom_id} " + f"(lazy loading)" + ) + asyncio.create_task(self._ingest_classroom(classroom_id)) + return True + + def _is_recently_ingested(self, classroom_id: int) -> bool: + if classroom_id not in self._ingestion_status: + return False + + ingestion_time = self._ingestion_status[classroom_id] + age = datetime.utcnow() - ingestion_time + return age < timedelta(hours=self.ingestion_ttl_hours) + + async def _ingest_classroom(self, classroom_id: int) -> None: + + # Prevent concurrent ingestion of same classroom + if classroom_id in self._ingesting: + logger.warning( + f"[IngestionService] Classroom {classroom_id} is already being ingested, skipping" + ) + return + + self._ingesting.add(classroom_id) + + try: + logger.info(f"[IngestionService] Starting ingestion for classroom {classroom_id}") + + # Fetch classroom data + classroom_data = await self.classroom_repository.get_classroom_data( + classroom_id=classroom_id, + student_id=None, + analysis_period_days=None, + ) + + if not classroom_data: + logger.warning( + f"[IngestionService] No data found for classroom {classroom_id}, skipping ingestion" + ) + return + + # Perform ingestion + result = await self.ingestion_pipeline.ingest(classroom_data) + + if result.get("errors"): + logger.error( + f"[IngestionService] Ingestion completed with errors for classroom {classroom_id}: " + f"{result['errors']}" + ) + else: + # Mark as ingested + self._ingestion_status[classroom_id] = datetime.utcnow() + logger.info( + f"[IngestionService] Successfully ingested classroom {classroom_id}: " + f"{result.get('documents_stored', 0)} documents, " + f"{result.get('graph_nodes', 0)} graph nodes" + ) + + # Clean up pending task + if classroom_id in self._pending_tasks: + del self._pending_tasks[classroom_id] + + except Exception as e: + logger.error( + f"[IngestionService] Error ingesting classroom {classroom_id}: {e}", + exc_info=True + ) + finally: + self._ingesting.discard(classroom_id) + + def get_ingestion_status(self, classroom_id: int) -> Optional[datetime]: + return self._ingestion_status.get(classroom_id) + + def is_ingesting(self, classroom_id: int) -> bool: + return classroom_id in self._ingesting + diff --git a/src/Services/AIService/app/core/snapshot/classroom_snapshot_store.py b/src/Services/AIService/app/core/snapshot/classroom_snapshot_store.py index 51607f2a..905d29fb 100644 --- a/src/Services/AIService/app/core/snapshot/classroom_snapshot_store.py +++ b/src/Services/AIService/app/core/snapshot/classroom_snapshot_store.py @@ -99,7 +99,7 @@ async def get_or_refresh_snapshot( snapshot = self._snapshot_store.get_snapshot(classroom_id) if not force_full_refresh and snapshot and not self._snapshot_store.needs_full_refresh(classroom_id): - logger.debug( + logger.info( "[ClassroomSnapshotUpdater] Using cached snapshot within cooldown window", extra={"classroom_id": classroom_id}, ) diff --git a/src/Services/AIService/app/core/snapshot/events.py b/src/Services/AIService/app/core/snapshot/events.py index 39ecf94e..a0569b18 100644 --- a/src/Services/AIService/app/core/snapshot/events.py +++ b/src/Services/AIService/app/core/snapshot/events.py @@ -9,6 +9,7 @@ ClassroomSnapshotStore, ClassroomSnapshotUpdater, ) +from app.core.rag.ingestion_service import IngestionService logger = logging.getLogger(__name__) @@ -27,9 +28,11 @@ def __init__( self, snapshot_store: ClassroomSnapshotStore, snapshot_updater: ClassroomSnapshotUpdater, + ingestion_service: Optional[IngestionService] = None, ) -> None: self._snapshot_store = snapshot_store self._snapshot_updater = snapshot_updater + self._ingestion_service = ingestion_service async def handle_event(self, event: ClassroomEvent) -> None: try: @@ -76,6 +79,18 @@ async def handle_event(self, event: ClassroomEvent) -> None: analysis_period_days=analysis_period_days, force_full_refresh=force_full_refresh, ) + if self._ingestion_service: + try: + await self._ingestion_service.schedule_ingestion( + classroom_id=classroom_id, + force=False, + ) + except Exception as ingestion_error: + logger.warning( + f"[ClassroomSnapshotEventHandler] Failed to schedule ingestion for " + f"classroom {classroom_id}: {ingestion_error}", + exc_info=True + ) except Exception as exc: logger.exception( "[ClassroomSnapshotEventHandler] Failed to handle event", diff --git a/src/Services/AIService/app/core/tools/student_data_tool.py b/src/Services/AIService/app/core/tools/student_data_tool.py index 4d67bb83..037924a6 100644 --- a/src/Services/AIService/app/core/tools/student_data_tool.py +++ b/src/Services/AIService/app/core/tools/student_data_tool.py @@ -76,6 +76,19 @@ async def _student_overview(self, parameters: Dict[str, Any]) -> str: if not student: return json.dumps({"error": "Student not found"}) + progress_summaries = data.get("student_progress_summaries", []) or data.get( + "studentProgressSummaries", [] + ) + progress_summary = next( + ( + s + for s in progress_summaries + if s.get("student_id") == student_id + or s.get("studentId") == student_id + ), + None, + ) + # Normalize key metrics for analysis overview = { "student_id": student_id, @@ -87,6 +100,28 @@ async def _student_overview(self, parameters: Dict[str, Any]) -> str: "engagement_score": student.get("engagement_score"), "last_active_at": student.get("last_active_at"), } + + if progress_summary: + overview["assessment_completion_rate"] = ( + progress_summary.get("assessment_completion_rate") + or progress_summary.get("assessmentCompletionRate") + ) + overview["content_completion_rate"] = ( + progress_summary.get("content_completion_rate") + or progress_summary.get("contentCompletionRate") + ) + overview["total_assessments"] = progress_summary.get( + "total_assessments" + ) or progress_summary.get("totalAssessments") + overview["completed_assessments"] = progress_summary.get( + "completed_assessments" + ) or progress_summary.get("completedAssessments") + overview["total_sections"] = progress_summary.get("total_sections") or progress_summary.get( + "totalSections" + ) + overview["completed_sections"] = progress_summary.get( + "completed_sections" + ) or progress_summary.get("completedSections") return json.dumps(overview) async def _class_overview(self, parameters: Dict[str, Any]) -> str: @@ -108,16 +143,52 @@ async def _class_overview(self, parameters: Dict[str, Any]) -> str: if count == 0: return json.dumps({"classroom_id": classroom_id, "student_count": 0}) - # Aggregate simple stats - avg_completion = sum(s.get("completion_rate", 0.0) for s in students) / count - avg_score = sum(s.get("average_score", 0.0) for s in students) / count + progress_summaries = data.get("student_progress_summaries", []) or data.get( + "studentProgressSummaries", [] + ) - overview = { - "classroom_id": classroom_id, - "student_count": count, - "average_completion_rate": round(avg_completion, 2), - "average_score": round(avg_score, 2), - } + if progress_summaries: + assessment_rates = [] + content_rates = [] + for s in progress_summaries: + ar = s.get("assessment_completion_rate") or s.get( + "assessmentCompletionRate" + ) + cr = s.get("content_completion_rate") or s.get( + "contentCompletionRate" + ) + if ar is not None: + assessment_rates.append(float(ar)) + if cr is not None: + content_rates.append(float(cr)) + + avg_assessment = ( + sum(assessment_rates) / len(assessment_rates) if assessment_rates else 0.0 + ) + avg_content = ( + sum(content_rates) / len(content_rates) if content_rates else 0.0 + ) + avg_score = sum(s.get("average_score", 0.0) for s in students) / count + + overview = { + "classroom_id": classroom_id, + "student_count": count, + "average_assessment_completion_rate": round(avg_assessment, 3), + "average_content_completion_rate": round(avg_content, 3), + "average_score": round(avg_score, 2), + } + else: + avg_completion = sum( + s.get("completion_rate", 0.0) for s in students + ) / count + avg_score = sum(s.get("average_score", 0.0) for s in students) / count + + overview = { + "classroom_id": classroom_id, + "student_count": count, + "average_completion_rate": round(avg_completion, 2), + "average_score": round(avg_score, 2), + } return json.dumps(overview) async def _detailed_data(self, parameters: Dict[str, Any]) -> str: @@ -164,8 +235,15 @@ async def _detailed_data(self, parameters: Dict[str, Any]) -> str: if not student_quizzes: student_quizzes = data.get("studentQuizzes", []) or data.get("student_quizzes", []) + progress_summaries = ( + data.get("student_progress_summaries") + or data.get("studentProgressSummaries") + or [] + ) + detailed = { "classroom_id": classroom_id, + "student_progress_summaries": progress_summaries, "quizAttempts": quiz_attempts, "studentAssignments": student_assignments, "sectionProgress": section_progress, diff --git a/src/Services/AIService/app/features/content_generation/models.py b/src/Services/AIService/app/features/content_generation/models.py index a0d23b24..82a8ca6d 100644 --- a/src/Services/AIService/app/features/content_generation/models.py +++ b/src/Services/AIService/app/features/content_generation/models.py @@ -15,11 +15,13 @@ class LessonSectionRequest(BaseModel): lesson_id: Optional[str] = Field( default=None, - description="Identifier of the lesson to extend. None falls back to mock repository.", - ) + ) force_mock: bool = Field( default=False, - description="Force using mock repository regardless of lesson_id (useful for testing).", + ) + lang: Optional[str] = Field( + default="vi", + description="Language code for the response (e.g., 'vi' for Vietnamese, 'en' for English).", ) diff --git a/src/Services/AIService/app/features/content_generation/prompts.py b/src/Services/AIService/app/features/content_generation/prompts.py index 62a19aec..5beca5fa 100644 --- a/src/Services/AIService/app/features/content_generation/prompts.py +++ b/src/Services/AIService/app/features/content_generation/prompts.py @@ -59,7 +59,7 @@ def _get_default_section_prompt_template() -> str: """ -def build_section_prompt(context_text: str) -> str: +def build_section_prompt(context_text: str, lang: str = "vi") -> str: """ Return LLM prompt for generating a new lesson section. @@ -74,5 +74,16 @@ def build_section_prompt(context_text: str) -> str: template_str = None template = template_str or _get_default_section_prompt_template() + # Add language instruction to the prompt + lang_instruction = "" + if lang == "vi": + lang_instruction = "\n\nLANGUAGE REQUIREMENT:\nYou MUST respond in Vietnamese (Tiếng Việt). All text fields including title, description, and any other content MUST be in Vietnamese." + elif lang == "en": + lang_instruction = "\n\nLANGUAGE REQUIREMENT:\nYou MUST respond in English. All text fields including title, description, and any other content MUST be in English." + else: + lang_instruction = f"\n\nLANGUAGE REQUIREMENT:\nYou MUST respond in {lang}. All text fields including title, description, and any other content MUST be in {lang}." + + template_with_lang = template + lang_instruction + # Format template with context - return template.format(context_text=context_text) + return template_with_lang.format(context_text=context_text) diff --git a/src/Services/AIService/app/features/content_generation/service.py b/src/Services/AIService/app/features/content_generation/service.py index c7e0b9fd..8e50eaa4 100644 --- a/src/Services/AIService/app/features/content_generation/service.py +++ b/src/Services/AIService/app/features/content_generation/service.py @@ -49,7 +49,8 @@ async def generate_lesson_section( is_fallback = self.lesson_repository.was_fallback_used() context_text = build_lesson_context(lesson) - prompt = build_section_prompt(context_text) + lang = request.lang or "vi" + prompt = build_section_prompt(context_text, lang=lang) logger.info("Generating lesson section via LLM client", extra={"lesson_id": lesson.id}) diff --git a/src/Services/AIService/app/features/teacher/service.py b/src/Services/AIService/app/features/teacher/service.py index 277ed643..bbc03d8c 100644 --- a/src/Services/AIService/app/features/teacher/service.py +++ b/src/Services/AIService/app/features/teacher/service.py @@ -20,6 +20,7 @@ ClassroomSnapshotStore, ClassroomSnapshotUpdater, ) +from app.core.rag.ingestion_service import IngestionService from app.features.recommendations.models import ( InterventionResponse, StudentOverview, @@ -59,6 +60,7 @@ def __init__( classroom_snapshot_store: Optional[ClassroomSnapshotStore] = None, classroom_snapshot_updater: Optional[ClassroomSnapshotUpdater] = None, direct_grading_pipeline: Optional[DirectGradingPipeline] = None, + ingestion_service: Optional[IngestionService] = None, ): self.llm = llm self.context_builder = context_builder @@ -73,6 +75,7 @@ def __init__( self.classroom_snapshot_store = classroom_snapshot_store self.classroom_snapshot_updater = classroom_snapshot_updater self.direct_grading_pipeline = direct_grading_pipeline + self.ingestion_service = ingestion_service logger.info("TeacherService initialized") @@ -177,7 +180,7 @@ async def analyze_student( classroom_snapshot_updater=self.classroom_snapshot_updater, ) - # Build context bundle (for budgeting + future reuse) + # Build context bundle context_bundle = await self.context_builder.build( query=question, user_id=teacher_id, @@ -185,21 +188,34 @@ async def analyze_student( session_id=session_id, ) + logger.info( + "[TeacherService] Context bundle for teacher %s: %s", + teacher_id, + context_bundle, + ) + + if classroom_id and self.ingestion_service: + try: + await self.ingestion_service.ensure_ingested( + classroom_id=classroom_id, + max_wait_seconds=0, + ) + except Exception as e: + logger.warning( + f"[TeacherService] Failed to ensure ingestion for classroom {classroom_id}: {e}" + ) + classroom_data = None if classroom_id: try: if self.classroom_snapshot_updater and self.classroom_snapshot_store: snapshot = await self.classroom_snapshot_updater.get_or_refresh_snapshot( classroom_id=classroom_id, - student_id=None, # Get all students for mapping + student_id=None, analysis_period_days=analysis_period_days, - force_full_refresh=False, # Use cache if available + force_full_refresh=False, ) classroom_data = snapshot.data if snapshot else None - logger.debug( - "[TeacherService] Pre-loaded classroom data from snapshot for classroom %s", - classroom_id, - ) except Exception as e: logger.warning( "[TeacherService] Failed to pre-load classroom data: %s", e @@ -212,9 +228,10 @@ async def analyze_student( force_mock=force_mock, analysis_period_days=analysis_period_days, lang=lang, + context_bundle=context_bundle, + classroom_data=classroom_data, ) - # Add teacher_id to result for post-processing result["teacher_id"] = teacher_id if not classroom_data and classroom_id: @@ -223,10 +240,7 @@ async def analyze_student( snapshot = self.classroom_snapshot_store.get_snapshot(classroom_id) if snapshot: classroom_data = snapshot.data - logger.debug( - "[TeacherService] Retrieved classroom data from cache after agent run for classroom %s", - classroom_id, - ) + except Exception as e: logger.warning( "[TeacherService] Failed to get classroom data from cache: %s", e @@ -559,6 +573,8 @@ def _create_student_overview( student_data: Dict[str, Any], use_overview_text: bool = False ) -> StudentOverview: + from datetime import datetime, timezone + completion_rate = _safe_float(student_data.get("completion_rate", 0.0)) average_score = _safe_float(student_data.get("average_score", 0.0)) engagement_score = _safe_float( @@ -570,15 +586,44 @@ def _create_student_overview( completion_rate * 100.0 if completion_rate > 0.0 else average_score ) - # Simple status classification aligned with recommendations behavior - if progress_percent < 50.0 or engagement_score < 0.3: - current_status = "AtRisk" - elif progress_percent < 70.0: - current_status = "NeedsSupport" - elif progress_percent < 90.0: - current_status = "Good" + # Calculate days enrolled if joined_at is available + days_enrolled = None + joined_at_str = student_data.get("joined_at") + if joined_at_str: + try: + if isinstance(joined_at_str, str): + if joined_at_str.endswith("Z"): + joined_at = datetime.fromisoformat(joined_at_str.replace("Z", "+00:00")) + else: + joined_at = datetime.fromisoformat(joined_at_str) + if joined_at.tzinfo is None: + joined_at = joined_at.replace(tzinfo=timezone.utc) + days_enrolled = (datetime.now(timezone.utc) - joined_at).days + except Exception: + pass + + is_new_student = days_enrolled is not None and days_enrolled < 7 + + if is_new_student: + if (engagement_score < 0.1 and progress_percent < 5.0) or \ + (average_score > 0 and average_score < 30.0 and progress_percent < 10.0): + current_status = "AtRisk" + elif progress_percent < 30.0: + current_status = "NeedsSupport" + elif progress_percent < 60.0: + current_status = "Good" + else: + current_status = "Excellent" else: - current_status = "Excellent" + # For established students, use original thresholds + if progress_percent < 50.0 or engagement_score < 0.3: + current_status = "AtRisk" + elif progress_percent < 70.0: + current_status = "NeedsSupport" + elif progress_percent < 90.0: + current_status = "Good" + else: + current_status = "Excellent" status_text = ( f"Học sinh hiện đạt khoảng {progress_percent:.1f}% tiến độ, " @@ -623,11 +668,22 @@ def _create_student_overview( if student_info: student_name = student_info.get("student_name") or student_info.get("studentName") + joined_at = None + if classroom_data: + all_students = classroom_data.get("students", []) + student_info = next( + (s for s in all_students if str(s.get("student_id", "") or s.get("studentId", "")) == sid), + None + ) + if student_info: + joined_at = student_info.get("joined_at") or student_info.get("joinedAt") + student_data = { "completion_rate": student_overview.get("completion_rate", 0.0), "average_score": student_overview.get("average_score", 0.0), "engagement_score": student_overview.get("engagement_score"), "student_name": student_name, + "joined_at": joined_at, # Include enrollment date } students.append(_create_student_overview(sid, student_data, use_overview_text=True)) elif classroom_data: @@ -684,6 +740,7 @@ def _create_student_overview( "completion_rate": completion_rate, "average_score": average_score, "engagement_score": engagement_score, + "joined_at": student.get("joined_at") or student.get("joinedAt"), # Include enrollment date } students.append(_create_student_overview(sid, student_data, use_overview_text=False)) diff --git a/src/Services/AIService/app/features/teacher/student_analysis_agent.py b/src/Services/AIService/app/features/teacher/student_analysis_agent.py index 1fb00157..058e3be4 100644 --- a/src/Services/AIService/app/features/teacher/student_analysis_agent.py +++ b/src/Services/AIService/app/features/teacher/student_analysis_agent.py @@ -1,4 +1,5 @@ from typing import Dict, Any, Optional, List +from collections import defaultdict import logging import json @@ -117,6 +118,8 @@ async def analyze_student( force_mock: bool = False, analysis_period_days: Optional[int] = 7, lang: Optional[str] = "vi", + context_bundle: Optional[Any] = None, + classroom_data: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ High-level entrypoint for teacher to analyze a student or a classroom. @@ -137,10 +140,10 @@ async def analyze_student( if focus: question += f" Focus on: {focus}." - # Phase 1 – Planning (single LLM call) + # Phase 1 – Planning (LLM) plan: List[str] = await self._generate_plan(question) - # Phase 2 – Execute concrete tool pipeline (no extra LLM calls) + # Phase 2 – Execute concrete tool pipeline tool_results = await self._execute_analysis_tools( classroom_id=classroom_id, student_id=student_id, @@ -148,12 +151,14 @@ async def analyze_student( analysis_period_days=analysis_period_days, ) - # Phase 3 – Summarize for teacher (single LLM call) + # Phase 3 – Summarize for teacher (LLM) summary = await self._summarize_for_teacher( question=question, plan=plan, tool_results=tool_results, lang=lang, + context_bundle=context_bundle, + classroom_data=classroom_data, ) return { @@ -184,8 +189,17 @@ async def _execute_analysis_tools( # 1) Student & class snapshot-backed data student_data_tool = self.tool_registry.get_tool("student_data") + + logger.info( + "[StudentAnalysisAgent] Executing analysis tools | " + "classroom_id=%s, student_id=%s, force_mock=%s, analysis_period_days=%s", + classroom_id, + student_id, + force_mock, + analysis_period_days, + ) + if student_data_tool and classroom_id is not None: - # Student-level overview (if student_id is provided) if student_id: try: student_overview_raw = await student_data_tool.run( @@ -214,6 +228,10 @@ async def _execute_analysis_tools( } ) results["class_overview"] = json.loads(class_overview_raw) + logger.info( + "[StudentAnalysisAgent] Class overview: %s", + results["class_overview"], + ) except Exception as exc: logger.warning( "[StudentAnalysisAgent] class_overview failed: %s", exc @@ -229,83 +247,22 @@ async def _execute_analysis_tools( } ) results["detailed_classroom_data"] = json.loads(detailed_data_raw) + logger.info( + "[StudentAnalysisAgent] Detailed classroom data (summarized): %s", + results["detailed_classroom_data"], + ) except Exception as exc: - # If detailed_data action doesn't exist, try to get raw snapshot data - logger.debug( - "[StudentAnalysisAgent] detailed_data action not available, attempting fallback: %s", exc + logger.warning( + "[StudentAnalysisAgent] detailed_data failed: %s", exc ) - # Fallback: Try to get raw data directly - try: - if hasattr(student_data_tool, '_get_classroom_data'): - raw_data = await student_data_tool._get_classroom_data( - classroom_id=classroom_id, - student_id=None, - force_mock=force_mock, - analysis_period_days=analysis_period_days, - ) - # Extract key detailed fields (handle both camelCase and snake_case) - quiz_attempts = raw_data.get("quizAttempts", []) or raw_data.get("quiz_attempts", []) - assignments = raw_data.get("studentAssignments", []) or raw_data.get("student_assignments", []) - section_progress = raw_data.get("sectionProgress", []) or raw_data.get("section_progress", []) - engagement = raw_data.get("engagementMetrics", []) or raw_data.get("engagement_metrics", []) - student_quizzes = raw_data.get("studentQuizzes", []) or raw_data.get("student_quizzes", []) - - detailed_summary = { - "quizAttempts": quiz_attempts, - "studentAssignments": assignments, - "sectionProgress": section_progress, - "engagementMetrics": engagement, - "studentQuizzes": student_quizzes, - } - # Count and summarize for AI - if quiz_attempts: - quiz_scores = [ - q.get("totalScore") or q.get("total_score", 0) - for q in quiz_attempts - if q.get("totalScore") or q.get("total_score") - ] - detailed_summary["quiz_summary"] = { - "total_attempts": len(quiz_attempts), - "completed_attempts": len([q for q in quiz_attempts if q.get("status") in ["Passed", "Completed"]]), - "average_score": round(sum(quiz_scores) / len(quiz_scores), 2) if quiz_scores else 0, - "scores": quiz_scores, - } - if assignments: - assignment_scores = [ - a.get("finalScore") or a.get("final_score", 0) - for a in assignments - if a.get("finalScore") or a.get("final_score") - ] - detailed_summary["assignment_summary"] = { - "total_assignments": len(assignments), - "submitted_count": len([a for a in assignments if a.get("finalScore") or a.get("final_score")]), - "average_score": round(sum(assignment_scores) / len(assignment_scores), 2) if assignment_scores else 0, - "scores": assignment_scores, - } - if section_progress: - detailed_summary["section_summary"] = { - "total_sections": len(section_progress), - "completed_count": len([s for s in section_progress if s.get("status") == "Completed"]), - "in_progress_count": len([s for s in section_progress if s.get("status") == "InProgress"]), - } - if engagement: - completion_rates = [ - e.get("completionRate") or e.get("completion_rate", 0) - for e in engagement - if e.get("completionRate") or e.get("completion_rate") - ] - detailed_summary["engagement_summary"] = { - "students_with_data": len(engagement), - "average_completion_rate": round(sum(completion_rates) / len(completion_rates), 3) if completion_rates else 0, - } - results["detailed_classroom_data"] = detailed_summary - except Exception as fallback_exc: - logger.warning( - "[StudentAnalysisAgent] Failed to get detailed data (fallback): %s", fallback_exc - ) # 2) Performance analysis (Neo4j patterns per student) performance_tool = self.tool_registry.get_tool("performance_analysis") + logger.info( + "[StudentAnalysisAgent] Executing performance analysis tools | " + "student_id=%s", + student_id, + ) if performance_tool and student_id: try: performance_raw = await performance_tool.run( @@ -322,6 +279,11 @@ async def _execute_analysis_tools( # 3) Pattern recognition (class-level and/or student-level) pattern_tool = self.tool_registry.get_tool("pattern_recognition") + logger.info( + "[StudentAnalysisAgent] Executing pattern recognition tools | " + "student_id=%s", + student_id, + ) if pattern_tool: # Student struggles/excels if student_id: @@ -365,6 +327,14 @@ async def _execute_analysis_tools( "classroom_id": classroom_id, } ) + logger.info( + "[StudentAnalysisAgent] Pattern recognition (class) struggles: %s", + class_struggles_raw, + ) + logger.info( + "[StudentAnalysisAgent] Pattern recognition (class) excels: %s", + class_excels_raw, + ) results["pattern_class_struggles"] = json.loads( class_struggles_raw ) @@ -383,14 +353,205 @@ async def _summarize_for_teacher( plan: List[str], tool_results: Dict[str, Any], lang: Optional[str], + context_bundle: Optional[Any] = None, + classroom_data: Optional[Dict[str, Any]] = None, ) -> str: """ Final LLM summarization step that turns structured data into a teacher-friendly narrative. """ + from datetime import datetime, timezone + now = datetime.now(timezone.utc) + current_date_str = now.strftime("%Y-%m-%d") + current_datetime_str = now.strftime("%Y-%m-%d %H:%M:%S UTC") + + from datetime import datetime, timezone + # Get current date/time for AI to understand timeline context + now = datetime.now(timezone.utc) + current_date_str = now.strftime("%Y-%m-%d") + current_datetime_str = now.strftime("%Y-%m-%d %H:%M:%S UTC") + + enrollment_context = {} + if classroom_data: + + students = classroom_data.get("students", []) + for student in students: + student_id = student.get("student_id") + joined_at_str = student.get("joined_at") + if student_id and joined_at_str: + try: + if isinstance(joined_at_str, str): + if joined_at_str.endswith("Z"): + joined_at = datetime.fromisoformat(joined_at_str.replace("Z", "+00:00")) + else: + joined_at = datetime.fromisoformat(joined_at_str) + if joined_at.tzinfo is None: + joined_at = joined_at.replace(tzinfo=timezone.utc) + + days_enrolled = (now - joined_at).days + enrollment_context[student_id] = { + "days_enrolled": days_enrolled, + "joined_at": joined_at_str, + } + except Exception as e: + logger.warning( + f"[StudentAnalysisAgent] Failed to parse joined_at for student {student_id}: {e}" + ) + + # Extract RAG context from context_bundle + rag_context = "" + if context_bundle and hasattr(context_bundle, "items"): + rag_items = [] + for item in context_bundle.items[:5]: + rag_items.append(f"- {item.content[:200]}...") # Truncate long content + if rag_items: + rag_context = "\n\nRELEVANT CONTEXT FROM KNOWLEDGE BASE:\n" + "\n".join(rag_items) + + slimmed_results: Dict[str, Any] = {} + + student_id_to_name: Dict[str, str] = {} + if classroom_data: + students = classroom_data.get("students", []) + for student in students: + student_id = str(student.get("student_id", "") or student.get("studentId", "")) + student_name = student.get("student_name") or student.get("studentName") + if student_id and student_name: + student_id_to_name[student_id] = student_name + + if student_id_to_name: + slimmed_results["student_names"] = student_id_to_name + try: - # Build a compact JSON context for the LLM - context_json = json.dumps(tool_results, ensure_ascii=False) + # Keep student_overview and class_overview as-is (already compact) + if tool_results.get("student_overview"): + slimmed_results["student_overview"] = tool_results["student_overview"] + if tool_results.get("class_overview"): + slimmed_results["class_overview"] = tool_results["class_overview"] + + detailed = tool_results.get("detailed_classroom_data") or {} + if detailed: + progress_summaries = None + if "student_progress_summaries" in detailed: + progress_summaries = detailed["student_progress_summaries"] + elif "studentProgressSummaries" in detailed: + progress_summaries = detailed["studentProgressSummaries"] + + quiz_attempts = detailed.get("quizAttempts", []) or detailed.get("quiz_attempts", []) + wrong_answers = [] + for attempt in quiz_attempts[:10]: # Limit to recent 10 attempts + student_quiz_id = attempt.get("student_quiz_id") + total_score = attempt.get("total_score", 0) + status = attempt.get("status") + started_at = attempt.get("started_at") + question_attempts = attempt.get("question_attempts", []) + + # Collect wrong answers with question details + for qa in question_attempts: + if not qa.get("is_correct", True): + wrong_answers.append({ + "student_quiz_id": student_quiz_id, + "question_content": qa.get("question_content", ""), + "answer_content": qa.get("answer_content", ""), + "question_type": qa.get("question_type", ""), + "topics": qa.get("topics", []), + "quiz_score": total_score, + "quiz_status": status, + "started_at": started_at, + }) + + # Extract quiz titles from studentQuizzes + student_quizzes = detailed.get("studentQuizzes", []) or detailed.get("student_quizzes", []) + quiz_info = [] + for sq in student_quizzes[:20]: # Limit to 20 quizzes + if sq.get("final_score", 0) > 0 or sq.get("attempt_count", 0) > 0: + quiz_info.append({ + "quiz_id": sq.get("id"), + "quiz_title": sq.get("quiz_title", ""), + "final_score": sq.get("final_score", 0), + "attempt_count": sq.get("attempt_count", 0), + "student_id": sq.get("student_id"), + }) + + # Extract in-progress sections with names + section_progress = detailed.get("sectionProgress", []) or detailed.get("section_progress", []) + in_progress_sections = [] + completed_sections_recent = [] + for sp in section_progress: + status_val = sp.get("status", "") + if status_val == "InProgress": + in_progress_sections.append({ + "student_id": sp.get("student_id"), + "section_id": sp.get("section_id"), + "section_name": sp.get("section_name", ""), + "last_activity_at": sp.get("last_activity_at"), + }) + elif status_val == "Completed": + # Keep recent completed sections (last 10 per student) + completed_sections_recent.append({ + "student_id": sp.get("student_id"), + "section_id": sp.get("section_id"), + "section_name": sp.get("section_name", ""), + "last_activity_at": sp.get("last_activity_at"), + }) + + # Group completed sections by student and keep only recent ones + student_completed = defaultdict(list) + for cs in completed_sections_recent: + student_completed[cs["student_id"]].append(cs) + + recent_completed = [] + for student_id, sections in student_completed.items(): + # Sort by last_activity_at descending and take top 5 + sorted_sections = sorted( + sections, + key=lambda x: x.get("last_activity_at", ""), + reverse=True + )[:5] + recent_completed.extend(sorted_sections) + + student_assignments = detailed.get("studentAssignments", []) or detailed.get("student_assignments", []) + submitted_assignments = [] + for sa in student_assignments: + if sa.get("submission_count", 0) > 0: + submitted_assignments.append({ + "student_id": sa.get("student_id"), + "final_score": sa.get("final_score", 0), + "submitted_at": sa.get("submitted_at"), + "due_date": sa.get("due_date"), + "submission_count": sa.get("submission_count", 0), + }) + + slimmed_detailed: Dict[str, Any] = { + "classroom_id": detailed.get("classroom_id"), + "student_progress_summaries": progress_summaries, + "quiz_summary": detailed.get("quiz_summary"), + "assignment_summary": detailed.get("assignment_summary"), + "section_summary": detailed.get("section_summary"), + "engagement_summary": detailed.get("engagement_summary"), + "wrong_answers": wrong_answers[:20], + "quiz_info": quiz_info, + "in_progress_sections": in_progress_sections, + "recent_completed_sections": recent_completed[:30], + "submitted_assignments": submitted_assignments, + } + slimmed_results["detailed_classroom_data"] = slimmed_detailed + + # Performance and pattern tools usually return already-aggregated JSON. + for key in [ + "performance_patterns", + "pattern_student_struggles", + "pattern_student_excels", + "pattern_class_struggles", + "pattern_class_excels", + ]: + if key in tool_results: + slimmed_results[key] = tool_results[key] + + # Add enrollment context to slimmed_results + if enrollment_context: + slimmed_results["enrollment_context"] = enrollment_context + + context_json = json.dumps(slimmed_results, ensure_ascii=False) except Exception: context_json = "{}" @@ -400,39 +561,41 @@ async def _summarize_for_teacher( f"{self.get_system_prompt()}\n\n" f"Teacher question/context:\n{question}\n\n" f"Planned steps:\n- " + "\n- ".join(plan or []) + "\n\n" + f"CURRENT DATE: {current_date_str} ({current_datetime_str}). Use to calculate days since events and identify overdue assignments.\n\n" "You also have structured data from tools (JSON below). " "Use it as the primary source of truth for metrics and patterns:\n\n" f"{context_json}\n\n" - "IMPORTANT: Student names are available in the 'students' array within the classroom data. " - "ALWAYS use student names (e.g., 'Nhan Thanh', 'Man Trieu') instead of student IDs (UUIDs) when referring to students in your response.\n\n" + "IMPORTANT: Use 'student_names' mapping (student_id -> student_name) to convert all student IDs to names. " + "ALWAYS use student names (e.g., 'Nhan Thanh', 'Man Trieu') instead of student IDs (UUIDs) when referring to students.\n" + "When you see student_id in wrong_answers, quiz_info, in_progress_sections, submitted_assignments, etc., look up the name in 'student_names'.\n\n" "CRITICAL DATA INTERPRETATION RULES:\n" - "- If 'class_overview' shows average_completion_rate=0.0 or average_score=0.0, " - "ALWAYS check 'detailed_classroom_data' for actual quizAttempts, studentAssignments, " - "sectionProgress, and engagementMetrics.\n" - "- The aggregated metrics in 'class_overview' may be incomplete or incorrectly calculated. " - "The detailed data (quizAttempts, assignments, sectionProgress) is the TRUE source of truth.\n" - "- If detailed_classroom_data shows quizAttempts with scores > 0, assignments with finalScore > 0, " - "or sectionProgress with Completed status, then students HAVE been learning and submitting work.\n" - "- Do NOT conclude 'no data' or 'students haven't submitted' if detailed_classroom_data shows " - "actual quiz attempts, assignment submissions, or completed sections.\n" - "- When analyzing mistakes or weaknesses, examine ALL quiz attempts for each student across different quizzes. " - "If a student answered correctly in one quiz but incorrectly in another, mention this nuance. " - "Do NOT say 'all students made the same mistake' if even one student got it right in any quiz.\n" - "- When evaluating student status (AtRisk, Good, etc.), consider:\n" - " * Quiz scores and assignment scores (quality of work)\n" - " * Number of completed sections (progress)\n" - " * Engagement metrics (completionRate, activeDays)\n" - " * Do NOT rely solely on aggregated average_completion_rate if it contradicts detailed data.\n\n" + "- Use the summary fields (quiz_summary, assignment_summary, section_summary) for overall statistics.\n" + "- Use detailed fields (wrong_answers, quiz_info, in_progress_sections) for specific, actionable insights.\n" + "- assessment_completion_rate = quizzes/assignments; content_completion_rate = reading/sections.\n" + "- If content_completion_rate is high but assessment_completion_rate is low, describe as 'engaged with content but needing more assessments', not 'AtRisk'.\n" + "- Only consider a student truly 'AtRisk' when there is evidence such as:\n" + " * consistently low quiz/assignment scores despite sufficient assessment attempts, and/or\n" + " * long inactivity (high days_since_last_activity) combined with low content and assessment progress.\n" + "- If summaries show activity (scores > 0, completed sections), do NOT say 'no data' or 'students haven't submitted'.\n" + "- When analyzing mistakes or weaknesses, use 'wrong_answers' array to identify SPECIFIC questions students got wrong:\n" + " * Mention the actual question content and the wrong answer they selected.\n" + " * Group similar mistakes to identify learning gaps (e.g., 'students struggle with questions about servo motor').\n" + " * Use 'quiz_info' to identify which quizzes students have attempted and their scores.\n" + "- When analyzing progress over time:\n" + " * Use 'in_progress_sections', 'recent_completed_sections', 'submitted_assignments' with dates.\n" + " * Compare dates with today ({current_date_str}) to calculate days since events and identify overdue assignments.\n" + " * Use relative terms like '2 ngày trước' when mentioning dates.\n" + "- When evaluating student status, consider all metrics together, not just one percentage.\n" + "- For new students (<7 days enrolled), lower completion rates are normal - focus on engagement quality, not quantity.\n" + f"{rag_context}\n\n" + "WRITING STYLE:\n" + "- Use natural Vietnamese, avoid technical terms. Use 'bài kiểm tra', 'bài tập', 'phần học' instead of 'quiz', 'assignment', 'section'.\n" + "- Write as a helpful colleague, not a technical report. Use student names, not IDs.\n\n" "TASK:\n" - f"- Write a concise, well-structured analysis for the teacher in language='{lang_hint}'.\n" - "- Use student names (from classroom.students) instead of student IDs when referring to students.\n" - "- Do NOT include teacher IDs or technical IDs in the response - use natural language only.\n" - "- Include: current status, key strengths, key weaknesses, and 1–3 practical interventions.\n" - "- If a specific student is provided, focus on that student but reference class context when helpful.\n" - "- Base your analysis on detailed_classroom_data when available, not just aggregated metrics.\n" - "- When identifying patterns, be precise: if a student made a mistake in one quiz but got it right in another, " - "mention this as 'inconsistent understanding' rather than saying they always get it wrong.\n" - "- Do NOT repeat the raw JSON; summarize it into natural language.\n" + f"- Write analysis in {lang_hint}: status, strengths, weaknesses (with specific examples from wrong_answers), and 1-3 interventions.\n" + "- Use wrong_answers, quiz_info, in_progress_sections for specific insights. Analyze patterns over time using dates.\n" + "- Be precise: if a student got it wrong in one quiz but right in another, say 'hiểu chưa vững' not 'always wrong'.\n" + "- Do NOT repeat JSON; summarize naturally.\n" ) messages = [{"role": "user", "content": prompt}] @@ -451,8 +614,6 @@ async def _summarize_for_teacher( f"tool_results={tool_results_json}" ) - # Use higher max_tokens for summarization to avoid truncation - # Prompt is ~3800 chars, need enough tokens for full response response = await self.llm.generate( messages, use_remote=self.use_remote, diff --git a/src/Services/AIService/app/infrastructure/config/settings.py b/src/Services/AIService/app/infrastructure/config/settings.py index f4574a93..9ee3afba 100644 --- a/src/Services/AIService/app/infrastructure/config/settings.py +++ b/src/Services/AIService/app/infrastructure/config/settings.py @@ -239,7 +239,13 @@ class Settings(BaseSettings): CLASSROOM_SNAPSHOT_REFRESH_COOLDOWN_SECONDS: int = 60 # RabbitMQ / Event Bus - RABBITMQ_URL: str = os.getenv("RABBITMQ_URL", "amqp://guest:guest@localhost:5672/") + # Aspire exposes connection strings via ConnectionStrings__{resource_name} + # Fallback to RABBITMQ_URL or default localhost + RABBITMQ_URL: str = ( + os.getenv("ConnectionStrings__messaging") or + os.getenv("RABBITMQ_URL") or + "amqp://guest:guest@localhost:5672/" + ) RABBITMQ_QUEUE_CLASSROOM_PROGRESS: str = "classroom-student-progress-updated" ENABLE_EVENT_CONSUMER: bool = os.getenv("ENABLE_EVENT_CONSUMER", "true").lower() == "true" diff --git a/src/Services/AIService/app/infrastructure/data/grpc_classroom_repository.py b/src/Services/AIService/app/infrastructure/data/grpc_classroom_repository.py index 82aad082..0b41e14c 100644 --- a/src/Services/AIService/app/infrastructure/data/grpc_classroom_repository.py +++ b/src/Services/AIService/app/infrastructure/data/grpc_classroom_repository.py @@ -234,6 +234,12 @@ def _map_snapshot_to_classroom_data( enrollment_type = None if enrollment_proto.HasField("enrollment_type") and enrollment_proto.enrollment_type.value: enrollment_type = enrollment_proto.enrollment_type.value + try: + if enrollment_proto.HasField("course_id") and enrollment_proto.course_id.value: + enrollment_data["course_id"] = enrollment_proto.course_id.value + except ValueError: + if hasattr(enrollment_proto, "course_id") and enrollment_proto.course_id: + enrollment_data["course_id"] = enrollment_proto.course_id if enrollment_proto.HasField("curriculum_name") and enrollment_proto.curriculum_name.value: enrollment_data["curriculum_name"] = enrollment_proto.curriculum_name.value @@ -248,6 +254,12 @@ def _map_snapshot_to_classroom_data( "student_id": quiz_proto.student_id, "final_score": quiz_proto.final_score, } + try: + if quiz_proto.HasField("quiz_id") and quiz_proto.quiz_id.value: + quiz_data["quiz_id"] = quiz_proto.quiz_id.value + except ValueError: + if hasattr(quiz_proto, "quiz_id") and quiz_proto.quiz_id: + quiz_data["quiz_id"] = quiz_proto.quiz_id if quiz_proto.quiz_title: quiz_data["quiz_title"] = quiz_proto.quiz_title if quiz_proto.HasField("quiz_description") and quiz_proto.quiz_description.value: @@ -394,12 +406,104 @@ def _map_snapshot_to_classroom_data( classroom_data["topics_catalog"].append(topic_data) + classroom_data["student_progress_summaries"] = [] + + if hasattr(snapshot_proto, "student_progress_summaries") and snapshot_proto.student_progress_summaries: + for summary_proto in snapshot_proto.student_progress_summaries: + summary_data = { + "student_id": summary_proto.student_id, + "assessment_completion_rate": summary_proto.assessment_completion_rate, + "total_assessments": summary_proto.total_assessments, + "completed_assessments": summary_proto.completed_assessments, + "content_completion_rate": summary_proto.content_completion_rate, + "total_sections": summary_proto.total_sections, + "completed_sections": summary_proto.completed_sections, + } + classroom_data["student_progress_summaries"].append(summary_data) + else: + student_ids = set() + + for student_data in classroom_data["students"]: + student_ids.add(student_data["student_id"]) + + for quiz_data in classroom_data["quizzes"]["student_quizzes"]: + if quiz_data.get("student_id"): + student_ids.add(quiz_data["student_id"]) + + for assignment_data in classroom_data["assignments"]["student_assignments"]: + if assignment_data.get("student_id"): + student_ids.add(assignment_data["student_id"]) + + for progress_data in classroom_data["progress"]["section_progress"]: + if progress_data.get("student_id"): + student_ids.add(progress_data["student_id"]) + + for engagement_data in classroom_data["time_metrics"]["engagement_metrics"]: + if engagement_data.get("student_id"): + student_ids.add(engagement_data["student_id"]) + + for student_id in student_ids: + student_quizzes_map = { + quiz_data["id"]: quiz_data + for quiz_data in classroom_data["quizzes"]["student_quizzes"] + if quiz_data["student_id"] == student_id + } + total_quizzes = set(student_quizzes_map.keys()) + + completed_quizzes = set() + for attempt in classroom_data["quizzes"]["quiz_attempts"]: + student_quiz_id = attempt.get("student_quiz_id") + if student_quiz_id in student_quizzes_map: + if attempt.get("status") == "Passed" or attempt.get("completed_at"): + completed_quizzes.add(student_quiz_id) + + completed_assignments = 0 + total_assignments = 0 + for assignment_data in classroom_data["assignments"]["student_assignments"]: + if assignment_data["student_id"] == student_id: + total_assignments += 1 + if assignment_data.get("submission_count", 0) > 0: + completed_assignments += 1 + + total_assessments = len(total_quizzes) + total_assignments + completed_assessments = len(completed_quizzes) + completed_assignments + assessment_completion_rate = ( + completed_assessments / total_assessments + if total_assessments > 0 else 0.0 + ) + + # Count sections progress + total_sections = 0 + completed_sections = 0 + for progress_data in classroom_data["progress"]["section_progress"]: + if progress_data["student_id"] == student_id: + total_sections += 1 + if progress_data.get("status") == "Completed": + completed_sections += 1 + + content_completion_rate = ( + completed_sections / total_sections + if total_sections > 0 else 0.0 + ) + + summary_data = { + "student_id": student_id, + "assessment_completion_rate": assessment_completion_rate, + "total_assessments": total_assessments, + "completed_assessments": completed_assessments, + "content_completion_rate": content_completion_rate, + "total_sections": total_sections, + "completed_sections": completed_sections, + } + classroom_data["student_progress_summaries"].append(summary_data) + logger.info( "Mapped learning snapshot to classroom data format", extra={ "students_count": len(classroom_data["students"]), "quizzes_count": len(classroom_data["quizzes"]["student_quizzes"]), "assignments_count": len(classroom_data["assignments"]["student_assignments"]), + "progress_summaries_count": len(classroom_data["student_progress_summaries"]), } ) diff --git a/src/Services/AIService/app/infrastructure/events/rabbitmq_consumer.py b/src/Services/AIService/app/infrastructure/events/rabbitmq_consumer.py index 4e1870e9..9f067c3b 100644 --- a/src/Services/AIService/app/infrastructure/events/rabbitmq_consumer.py +++ b/src/Services/AIService/app/infrastructure/events/rabbitmq_consumer.py @@ -33,12 +33,11 @@ async def connect(self) -> None: extra={"url": settings.RABBITMQ_URL.split("@")[-1] if "@" in settings.RABBITMQ_URL else "***"}, ) except Exception as e: - logger.error( - "[ClassroomProgressEventConsumer] Failed to connect to RabbitMQ: %s. Event consumer will be disabled.", + logger.warning( + "[ClassroomProgressEventConsumer] Failed to connect to RabbitMQ: %s. Event consumer will be disabled. " + "This is OK if RabbitMQ is not available - the service will continue without event processing.", e, - exc_info=True, ) - # Don't raise - let the caller handle gracefully self._connection = None raise diff --git a/src/Services/AIService/scripts/test_trigger_progress_event.py b/src/Services/AIService/scripts/test_trigger_progress_event.py new file mode 100644 index 00000000..6b2acbbf --- /dev/null +++ b/src/Services/AIService/scripts/test_trigger_progress_event.py @@ -0,0 +1,179 @@ +""" +Test script to trigger ClassroomStudentProgressUpdatedEvent +This script publishes a test event to RabbitMQ to test the ingestion pipeline. +""" + +import asyncio +import json +import sys +from pathlib import Path + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +import aio_pika +from app.infrastructure.config.settings import settings + + +async def publish_test_event( + classroom_id: int, + student_id: str, + course_enrollment_id: int = 1, + course_id: int = 1, + progress_percentage: int = 50, + status: str = "InProgress", +): + """ + Publish a test ClassroomStudentProgressUpdatedEvent to RabbitMQ + + Args: + classroom_id: Classroom ID + student_id: Student ID (UUID string) + course_enrollment_id: Course enrollment ID + course_id: Course ID + progress_percentage: Progress percentage (0-100) + status: Status string (e.g., "InProgress", "Completed") + """ + exchange_name = "EventBus.Messages:ClassroomStudentProgressUpdatedEvent" + routing_key = "EventBus.Messages:ClassroomStudentProgressUpdatedEvent" + + # Create event payload matching C# event structure + event_data = { + "StudentId": student_id, + "ClassroomId": classroom_id, + "CourseEnrollmentId": course_enrollment_id, + "CourseId": course_id, + "ProgressPercentage": progress_percentage, + "Status": status, + } + + print(f"Connecting to RabbitMQ at {settings.RABBITMQ_URL.split('@')[-1] if '@' in settings.RABBITMQ_URL else '***'}") + + try: + # Connect to RabbitMQ + connection = await aio_pika.connect_robust(settings.RABBITMQ_URL) + print("✓ Connected to RabbitMQ") + + async with connection: + channel = await connection.channel() + + # Declare exchange + try: + exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.TOPIC, + durable=True, + ) + except Exception: + exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.FANOUT, + durable=True, + ) + + print(f"✓ Exchange '{exchange_name}' declared") + + # Publish message + message_body = json.dumps(event_data).encode("utf-8") + message = aio_pika.Message( + body=message_body, + content_type="application/json", + delivery_mode=aio_pika.DeliveryMode.PERSISTENT, + ) + + await exchange.publish( + message, + routing_key=routing_key, + ) + + print(f"✓ Event published successfully!") + print(f"\nEvent details:") + print(f" Classroom ID: {classroom_id}") + print(f" Student ID: {student_id}") + print(f" Course Enrollment ID: {course_enrollment_id}") + print(f" Course ID: {course_id}") + print(f" Progress: {progress_percentage}%") + print(f" Status: {status}") + print(f"\nThe event should be consumed by ClassroomProgressEventConsumer") + print(f"and trigger RAG ingestion (with debouncing) for classroom {classroom_id}") + + except Exception as e: + print(f"✗ Error publishing event: {e}") + import traceback + traceback.print_exc() + return False + + return True + + +async def main(): + """Main function with example usage""" + import argparse + + parser = argparse.ArgumentParser( + description="Trigger ClassroomStudentProgressUpdatedEvent for testing" + ) + parser.add_argument( + "--classroom-id", + type=int, + required=True, + help="Classroom ID" + ) + parser.add_argument( + "--student-id", + type=str, + required=True, + help="Student ID (UUID string)" + ) + parser.add_argument( + "--course-enrollment-id", + type=int, + default=1, + help="Course enrollment ID (default: 1)" + ) + parser.add_argument( + "--course-id", + type=int, + default=1, + help="Course ID (default: 1)" + ) + parser.add_argument( + "--progress", + type=int, + default=50, + help="Progress percentage 0-100 (default: 50)" + ) + parser.add_argument( + "--status", + type=str, + default="InProgress", + help="Status (default: InProgress)" + ) + + args = parser.parse_args() + + success = await publish_test_event( + classroom_id=args.classroom_id, + student_id=args.student_id, + course_enrollment_id=args.course_enrollment_id, + course_id=args.course_id, + progress_percentage=args.progress, + status=args.status, + ) + + if success: + print("\n✓ Test event published successfully!") + print("\nTo verify:") + print("1. Check logs for '[ClassroomProgressEventConsumer] Received event'") + print("2. Check logs for '[IngestionService] Scheduled ingestion' (after debounce)") + print("3. Check logs for '[IngestionService] Starting ingestion' (after 5 minutes)") + else: + print("\n✗ Failed to publish test event") + sys.exit(1) + + +if __name__ == "__main__": + asyncio.run(main()) + + + diff --git a/src/Services/ClassroomService/Classroom.Application/Features/Classrooms/Queries/GetClassroomLearningSnapshot/GetClassroomLearningSnapshotQueryHandler.cs b/src/Services/ClassroomService/Classroom.Application/Features/Classrooms/Queries/GetClassroomLearningSnapshot/GetClassroomLearningSnapshotQueryHandler.cs index 9ce45ada..cf4ae578 100644 --- a/src/Services/ClassroomService/Classroom.Application/Features/Classrooms/Queries/GetClassroomLearningSnapshot/GetClassroomLearningSnapshotQueryHandler.cs +++ b/src/Services/ClassroomService/Classroom.Application/Features/Classrooms/Queries/GetClassroomLearningSnapshot/GetClassroomLearningSnapshotQueryHandler.cs @@ -401,19 +401,27 @@ public async Task Handle( response.StudentAssignments.Add(assignmentData); } - // Map engagement metrics (simplified - calculate from activity) + // Map engagement metrics & per-student progress summaries foreach (var student in students) { var studentGuid = Guid.Parse(student.StudentId); - var studentQuizCount = filteredStudentQuizzes.Count(sq => sq.StudentId == studentGuid.ToString()); - var studentAssignmentCount = filteredStudentAssignments.Count(sa => sa.StudentId == studentGuid.ToString()); - // Calculate completion rate (simplified) - var totalActivities = studentQuizCount + studentAssignmentCount; - var completedActivities = filteredStudentQuizzes.Count(sq => sq.StudentId == studentGuid.ToString() && sq.Status.ToString() == "Passed") - + filteredStudentAssignments.Count(sa => sa.StudentId == studentGuid.ToString() && sa.Status.ToString() == "Passed"); - - var completionRate = totalActivities > 0 ? (double)completedActivities / totalActivities : 0; + // --- Assessment-based metrics (quizzes + assignments) --- + var studentQuizCount = filteredStudentQuizzes + .Count(sq => sq.StudentId == studentGuid.ToString()); + var completedStudentQuizzes = filteredStudentQuizzes + .Count(sq => sq.StudentId == studentGuid.ToString() && sq.Status.ToString() == "Passed"); + + var studentAssignmentCount = filteredStudentAssignments + .Count(sa => sa.StudentId == studentGuid.ToString()); + var completedStudentAssignments = filteredStudentAssignments + .Count(sa => sa.StudentId == studentGuid.ToString() && sa.Status.ToString() == "Passed"); + + var totalAssessments = studentQuizCount + studentAssignmentCount; + var completedAssessments = completedStudentQuizzes + completedStudentAssignments; + var assessmentCompletionRate = totalAssessments > 0 + ? (double)completedAssessments / totalAssessments + : 0d; // Calculate days since last activity var lastQuizActivity = filteredStudentQuizzes @@ -467,16 +475,48 @@ public async Task Handle( ? attemptDurations.Average() : 0d; + // Content / section progress (reading + activities) + var studentSectionProgress = courseEnrollments + .Where(ce => ce.StudentId == studentGuid) + .SelectMany(ce => ce.LessonProgress) + .SelectMany(lp => lp.SectionProgress) + .ToList(); + + var totalSections = studentSectionProgress.Count; + var completedSections = studentSectionProgress + .Count(sp => sp.Status.ToString() == "Completed"); + + var contentCompletionRate = totalSections > 0 + ? (double)completedSections / totalSections + : 0d; + var engagementMetric = new GrpcEngagementMetricData { StudentId = student.StudentId, - CompletionRate = completionRate, + // Keep engagement completion_rate aligned with assessment-based view, + // as UI and downstream consumers already rely on this meaning. + CompletionRate = assessmentCompletionRate, DaysSinceLastActivity = daysSinceLastActivity, ActiveDaysLast7Days = activeDays, AvgSessionDurationMinutes = avgSessionDuration }; response.EngagementMetrics.Add(engagementMetric); + + // Compact per-student progress summary to avoid scanning + // deeply nested event-level records on the AI side. + var progressSummary = new GrpcStudentProgressSummary + { + StudentId = student.StudentId, + AssessmentCompletionRate = assessmentCompletionRate, + TotalAssessments = totalAssessments, + CompletedAssessments = completedAssessments, + ContentCompletionRate = contentCompletionRate, + TotalSections = totalSections, + CompletedSections = completedSections + }; + + response.StudentProgressSummaries.Add(progressSummary); } // Map section progress diff --git a/src/Services/STEMify-Backend/STEMify-Backend.AppHost/Extensions/ServiceExtensions.cs b/src/Services/STEMify-Backend/STEMify-Backend.AppHost/Extensions/ServiceExtensions.cs index 1c8bed50..2e349e76 100644 --- a/src/Services/STEMify-Backend/STEMify-Backend.AppHost/Extensions/ServiceExtensions.cs +++ b/src/Services/STEMify-Backend/STEMify-Backend.AppHost/Extensions/ServiceExtensions.cs @@ -319,7 +319,9 @@ public static IDistributedApplicationBuilder AddApplicationServices(this IDistri .WithExternalHttpEndpoints() .WithHttpHealthCheck("/health") .WithReference(aiMemoryDb) + .WithReference(rabbitmqReference) .WithEnvironment("AI_MEMORY_DB_CONNECTION", aiMemoryDb.Resource.ConnectionStringExpression) + .WithEnvironment("RABBITMQ_URL", rabbitmqReference.Resource.ConnectionStringExpression) .WithEnvironment("RESOURCE_GRPC_ENDPOINT", resourceGrpcEndpoint) .WithEnvironment("RESOURCE_GRPC_USE_TLS", "false") .WithEnvironment("PYTHONUNBUFFERED", "1") @@ -334,7 +336,8 @@ public static IDistributedApplicationBuilder AddApplicationServices(this IDistri ?? Environment.GetEnvironmentVariable("DEEPSEEK_API_KEY") ?? "") .WithReference(resourceApiService) - .WaitFor(resourceApiService); + .WaitFor(resourceApiService) + .WaitFor(rabbitmqReference); ConfigureObservability(aiService); var apiGateway = builder