Merge branch 'pr-367'
This commit is contained in:
@@ -462,6 +462,39 @@ async def get_agent_alerts_endpoint(
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@router.get("/huddle/feed")
|
||||
async def get_agent_huddle_feed_endpoint(
|
||||
since: Optional[str] = None,
|
||||
cursor: Optional[str] = None,
|
||||
runs_limit: int = 20,
|
||||
events_limit: int = 50,
|
||||
alerts_limit: int = 20,
|
||||
approvals_limit: int = 20,
|
||||
current_user: dict = Depends(get_current_user),
|
||||
db: Session = Depends(get_db),
|
||||
) -> Dict[str, Any]:
|
||||
try:
|
||||
user_id = str(current_user.get("id"))
|
||||
service = AgentActivityService(db, user_id)
|
||||
feed = service.get_huddle_feed(
|
||||
since=since,
|
||||
cursor=cursor,
|
||||
runs_limit=runs_limit,
|
||||
events_limit=events_limit,
|
||||
alerts_limit=alerts_limit,
|
||||
approvals_limit=approvals_limit,
|
||||
)
|
||||
return {
|
||||
"success": True,
|
||||
"data": feed,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"user_id": user_id,
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting huddle feed for user {current_user.get('id')}: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@router.post("/alerts/{alert_id}/mark-read")
|
||||
async def mark_agent_alert_read_endpoint(
|
||||
alert_id: int,
|
||||
|
||||
@@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from models.agent_activity_models import AgentAlert, AgentApprovalRequest, AgentEvent, AgentRun
|
||||
@@ -193,3 +194,210 @@ class AgentActivityService:
|
||||
self.db.commit()
|
||||
self.db.refresh(req)
|
||||
return req
|
||||
|
||||
def get_huddle_feed(
|
||||
self,
|
||||
since: Optional[str] = None,
|
||||
cursor: Optional[str] = None,
|
||||
runs_limit: int = 20,
|
||||
events_limit: int = 50,
|
||||
alerts_limit: int = 20,
|
||||
approvals_limit: int = 20,
|
||||
) -> Dict[str, Any]:
|
||||
now = datetime.utcnow()
|
||||
since_dt = self._parse_datetime(since)
|
||||
cursor_dt = self._parse_datetime(cursor)
|
||||
|
||||
statuses = self._get_active_statuses()
|
||||
runs = self._list_runs_for_feed(limit=runs_limit, since_dt=since_dt, cursor_dt=cursor_dt)
|
||||
events = self._list_events_for_feed(limit=events_limit, since_dt=since_dt, cursor_dt=cursor_dt)
|
||||
alerts = self._list_alerts_for_feed(limit=alerts_limit, since_dt=since_dt, cursor_dt=cursor_dt)
|
||||
approvals = self._list_approvals_for_feed(limit=approvals_limit, since_dt=since_dt, cursor_dt=cursor_dt)
|
||||
|
||||
cursors = {
|
||||
"runs": self._next_cursor(runs, "started_at"),
|
||||
"events": self._next_cursor(events, "created_at"),
|
||||
"alerts": self._next_cursor(alerts, "created_at"),
|
||||
"approvals": self._next_cursor(approvals, "created_at"),
|
||||
"feed": now.isoformat(),
|
||||
}
|
||||
|
||||
return {
|
||||
"statuses": statuses,
|
||||
"runs": [self._serialize_run(run) for run in runs],
|
||||
"events": [self._serialize_event(evt) for evt in events],
|
||||
"alerts": [self._serialize_alert(alert) for alert in alerts],
|
||||
"approvals": [self._serialize_approval(req) for req in approvals],
|
||||
"unread_alerts": self._count_unread_alerts(),
|
||||
"pending_approvals": self._count_pending_approvals(),
|
||||
"cursors": cursors,
|
||||
"server_timestamp": now.isoformat(),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _parse_datetime(value: Optional[str]) -> Optional[datetime]:
|
||||
if not value:
|
||||
return None
|
||||
text = str(value).strip()
|
||||
if not text:
|
||||
return None
|
||||
if text.endswith("Z"):
|
||||
text = text.replace("Z", "+00:00")
|
||||
try:
|
||||
parsed = datetime.fromisoformat(text)
|
||||
if parsed.tzinfo is not None:
|
||||
return parsed.replace(tzinfo=None)
|
||||
return parsed
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def _get_active_statuses(self) -> List[Dict[str, Any]]:
|
||||
subquery = (
|
||||
self.db.query(
|
||||
AgentRun.agent_type.label("agent_type"),
|
||||
func.max(AgentRun.started_at).label("max_started_at"),
|
||||
)
|
||||
.filter(AgentRun.user_id == self.user_id)
|
||||
.group_by(AgentRun.agent_type)
|
||||
.subquery()
|
||||
)
|
||||
|
||||
rows = (
|
||||
self.db.query(AgentRun)
|
||||
.join(
|
||||
subquery,
|
||||
(AgentRun.agent_type == subquery.c.agent_type)
|
||||
& (AgentRun.started_at == subquery.c.max_started_at),
|
||||
)
|
||||
.filter(AgentRun.user_id == self.user_id)
|
||||
.all()
|
||||
)
|
||||
return [
|
||||
{
|
||||
"agent_type": row.agent_type,
|
||||
"status": row.status,
|
||||
"success": row.success,
|
||||
"run_id": row.id,
|
||||
"updated_at": (row.finished_at or row.started_at).isoformat() if (row.finished_at or row.started_at) else None,
|
||||
}
|
||||
for row in rows
|
||||
]
|
||||
|
||||
def _list_runs_for_feed(self, limit: int, since_dt: Optional[datetime], cursor_dt: Optional[datetime]) -> List[AgentRun]:
|
||||
q = self.db.query(AgentRun).filter(AgentRun.user_id == self.user_id)
|
||||
if since_dt:
|
||||
q = q.filter(AgentRun.started_at >= since_dt)
|
||||
if cursor_dt:
|
||||
q = q.filter(AgentRun.started_at < cursor_dt)
|
||||
return q.order_by(AgentRun.started_at.desc()).limit(limit).all()
|
||||
|
||||
def _list_events_for_feed(self, limit: int, since_dt: Optional[datetime], cursor_dt: Optional[datetime]) -> List[AgentEvent]:
|
||||
q = self.db.query(AgentEvent).filter(AgentEvent.user_id == self.user_id)
|
||||
if since_dt:
|
||||
q = q.filter(AgentEvent.created_at >= since_dt)
|
||||
if cursor_dt:
|
||||
q = q.filter(AgentEvent.created_at < cursor_dt)
|
||||
return q.order_by(AgentEvent.created_at.desc()).limit(limit).all()
|
||||
|
||||
def _list_alerts_for_feed(self, limit: int, since_dt: Optional[datetime], cursor_dt: Optional[datetime]) -> List[AgentAlert]:
|
||||
q = self.db.query(AgentAlert).filter(AgentAlert.user_id == self.user_id, AgentAlert.read_at.is_(None))
|
||||
if since_dt:
|
||||
q = q.filter(AgentAlert.created_at >= since_dt)
|
||||
if cursor_dt:
|
||||
q = q.filter(AgentAlert.created_at < cursor_dt)
|
||||
return q.order_by(AgentAlert.created_at.desc()).limit(limit).all()
|
||||
|
||||
def _list_approvals_for_feed(
|
||||
self,
|
||||
limit: int,
|
||||
since_dt: Optional[datetime],
|
||||
cursor_dt: Optional[datetime],
|
||||
) -> List[AgentApprovalRequest]:
|
||||
q = self.db.query(AgentApprovalRequest).filter(
|
||||
AgentApprovalRequest.user_id == self.user_id,
|
||||
AgentApprovalRequest.status == "pending",
|
||||
)
|
||||
if since_dt:
|
||||
q = q.filter(AgentApprovalRequest.created_at >= since_dt)
|
||||
if cursor_dt:
|
||||
q = q.filter(AgentApprovalRequest.created_at < cursor_dt)
|
||||
return q.order_by(AgentApprovalRequest.created_at.desc()).limit(limit).all()
|
||||
|
||||
|
||||
def _count_unread_alerts(self) -> int:
|
||||
return self.db.query(AgentAlert).filter(
|
||||
AgentAlert.user_id == self.user_id,
|
||||
AgentAlert.read_at.is_(None),
|
||||
).count()
|
||||
|
||||
def _count_pending_approvals(self) -> int:
|
||||
return self.db.query(AgentApprovalRequest).filter(
|
||||
AgentApprovalRequest.user_id == self.user_id,
|
||||
AgentApprovalRequest.status == "pending",
|
||||
).count()
|
||||
|
||||
@staticmethod
|
||||
def _next_cursor(items: List[Any], time_attr: str) -> Optional[str]:
|
||||
if not items:
|
||||
return None
|
||||
ts = getattr(items[-1], time_attr, None)
|
||||
return ts.isoformat() if ts else None
|
||||
|
||||
@staticmethod
|
||||
def _serialize_run(run: AgentRun) -> Dict[str, Any]:
|
||||
return {
|
||||
"id": run.id,
|
||||
"user_id": run.user_id,
|
||||
"agent_type": run.agent_type,
|
||||
"status": run.status,
|
||||
"success": run.success,
|
||||
"error_message": run.error_message,
|
||||
"result_summary": run.result_summary,
|
||||
"mlflow_run_id": run.mlflow_run_id,
|
||||
"started_at": run.started_at.isoformat() if run.started_at else None,
|
||||
"finished_at": run.finished_at.isoformat() if run.finished_at else None,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _serialize_event(evt: AgentEvent) -> Dict[str, Any]:
|
||||
return {
|
||||
"id": evt.id,
|
||||
"run_id": evt.run_id,
|
||||
"agent_type": evt.agent_type,
|
||||
"event_type": evt.event_type,
|
||||
"severity": evt.severity,
|
||||
"message": evt.message,
|
||||
"payload": evt.payload,
|
||||
"created_at": evt.created_at.isoformat() if evt.created_at else None,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _serialize_alert(alert: AgentAlert) -> Dict[str, Any]:
|
||||
return {
|
||||
"id": alert.id,
|
||||
"source": alert.source,
|
||||
"type": alert.alert_type,
|
||||
"severity": alert.severity,
|
||||
"title": alert.title,
|
||||
"message": alert.message,
|
||||
"cta_path": alert.cta_path,
|
||||
"payload": alert.payload,
|
||||
"created_at": alert.created_at.isoformat() if alert.created_at else None,
|
||||
"read_at": alert.read_at.isoformat() if alert.read_at else None,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _serialize_approval(req: AgentApprovalRequest) -> Dict[str, Any]:
|
||||
return {
|
||||
"id": req.id,
|
||||
"status": req.status,
|
||||
"decision": req.decision,
|
||||
"action_id": req.action_id,
|
||||
"action_type": req.action_type,
|
||||
"agent_type": req.agent_type,
|
||||
"target_resource": req.target_resource,
|
||||
"risk_level": req.risk_level,
|
||||
"payload": req.payload,
|
||||
"created_at": req.created_at.isoformat() if req.created_at else None,
|
||||
"decided_at": req.decided_at.isoformat() if req.decided_at else None,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user