Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/BuildingBlocks/Shared/Protos/Classroom/classrooms.proto
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,18 @@ message GetClassroomLearningSnapshotRequest {
google.protobuf.Int32Value days_back = 3;
}

message GrpcStudentProgressSummary {
string student_id = 1;

double assessment_completion_rate = 2;
int32 total_assessments = 3;
int32 completed_assessments = 4;

double content_completion_rate = 5;
int32 total_sections = 6;
int32 completed_sections = 7;
}

message GrpcClassroomLearningSnapshotResponse {
GrpcClassroomBasicInfo classroom = 1;
repeated GrpcStudentLearningData students = 2;
Expand All @@ -265,6 +277,8 @@ message GrpcClassroomLearningSnapshotResponse {
repeated GrpcSectionProgressData section_progress = 7;
repeated GrpcTopicCatalogItem topics_catalog = 8;
GrpcAnalysisPeriod analysis_period = 9;

repeated GrpcStudentProgressSummary student_progress_summaries = 10;
}

message GrpcClassroomBasicInfo {
Expand Down
29 changes: 29 additions & 0 deletions src/Services/AIService/app/api/http/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from app.core.llm.client import LLMClient
from app.core.rag.ingestion_pipeline import IngestionPipeline
from app.core.rag.ingestion_service import IngestionService
from app.core.rag.document_processor import DocumentProcessor
from app.core.embedding.pipeline import EmbeddingPipeline, get_embedding_pipeline
from app.core.graph.builder import GraphBuilder
Expand Down Expand Up @@ -285,11 +286,38 @@ def get_classroom_snapshot_updater() -> ClassroomSnapshotUpdater:
)


@lru_cache(maxsize=1)
def get_ingestion_service() -> Optional[IngestionService]:
"""
Provide ingestion service for RAG indexing with debouncing.
Returns None if RAG features are disabled or unavailable.
"""
try:
ingestion_pipeline = get_ingestion_pipeline()
if not ingestion_pipeline:
return None

classroom_repository = get_classroom_repository()

return IngestionService(
ingestion_pipeline=ingestion_pipeline,
classroom_repository=classroom_repository,
debounce_seconds=getattr(settings, 'RAG_INGESTION_DEBOUNCE_SECONDS', 300),
ingestion_ttl_hours=getattr(settings, 'RAG_INGESTION_TTL_HOURS', 24),
)
except Exception as e:
logger = logging.getLogger(__name__)
logger.warning(f"Failed to initialize ingestion service: {e}. Continuing without RAG ingestion.")
return None


@lru_cache(maxsize=1)
def get_classroom_snapshot_event_handler() -> ClassroomSnapshotEventHandler:
ingestion_service = get_ingestion_service()
return ClassroomSnapshotEventHandler(
snapshot_store=get_classroom_snapshot_store(),
snapshot_updater=get_classroom_snapshot_updater(),
ingestion_service=ingestion_service,
)


Expand Down Expand Up @@ -383,6 +411,7 @@ def get_teacher_service() -> TeacherService:
classroom_snapshot_store=get_classroom_snapshot_store(),
classroom_snapshot_updater=get_classroom_snapshot_updater(),
direct_grading_pipeline=direct_grading_pipeline,
ingestion_service=get_ingestion_service(),
)


Expand Down
143 changes: 143 additions & 0 deletions src/Services/AIService/app/api/http/routers/teacher.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ class BuildGraphRequest(BaseModel):
force_rebuild: bool = False


class TriggerProgressEventRequest(BaseModel):
"""Request model for triggering test progress event"""
classroom_id: int
student_id: str
course_enrollment_id: int = 1
course_id: int = 1
progress_percentage: int = 50
status: str = "InProgress"


