diff --git a/backend/api/agents_api.py b/backend/api/agents_api.py index d4ff38d9..d5ba6b41 100644 --- a/backend/api/agents_api.py +++ b/backend/api/agents_api.py @@ -4,9 +4,11 @@ Provides REST API access to agent orchestration functionality """ from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks +from fastapi.responses import StreamingResponse from typing import Dict, List, Any, Optional import asyncio from datetime import datetime +import json from middleware.auth_middleware import get_current_user from utils.logger_utils import get_service_logger @@ -19,13 +21,119 @@ from services.intelligence.agents.performance_monitor import PerformanceMetric, from services.database import get_db from services.agent_activity_service import AgentActivityService from sqlalchemy.orm import Session -from models.agent_activity_models import AgentProfile +from models.agent_activity_models import AgentProfile, AgentRun, AgentEvent, AgentAlert, AgentApprovalRequest from services.intelligence.agents.team_catalog import AGENT_TEAM_CATALOG, get_agent_catalog_entry logger = get_service_logger(__name__) router = APIRouter(prefix="/api/agents", tags=["Autonomous Agents"]) + +def _serialize_run(run: AgentRun) -> Dict[str, Any]: + return { + "id": run.id, + "user_id": run.user_id, + "agent_type": run.agent_type, + "status": run.status, + "success": run.success, + "error_message": run.error_message, + "result_summary": run.result_summary, + "mlflow_run_id": run.mlflow_run_id, + "started_at": run.started_at.isoformat() if run.started_at else None, + "finished_at": run.finished_at.isoformat() if run.finished_at else None, + } + + +def _serialize_event(event: AgentEvent) -> Dict[str, Any]: + return { + "id": event.id, + "run_id": event.run_id, + "agent_type": event.agent_type, + "event_type": event.event_type, + "severity": event.severity, + "message": event.message, + "payload": event.payload, + "created_at": event.created_at.isoformat() if event.created_at else None, + } + + +def _serialize_alert(alert: AgentAlert) -> Dict[str, Any]: + return { + "id": alert.id, + "source": alert.source, + "type": alert.alert_type, + "severity": alert.severity, + "title": alert.title, + "message": alert.message, + "cta_path": alert.cta_path, + "payload": alert.payload, + "created_at": alert.created_at.isoformat() if alert.created_at else None, + "read_at": alert.read_at.isoformat() if alert.read_at else None, + } + + +def _serialize_approval(approval: AgentApprovalRequest) -> Dict[str, Any]: + return { + "id": approval.id, + "status": approval.status, + "decision": approval.decision, + "action_id": approval.action_id, + "action_type": approval.action_type, + "agent_type": approval.agent_type, + "target_resource": approval.target_resource, + "risk_level": approval.risk_level, + "payload": approval.payload, + "created_at": approval.created_at.isoformat() if approval.created_at else None, + "decided_at": approval.decided_at.isoformat() if approval.decided_at else None, + } + + +def _build_huddle_snapshot( + db: Session, + user_id: str, + since_run_id: int = 0, + since_event_id: int = 0, + since_alert_id: int = 0, + since_approval_id: int = 0, + limit: int = 50, +) -> Dict[str, Any]: + runs_query = db.query(AgentRun).filter(AgentRun.user_id == user_id) + events_query = db.query(AgentEvent).filter(AgentEvent.user_id == user_id) + alerts_query = db.query(AgentAlert).filter(AgentAlert.user_id == user_id) + approvals_query = db.query(AgentApprovalRequest).filter(AgentApprovalRequest.user_id == user_id) + + if since_run_id > 0: + runs_query = runs_query.filter(AgentRun.id > since_run_id) + if since_event_id > 0: + events_query = events_query.filter(AgentEvent.id > since_event_id) + if since_alert_id > 0: + alerts_query = alerts_query.filter(AgentAlert.id > since_alert_id) + if since_approval_id > 0: + approvals_query = approvals_query.filter(AgentApprovalRequest.id > since_approval_id) + + runs = runs_query.order_by(AgentRun.id.desc()).limit(limit).all() + events = events_query.order_by(AgentEvent.id.desc()).limit(limit * 2).all() + alerts = alerts_query.order_by(AgentAlert.id.desc()).limit(limit).all() + approvals = approvals_query.order_by(AgentApprovalRequest.id.desc()).limit(limit).all() + + runs_sorted = list(reversed(runs)) + events_sorted = list(reversed(events)) + alerts_sorted = list(reversed(alerts)) + approvals_sorted = list(reversed(approvals)) + + return { + "runs": [_serialize_run(r) for r in runs_sorted], + "events": [_serialize_event(e) for e in events_sorted], + "alerts": [_serialize_alert(a) for a in alerts_sorted], + "approvals": [_serialize_approval(a) for a in approvals_sorted], + "cursor": { + "run_id": max([since_run_id] + [r.id for r in runs_sorted]), + "event_id": max([since_event_id] + [e.id for e in events_sorted]), + "alert_id": max([since_alert_id] + [a.id for a in alerts_sorted]), + "approval_id": max([since_approval_id] + [a.id for a in approvals_sorted]), + }, + } + @router.get("/team") async def get_agent_team_endpoint( current_user: dict = Depends(get_current_user), @@ -594,6 +702,142 @@ async def get_agent_approvals_endpoint( raise HTTPException(status_code=500, detail=str(e)) +@router.get("/huddle/feed") +async def get_agent_huddle_feed_endpoint( + since_run_id: int = 0, + since_event_id: int = 0, + since_alert_id: int = 0, + since_approval_id: int = 0, + limit: int = 50, + current_user: dict = Depends(get_current_user), + db: Session = Depends(get_db), +) -> Dict[str, Any]: + try: + user_id = str(current_user.get("id")) + payload = _build_huddle_snapshot( + db=db, + user_id=user_id, + since_run_id=max(0, int(since_run_id)), + since_event_id=max(0, int(since_event_id)), + since_alert_id=max(0, int(since_alert_id)), + since_approval_id=max(0, int(since_approval_id)), + limit=max(1, min(int(limit), 200)), + ) + return { + "success": True, + "data": payload, + "timestamp": datetime.utcnow().isoformat(), + "user_id": user_id, + } + except Exception as e: + logger.error(f"Error getting huddle feed for user {current_user.get('id')}: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/huddle/stream") +async def stream_agent_huddle_endpoint( + current_user: dict = Depends(get_current_user), + db: Session = Depends(get_db), +): + user_id = str(current_user.get("id")) + + async def event_generator(): + cursor = {"run_id": 0, "event_id": 0, "alert_id": 0, "approval_id": 0} + run_signatures: Dict[int, str] = {} + + initial_snapshot = _build_huddle_snapshot(db=db, user_id=user_id, limit=50) + cursor.update(initial_snapshot.get("cursor") or {}) + for run in initial_snapshot.get("runs", []): + run_signatures[int(run.get("id") or 0)] = json.dumps( + { + "status": run.get("status"), + "success": run.get("success"), + "finished_at": run.get("finished_at"), + "error_message": run.get("error_message"), + }, + sort_keys=True, + ) + + yield f"event: snapshot\ndata: {json.dumps(initial_snapshot)}\n\n" + + while True: + try: + delta = _build_huddle_snapshot( + db=db, + user_id=user_id, + since_run_id=int(cursor.get("run_id", 0)), + since_event_id=int(cursor.get("event_id", 0)), + since_alert_id=int(cursor.get("alert_id", 0)), + since_approval_id=int(cursor.get("approval_id", 0)), + limit=50, + ) + + recent_runs = ( + db.query(AgentRun) + .filter(AgentRun.user_id == user_id) + .order_by(AgentRun.id.desc()) + .limit(100) + .all() + ) + lifecycle_updates: List[Dict[str, Any]] = [] + for run in recent_runs: + signature = json.dumps( + { + "status": run.status, + "success": run.success, + "finished_at": run.finished_at.isoformat() if run.finished_at else None, + "error_message": run.error_message, + }, + sort_keys=True, + ) + previous = run_signatures.get(run.id) + if previous != signature: + lifecycle_updates.append(_serialize_run(run)) + run_signatures[run.id] = signature + + if len(run_signatures) > 300: + keep_ids = {r.id for r in recent_runs} + run_signatures = {k: v for k, v in run_signatures.items() if k in keep_ids} + + has_changes = bool( + delta.get("events") + or delta.get("alerts") + or delta.get("approvals") + or lifecycle_updates + ) + + if has_changes: + if delta.get("cursor"): + cursor.update(delta["cursor"]) + event_payload = { + "runs": lifecycle_updates, + "events": delta.get("events", []), + "alerts": delta.get("alerts", []), + "approvals": delta.get("approvals", []), + "cursor": cursor, + "ts": datetime.utcnow().isoformat(), + } + yield f"event: delta\ndata: {json.dumps(event_payload)}\n\n" + else: + yield f"event: heartbeat\ndata: {json.dumps({'ts': datetime.utcnow().isoformat()})}\n\n" + + await asyncio.sleep(2.5) + except asyncio.CancelledError: + break + except Exception as stream_error: + logger.warning(f"Huddle stream loop error for user {user_id}: {stream_error}") + error_payload = {"message": "stream_error", "ts": datetime.utcnow().isoformat()} + yield f"event: error\ndata: {json.dumps(error_payload)}\n\n" + await asyncio.sleep(3) + + headers = { + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + } + return StreamingResponse(event_generator(), media_type="text/event-stream", headers=headers) + + @router.post("/approvals/{approval_id}/decision") async def decide_agent_approval_endpoint( approval_id: int, diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index f384143c..fc168638 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -49,6 +49,7 @@ import IntentResearchTest from './pages/IntentResearchTest'; import SchedulerDashboard from './pages/SchedulerDashboard'; import BillingPage from './pages/BillingPage'; import ApprovalsPage from './pages/ApprovalsPage'; +import TeamActivityPage from './pages/TeamActivityPage'; import StripeDisputesDashboard from './pages/StripeDisputesDashboard'; import ProtectedRoute from './components/shared/ProtectedRoute'; import GSCAuthCallback from './components/SEODashboard/components/GSCAuthCallback'; @@ -622,6 +623,7 @@ const App: React.FC = () => { } /> } /> } /> + } /> } /> } /> } /> diff --git a/frontend/src/components/MainDashboard/components/TeamHuddleWidget.tsx b/frontend/src/components/MainDashboard/components/TeamHuddleWidget.tsx index 79bd709d..5ea95b34 100644 --- a/frontend/src/components/MainDashboard/components/TeamHuddleWidget.tsx +++ b/frontend/src/components/MainDashboard/components/TeamHuddleWidget.tsx @@ -4,7 +4,6 @@ import { Paper, Typography, Avatar, - AvatarGroup, Chip, List, ListItem, @@ -20,182 +19,70 @@ import { Search as SeoIcon, Campaign as SocialIcon, CompareArrows as CompetitorIcon, - Refresh as RefreshIcon, - MoreVert as MoreVertIcon + Refresh as RefreshIcon } from '@mui/icons-material'; +import { Link as RouterLink } from 'react-router-dom'; +import { useAgentHuddleFeed } from '../../../hooks/useAgentHuddleFeed'; -interface AgentStatus { - id: string; - name: string; - role: string; - status: 'active' | 'thinking' | 'idle' | 'offline'; - current_activity: string; - icon: React.ElementType; - color: string; -} - -// Mock data - In real implementation, this would come from a backend endpoint -// /api/agents/status or similar -const AGENT_TEAM: AgentStatus[] = [ - { - id: 'strategy_architect', - name: 'Strategy Architect', - role: 'Team Lead', - status: 'active', - current_activity: 'Analyzing content pillar performance', - icon: StrategyIcon, - color: '#6366f1' // Indigo - }, - { - id: 'content_strategist', - name: 'Content Strategist', - role: 'Creative', - status: 'thinking', - current_activity: 'Identifying semantic gaps in "AI Tools"', - icon: ContentIcon, - color: '#10b981' // Emerald - }, - { - id: 'seo_specialist', - name: 'SEO Specialist', - role: 'Technical', - status: 'idle', - current_activity: 'Monitoring SERP rankings', - icon: SeoIcon, - color: '#f59e0b' // Amber - }, - { - id: 'social_manager', - name: 'Social Manager', - role: 'Engagement', - status: 'idle', - current_activity: 'Waiting for new content to schedule', - icon: SocialIcon, - color: '#ec4899' // Pink - }, - { - id: 'competitor_analyst', - name: 'Competitor Analyst', - role: 'Intelligence', - status: 'active', - current_activity: 'Scanning competitor X for new posts', - icon: CompetitorIcon, - color: '#ef4444' // Red - } -]; +const ICON_BY_AGENT: Record = { + strategy: StrategyIcon, + content: ContentIcon, + seo: SeoIcon, + social: SocialIcon, + competitor: CompetitorIcon, +}; const TeamHuddleWidget: React.FC = () => { + const { runs, events, connectionMode, lastHeartbeatAt } = useAgentHuddleFeed(); + + const rows = React.useMemo(() => { + return runs.slice(0, 5).map((run) => { + const event = events.find((e) => e.agent_type === run.agent_type); + const agentType = String(run.agent_type || 'strategy'); + const IconComponent = ICON_BY_AGENT[agentType] || StrategyIcon; + const status = run.status === 'running' ? 'thinking' : run.success === false ? 'offline' : 'active'; + return { + id: run.id, + name: agentType.replace(/_/g, ' ').replace(/\b\w/g, (c) => c.toUpperCase()), + role: run.status || 'active', + status, + current_activity: event?.message || run.result_summary || 'Awaiting next update', + icon: IconComponent, + }; + }); + }, [runs, events]); + return ( - + - - Team Huddle - - - - - - - - - + Team Huddle + + + + - {AGENT_TEAM.map((agent, index) => ( + {rows.map((agent, index) => ( {index > 0 && } - - - - } - > + - - - + - - {agent.name} - - - {agent.role} - - - } - secondary={ - - {agent.current_activity} - - } + primary={{agent.name}{agent.role}} + secondary={{agent.current_activity}} /> ))} - + - + View Full Team Activity diff --git a/frontend/src/hooks/useAgentHuddleFeed.ts b/frontend/src/hooks/useAgentHuddleFeed.ts new file mode 100644 index 00000000..34656733 --- /dev/null +++ b/frontend/src/hooks/useAgentHuddleFeed.ts @@ -0,0 +1,167 @@ +import { useCallback, useEffect, useMemo, useRef, useState } from 'react'; +import { apiClient, getApiUrl, getAuthTokenGetter } from '../api/client'; + +export interface AgentRunItem { id: number; agent_type?: string; status?: string; success?: boolean | null; result_summary?: string | null; finished_at?: string | null; } +export interface AgentEventItem { id: number; agent_type?: string; event_type?: string; message?: string | null; created_at?: string | null; } +export interface AgentAlertItem { id: number; title?: string; message?: string; severity?: string; read_at?: string | null; } +export interface AgentApprovalItem { id: number; action_type?: string; status?: string; risk_level?: number; created_at?: string | null; } + +interface Cursor { run_id: number; event_id: number; alert_id: number; approval_id: number; } +interface FeedPayload { + runs: AgentRunItem[]; + events: AgentEventItem[]; + alerts: AgentAlertItem[]; + approvals: AgentApprovalItem[]; + cursor: Cursor; +} + +const DEFAULT_CURSOR: Cursor = { run_id: 0, event_id: 0, alert_id: 0, approval_id: 0 }; +const BASE_BACKOFF_MS = 1500; +const MAX_BACKOFF_MS = 20000; + +const mergeById = (prev: T[], incoming: T[], limit = 100): T[] => { + if (!incoming.length) return prev; + const byId = new Map(); + [...prev, ...incoming].forEach((item) => byId.set(item.id, item)); + return Array.from(byId.values()).sort((a, b) => b.id - a.id).slice(0, limit); +}; + +const parseSseLines = (raw: string): Array<{ event: string; data: string }> => { + return raw + .split('\n\n') + .map((block) => block.trim()) + .filter(Boolean) + .map((block) => { + const lines = block.split('\n'); + const event = (lines.find((line) => line.startsWith('event:')) || 'event: message').replace('event:', '').trim(); + const data = lines + .filter((line) => line.startsWith('data:')) + .map((line) => line.replace('data:', '').trim()) + .join(''); + return { event, data }; + }); +}; + +export const useAgentHuddleFeed = () => { + const [feed, setFeed] = useState({ runs: [], events: [], alerts: [], approvals: [], cursor: DEFAULT_CURSOR }); + const [connectionMode, setConnectionMode] = useState<'connecting' | 'sse' | 'polling'>('connecting'); + const [lastHeartbeatAt, setLastHeartbeatAt] = useState(null); + const stopRef = useRef(false); + const reconnectAttemptRef = useRef(0); + const cursorRef = useRef(DEFAULT_CURSOR); + + const applyPayload = useCallback((payload: Partial, replace = false) => { + setFeed((prev) => ({ + runs: replace ? (payload.runs || []) : mergeById(prev.runs, payload.runs || []), + events: replace ? (payload.events || []) : mergeById(prev.events, payload.events || []), + alerts: replace ? (payload.alerts || []) : mergeById(prev.alerts, payload.alerts || []), + approvals: replace ? (payload.approvals || []) : mergeById(prev.approvals, payload.approvals || []), + cursor: { + run_id: payload.cursor?.run_id ?? prev.cursor.run_id, + event_id: payload.cursor?.event_id ?? prev.cursor.event_id, + alert_id: payload.cursor?.alert_id ?? prev.cursor.alert_id, + approval_id: payload.cursor?.approval_id ?? prev.cursor.approval_id, + }, + })); + if (payload.cursor) { + cursorRef.current = { + run_id: payload.cursor.run_id ?? cursorRef.current.run_id, + event_id: payload.cursor.event_id ?? cursorRef.current.event_id, + alert_id: payload.cursor.alert_id ?? cursorRef.current.alert_id, + approval_id: payload.cursor.approval_id ?? cursorRef.current.approval_id, + }; + } + }, []); + + const loadSnapshot = useCallback(async (cursor?: Cursor) => { + const resp = await apiClient.get('/api/agents/huddle/feed', { params: cursor || {} }); + const data = resp?.data?.data as FeedPayload; + applyPayload(data, !cursor); + return data; + }, [applyPayload]); + + useEffect(() => { + stopRef.current = false; + let pollingTimer: ReturnType | null = null; + + const startPolling = () => { + setConnectionMode('polling'); + if (pollingTimer) clearInterval(pollingTimer); + pollingTimer = setInterval(async () => { + try { + await loadSnapshot(cursorRef.current); + } catch { + // no-op + } + }, 7000); + }; + + const connect = async () => { + try { + setConnectionMode('connecting'); + await loadSnapshot(); + const tokenGetter = getAuthTokenGetter(); + const token = tokenGetter ? await tokenGetter() : null; + if (!token) throw new Error('No auth token available for SSE stream'); + + const response = await fetch(`${getApiUrl()}/api/agents/huddle/stream`, { + headers: { Authorization: `Bearer ${token}`, Accept: 'text/event-stream' }, + }); + + if (!response.ok || !response.body) { + throw new Error(`SSE stream unavailable (${response.status})`); + } + + reconnectAttemptRef.current = 0; + setConnectionMode('sse'); + + const reader = response.body.getReader(); + const decoder = new TextDecoder('utf-8'); + let buffer = ''; + + while (!stopRef.current) { + const { done, value } = await reader.read(); + if (done) { + throw new Error('SSE stream ended'); + } + buffer += decoder.decode(value, { stream: true }); + const chunks = buffer.split('\n\n'); + buffer = chunks.pop() || ''; + + for (const packet of parseSseLines(chunks.join('\n\n'))) { + if (!packet.data) continue; + if (packet.event === 'heartbeat') { + setLastHeartbeatAt(Date.now()); + continue; + } + const payload = JSON.parse(packet.data); + if (packet.event === 'snapshot') { + applyPayload(payload, true); + } + if (packet.event === 'delta') { + applyPayload(payload, false); + } + } + } + } catch { + reconnectAttemptRef.current += 1; + if (reconnectAttemptRef.current >= 3) { + startPolling(); + return; + } + const sleepMs = Math.min(MAX_BACKOFF_MS, BASE_BACKOFF_MS * (2 ** reconnectAttemptRef.current)); + await new Promise((resolve) => setTimeout(resolve, sleepMs)); + if (!stopRef.current) connect(); + } + }; + + connect(); + + return () => { + stopRef.current = true; + if (pollingTimer) clearInterval(pollingTimer); + }; + }, [applyPayload, loadSnapshot]); + + return useMemo(() => ({ ...feed, connectionMode, lastHeartbeatAt }), [feed, connectionMode, lastHeartbeatAt]); +}; diff --git a/frontend/src/pages/TeamActivityPage.tsx b/frontend/src/pages/TeamActivityPage.tsx new file mode 100644 index 00000000..71026c9d --- /dev/null +++ b/frontend/src/pages/TeamActivityPage.tsx @@ -0,0 +1,66 @@ +import React from 'react'; +import { Box, Card, CardContent, Chip, Divider, Grid, List, ListItem, ListItemText, Typography } from '@mui/material'; +import { useAgentHuddleFeed } from '../hooks/useAgentHuddleFeed'; + +const TeamActivityPage: React.FC = () => { + const { runs, events, alerts, approvals, connectionMode } = useAgentHuddleFeed(); + + return ( + + + Team Activity + + + + + + + Run lifecycle updates + + {runs.slice(0, 20).map((run) => ( + + ))} + + + + + + + New events + + {events.slice(0, 20).map((event) => ( + + ))} + + + + + + + + + Alert deltas + + {alerts.slice(0, 20).map((alert) => ( + + ))} + + + + + + + Approval deltas + + {approvals.slice(0, 20).map((approval) => ( + + ))} + + + + + + ); +}; + +export default TeamActivityPage;