Compare commits
1 Commits
main
...
codex/impl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ea0a7ec34e |
@@ -4,13 +4,18 @@ AI response quality assessment and strategic analysis.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from typing import Dict, Any, List
|
from typing import Dict, Any, List, Tuple
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class QualityValidationService:
|
class QualityValidationService:
|
||||||
"""Service for quality validation and strategic analysis."""
|
"""Service for quality validation and strategic analysis."""
|
||||||
|
|
||||||
|
_RECOMMENDATION_FIELDS = ("recommendation", "title", "action", "description")
|
||||||
|
_EVIDENCE_FIELDS = ("evidence", "rationale", "reason", "justification", "supporting_data")
|
||||||
|
_SPECIFICITY_FIELDS = ("owner", "timeline", "kpi", "metric", "target", "channel", "audience")
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@@ -20,6 +25,7 @@ class QualityValidationService:
|
|||||||
Schema format example:
|
Schema format example:
|
||||||
{"type": "object", "required": ["strategy_brief", "channels"], "properties": {"strategy_brief": {"type": "object"}, "channels": {"type": "array"}}}
|
{"type": "object", "required": ["strategy_brief", "channels"], "properties": {"strategy_brief": {"type": "object"}, "channels": {"type": "array"}}}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def _check(node, sch, path="$"):
|
def _check(node, sch, path="$"):
|
||||||
t = sch.get("type")
|
t = sch.get("type")
|
||||||
if t == "object":
|
if t == "object":
|
||||||
@@ -51,126 +57,192 @@ class QualityValidationService:
|
|||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
|
|
||||||
_check(data, schema)
|
_check(data, schema)
|
||||||
|
|
||||||
def calculate_strategic_scores(self, ai_recommendations: Dict[str, Any]) -> Dict[str, float]:
|
def _safe_text(self, value: Any) -> str:
|
||||||
"""Calculate strategic performance scores from AI recommendations."""
|
return value.strip() if isinstance(value, str) else ""
|
||||||
scores = {
|
|
||||||
'overall_score': 0.0,
|
def _normalize_recommendations(self, ai_recommendations: Dict[str, Any]) -> Tuple[List[Dict[str, Any]], List[Dict[str, str]]]:
|
||||||
'content_quality_score': 0.0,
|
"""Flatten heterogeneous AI payload into normalized recommendation entries."""
|
||||||
'engagement_score': 0.0,
|
entries: List[Dict[str, Any]] = []
|
||||||
'conversion_score': 0.0,
|
failures: List[Dict[str, str]] = []
|
||||||
'innovation_score': 0.0
|
|
||||||
|
if not isinstance(ai_recommendations, dict):
|
||||||
|
failures.append({"error": "invalid_root", "detail": "ai_recommendations must be a dictionary"})
|
||||||
|
return entries, failures
|
||||||
|
|
||||||
|
for section, payload in ai_recommendations.items():
|
||||||
|
if not isinstance(payload, dict):
|
||||||
|
failures.append({"section": str(section), "error": "invalid_section", "detail": "section payload must be an object"})
|
||||||
|
continue
|
||||||
|
|
||||||
|
items = payload.get("recommendations")
|
||||||
|
if items is None:
|
||||||
|
candidate = payload.get("recommendation") or payload.get("action") or payload.get("description")
|
||||||
|
if isinstance(candidate, str) and candidate.strip():
|
||||||
|
items = [{"recommendation": candidate}]
|
||||||
|
else:
|
||||||
|
failures.append({"section": str(section), "error": "missing_recommendations", "detail": "section missing recommendations list"})
|
||||||
|
continue
|
||||||
|
|
||||||
|
if isinstance(items, dict):
|
||||||
|
items = [items]
|
||||||
|
if not isinstance(items, list):
|
||||||
|
failures.append({"section": str(section), "error": "invalid_recommendations", "detail": "recommendations must be list or object"})
|
||||||
|
continue
|
||||||
|
|
||||||
|
for idx, item in enumerate(items):
|
||||||
|
if not isinstance(item, dict):
|
||||||
|
failures.append({"section": str(section), "error": "invalid_item", "detail": f"recommendation[{idx}] must be object"})
|
||||||
|
continue
|
||||||
|
|
||||||
|
rec_text = next((self._safe_text(item.get(field)) for field in self._RECOMMENDATION_FIELDS if self._safe_text(item.get(field))), "")
|
||||||
|
if not rec_text:
|
||||||
|
failures.append({"section": str(section), "error": "missing_text", "detail": f"recommendation[{idx}] missing primary recommendation text"})
|
||||||
|
continue
|
||||||
|
|
||||||
|
confidence = item.get("confidence", payload.get("metrics", {}).get("confidence", 0.5))
|
||||||
|
try:
|
||||||
|
confidence = float(confidence)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
confidence = 0.5
|
||||||
|
|
||||||
|
evidence = next((self._safe_text(item.get(field)) for field in self._EVIDENCE_FIELDS if self._safe_text(item.get(field))), "")
|
||||||
|
|
||||||
|
entry = {
|
||||||
|
"section": section,
|
||||||
|
"text": rec_text,
|
||||||
|
"confidence": max(0.0, min(1.0, confidence)),
|
||||||
|
"priority": self._safe_text(item.get("priority")) or "medium",
|
||||||
|
"impact": self._safe_text(item.get("impact")) or "medium",
|
||||||
|
"probability": self._safe_text(item.get("probability")) or "medium",
|
||||||
|
"implementation": self._safe_text(item.get("implementation")) or self._safe_text(item.get("status")) or "unspecified",
|
||||||
|
"evidence": evidence,
|
||||||
|
"metadata": item,
|
||||||
|
}
|
||||||
|
entries.append(entry)
|
||||||
|
|
||||||
|
if failures:
|
||||||
|
logger.warning("quality_validation_normalization_failures", extra={"validation_failures": failures})
|
||||||
|
return entries, failures
|
||||||
|
|
||||||
|
def _compute_recommendation_quality(self, entries: List[Dict[str, Any]]) -> Dict[str, float]:
|
||||||
|
if not entries:
|
||||||
|
return {"evidence_density": 0.0, "specificity": 0.0, "field_coverage": 0.0, "overall_quality": 0.0}
|
||||||
|
|
||||||
|
evidence_count = sum(1 for e in entries if e.get("evidence"))
|
||||||
|
specificity_hits = 0
|
||||||
|
for entry in entries:
|
||||||
|
metadata = entry.get("metadata", {})
|
||||||
|
for field in self._SPECIFICITY_FIELDS:
|
||||||
|
if self._safe_text(metadata.get(field)):
|
||||||
|
specificity_hits += 1
|
||||||
|
if any(ch.isdigit() for ch in entry.get("text", "")):
|
||||||
|
specificity_hits += 1
|
||||||
|
|
||||||
|
coverage_fields = ["text", "priority", "impact", "confidence", "implementation", "section"]
|
||||||
|
present = sum(1 for e in entries for field in coverage_fields if e.get(field) not in (None, ""))
|
||||||
|
max_fields = len(entries) * len(coverage_fields)
|
||||||
|
|
||||||
|
evidence_density = evidence_count / len(entries)
|
||||||
|
specificity = min(1.0, specificity_hits / (len(entries) * 3))
|
||||||
|
field_coverage = present / max_fields if max_fields else 0.0
|
||||||
|
overall = (0.35 * evidence_density) + (0.35 * specificity) + (0.30 * field_coverage)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"evidence_density": evidence_density,
|
||||||
|
"specificity": specificity,
|
||||||
|
"field_coverage": field_coverage,
|
||||||
|
"overall_quality": overall,
|
||||||
}
|
}
|
||||||
|
|
||||||
# Calculate scores based on AI recommendations
|
def calculate_strategic_scores(self, ai_recommendations: Dict[str, Any]) -> Dict[str, float]:
|
||||||
total_confidence = 0
|
entries, _ = self._normalize_recommendations(ai_recommendations)
|
||||||
total_score = 0
|
quality = self._compute_recommendation_quality(entries)
|
||||||
|
|
||||||
for analysis_type, recommendations in ai_recommendations.items():
|
if not entries:
|
||||||
if isinstance(recommendations, dict) and 'metrics' in recommendations:
|
return {
|
||||||
metrics = recommendations['metrics']
|
"overall_score": 0.0,
|
||||||
score = metrics.get('score', 50)
|
"content_quality_score": 0.0,
|
||||||
confidence = metrics.get('confidence', 0.5)
|
"engagement_score": 0.0,
|
||||||
|
"conversion_score": 0.0,
|
||||||
|
"innovation_score": 0.0,
|
||||||
|
}
|
||||||
|
|
||||||
total_score += score * confidence
|
weighted_score = 0.0
|
||||||
total_confidence += confidence
|
total_confidence = 0.0
|
||||||
|
for entry in entries:
|
||||||
|
weight = entry["confidence"]
|
||||||
|
priority_boost = {"high": 1.1, "medium": 1.0, "low": 0.9}.get(entry["priority"].lower(), 1.0)
|
||||||
|
impact_boost = {"high": 1.1, "medium": 1.0, "low": 0.9}.get(entry["impact"].lower(), 1.0)
|
||||||
|
entry_score = 100.0 * quality["overall_quality"] * priority_boost * impact_boost
|
||||||
|
weighted_score += entry_score * weight
|
||||||
|
total_confidence += weight
|
||||||
|
|
||||||
if total_confidence > 0:
|
overall = weighted_score / total_confidence if total_confidence else 0.0
|
||||||
scores['overall_score'] = total_score / total_confidence
|
return {
|
||||||
|
"overall_score": round(overall, 2),
|
||||||
# Set other scores based on overall score
|
"content_quality_score": round(min(100.0, overall * (1.0 + quality["field_coverage"] * 0.15)), 2),
|
||||||
scores['content_quality_score'] = scores['overall_score'] * 1.1
|
"engagement_score": round(min(100.0, overall * (0.9 + quality["specificity"] * 0.2)), 2),
|
||||||
scores['engagement_score'] = scores['overall_score'] * 0.9
|
"conversion_score": round(min(100.0, overall * (0.9 + quality["evidence_density"] * 0.2)), 2),
|
||||||
scores['conversion_score'] = scores['overall_score'] * 0.95
|
"innovation_score": round(min(100.0, overall * (0.95 + quality["specificity"] * 0.15)), 2),
|
||||||
scores['innovation_score'] = scores['overall_score'] * 1.05
|
}
|
||||||
|
|
||||||
return scores
|
|
||||||
|
|
||||||
def extract_market_positioning(self, ai_recommendations: Dict[str, Any]) -> Dict[str, Any]:
|
def extract_market_positioning(self, ai_recommendations: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
"""Extract market positioning from AI recommendations."""
|
entries, _ = self._normalize_recommendations(ai_recommendations)
|
||||||
|
if not entries:
|
||||||
|
return {"industry_position": "unknown", "competitive_advantage": "insufficient_data", "market_share": "unknown", "positioning_score": 0}
|
||||||
|
|
||||||
|
top = max(entries, key=lambda e: e["confidence"])
|
||||||
|
positioning_score = int(min(5, max(1, round(1 + (top["confidence"] * 4)))))
|
||||||
return {
|
return {
|
||||||
'industry_position': 'emerging',
|
"industry_position": top["priority"],
|
||||||
'competitive_advantage': 'AI-powered content',
|
"competitive_advantage": top["text"],
|
||||||
'market_share': '2.5%',
|
"market_share": "unknown",
|
||||||
'positioning_score': 4
|
"positioning_score": positioning_score,
|
||||||
}
|
}
|
||||||
|
|
||||||
def extract_competitive_advantages(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]:
|
def extract_competitive_advantages(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]:
|
||||||
"""Extract competitive advantages from AI recommendations."""
|
entries, _ = self._normalize_recommendations(ai_recommendations)
|
||||||
return [
|
return [
|
||||||
{
|
{"advantage": e["text"], "impact": e["impact"].title(), "implementation": e["implementation"]}
|
||||||
'advantage': 'AI-powered content creation',
|
for e in entries[:5]
|
||||||
'impact': 'High',
|
|
||||||
'implementation': 'In Progress'
|
|
||||||
},
|
|
||||||
{
|
|
||||||
'advantage': 'Data-driven strategy',
|
|
||||||
'impact': 'Medium',
|
|
||||||
'implementation': 'Complete'
|
|
||||||
}
|
|
||||||
]
|
]
|
||||||
|
|
||||||
def extract_strategic_risks(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]:
|
def extract_strategic_risks(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]:
|
||||||
"""Extract strategic risks from AI recommendations."""
|
entries, _ = self._normalize_recommendations(ai_recommendations)
|
||||||
return [
|
risks = [e for e in entries if any(k in e["text"].lower() for k in ["risk", "threat", "decline", "churn"])]
|
||||||
{
|
return [{"risk": e["text"], "probability": e["probability"].title(), "impact": e["impact"].title()} for e in risks[:5]]
|
||||||
'risk': 'Content saturation in market',
|
|
||||||
'probability': 'Medium',
|
|
||||||
'impact': 'High'
|
|
||||||
},
|
|
||||||
{
|
|
||||||
'risk': 'Algorithm changes affecting reach',
|
|
||||||
'probability': 'High',
|
|
||||||
'impact': 'Medium'
|
|
||||||
}
|
|
||||||
]
|
|
||||||
|
|
||||||
def extract_opportunity_analysis(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]:
|
def extract_opportunity_analysis(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]:
|
||||||
"""Extract opportunity analysis from AI recommendations."""
|
entries, _ = self._normalize_recommendations(ai_recommendations)
|
||||||
|
opportunities = [e for e in entries if any(k in e["text"].lower() for k in ["opportunity", "expand", "growth", "increase"])]
|
||||||
return [
|
return [
|
||||||
{
|
{"opportunity": e["text"], "potential_impact": e["impact"].title(), "implementation_ease": e["implementation"]}
|
||||||
'opportunity': 'Video content expansion',
|
for e in opportunities[:5]
|
||||||
'potential_impact': 'High',
|
|
||||||
'implementation_ease': 'Medium'
|
|
||||||
},
|
|
||||||
{
|
|
||||||
'opportunity': 'Social media engagement',
|
|
||||||
'potential_impact': 'Medium',
|
|
||||||
'implementation_ease': 'High'
|
|
||||||
}
|
|
||||||
]
|
]
|
||||||
|
|
||||||
def validate_ai_response_quality(self, ai_response: Dict[str, Any]) -> Dict[str, Any]:
|
def validate_ai_response_quality(self, ai_response: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
"""Validate the quality of AI response."""
|
entries, failures = self._normalize_recommendations(ai_response)
|
||||||
quality_metrics = {
|
quality = self._compute_recommendation_quality(entries)
|
||||||
'completeness': 0.0,
|
|
||||||
'relevance': 0.0,
|
|
||||||
'actionability': 0.0,
|
|
||||||
'confidence': 0.0,
|
|
||||||
'overall_quality': 0.0
|
|
||||||
}
|
|
||||||
|
|
||||||
# Calculate completeness
|
required_fields = ["recommendations", "insights", "metrics"]
|
||||||
required_fields = ['recommendations', 'insights', 'metrics']
|
|
||||||
present_fields = sum(1 for field in required_fields if field in ai_response)
|
present_fields = sum(1 for field in required_fields if field in ai_response)
|
||||||
quality_metrics['completeness'] = present_fields / len(required_fields)
|
completeness = present_fields / len(required_fields)
|
||||||
|
|
||||||
# Calculate relevance (placeholder logic)
|
confidence = 0.0
|
||||||
quality_metrics['relevance'] = 0.8 if ai_response.get('analysis_type') else 0.5
|
if entries:
|
||||||
|
confidence = sum(e["confidence"] for e in entries) / len(entries)
|
||||||
|
|
||||||
# Calculate actionability (placeholder logic)
|
return {
|
||||||
recommendations = ai_response.get('recommendations', [])
|
"completeness": completeness,
|
||||||
quality_metrics['actionability'] = min(1.0, len(recommendations) / 5.0)
|
"relevance": quality["field_coverage"],
|
||||||
|
"actionability": quality["specificity"],
|
||||||
# Calculate confidence
|
"confidence": confidence,
|
||||||
metrics = ai_response.get('metrics', {})
|
"overall_quality": (completeness + quality["overall_quality"] + confidence) / 3,
|
||||||
quality_metrics['confidence'] = metrics.get('confidence', 0.5)
|
"validation_failures": failures,
|
||||||
|
}
|
||||||
# Calculate overall quality
|
|
||||||
quality_metrics['overall_quality'] = sum(quality_metrics.values()) / len(quality_metrics)
|
|
||||||
|
|
||||||
return quality_metrics
|
|
||||||
|
|
||||||
def assess_strategy_quality(self, strategy_data: Dict[str, Any]) -> Dict[str, Any]:
|
def assess_strategy_quality(self, strategy_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
"""Assess the overall quality of a content strategy."""
|
"""Assess the overall quality of a content strategy."""
|
||||||
@@ -182,24 +254,15 @@ class QualityValidationService:
|
|||||||
'overall_quality': 0.0
|
'overall_quality': 0.0
|
||||||
}
|
}
|
||||||
|
|
||||||
# Assess data completeness
|
|
||||||
required_fields = [
|
required_fields = [
|
||||||
'business_objectives', 'target_metrics', 'content_budget',
|
'business_objectives', 'target_metrics', 'content_budget',
|
||||||
'team_size', 'implementation_timeline'
|
'team_size', 'implementation_timeline'
|
||||||
]
|
]
|
||||||
present_fields = sum(1 for field in required_fields if strategy_data.get(field))
|
present_fields = sum(1 for field in required_fields if strategy_data.get(field))
|
||||||
quality_assessment['data_completeness'] = present_fields / len(required_fields)
|
quality_assessment['data_completeness'] = present_fields / len(required_fields)
|
||||||
|
|
||||||
# Assess strategic clarity (placeholder logic)
|
|
||||||
quality_assessment['strategic_clarity'] = 0.7 if strategy_data.get('business_objectives') else 0.3
|
quality_assessment['strategic_clarity'] = 0.7 if strategy_data.get('business_objectives') else 0.3
|
||||||
|
|
||||||
# Assess implementation readiness (placeholder logic)
|
|
||||||
quality_assessment['implementation_readiness'] = 0.6 if strategy_data.get('team_size') else 0.2
|
quality_assessment['implementation_readiness'] = 0.6 if strategy_data.get('team_size') else 0.2
|
||||||
|
|
||||||
# Assess competitive positioning (placeholder logic)
|
|
||||||
quality_assessment['competitive_positioning'] = 0.5 if strategy_data.get('competitive_position') else 0.2
|
quality_assessment['competitive_positioning'] = 0.5 if strategy_data.get('competitive_position') else 0.2
|
||||||
|
|
||||||
# Calculate overall quality
|
|
||||||
quality_assessment['overall_quality'] = sum(quality_assessment.values()) / len(quality_assessment)
|
quality_assessment['overall_quality'] = sum(quality_assessment.values()) / len(quality_assessment)
|
||||||
|
|
||||||
return quality_assessment
|
return quality_assessment
|
||||||
109
backend/tests/test_quality_validation_service.py
Normal file
109
backend/tests/test_quality_validation_service.py
Normal file
@@ -0,0 +1,109 @@
|
|||||||
|
import importlib.util
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
def _load_service_class():
|
||||||
|
module_path = Path(__file__).resolve().parents[1] / "api/content_planning/services/content_strategy/ai_analysis/quality_validation.py"
|
||||||
|
spec = importlib.util.spec_from_file_location("quality_validation", module_path)
|
||||||
|
module = importlib.util.module_from_spec(spec)
|
||||||
|
assert spec and spec.loader
|
||||||
|
spec.loader.exec_module(module)
|
||||||
|
return module.QualityValidationService
|
||||||
|
|
||||||
|
|
||||||
|
QualityValidationService = _load_service_class()
|
||||||
|
|
||||||
|
|
||||||
|
def _service():
|
||||||
|
return QualityValidationService()
|
||||||
|
|
||||||
|
|
||||||
|
def test_quality_validation_good_payload():
|
||||||
|
payload = {
|
||||||
|
"market_analysis": {
|
||||||
|
"recommendations": [
|
||||||
|
{
|
||||||
|
"recommendation": "Expand webinar content to enterprise segment by Q3 with 15% MQL target",
|
||||||
|
"evidence": "Pipeline attribution shows webinars convert 2.1x vs blog traffic",
|
||||||
|
"priority": "high",
|
||||||
|
"impact": "high",
|
||||||
|
"confidence": 0.9,
|
||||||
|
"timeline": "Q3",
|
||||||
|
"owner": "Demand Gen",
|
||||||
|
"kpi": "MQL"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"recommendation": "Increase LinkedIn video cadence to 3 posts/week",
|
||||||
|
"evidence": "Audience engagement up 28% on short-form clips",
|
||||||
|
"priority": "medium",
|
||||||
|
"impact": "medium",
|
||||||
|
"confidence": 0.8,
|
||||||
|
"channel": "LinkedIn",
|
||||||
|
"metric": "Engagement rate"
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
service = _service()
|
||||||
|
scores = service.calculate_strategic_scores(payload)
|
||||||
|
quality = service.validate_ai_response_quality(payload)
|
||||||
|
advantages = service.extract_competitive_advantages(payload)
|
||||||
|
|
||||||
|
assert scores["overall_score"] > 50
|
||||||
|
assert quality["overall_quality"] > 0.5
|
||||||
|
assert quality["validation_failures"] == []
|
||||||
|
assert len(advantages) == 2
|
||||||
|
assert advantages[0]["advantage"].startswith("Expand webinar")
|
||||||
|
|
||||||
|
|
||||||
|
def test_quality_validation_partial_payload_handles_guardrails():
|
||||||
|
payload = {
|
||||||
|
"channel_strategy": {
|
||||||
|
"recommendation": "Opportunity: expand newsletter personalization for retention"
|
||||||
|
},
|
||||||
|
"invalid_section": ["bad-shape"],
|
||||||
|
}
|
||||||
|
|
||||||
|
service = _service()
|
||||||
|
quality = service.validate_ai_response_quality(payload)
|
||||||
|
opportunities = service.extract_opportunity_analysis(payload)
|
||||||
|
|
||||||
|
assert quality["overall_quality"] >= 0
|
||||||
|
assert len(quality["validation_failures"]) >= 1
|
||||||
|
assert len(opportunities) == 1
|
||||||
|
assert opportunities[0]["opportunity"].startswith("Opportunity")
|
||||||
|
|
||||||
|
|
||||||
|
def test_quality_validation_invalid_payload():
|
||||||
|
service = _service()
|
||||||
|
quality = service.validate_ai_response_quality("not-a-dict")
|
||||||
|
scores = service.calculate_strategic_scores("not-a-dict")
|
||||||
|
|
||||||
|
assert quality["overall_quality"] == 0
|
||||||
|
assert quality["validation_failures"][0]["error"] == "invalid_root"
|
||||||
|
assert scores["overall_score"] == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_risk_extraction_from_deterministic_input():
|
||||||
|
payload = {
|
||||||
|
"risk_analysis": {
|
||||||
|
"recommendations": [
|
||||||
|
{
|
||||||
|
"title": "Risk: organic traffic decline due to SERP feature expansion",
|
||||||
|
"probability": "high",
|
||||||
|
"impact": "high",
|
||||||
|
"confidence": 0.7,
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
risks = _service().extract_strategic_risks(payload)
|
||||||
|
assert risks == [
|
||||||
|
{
|
||||||
|
"risk": "Risk: organic traffic decline due to SERP feature expansion",
|
||||||
|
"probability": "High",
|
||||||
|
"impact": "High",
|
||||||
|
}
|
||||||
|
]
|
||||||
Reference in New Issue
Block a user