Add aggregated agent huddle feed endpoint

This commit is contained in:
ي
2026-03-02 21:49:57 +05:30
parent cb6a3a8042
commit 92b0255028
2 changed files with 241 additions and 0 deletions

View File

@@ -462,6 +462,39 @@ async def get_agent_alerts_endpoint(
raise HTTPException(status_code=500, detail=str(e))
@router.get("/huddle/feed")
async def get_agent_huddle_feed_endpoint(
since: Optional[str] = None,
cursor: Optional[str] = None,
runs_limit: int = 20,
events_limit: int = 50,
alerts_limit: int = 20,
approvals_limit: int = 20,
current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db),
) -> Dict[str, Any]:
try:
user_id = str(current_user.get("id"))
service = AgentActivityService(db, user_id)
feed = service.get_huddle_feed(
since=since,
cursor=cursor,
runs_limit=runs_limit,
events_limit=events_limit,
alerts_limit=alerts_limit,
approvals_limit=approvals_limit,
)
return {
"success": True,
"data": feed,
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
}
except Exception as e:
logger.error(f"Error getting huddle feed for user {current_user.get('id')}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/alerts/{alert_id}/mark-read")
async def mark_agent_alert_read_endpoint(
alert_id: int,

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
from datetime import datetime
from typing import Any, Dict, List, Optional
from sqlalchemy import func
from sqlalchemy.orm import Session
from models.agent_activity_models import AgentAlert, AgentApprovalRequest, AgentEvent, AgentRun
@@ -193,3 +194,210 @@ class AgentActivityService:
self.db.commit()
self.db.refresh(req)
return req
def get_huddle_feed(
self,
since: Optional[str] = None,
cursor: Optional[str] = None,
runs_limit: int = 20,
events_limit: int = 50,
alerts_limit: int = 20,
approvals_limit: int = 20,
) -> Dict[str, Any]:
now = datetime.utcnow()
since_dt = self._parse_datetime(since)
cursor_dt = self._parse_datetime(cursor)
statuses = self._get_active_statuses()
runs = self._list_runs_for_feed(limit=runs_limit, since_dt=since_dt, cursor_dt=cursor_dt)
events = self._list_events_for_feed(limit=events_limit, since_dt=since_dt, cursor_dt=cursor_dt)
alerts = self._list_alerts_for_feed(limit=alerts_limit, since_dt=since_dt, cursor_dt=cursor_dt)
approvals = self._list_approvals_for_feed(limit=approvals_limit, since_dt=since_dt, cursor_dt=cursor_dt)
cursors = {
"runs": self._next_cursor(runs, "started_at"),
"events": self._next_cursor(events, "created_at"),
"alerts": self._next_cursor(alerts, "created_at"),
"approvals": self._next_cursor(approvals, "created_at"),
"feed": now.isoformat(),
}
return {
"statuses": statuses,
"runs": [self._serialize_run(run) for run in runs],
"events": [self._serialize_event(evt) for evt in events],
"alerts": [self._serialize_alert(alert) for alert in alerts],
"approvals": [self._serialize_approval(req) for req in approvals],
"unread_alerts": self._count_unread_alerts(),
"pending_approvals": self._count_pending_approvals(),
"cursors": cursors,
"server_timestamp": now.isoformat(),
}
@staticmethod
def _parse_datetime(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
text = str(value).strip()
if not text:
return None
if text.endswith("Z"):
text = text.replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(text)
if parsed.tzinfo is not None:
return parsed.replace(tzinfo=None)
return parsed
except ValueError:
return None
def _get_active_statuses(self) -> List[Dict[str, Any]]:
subquery = (
self.db.query(
AgentRun.agent_type.label("agent_type"),
func.max(AgentRun.started_at).label("max_started_at"),
)
.filter(AgentRun.user_id == self.user_id)
.group_by(AgentRun.agent_type)
.subquery()
)
rows = (
self.db.query(AgentRun)
.join(
subquery,
(AgentRun.agent_type == subquery.c.agent_type)
& (AgentRun.started_at == subquery.c.max_started_at),
)
.filter(AgentRun.user_id == self.user_id)
.all()
)
return [
{
"agent_type": row.agent_type,
"status": row.status,
"success": row.success,
"run_id": row.id,
"updated_at": (row.finished_at or row.started_at).isoformat() if (row.finished_at or row.started_at) else None,
}
for row in rows
]
def _list_runs_for_feed(self, limit: int, since_dt: Optional[datetime], cursor_dt: Optional[datetime]) -> List[AgentRun]:
q = self.db.query(AgentRun).filter(AgentRun.user_id == self.user_id)
if since_dt:
q = q.filter(AgentRun.started_at >= since_dt)
if cursor_dt:
q = q.filter(AgentRun.started_at < cursor_dt)
return q.order_by(AgentRun.started_at.desc()).limit(limit).all()
def _list_events_for_feed(self, limit: int, since_dt: Optional[datetime], cursor_dt: Optional[datetime]) -> List[AgentEvent]:
q = self.db.query(AgentEvent).filter(AgentEvent.user_id == self.user_id)
if since_dt:
q = q.filter(AgentEvent.created_at >= since_dt)
if cursor_dt:
q = q.filter(AgentEvent.created_at < cursor_dt)
return q.order_by(AgentEvent.created_at.desc()).limit(limit).all()
def _list_alerts_for_feed(self, limit: int, since_dt: Optional[datetime], cursor_dt: Optional[datetime]) -> List[AgentAlert]:
q = self.db.query(AgentAlert).filter(AgentAlert.user_id == self.user_id, AgentAlert.read_at.is_(None))
if since_dt:
q = q.filter(AgentAlert.created_at >= since_dt)
if cursor_dt:
q = q.filter(AgentAlert.created_at < cursor_dt)
return q.order_by(AgentAlert.created_at.desc()).limit(limit).all()
def _list_approvals_for_feed(
self,
limit: int,
since_dt: Optional[datetime],
cursor_dt: Optional[datetime],
) -> List[AgentApprovalRequest]:
q = self.db.query(AgentApprovalRequest).filter(
AgentApprovalRequest.user_id == self.user_id,
AgentApprovalRequest.status == "pending",
)
if since_dt:
q = q.filter(AgentApprovalRequest.created_at >= since_dt)
if cursor_dt:
q = q.filter(AgentApprovalRequest.created_at < cursor_dt)
return q.order_by(AgentApprovalRequest.created_at.desc()).limit(limit).all()
def _count_unread_alerts(self) -> int:
return self.db.query(AgentAlert).filter(
AgentAlert.user_id == self.user_id,
AgentAlert.read_at.is_(None),
).count()
def _count_pending_approvals(self) -> int:
return self.db.query(AgentApprovalRequest).filter(
AgentApprovalRequest.user_id == self.user_id,
AgentApprovalRequest.status == "pending",
).count()
@staticmethod
def _next_cursor(items: List[Any], time_attr: str) -> Optional[str]:
if not items:
return None
ts = getattr(items[-1], time_attr, None)
return ts.isoformat() if ts else None
@staticmethod
def _serialize_run(run: AgentRun) -> Dict[str, Any]:
return {
"id": run.id,
"user_id": run.user_id,
"agent_type": run.agent_type,
"status": run.status,
"success": run.success,
"error_message": run.error_message,
"result_summary": run.result_summary,
"mlflow_run_id": run.mlflow_run_id,
"started_at": run.started_at.isoformat() if run.started_at else None,
"finished_at": run.finished_at.isoformat() if run.finished_at else None,
}
@staticmethod
def _serialize_event(evt: AgentEvent) -> Dict[str, Any]:
return {
"id": evt.id,
"run_id": evt.run_id,
"agent_type": evt.agent_type,
"event_type": evt.event_type,
"severity": evt.severity,
"message": evt.message,
"payload": evt.payload,
"created_at": evt.created_at.isoformat() if evt.created_at else None,
}
@staticmethod
def _serialize_alert(alert: AgentAlert) -> Dict[str, Any]:
return {
"id": alert.id,
"source": alert.source,
"type": alert.alert_type,
"severity": alert.severity,
"title": alert.title,
"message": alert.message,
"cta_path": alert.cta_path,
"payload": alert.payload,
"created_at": alert.created_at.isoformat() if alert.created_at else None,
"read_at": alert.read_at.isoformat() if alert.read_at else None,
}
@staticmethod
def _serialize_approval(req: AgentApprovalRequest) -> Dict[str, Any]:
return {
"id": req.id,
"status": req.status,
"decision": req.decision,
"action_id": req.action_id,
"action_type": req.action_type,
"agent_type": req.agent_type,
"target_resource": req.target_resource,
"risk_level": req.risk_level,
"payload": req.payload,
"created_at": req.created_at.isoformat() if req.created_at else None,
"decided_at": req.decided_at.isoformat() if req.decided_at else None,
}