diff --git a/backend/services/task_memory_service.py b/backend/services/task_memory_service.py index 237803da..f379deb4 100644 --- a/backend/services/task_memory_service.py +++ b/backend/services/task_memory_service.py @@ -13,6 +13,10 @@ from sqlalchemy.orm import Session from models.daily_workflow_models import TaskHistory, DailyWorkflowTask from services.intelligence.txtai_service import TxtaiIntelligenceService +EXACT_DUPLICATE_LOOKBACK_DAYS = 7 +SEMANTIC_SUPPRESSION_SCORE_THRESHOLD = 0.85 +SUPPRESSED_STATUSES = {"dismissed", "rejected"} + class TaskMemoryService: """ Manages the long-term memory of user tasks. @@ -96,7 +100,7 @@ class TaskMemoryService: filtered = [] # Get recent history hashes (last 7 days) - cutoff = datetime.utcnow() - timedelta(days=7) + cutoff = datetime.utcnow() - timedelta(days=EXACT_DUPLICATE_LOOKBACK_DAYS) recent_hashes = { row.task_hash for row in self.db.query(TaskHistory.task_hash) @@ -117,23 +121,39 @@ class TaskMemoryService: is_semantic_duplicate = False try: # Check if similar tasks were REJECTED recently - results = self.intelligence.search( + results = await self.intelligence.search( f"{p.title} {p.description}", limit=1 ) if results: top = results[0] - # If very similar (>0.85) and was REJECTED/DISMISSED - # We might need to fetch the metadata from the result if txtai returns it - # For now, this is a heuristic stub. Txtai search returns dict with 'id', 'score', 'text', etc. - # If we stored 'status' in metadata, we check it. - - if top['score'] > 0.85: - # Retrieve status from DB using vector_id if needed, or if metadata is returned - # Assuming we want to avoid repeating REJECTED ideas - # This requires storing 'status' in the index metadata - pass + top_score = float(top.get("score", 0)) + if top_score >= SEMANTIC_SUPPRESSION_SCORE_THRESHOLD: + indexed_status = self._extract_indexed_status(top) + if indexed_status in SUPPRESSED_STATUSES: + logger.info( + f"Filtering redundant task (semantic {top_score:.2f}, indexed status={indexed_status}): {p.title}" + ) + is_semantic_duplicate = True + else: + vector_id = top.get("id") or top.get("vector_id") + if vector_id: + history = ( + self.db.query(TaskHistory.status) + .filter( + TaskHistory.user_id == self.user_id, + TaskHistory.vector_id == str(vector_id), + ) + .order_by(TaskHistory.created_at.desc()) + .first() + ) + history_status = getattr(history, "status", None) + if history_status in SUPPRESSED_STATUSES: + logger.info( + f"Filtering redundant task (semantic {top_score:.2f}, history status={history_status}): {p.title}" + ) + is_semantic_duplicate = True except Exception: pass @@ -141,3 +161,16 @@ class TaskMemoryService: filtered.append(p) return filtered + + def _extract_indexed_status(self, search_result: Dict[str, Any]) -> Optional[str]: + """Extract indexed status from txtai result metadata if available.""" + status = search_result.get("status") + if status: + return str(status).lower() + + obj = search_result.get("object") + if isinstance(obj, dict): + obj_status = obj.get("status") + return str(obj_status).lower() if obj_status else None + + return None diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index d4ef0797..9b4dad18 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -204,8 +204,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> agent_tasks = list(unique_map.values()) # Phase 3: Check memory for rejections (Semantic Filter) - # For now, we rely on exact match logic in memory service if implemented fully - # agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks) + agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks) except Exception as e: logger.error(f"Committee proposal phase failed: {e}") diff --git a/backend/test/test_task_memory_service.py b/backend/test/test_task_memory_service.py new file mode 100644 index 00000000..58b2f296 --- /dev/null +++ b/backend/test/test_task_memory_service.py @@ -0,0 +1,101 @@ +from datetime import datetime +from types import SimpleNamespace +from unittest.mock import AsyncMock + +import pytest +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from models.daily_workflow_models import TaskHistory +from models.enhanced_strategy_models import Base +from services.task_memory_service import TaskMemoryService + + +@pytest.fixture +def db_session(): + engine = create_engine("sqlite:///:memory:") + Base.metadata.create_all(engine, tables=[TaskHistory.__table__]) + SessionLocal = sessionmaker(bind=engine) + session = SessionLocal() + try: + yield session + finally: + session.close() + + +@pytest.mark.asyncio +async def test_filter_redundant_proposals_suppresses_exact_hash_duplicates(db_session): + service = TaskMemoryService(user_id="user-1", db=db_session) + service.intelligence = SimpleNamespace(search=AsyncMock(return_value=[])) + + title = "Create LinkedIn post" + description = "Draft a post about customer success stories" + + db_session.add( + TaskHistory( + user_id="user-1", + task_hash=service._compute_hash(title, description), + title=title, + description=description, + pillar_id="engage", + status="completed", + created_at=datetime.utcnow(), + vector_id="vec-exact", + ) + ) + db_session.commit() + + proposals = [SimpleNamespace(title=title, description=description)] + filtered = await service.filter_redundant_proposals(proposals) + + assert filtered == [] + + +@pytest.mark.asyncio +async def test_filter_redundant_proposals_suppresses_semantic_dismissed_by_vector_id_lookup(db_session): + service = TaskMemoryService(user_id="user-2", db=db_session) + service.intelligence = SimpleNamespace( + search=AsyncMock(return_value=[{"id": "vec-dismissed", "score": 0.93}]) + ) + + db_session.add( + TaskHistory( + user_id="user-2", + task_hash="hash-1", + title="Old task", + description="Old description", + pillar_id="plan", + status="dismissed", + created_at=datetime.utcnow(), + vector_id="vec-dismissed", + ) + ) + db_session.commit() + + proposals = [ + SimpleNamespace( + title="Plan daily content topics", + description="Choose 3 content ideas for this week", + ) + ] + + filtered = await service.filter_redundant_proposals(proposals) + + assert filtered == [] + + +@pytest.mark.asyncio +async def test_filter_redundant_proposals_keeps_non_duplicates(db_session): + service = TaskMemoryService(user_id="user-3", db=db_session) + service.intelligence = SimpleNamespace( + search=AsyncMock(return_value=[{"id": "vec-completed", "score": 0.40}]) + ) + + proposal = SimpleNamespace( + title="Write newsletter intro", + description="Prepare a short intro for the weekly newsletter", + ) + filtered = await service.filter_redundant_proposals([proposal]) + + assert filtered == [proposal] + service.intelligence.search.assert_awaited_once()