From 92b025502860988b3ac21d2bab773534ed89b5b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Mon, 2 Mar 2026 21:49:57 +0530 Subject: [PATCH] Add aggregated agent huddle feed endpoint --- backend/api/agents_api.py | 33 ++++ backend/services/agent_activity_service.py | 208 +++++++++++++++++++++ 2 files changed, 241 insertions(+) diff --git a/backend/api/agents_api.py b/backend/api/agents_api.py index d4ff38d9..541fd361 100644 --- a/backend/api/agents_api.py +++ b/backend/api/agents_api.py @@ -462,6 +462,39 @@ async def get_agent_alerts_endpoint( raise HTTPException(status_code=500, detail=str(e)) +@router.get("/huddle/feed") +async def get_agent_huddle_feed_endpoint( + since: Optional[str] = None, + cursor: Optional[str] = None, + runs_limit: int = 20, + events_limit: int = 50, + alerts_limit: int = 20, + approvals_limit: int = 20, + current_user: dict = Depends(get_current_user), + db: Session = Depends(get_db), +) -> Dict[str, Any]: + try: + user_id = str(current_user.get("id")) + service = AgentActivityService(db, user_id) + feed = service.get_huddle_feed( + since=since, + cursor=cursor, + runs_limit=runs_limit, + events_limit=events_limit, + alerts_limit=alerts_limit, + approvals_limit=approvals_limit, + ) + return { + "success": True, + "data": feed, + "timestamp": datetime.utcnow().isoformat(), + "user_id": user_id, + } + except Exception as e: + logger.error(f"Error getting huddle feed for user {current_user.get('id')}: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + @router.post("/alerts/{alert_id}/mark-read") async def mark_agent_alert_read_endpoint( alert_id: int, diff --git a/backend/services/agent_activity_service.py b/backend/services/agent_activity_service.py index 70cbdcb0..4f67c9e1 100644 --- a/backend/services/agent_activity_service.py +++ b/backend/services/agent_activity_service.py @@ -3,6 +3,7 @@ from __future__ import annotations from datetime import datetime from typing import Any, Dict, List, Optional +from sqlalchemy import func from sqlalchemy.orm import Session from models.agent_activity_models import AgentAlert, AgentApprovalRequest, AgentEvent, AgentRun @@ -193,3 +194,210 @@ class AgentActivityService: self.db.commit() self.db.refresh(req) return req + + def get_huddle_feed( + self, + since: Optional[str] = None, + cursor: Optional[str] = None, + runs_limit: int = 20, + events_limit: int = 50, + alerts_limit: int = 20, + approvals_limit: int = 20, + ) -> Dict[str, Any]: + now = datetime.utcnow() + since_dt = self._parse_datetime(since) + cursor_dt = self._parse_datetime(cursor) + + statuses = self._get_active_statuses() + runs = self._list_runs_for_feed(limit=runs_limit, since_dt=since_dt, cursor_dt=cursor_dt) + events = self._list_events_for_feed(limit=events_limit, since_dt=since_dt, cursor_dt=cursor_dt) + alerts = self._list_alerts_for_feed(limit=alerts_limit, since_dt=since_dt, cursor_dt=cursor_dt) + approvals = self._list_approvals_for_feed(limit=approvals_limit, since_dt=since_dt, cursor_dt=cursor_dt) + + cursors = { + "runs": self._next_cursor(runs, "started_at"), + "events": self._next_cursor(events, "created_at"), + "alerts": self._next_cursor(alerts, "created_at"), + "approvals": self._next_cursor(approvals, "created_at"), + "feed": now.isoformat(), + } + + return { + "statuses": statuses, + "runs": [self._serialize_run(run) for run in runs], + "events": [self._serialize_event(evt) for evt in events], + "alerts": [self._serialize_alert(alert) for alert in alerts], + "approvals": [self._serialize_approval(req) for req in approvals], + "unread_alerts": self._count_unread_alerts(), + "pending_approvals": self._count_pending_approvals(), + "cursors": cursors, + "server_timestamp": now.isoformat(), + } + + @staticmethod + def _parse_datetime(value: Optional[str]) -> Optional[datetime]: + if not value: + return None + text = str(value).strip() + if not text: + return None + if text.endswith("Z"): + text = text.replace("Z", "+00:00") + try: + parsed = datetime.fromisoformat(text) + if parsed.tzinfo is not None: + return parsed.replace(tzinfo=None) + return parsed + except ValueError: + return None + + def _get_active_statuses(self) -> List[Dict[str, Any]]: + subquery = ( + self.db.query( + AgentRun.agent_type.label("agent_type"), + func.max(AgentRun.started_at).label("max_started_at"), + ) + .filter(AgentRun.user_id == self.user_id) + .group_by(AgentRun.agent_type) + .subquery() + ) + + rows = ( + self.db.query(AgentRun) + .join( + subquery, + (AgentRun.agent_type == subquery.c.agent_type) + & (AgentRun.started_at == subquery.c.max_started_at), + ) + .filter(AgentRun.user_id == self.user_id) + .all() + ) + return [ + { + "agent_type": row.agent_type, + "status": row.status, + "success": row.success, + "run_id": row.id, + "updated_at": (row.finished_at or row.started_at).isoformat() if (row.finished_at or row.started_at) else None, + } + for row in rows + ] + + def _list_runs_for_feed(self, limit: int, since_dt: Optional[datetime], cursor_dt: Optional[datetime]) -> List[AgentRun]: + q = self.db.query(AgentRun).filter(AgentRun.user_id == self.user_id) + if since_dt: + q = q.filter(AgentRun.started_at >= since_dt) + if cursor_dt: + q = q.filter(AgentRun.started_at < cursor_dt) + return q.order_by(AgentRun.started_at.desc()).limit(limit).all() + + def _list_events_for_feed(self, limit: int, since_dt: Optional[datetime], cursor_dt: Optional[datetime]) -> List[AgentEvent]: + q = self.db.query(AgentEvent).filter(AgentEvent.user_id == self.user_id) + if since_dt: + q = q.filter(AgentEvent.created_at >= since_dt) + if cursor_dt: + q = q.filter(AgentEvent.created_at < cursor_dt) + return q.order_by(AgentEvent.created_at.desc()).limit(limit).all() + + def _list_alerts_for_feed(self, limit: int, since_dt: Optional[datetime], cursor_dt: Optional[datetime]) -> List[AgentAlert]: + q = self.db.query(AgentAlert).filter(AgentAlert.user_id == self.user_id, AgentAlert.read_at.is_(None)) + if since_dt: + q = q.filter(AgentAlert.created_at >= since_dt) + if cursor_dt: + q = q.filter(AgentAlert.created_at < cursor_dt) + return q.order_by(AgentAlert.created_at.desc()).limit(limit).all() + + def _list_approvals_for_feed( + self, + limit: int, + since_dt: Optional[datetime], + cursor_dt: Optional[datetime], + ) -> List[AgentApprovalRequest]: + q = self.db.query(AgentApprovalRequest).filter( + AgentApprovalRequest.user_id == self.user_id, + AgentApprovalRequest.status == "pending", + ) + if since_dt: + q = q.filter(AgentApprovalRequest.created_at >= since_dt) + if cursor_dt: + q = q.filter(AgentApprovalRequest.created_at < cursor_dt) + return q.order_by(AgentApprovalRequest.created_at.desc()).limit(limit).all() + + + def _count_unread_alerts(self) -> int: + return self.db.query(AgentAlert).filter( + AgentAlert.user_id == self.user_id, + AgentAlert.read_at.is_(None), + ).count() + + def _count_pending_approvals(self) -> int: + return self.db.query(AgentApprovalRequest).filter( + AgentApprovalRequest.user_id == self.user_id, + AgentApprovalRequest.status == "pending", + ).count() + + @staticmethod + def _next_cursor(items: List[Any], time_attr: str) -> Optional[str]: + if not items: + return None + ts = getattr(items[-1], time_attr, None) + return ts.isoformat() if ts else None + + @staticmethod + def _serialize_run(run: AgentRun) -> Dict[str, Any]: + return { + "id": run.id, + "user_id": run.user_id, + "agent_type": run.agent_type, + "status": run.status, + "success": run.success, + "error_message": run.error_message, + "result_summary": run.result_summary, + "mlflow_run_id": run.mlflow_run_id, + "started_at": run.started_at.isoformat() if run.started_at else None, + "finished_at": run.finished_at.isoformat() if run.finished_at else None, + } + + @staticmethod + def _serialize_event(evt: AgentEvent) -> Dict[str, Any]: + return { + "id": evt.id, + "run_id": evt.run_id, + "agent_type": evt.agent_type, + "event_type": evt.event_type, + "severity": evt.severity, + "message": evt.message, + "payload": evt.payload, + "created_at": evt.created_at.isoformat() if evt.created_at else None, + } + + @staticmethod + def _serialize_alert(alert: AgentAlert) -> Dict[str, Any]: + return { + "id": alert.id, + "source": alert.source, + "type": alert.alert_type, + "severity": alert.severity, + "title": alert.title, + "message": alert.message, + "cta_path": alert.cta_path, + "payload": alert.payload, + "created_at": alert.created_at.isoformat() if alert.created_at else None, + "read_at": alert.read_at.isoformat() if alert.read_at else None, + } + + @staticmethod + def _serialize_approval(req: AgentApprovalRequest) -> Dict[str, Any]: + return { + "id": req.id, + "status": req.status, + "decision": req.decision, + "action_id": req.action_id, + "action_type": req.action_type, + "agent_type": req.agent_type, + "target_resource": req.target_resource, + "risk_level": req.risk_level, + "payload": req.payload, + "created_at": req.created_at.isoformat() if req.created_at else None, + "decided_at": req.decided_at.isoformat() if req.decided_at else None, + }