@router.post("/student-analysis", response_model=InterventionResponse)
async def student_analysis(
request: StudentAnalysisRequest,
Expand Down Expand Up @@ -132,3 +142,136 @@ async def build_graph(
raise HTTPException(status_code=500, detail=str(e))


@router.post("/test/trigger-progress-event")
async def trigger_progress_event(
request: TriggerProgressEventRequest,
direct: bool = False, # Query parameter: ?direct=true
):
"""
Test endpoint to trigger ClassroomStudentProgressUpdatedEvent.

This can work in two modes:
1. RabbitMQ mode (default): Publishes event to RabbitMQ
2. Direct mode (direct=true): Directly calls event handler (useful when RabbitMQ unavailable)

The event will trigger:
1. Snapshot refresh
2. RAG ingestion (with debouncing - waits 5 minutes before ingesting)

Use this to test the event-driven ingestion flow.
"""
from app.core.snapshot.events import ClassroomEvent
from app.api.http.dependencies import get_classroom_snapshot_event_handler

# Create event payload matching C# event structure
event_data = {
"StudentId": request.student_id,
"ClassroomId": request.classroom_id,
"CourseEnrollmentId": request.course_enrollment_id,
"CourseId": request.course_id,
"ProgressPercentage": request.progress_percentage,
"Status": request.status,
}

if direct:
# Direct mode: Call event handler directly (bypass RabbitMQ)
try:
event_handler = get_classroom_snapshot_event_handler()
classroom_event = ClassroomEvent(
type="STUDENT_PROGRESS_UPDATED",
classroom_id=request.classroom_id,
student_id=request.student_id,
payload={
"course_enrollment_id": request.course_enrollment_id,
"course_id": request.course_id,
"progress_percentage": request.progress_percentage,
"status": request.status,
},
)
await event_handler.handle_event(classroom_event)

logger.info(
f"[TeacherRouter] Test progress event handled directly for classroom {request.classroom_id}, "
f"student {request.student_id}"
)

return {
"success": True,
"message": "Event handled directly (bypassed RabbitMQ)",
"event_data": event_data,
"note": "Event was processed directly. RAG ingestion will be scheduled with 5-minute debounce.",
}
except Exception as e:
logger.error("[TeacherRouter] Error handling event directly: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to handle event: {str(e)}")
else:
# RabbitMQ mode: Publish to RabbitMQ
try:
import aio_pika
import json
from app.infrastructure.config.settings import settings

exchange_name = "EventBus.Messages:ClassroomStudentProgressUpdatedEvent"
routing_key = "EventBus.Messages:ClassroomStudentProgressUpdatedEvent"

# Connect and publish
connection = await aio_pika.connect_robust(settings.RABBITMQ_URL)

try:
async with connection:
channel = await connection.channel()

# Declare exchange
try:
exchange = await channel.declare_exchange(
exchange_name,
aio_pika.ExchangeType.TOPIC,
durable=True,
)
except Exception:
exchange = await channel.declare_exchange(
exchange_name,
aio_pika.ExchangeType.FANOUT,
durable=True,
)

# Publish message
message_body = json.dumps(event_data).encode("utf-8")
message = aio_pika.Message(
body=message_body,
content_type="application/json",
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
)

await exchange.publish(
message,
routing_key=routing_key,
)

logger.info(
f"[TeacherRouter] Test progress event published to RabbitMQ for classroom {request.classroom_id}, "
f"student {request.student_id}"
)

return {
"success": True,
"message": "Event published to RabbitMQ successfully",
"event_data": event_data,
"note": "Event will be consumed by ClassroomProgressEventConsumer. "
"RAG ingestion will be scheduled with 5-minute debounce.",
}
finally:
await connection.close()

except Exception as e:
logger.warning(
f"[TeacherRouter] Failed to publish to RabbitMQ: {e}. "
f"Hint: Use ?direct=true to test without RabbitMQ"
)
raise HTTPException(
status_code=503,
detail=f"Failed to publish event to RabbitMQ: {str(e)}. "
f"Use ?direct=true query parameter to test without RabbitMQ."
)


20 changes: 2 additions & 18 deletions src/Services/AIService/app/core/agent/plan_solve_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,9 @@ async def _generate_plan(self, question: str) -> List[str]:
"""Generate action plan"""
prompt = self.PLANNER_PROMPT.format(question=question)
messages: List[LLMMessage] = [{"role": "user", "content": prompt}]

# Log request data before calling LLM (full prompt, no truncate)
logger.info(
f"[Plan-Solve] Calling LLM to generate plan | "
f"question={question}, prompt_length={len(prompt)}, use_remote={self.use_remote} | "
f"full_prompt={prompt}"
)

response = await self.llm.generate(messages, use_remote=self.use_remote)
response_text = response.content if hasattr(response, 'content') else str(response)

# Parse Python list
try:
if "```python" in response_text:
Expand All @@ -107,6 +99,7 @@ async def _generate_plan(self, question: str) -> List[str]:
plan_str = response_text.strip()

plan = ast.literal_eval(plan_str)

return plan if isinstance(plan, list) else []
except Exception as e:
logger.warning(f"[Plan-Solve] Failed to parse plan: {e}")
Expand All @@ -128,15 +121,6 @@ async def _execute_step(
current_step=current_step
)
messages: List[LLMMessage] = [{"role": "user", "content": prompt}]

# Log request data before calling LLM (full prompt, no truncate)
logger.info(
f"[Plan-Solve] Calling LLM to execute step | "
f"current_step={current_step}, question={question}, "
f"prompt_length={len(prompt)}, use_remote={self.use_remote} | "
f"full_prompt={prompt}"
)

response = await self.llm.generate(messages, use_remote=self.use_remote)
return response.content if hasattr(response, 'content') else str(response)

Expand Down
14 changes: 14 additions & 0 deletions src/Services/AIService/app/core/context/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,25 @@ async def build(
return bundle

candidates = await self.gatherer.gather(query=query, user_id=user_id, top_k=top_k)
logger.info(f"[JITContextBuilder] Gathered {len(candidates)} candidates")

selected = self.selector.select(candidates)
logger.info(f"[JITContextBuilder] Selected {len(selected)} items after selection")

structured = self.structurer.structure(selected)
logger.info(
f"[JITContextBuilder] Structured: memory={len(structured.get('memory', []))}, "
f"retrieval={len(structured.get('retrieval', []))}, other={len(structured.get('other', []))}"
)

compressed = self.compressor.compress(structured)
logger.info(
f"[JITContextBuilder] Compressed: memory={len(compressed.get('memory', []))}, "
f"retrieval={len(compressed.get('retrieval', []))}, other={len(compressed.get('other', []))}"
)

total_tokens = self._estimate_tokens(compressed)
logger.info(f"[JITContextBuilder] Estimated {total_tokens} tokens")

notes = None
if total_tokens > self.token_budget:
Expand Down
Loading