Files
ALwrity/backend/services/research/intent/intent_aware_analyzer.py

572 lines
23 KiB
Python

"""
Intent-Aware Result Analyzer
Analyzes research results based on user intent.
Extracts exactly what the user needs from raw research data.
This is the key innovation - instead of generic analysis,
we analyze results through the lens of what the user wants to accomplish.
Author: ALwrity Team
Version: 1.0
"""
import json
from typing import Dict, Any, List, Optional
from loguru import logger
from models.research_intent_models import (
ResearchIntent,
IntentDrivenResearchResult,
ExpectedDeliverable,
StatisticWithCitation,
ExpertQuote,
CaseStudySummary,
TrendAnalysis,
ComparisonTable,
ComparisonItem,
ProsCons,
SourceWithRelevance,
)
from models.research_persona_models import ResearchPersona
from .intent_prompt_builder import IntentPromptBuilder
class IntentAwareAnalyzer:
"""
Analyzes research results based on user intent.
Instead of generic summaries, this extracts exactly what the user
needs: statistics, quotes, case studies, trends, etc.
"""
def __init__(self):
"""Initialize the analyzer."""
self.prompt_builder = IntentPromptBuilder()
logger.info("IntentAwareAnalyzer initialized")
async def analyze(
self,
raw_results: Dict[str, Any],
intent: ResearchIntent,
research_persona: Optional[ResearchPersona] = None,
user_id: Optional[str] = None,
) -> IntentDrivenResearchResult:
"""
Analyze raw research results based on user intent.
Args:
raw_results: Raw results from Exa/Tavily/Google
intent: The user's research intent
research_persona: Optional persona for context
Returns:
IntentDrivenResearchResult with extracted deliverables
"""
try:
logger.info(f"Analyzing results for intent: {intent.primary_question[:50]}...")
# Format raw results for analysis
formatted_results = self._format_raw_results(raw_results)
# Build the analysis prompt
prompt = self.prompt_builder.build_intent_aware_analysis_prompt(
raw_results=formatted_results,
intent=intent,
research_persona=research_persona,
)
# Define the expected JSON schema
analysis_schema = self._build_analysis_schema(intent.expected_deliverables)
# Call LLM for analysis
from services.llm_providers.main_text_generation import llm_text_gen
result = llm_text_gen(
prompt=prompt,
json_struct=analysis_schema,
user_id=user_id # Required for subscription checking
)
if isinstance(result, dict) and "error" in result:
logger.error(f"Intent-aware analysis failed: {result.get('error')}")
return self._create_fallback_result(raw_results, intent)
# Parse and validate the result
analyzed_result = self._parse_analysis_result(result, intent, raw_results)
logger.info(
f"Analysis complete: {len(analyzed_result.key_takeaways)} takeaways, "
f"{len(analyzed_result.statistics)} stats, "
f"{len(analyzed_result.sources)} sources"
)
return analyzed_result
except Exception as e:
logger.error(f"Error in intent-aware analysis: {e}")
return self._create_fallback_result(raw_results, intent)
def _format_raw_results(self, raw_results: Dict[str, Any]) -> str:
"""Format raw research results for LLM analysis."""
formatted_parts = []
# Extract content
content = raw_results.get("content", "")
if content:
formatted_parts.append(f"=== MAIN CONTENT ===\n{content[:8000]}")
# Extract sources with their content
sources = raw_results.get("sources", [])
if sources:
formatted_parts.append("\n=== SOURCES ===")
for i, source in enumerate(sources[:15], 1): # Limit to 15 sources
title = source.get("title", "Untitled")
url = source.get("url", "")
excerpt = source.get("excerpt", source.get("text", source.get("content", "")))
formatted_parts.append(f"\nSource {i}: {title}")
formatted_parts.append(f"URL: {url}")
if excerpt:
formatted_parts.append(f"Content: {excerpt[:500]}")
# Extract grounding metadata if available (from Google)
grounding = raw_results.get("grounding_metadata", {})
if grounding:
formatted_parts.append("\n=== GROUNDING DATA ===")
formatted_parts.append(json.dumps(grounding, indent=2)[:2000])
# Extract any AI answers (from Tavily)
answer = raw_results.get("answer", "")
if answer:
formatted_parts.append(f"\n=== AI-GENERATED ANSWER ===\n{answer}")
return "\n".join(formatted_parts)
def _build_analysis_schema(self, expected_deliverables: List[str]) -> Dict[str, Any]:
"""Build JSON schema based on expected deliverables."""
# Base schema
schema = {
"type": "object",
"properties": {
"primary_answer": {"type": "string"},
"secondary_answers": {
"type": "object",
"additionalProperties": {"oneOf": [{"type": "string"}, {"type": "null"}]}
},
"focus_areas_coverage": {
"type": "object",
"additionalProperties": {"oneOf": [{"type": "string"}, {"type": "null"}]},
"description": "Summary of what was found for each focus area, or null if not covered"
},
"also_answering_coverage": {
"type": "object",
"additionalProperties": {"oneOf": [{"type": "string"}, {"type": "null"}]},
"description": "Information found about each 'also answering' topic, or null if not found"
},
"executive_summary": {"type": "string"},
"key_takeaways": {
"type": "array",
"items": {"type": "string"},
"maxItems": 7
},
"confidence": {"type": "number"},
"gaps_identified": {
"type": "array",
"items": {"type": "string"}
},
"follow_up_queries": {
"type": "array",
"items": {"type": "string"}
},
},
"required": ["primary_answer", "executive_summary", "key_takeaways", "confidence"]
}
# Add deliverable-specific properties
if ExpectedDeliverable.KEY_STATISTICS.value in expected_deliverables:
schema["properties"]["statistics"] = {
"type": "array",
"items": {
"type": "object",
"properties": {
"statistic": {"type": "string"},
"value": {"type": "string"},
"context": {"type": "string"},
"source": {"type": "string"},
"url": {"type": "string"},
"credibility": {"type": "number"},
"recency": {"type": "string"}
},
"required": ["statistic", "context", "source", "url"]
}
}
if ExpectedDeliverable.EXPERT_QUOTES.value in expected_deliverables:
schema["properties"]["expert_quotes"] = {
"type": "array",
"items": {
"type": "object",
"properties": {
"quote": {"type": "string"},
"speaker": {"type": "string"},
"title": {"type": "string"},
"organization": {"type": "string"},
"source": {"type": "string"},
"url": {"type": "string"}
},
"required": ["quote", "speaker", "source", "url"]
}
}
if ExpectedDeliverable.CASE_STUDIES.value in expected_deliverables:
schema["properties"]["case_studies"] = {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": {"type": "string"},
"organization": {"type": "string"},
"challenge": {"type": "string"},
"solution": {"type": "string"},
"outcome": {"type": "string"},
"key_metrics": {"type": "array", "items": {"type": "string"}},
"source": {"type": "string"},
"url": {"type": "string"}
},
"required": ["title", "organization", "challenge", "solution", "outcome"]
}
}
if ExpectedDeliverable.TRENDS.value in expected_deliverables:
schema["properties"]["trends"] = {
"type": "array",
"items": {
"type": "object",
"properties": {
"trend": {"type": "string"},
"direction": {"type": "string"},
"evidence": {"type": "array", "items": {"type": "string"}},
"impact": {"type": "string"},
"timeline": {"type": "string"},
"sources": {"type": "array", "items": {"type": "string"}}
},
"required": ["trend", "direction", "evidence"]
}
}
if ExpectedDeliverable.COMPARISONS.value in expected_deliverables:
schema["properties"]["comparisons"] = {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": {"type": "string"},
"criteria": {"type": "array", "items": {"type": "string"}},
"items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"pros": {"type": "array", "items": {"type": "string"}},
"cons": {"type": "array", "items": {"type": "string"}},
"features": {"type": "object"}
}
}
},
"verdict": {"type": "string"}
}
}
}
if ExpectedDeliverable.PROS_CONS.value in expected_deliverables:
schema["properties"]["pros_cons"] = {
"type": "object",
"properties": {
"subject": {"type": "string"},
"pros": {"type": "array", "items": {"type": "string"}},
"cons": {"type": "array", "items": {"type": "string"}},
"balanced_verdict": {"type": "string"}
}
}
if ExpectedDeliverable.BEST_PRACTICES.value in expected_deliverables:
schema["properties"]["best_practices"] = {
"type": "array",
"items": {"type": "string"}
}
if ExpectedDeliverable.STEP_BY_STEP.value in expected_deliverables:
schema["properties"]["step_by_step"] = {
"type": "array",
"items": {"type": "string"}
}
if ExpectedDeliverable.DEFINITIONS.value in expected_deliverables:
schema["properties"]["definitions"] = {
"type": "object",
"additionalProperties": {"type": "string"}
}
if ExpectedDeliverable.EXAMPLES.value in expected_deliverables:
schema["properties"]["examples"] = {
"type": "array",
"items": {"type": "string"}
}
if ExpectedDeliverable.PREDICTIONS.value in expected_deliverables:
schema["properties"]["predictions"] = {
"type": "array",
"items": {"type": "string"}
}
# Always include sources and suggested outline
schema["properties"]["sources"] = {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": {"type": "string"},
"url": {"type": "string"},
"relevance_score": {"type": "number"},
"relevance_reason": {"type": "string"},
"content_type": {"type": "string"},
"credibility_score": {"type": "number"}
},
"required": ["title", "url"]
}
}
schema["properties"]["suggested_outline"] = {
"type": "array",
"items": {"type": "string"}
}
return schema
def _parse_analysis_result(
self,
result: Dict[str, Any],
intent: ResearchIntent,
raw_results: Dict[str, Any],
) -> IntentDrivenResearchResult:
"""Parse LLM analysis result into structured format."""
# Parse statistics
statistics = []
for stat in result.get("statistics", []):
try:
statistics.append(StatisticWithCitation(
statistic=stat.get("statistic", ""),
value=stat.get("value"),
context=stat.get("context", ""),
source=stat.get("source", ""),
url=stat.get("url", ""),
credibility=float(stat.get("credibility", 0.8)),
recency=stat.get("recency"),
))
except Exception as e:
logger.warning(f"Failed to parse statistic: {e}")
# Parse expert quotes
expert_quotes = []
for quote in result.get("expert_quotes", []):
try:
expert_quotes.append(ExpertQuote(
quote=quote.get("quote", ""),
speaker=quote.get("speaker", ""),
title=quote.get("title"),
organization=quote.get("organization"),
context=quote.get("context"),
source=quote.get("source", ""),
url=quote.get("url", ""),
))
except Exception as e:
logger.warning(f"Failed to parse expert quote: {e}")
# Parse case studies
case_studies = []
for cs in result.get("case_studies", []):
try:
case_studies.append(CaseStudySummary(
title=cs.get("title", ""),
organization=cs.get("organization", ""),
challenge=cs.get("challenge", ""),
solution=cs.get("solution", ""),
outcome=cs.get("outcome", ""),
key_metrics=cs.get("key_metrics", []),
source=cs.get("source", ""),
url=cs.get("url", ""),
))
except Exception as e:
logger.warning(f"Failed to parse case study: {e}")
# Parse trends
trends = []
for trend in result.get("trends", []):
try:
trends.append(TrendAnalysis(
trend=trend.get("trend", ""),
direction=trend.get("direction", "growing"),
evidence=trend.get("evidence", []),
impact=trend.get("impact"),
timeline=trend.get("timeline"),
sources=trend.get("sources", []),
))
except Exception as e:
logger.warning(f"Failed to parse trend: {e}")
# Parse comparisons
comparisons = []
for comp in result.get("comparisons", []):
try:
items = []
for item in comp.get("items", []):
items.append(ComparisonItem(
name=item.get("name", ""),
description=item.get("description"),
pros=item.get("pros", []),
cons=item.get("cons", []),
features=item.get("features", {}),
rating=item.get("rating"),
source=item.get("source"),
))
comparisons.append(ComparisonTable(
title=comp.get("title", ""),
criteria=comp.get("criteria", []),
items=items,
winner=comp.get("winner"),
verdict=comp.get("verdict"),
))
except Exception as e:
logger.warning(f"Failed to parse comparison: {e}")
# Parse pros/cons
pros_cons = None
pc_data = result.get("pros_cons")
if pc_data:
try:
pros_cons = ProsCons(
subject=pc_data.get("subject", intent.original_input),
pros=pc_data.get("pros", []),
cons=pc_data.get("cons", []),
balanced_verdict=pc_data.get("balanced_verdict", ""),
)
except Exception as e:
logger.warning(f"Failed to parse pros/cons: {e}")
# Parse sources
sources = []
for src in result.get("sources", []):
try:
sources.append(SourceWithRelevance(
title=src.get("title", ""),
url=src.get("url", ""),
excerpt=src.get("excerpt"),
relevance_score=float(src.get("relevance_score", 0.8)),
relevance_reason=src.get("relevance_reason"),
content_type=src.get("content_type"),
published_date=src.get("published_date"),
credibility_score=float(src.get("credibility_score", 0.8)),
))
except Exception as e:
logger.warning(f"Failed to parse source: {e}")
# If no sources from analysis, extract from raw results
if not sources:
sources = self._extract_sources_from_raw(raw_results)
# Parse coverage fields (handle null values)
focus_areas_coverage = {}
for area, coverage in result.get("focus_areas_coverage", {}).items():
focus_areas_coverage[area] = coverage if coverage else None
also_answering_coverage = {}
for topic, coverage in result.get("also_answering_coverage", {}).items():
also_answering_coverage[topic] = coverage if coverage else None
return IntentDrivenResearchResult(
success=True,
primary_answer=result.get("primary_answer", ""),
secondary_answers=result.get("secondary_answers", {}),
focus_areas_coverage=focus_areas_coverage,
also_answering_coverage=also_answering_coverage,
statistics=statistics,
expert_quotes=expert_quotes,
case_studies=case_studies,
comparisons=comparisons,
trends=trends,
best_practices=result.get("best_practices", []),
step_by_step=result.get("step_by_step", []),
pros_cons=pros_cons,
definitions=result.get("definitions", {}),
examples=result.get("examples", []),
predictions=result.get("predictions", []),
executive_summary=result.get("executive_summary", ""),
key_takeaways=result.get("key_takeaways", []),
suggested_outline=result.get("suggested_outline", []),
sources=sources,
raw_content=self._format_raw_results(raw_results)[:5000],
confidence=float(result.get("confidence", 0.7)),
gaps_identified=result.get("gaps_identified", []),
follow_up_queries=result.get("follow_up_queries", []),
original_intent=intent,
)
def _extract_sources_from_raw(self, raw_results: Dict[str, Any]) -> List[SourceWithRelevance]:
"""Extract sources from raw results when analysis doesn't provide them."""
sources = []
for src in raw_results.get("sources", [])[:10]:
try:
sources.append(SourceWithRelevance(
title=src.get("title", "Untitled"),
url=src.get("url", ""),
excerpt=src.get("excerpt", src.get("text", ""))[:200],
relevance_score=0.8,
credibility_score=float(src.get("credibility_score", 0.8)),
))
except Exception as e:
logger.warning(f"Failed to extract source: {e}")
return sources
def _create_fallback_result(
self,
raw_results: Dict[str, Any],
intent: ResearchIntent,
) -> IntentDrivenResearchResult:
"""Create a fallback result when AI analysis fails."""
# Extract basic information from raw results
content = raw_results.get("content", "")
sources = self._extract_sources_from_raw(raw_results)
# Create basic takeaways from content
key_takeaways = []
if content:
sentences = content.split(". ")[:5]
key_takeaways = [s.strip() + "." for s in sentences if len(s) > 20]
return IntentDrivenResearchResult(
success=True,
primary_answer=f"Research findings for: {intent.primary_question}",
secondary_answers={},
focus_areas_coverage={area: None for area in intent.focus_areas} if intent.focus_areas else {},
also_answering_coverage={topic: None for topic in intent.also_answering} if intent.also_answering else {},
executive_summary=content[:300] if content else "Research completed",
key_takeaways=key_takeaways,
sources=sources,
raw_content=self._format_raw_results(raw_results)[:5000],
confidence=0.5,
gaps_identified=[
"AI analysis failed - showing raw results",
"Manual review recommended"
],
follow_up_queries=[],
original_intent=intent,
)