Feat: Add SSE-powered Team Huddle feed and Activity page

This commit is contained in:
ajaysi
2026-03-03 17:40:40 +05:30
5 changed files with 529 additions and 303 deletions

View File

@@ -4,9 +4,11 @@ Provides REST API access to agent orchestration functionality
"""
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks
from fastapi.responses import StreamingResponse
from typing import Dict, List, Any, Optional
import asyncio
from datetime import datetime
import json
from middleware.auth_middleware import get_current_user
from utils.logger_utils import get_service_logger
@@ -19,13 +21,119 @@ from services.intelligence.agents.performance_monitor import PerformanceMetric,
from services.database import get_db
from services.agent_activity_service import AgentActivityService
from sqlalchemy.orm import Session
from models.agent_activity_models import AgentProfile
from models.agent_activity_models import AgentProfile, AgentRun, AgentEvent, AgentAlert, AgentApprovalRequest
from services.intelligence.agents.team_catalog import AGENT_TEAM_CATALOG, get_agent_catalog_entry
logger = get_service_logger(__name__)
router = APIRouter(prefix="/api/agents", tags=["Autonomous Agents"])
def _serialize_run(run: AgentRun) -> Dict[str, Any]:
return {
"id": run.id,
"user_id": run.user_id,
"agent_type": run.agent_type,
"status": run.status,
"success": run.success,
"error_message": run.error_message,
"result_summary": run.result_summary,
"mlflow_run_id": run.mlflow_run_id,
"started_at": run.started_at.isoformat() if run.started_at else None,
"finished_at": run.finished_at.isoformat() if run.finished_at else None,
}
def _serialize_event(event: AgentEvent) -> Dict[str, Any]:
return {
"id": event.id,
"run_id": event.run_id,
"agent_type": event.agent_type,
"event_type": event.event_type,
"severity": event.severity,
"message": event.message,
"payload": event.payload,
"created_at": event.created_at.isoformat() if event.created_at else None,
}
def _serialize_alert(alert: AgentAlert) -> Dict[str, Any]:
return {
"id": alert.id,
"source": alert.source,
"type": alert.alert_type,
"severity": alert.severity,
"title": alert.title,
"message": alert.message,
"cta_path": alert.cta_path,
"payload": alert.payload,
"created_at": alert.created_at.isoformat() if alert.created_at else None,
"read_at": alert.read_at.isoformat() if alert.read_at else None,
}
def _serialize_approval(approval: AgentApprovalRequest) -> Dict[str, Any]:
return {
"id": approval.id,
"status": approval.status,
"decision": approval.decision,
"action_id": approval.action_id,
"action_type": approval.action_type,
"agent_type": approval.agent_type,
"target_resource": approval.target_resource,
"risk_level": approval.risk_level,
"payload": approval.payload,
"created_at": approval.created_at.isoformat() if approval.created_at else None,
"decided_at": approval.decided_at.isoformat() if approval.decided_at else None,
}
def _build_huddle_snapshot(
db: Session,
user_id: str,
since_run_id: int = 0,
since_event_id: int = 0,
since_alert_id: int = 0,
since_approval_id: int = 0,
limit: int = 50,
) -> Dict[str, Any]:
runs_query = db.query(AgentRun).filter(AgentRun.user_id == user_id)
events_query = db.query(AgentEvent).filter(AgentEvent.user_id == user_id)
alerts_query = db.query(AgentAlert).filter(AgentAlert.user_id == user_id)
approvals_query = db.query(AgentApprovalRequest).filter(AgentApprovalRequest.user_id == user_id)
if since_run_id > 0:
runs_query = runs_query.filter(AgentRun.id > since_run_id)
if since_event_id > 0:
events_query = events_query.filter(AgentEvent.id > since_event_id)
if since_alert_id > 0:
alerts_query = alerts_query.filter(AgentAlert.id > since_alert_id)
if since_approval_id > 0:
approvals_query = approvals_query.filter(AgentApprovalRequest.id > since_approval_id)
runs = runs_query.order_by(AgentRun.id.desc()).limit(limit).all()
events = events_query.order_by(AgentEvent.id.desc()).limit(limit * 2).all()
alerts = alerts_query.order_by(AgentAlert.id.desc()).limit(limit).all()
approvals = approvals_query.order_by(AgentApprovalRequest.id.desc()).limit(limit).all()
runs_sorted = list(reversed(runs))
events_sorted = list(reversed(events))
alerts_sorted = list(reversed(alerts))
approvals_sorted = list(reversed(approvals))
return {
"runs": [_serialize_run(r) for r in runs_sorted],
"events": [_serialize_event(e) for e in events_sorted],
"alerts": [_serialize_alert(a) for a in alerts_sorted],
"approvals": [_serialize_approval(a) for a in approvals_sorted],
"cursor": {
"run_id": max([since_run_id] + [r.id for r in runs_sorted]),
"event_id": max([since_event_id] + [e.id for e in events_sorted]),
"alert_id": max([since_alert_id] + [a.id for a in alerts_sorted]),
"approval_id": max([since_approval_id] + [a.id for a in approvals_sorted]),
},
}
@router.get("/team")
async def get_agent_team_endpoint(
current_user: dict = Depends(get_current_user),
@@ -627,6 +735,142 @@ async def get_agent_approvals_endpoint(
raise HTTPException(status_code=500, detail=str(e))
@router.get("/huddle/feed")
async def get_agent_huddle_feed_endpoint(
since_run_id: int = 0,
since_event_id: int = 0,
since_alert_id: int = 0,
since_approval_id: int = 0,
limit: int = 50,
current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db),
) -> Dict[str, Any]:
try:
user_id = str(current_user.get("id"))
payload = _build_huddle_snapshot(
db=db,
user_id=user_id,
since_run_id=max(0, int(since_run_id)),
since_event_id=max(0, int(since_event_id)),
since_alert_id=max(0, int(since_alert_id)),
since_approval_id=max(0, int(since_approval_id)),
limit=max(1, min(int(limit), 200)),
)
return {
"success": True,
"data": payload,
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
}
except Exception as e:
logger.error(f"Error getting huddle feed for user {current_user.get('id')}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/huddle/stream")
async def stream_agent_huddle_endpoint(
current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db),
):
user_id = str(current_user.get("id"))
async def event_generator():
cursor = {"run_id": 0, "event_id": 0, "alert_id": 0, "approval_id": 0}
run_signatures: Dict[int, str] = {}
initial_snapshot = _build_huddle_snapshot(db=db, user_id=user_id, limit=50)
cursor.update(initial_snapshot.get("cursor") or {})
for run in initial_snapshot.get("runs", []):
run_signatures[int(run.get("id") or 0)] = json.dumps(
{
"status": run.get("status"),
"success": run.get("success"),
"finished_at": run.get("finished_at"),
"error_message": run.get("error_message"),
},
sort_keys=True,
)
yield f"event: snapshot\ndata: {json.dumps(initial_snapshot)}\n\n"
while True:
try:
delta = _build_huddle_snapshot(
db=db,
user_id=user_id,
since_run_id=int(cursor.get("run_id", 0)),
since_event_id=int(cursor.get("event_id", 0)),
since_alert_id=int(cursor.get("alert_id", 0)),
since_approval_id=int(cursor.get("approval_id", 0)),
limit=50,
)
recent_runs = (
db.query(AgentRun)
.filter(AgentRun.user_id == user_id)
.order_by(AgentRun.id.desc())
.limit(100)
.all()
)
lifecycle_updates: List[Dict[str, Any]] = []
for run in recent_runs:
signature = json.dumps(
{
"status": run.status,
"success": run.success,
"finished_at": run.finished_at.isoformat() if run.finished_at else None,
"error_message": run.error_message,
},
sort_keys=True,
)
previous = run_signatures.get(run.id)
if previous != signature:
lifecycle_updates.append(_serialize_run(run))
run_signatures[run.id] = signature
if len(run_signatures) > 300:
keep_ids = {r.id for r in recent_runs}
run_signatures = {k: v for k, v in run_signatures.items() if k in keep_ids}
has_changes = bool(
delta.get("events")
or delta.get("alerts")
or delta.get("approvals")
or lifecycle_updates
)
if has_changes:
if delta.get("cursor"):
cursor.update(delta["cursor"])
event_payload = {
"runs": lifecycle_updates,
"events": delta.get("events", []),
"alerts": delta.get("alerts", []),
"approvals": delta.get("approvals", []),
"cursor": cursor,
"ts": datetime.utcnow().isoformat(),
}
yield f"event: delta\ndata: {json.dumps(event_payload)}\n\n"
else:
yield f"event: heartbeat\ndata: {json.dumps({'ts': datetime.utcnow().isoformat()})}\n\n"
await asyncio.sleep(2.5)
except asyncio.CancelledError:
break
except Exception as stream_error:
logger.warning(f"Huddle stream loop error for user {user_id}: {stream_error}")
error_payload = {"message": "stream_error", "ts": datetime.utcnow().isoformat()}
yield f"event: error\ndata: {json.dumps(error_payload)}\n\n"
await asyncio.sleep(3)
headers = {
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
return StreamingResponse(event_generator(), media_type="text/event-stream", headers=headers)
@router.post("/approvals/{approval_id}/decision")
async def decide_agent_approval_endpoint(
approval_id: int,