Add agent huddle SSE feed with frontend live subscriptions
This commit is contained in:
@@ -4,9 +4,11 @@ Provides REST API access to agent orchestration functionality
|
||||
"""
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks
|
||||
from fastapi.responses import StreamingResponse
|
||||
from typing import Dict, List, Any, Optional
|
||||
import asyncio
|
||||
from datetime import datetime
|
||||
import json
|
||||
|
||||
from middleware.auth_middleware import get_current_user
|
||||
from utils.logger_utils import get_service_logger
|
||||
@@ -19,13 +21,119 @@ from services.intelligence.agents.performance_monitor import PerformanceMetric,
|
||||
from services.database import get_db
|
||||
from services.agent_activity_service import AgentActivityService
|
||||
from sqlalchemy.orm import Session
|
||||
from models.agent_activity_models import AgentProfile
|
||||
from models.agent_activity_models import AgentProfile, AgentRun, AgentEvent, AgentAlert, AgentApprovalRequest
|
||||
from services.intelligence.agents.team_catalog import AGENT_TEAM_CATALOG, get_agent_catalog_entry
|
||||
|
||||
logger = get_service_logger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/api/agents", tags=["Autonomous Agents"])
|
||||
|
||||
|
||||
def _serialize_run(run: AgentRun) -> Dict[str, Any]:
|
||||
return {
|
||||
"id": run.id,
|
||||
"user_id": run.user_id,
|
||||
"agent_type": run.agent_type,
|
||||
"status": run.status,
|
||||
"success": run.success,
|
||||
"error_message": run.error_message,
|
||||
"result_summary": run.result_summary,
|
||||
"mlflow_run_id": run.mlflow_run_id,
|
||||
"started_at": run.started_at.isoformat() if run.started_at else None,
|
||||
"finished_at": run.finished_at.isoformat() if run.finished_at else None,
|
||||
}
|
||||
|
||||
|
||||
def _serialize_event(event: AgentEvent) -> Dict[str, Any]:
|
||||
return {
|
||||
"id": event.id,
|
||||
"run_id": event.run_id,
|
||||
"agent_type": event.agent_type,
|
||||
"event_type": event.event_type,
|
||||
"severity": event.severity,
|
||||
"message": event.message,
|
||||
"payload": event.payload,
|
||||
"created_at": event.created_at.isoformat() if event.created_at else None,
|
||||
}
|
||||
|
||||
|
||||
def _serialize_alert(alert: AgentAlert) -> Dict[str, Any]:
|
||||
return {
|
||||
"id": alert.id,
|
||||
"source": alert.source,
|
||||
"type": alert.alert_type,
|
||||
"severity": alert.severity,
|
||||
"title": alert.title,
|
||||
"message": alert.message,
|
||||
"cta_path": alert.cta_path,
|
||||
"payload": alert.payload,
|
||||
"created_at": alert.created_at.isoformat() if alert.created_at else None,
|
||||
"read_at": alert.read_at.isoformat() if alert.read_at else None,
|
||||
}
|
||||
|
||||
|
||||
def _serialize_approval(approval: AgentApprovalRequest) -> Dict[str, Any]:
|
||||
return {
|
||||
"id": approval.id,
|
||||
"status": approval.status,
|
||||
"decision": approval.decision,
|
||||
"action_id": approval.action_id,
|
||||
"action_type": approval.action_type,
|
||||
"agent_type": approval.agent_type,
|
||||
"target_resource": approval.target_resource,
|
||||
"risk_level": approval.risk_level,
|
||||
"payload": approval.payload,
|
||||
"created_at": approval.created_at.isoformat() if approval.created_at else None,
|
||||
"decided_at": approval.decided_at.isoformat() if approval.decided_at else None,
|
||||
}
|
||||
|
||||
|
||||
def _build_huddle_snapshot(
|
||||
db: Session,
|
||||
user_id: str,
|
||||
since_run_id: int = 0,
|
||||
since_event_id: int = 0,
|
||||
since_alert_id: int = 0,
|
||||
since_approval_id: int = 0,
|
||||
limit: int = 50,
|
||||
) -> Dict[str, Any]:
|
||||
runs_query = db.query(AgentRun).filter(AgentRun.user_id == user_id)
|
||||
events_query = db.query(AgentEvent).filter(AgentEvent.user_id == user_id)
|
||||
alerts_query = db.query(AgentAlert).filter(AgentAlert.user_id == user_id)
|
||||
approvals_query = db.query(AgentApprovalRequest).filter(AgentApprovalRequest.user_id == user_id)
|
||||
|
||||
if since_run_id > 0:
|
||||
runs_query = runs_query.filter(AgentRun.id > since_run_id)
|
||||
if since_event_id > 0:
|
||||
events_query = events_query.filter(AgentEvent.id > since_event_id)
|
||||
if since_alert_id > 0:
|
||||
alerts_query = alerts_query.filter(AgentAlert.id > since_alert_id)
|
||||
if since_approval_id > 0:
|
||||
approvals_query = approvals_query.filter(AgentApprovalRequest.id > since_approval_id)
|
||||
|
||||
runs = runs_query.order_by(AgentRun.id.desc()).limit(limit).all()
|
||||
events = events_query.order_by(AgentEvent.id.desc()).limit(limit * 2).all()
|
||||
alerts = alerts_query.order_by(AgentAlert.id.desc()).limit(limit).all()
|
||||
approvals = approvals_query.order_by(AgentApprovalRequest.id.desc()).limit(limit).all()
|
||||
|
||||
runs_sorted = list(reversed(runs))
|
||||
events_sorted = list(reversed(events))
|
||||
alerts_sorted = list(reversed(alerts))
|
||||
approvals_sorted = list(reversed(approvals))
|
||||
|
||||
return {
|
||||
"runs": [_serialize_run(r) for r in runs_sorted],
|
||||
"events": [_serialize_event(e) for e in events_sorted],
|
||||
"alerts": [_serialize_alert(a) for a in alerts_sorted],
|
||||
"approvals": [_serialize_approval(a) for a in approvals_sorted],
|
||||
"cursor": {
|
||||
"run_id": max([since_run_id] + [r.id for r in runs_sorted]),
|
||||
"event_id": max([since_event_id] + [e.id for e in events_sorted]),
|
||||
"alert_id": max([since_alert_id] + [a.id for a in alerts_sorted]),
|
||||
"approval_id": max([since_approval_id] + [a.id for a in approvals_sorted]),
|
||||
},
|
||||
}
|
||||
|
||||
@router.get("/team")
|
||||
async def get_agent_team_endpoint(
|
||||
current_user: dict = Depends(get_current_user),
|
||||
@@ -594,6 +702,142 @@ async def get_agent_approvals_endpoint(
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@router.get("/huddle/feed")
|
||||
async def get_agent_huddle_feed_endpoint(
|
||||
since_run_id: int = 0,
|
||||
since_event_id: int = 0,
|
||||
since_alert_id: int = 0,
|
||||
since_approval_id: int = 0,
|
||||
limit: int = 50,
|
||||
current_user: dict = Depends(get_current_user),
|
||||
db: Session = Depends(get_db),
|
||||
) -> Dict[str, Any]:
|
||||
try:
|
||||
user_id = str(current_user.get("id"))
|
||||
payload = _build_huddle_snapshot(
|
||||
db=db,
|
||||
user_id=user_id,
|
||||
since_run_id=max(0, int(since_run_id)),
|
||||
since_event_id=max(0, int(since_event_id)),
|
||||
since_alert_id=max(0, int(since_alert_id)),
|
||||
since_approval_id=max(0, int(since_approval_id)),
|
||||
limit=max(1, min(int(limit), 200)),
|
||||
)
|
||||
return {
|
||||
"success": True,
|
||||
"data": payload,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"user_id": user_id,
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting huddle feed for user {current_user.get('id')}: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@router.get("/huddle/stream")
|
||||
async def stream_agent_huddle_endpoint(
|
||||
current_user: dict = Depends(get_current_user),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
user_id = str(current_user.get("id"))
|
||||
|
||||
async def event_generator():
|
||||
cursor = {"run_id": 0, "event_id": 0, "alert_id": 0, "approval_id": 0}
|
||||
run_signatures: Dict[int, str] = {}
|
||||
|
||||
initial_snapshot = _build_huddle_snapshot(db=db, user_id=user_id, limit=50)
|
||||
cursor.update(initial_snapshot.get("cursor") or {})
|
||||
for run in initial_snapshot.get("runs", []):
|
||||
run_signatures[int(run.get("id") or 0)] = json.dumps(
|
||||
{
|
||||
"status": run.get("status"),
|
||||
"success": run.get("success"),
|
||||
"finished_at": run.get("finished_at"),
|
||||
"error_message": run.get("error_message"),
|
||||
},
|
||||
sort_keys=True,
|
||||
)
|
||||
|
||||
yield f"event: snapshot\ndata: {json.dumps(initial_snapshot)}\n\n"
|
||||
|
||||
while True:
|
||||
try:
|
||||
delta = _build_huddle_snapshot(
|
||||
db=db,
|
||||
user_id=user_id,
|
||||
since_run_id=int(cursor.get("run_id", 0)),
|
||||
since_event_id=int(cursor.get("event_id", 0)),
|
||||
since_alert_id=int(cursor.get("alert_id", 0)),
|
||||
since_approval_id=int(cursor.get("approval_id", 0)),
|
||||
limit=50,
|
||||
)
|
||||
|
||||
recent_runs = (
|
||||
db.query(AgentRun)
|
||||
.filter(AgentRun.user_id == user_id)
|
||||
.order_by(AgentRun.id.desc())
|
||||
.limit(100)
|
||||
.all()
|
||||
)
|
||||
lifecycle_updates: List[Dict[str, Any]] = []
|
||||
for run in recent_runs:
|
||||
signature = json.dumps(
|
||||
{
|
||||
"status": run.status,
|
||||
"success": run.success,
|
||||
"finished_at": run.finished_at.isoformat() if run.finished_at else None,
|
||||
"error_message": run.error_message,
|
||||
},
|
||||
sort_keys=True,
|
||||
)
|
||||
previous = run_signatures.get(run.id)
|
||||
if previous != signature:
|
||||
lifecycle_updates.append(_serialize_run(run))
|
||||
run_signatures[run.id] = signature
|
||||
|
||||
if len(run_signatures) > 300:
|
||||
keep_ids = {r.id for r in recent_runs}
|
||||
run_signatures = {k: v for k, v in run_signatures.items() if k in keep_ids}
|
||||
|
||||
has_changes = bool(
|
||||
delta.get("events")
|
||||
or delta.get("alerts")
|
||||
or delta.get("approvals")
|
||||
or lifecycle_updates
|
||||
)
|
||||
|
||||
if has_changes:
|
||||
if delta.get("cursor"):
|
||||
cursor.update(delta["cursor"])
|
||||
event_payload = {
|
||||
"runs": lifecycle_updates,
|
||||
"events": delta.get("events", []),
|
||||
"alerts": delta.get("alerts", []),
|
||||
"approvals": delta.get("approvals", []),
|
||||
"cursor": cursor,
|
||||
"ts": datetime.utcnow().isoformat(),
|
||||
}
|
||||
yield f"event: delta\ndata: {json.dumps(event_payload)}\n\n"
|
||||
else:
|
||||
yield f"event: heartbeat\ndata: {json.dumps({'ts': datetime.utcnow().isoformat()})}\n\n"
|
||||
|
||||
await asyncio.sleep(2.5)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as stream_error:
|
||||
logger.warning(f"Huddle stream loop error for user {user_id}: {stream_error}")
|
||||
error_payload = {"message": "stream_error", "ts": datetime.utcnow().isoformat()}
|
||||
yield f"event: error\ndata: {json.dumps(error_payload)}\n\n"
|
||||
await asyncio.sleep(3)
|
||||
|
||||
headers = {
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
}
|
||||
return StreamingResponse(event_generator(), media_type="text/event-stream", headers=headers)
|
||||
|
||||
|
||||
@router.post("/approvals/{approval_id}/decision")
|
||||
async def decide_agent_approval_endpoint(
|
||||
approval_id: int,
|
||||
|
||||
Reference in New Issue
Block a user