Compare commits

...

15 Commits

Author SHA1 Message Date
ajaysi
b894bc0abb fix: GHSA-426f-p74m-73fv — JWT JWKS issuer confusion auth bypass (CVSS 9.4)
Pin issuer and JWKS URL at startup from CLERK_PUBLISHABLE_KEY.
Validate token iss claim before any JWKS fetch.
Add issuer= to jwt.decode() with verify_iss=True.
2026-06-05 12:07:22 +05:30
ajaysi
70542b32fc feat: add linkedin and facebook feature flags, clean up dead code
- Register 'linkedin' FeatureGroup with routers.linkedin and
  api.linkedin_image_generation routers
- Register 'facebook' FeatureGroup with
  api.facebook_writer.routers:facebook_router
- Add 'linkedin' and 'facebook' profiles to PROFILE_GROUP_MAP
- Remove dead imports of linkedin_router, linkedin_image_router,
  and facebook_router from app.py (router manager handles via
  CORE_ROUTER_REGISTRY)
- Add LINKEDIN and FACEBOOK keys to frontend FEATURE_KEYS
- Add route priorities for /linkedin-writer and /facebook-writer
- Change route gates from feature='social' to feature='linkedin'
  and feature='facebook' respectively
2026-06-05 12:07:22 +05:30
ajaysi
9a3d704c5c feat: add backlinking feature flag following blog_writer pattern
- Register 'backlinking' FeatureGroup in feature_registry.py with
  routers=routers.backlink_outreach:router
- Add 'backlinking' profile to PROFILE_GROUP_MAP (core + backlinking)
- Add backlink_outreach to OPTIONAL_ROUTER_REGISTRY with
  features={'all', 'backlinking'}
- Remove direct import/include of backlink_outreach from app.py
  (router manager handles both 'all' and 'backlinking' modes)
- Add BACKLINKING key to FEATURE_KEYS and route priority in
  frontend demoMode.ts
- Change frontend route gate from feature='seo' to feature='backlinking'
  so ALWRITY_ENABLED_FEATURES=backlinking enables the route
2026-06-03 20:19:41 +05:30
ajaysi
8699ffc27d fix: resolve remaining 5 QA audit findings (#3, #8, #10, #11, #12)
#3 — Duplicate prospect handling: add_lead now checks (campaign_id, url)
     before insert; bulk_add_leads skips existing URLs.
#8 — Atomic rate limiting: try_increment_* methods atomically check cap
     and increment in a single session; router uses these before send.
#10 — Reply matching via Message-ID: sender generates Message-ID header,
     stored on OutreachAttempt; reply monitor parses In-Reply-To/References;
     poll_replies matches by message_id first, falls back to from_email.
#11 — Save-to-campaign uses existing store results instead of
      re-running expensive deepDiscover.
#12 — Lead status Literal type: Pydantic models enforce valid status
      values; backend validates via LEAD_VALID_STATUSES frozenset;
      frontend API typed as LeadStatus union.
2026-06-03 20:06:11 +05:30
ajaysi
259194c289 Merge remote-tracking branch 'origin/codex/add-atomic-idempotency-reservation-method'
# Conflicts:
#	backend/routers/backlink_outreach.py
#	backend/services/backlink_outreach_models.py
2026-06-03 18:52:18 +05:30
ajaysi
2f93ae4891 Merge remote-tracking branch 'origin/codex/add-sender-email-validation-and-logging' 2026-06-03 18:50:53 +05:30
ي
bf22a3d318 Handle backlink outreach idempotency reservations 2026-06-03 18:49:14 +05:30
ajaysi
2a879a6e24 Merge remote-tracking branch 'origin/codex/update-compliance-requirements-for-outreach-send' 2026-06-03 18:49:07 +05:30
ajaysi
7749b4db0e Merge remote-tracking branch 'origin/codex/refactor-backlink-outreach-services-for-async-support'
# Conflicts:
#	backend/routers/backlink_outreach.py
2026-06-03 18:49:01 +05:30
ي
cbace3b752 Validate backlink outreach sender aliases 2026-06-03 18:48:17 +05:30
ي
98d4ac6dbd Harden backlink outreach send policy 2026-06-03 18:33:11 +05:30
ي
55b7209554 Refactor backlink discovery HTTP calls 2026-06-03 18:28:40 +05:30
ajaysi
57e46a20f8 Merge remote-tracking branch 'origin/codex/update-backlink-outreach-for-campaign-validation' 2026-06-03 18:22:35 +05:30
ي
ec2f9151b8 Harden backlink lead campaign ownership 2026-06-03 18:19:16 +05:30
ي
40516e5c79 Secure backlink lead status updates 2026-06-03 18:16:10 +05:30
16 changed files with 1168 additions and 280 deletions

View File

@@ -58,6 +58,21 @@ FEATURE_GROUPS: Dict[str, FeatureGroup] = {
"api.blog_writer.seo_analysis:router",
),
),
"backlinking": FeatureGroup(
features=("backlinking",),
routers=("routers.backlink_outreach:router",),
),
"linkedin": FeatureGroup(
features=("linkedin",),
routers=(
"routers.linkedin:router",
"api.linkedin_image_generation:router",
),
),
"facebook": FeatureGroup(
features=("facebook",),
routers=("api.facebook_writer.routers:facebook_router",),
),
}
@@ -67,5 +82,8 @@ PROFILE_GROUP_MAP: Dict[str, Tuple[str, ...]] = {
"podcast": ("core", "podcast"),
"youtube": ("core", "youtube"),
"blog_writer": ("core", "blog_writer"),
"backlinking": ("core", "backlinking"),
"linkedin": ("core", "linkedin"),
"facebook": ("core", "facebook"),
"planning": ("core", "content_planning"),
}

View File

@@ -67,6 +67,7 @@ OPTIONAL_ROUTER_REGISTRY = [
{"name": "oauth_token_monitoring", "module": "api.oauth_token_monitoring_routes", "attr": "router", "features": {"all", "core"}},
{"name": "agents", "module": "api.agents_api", "attr": "router", "features": {"all"}},
{"name": "today_workflow", "module": "api.today_workflow", "attr": "router", "features": {"all"}},
{"name": "backlink_outreach", "module": "routers.backlink_outreach", "attr": "router", "features": {"all", "backlinking"}},
]
OPTIONAL_MODULE_MATRIX = {

View File

@@ -126,19 +126,14 @@ seo_tools_router = None
if _is_full_mode():
from routers.seo_tools import router as seo_tools_router
# Skip Facebook Writer, LinkedIn, and other non-essential routes in feature-only modes
# Also skip other heavy services that trigger PersonaAnalysisService initialization
# Skip heavy services in feature-only modes (PersonaAnalysisService, etc.)
if _is_full_mode():
from api.facebook_writer.routers import facebook_router
from routers.linkedin import router as linkedin_router
from api.linkedin_image_generation import router as linkedin_image_router
from api.brainstorm import router as brainstorm_router
from api.images import router as images_router
from api.assets_serving import router as assets_serving_router
from routers.image_studio import router as image_studio_router
from routers.product_marketing import router as product_marketing_router
from routers.campaign_creator import router as campaign_creator_router
from routers.backlink_outreach import router as backlink_outreach_router
else:
# In feature-only modes, only load essential assets router
from api.assets_serving import router as assets_serving_router
@@ -147,7 +142,6 @@ else:
image_studio_router = None
product_marketing_router = None
campaign_creator_router = None
backlink_outreach_router = None
# Import hallucination detector router
try:
@@ -683,8 +677,6 @@ if _is_full_mode():
app.include_router(product_marketing_router)
if campaign_creator_router:
app.include_router(campaign_creator_router)
if backlink_outreach_router:
app.include_router(backlink_outreach_router)
router_group_status["platform_extensions"] = {
"mounted": True,
@@ -799,6 +791,24 @@ async def startup_event():
else:
logger.info(f"[FEATURE-MODE] Skipping scheduler startup (features: {enabled_features})")
# Recover stale YouTube tasks on startup
if _is_feature_enabled("youtube"):
try:
from api.youtube.task_manager import task_manager
from services.database import get_all_user_ids
user_ids = get_all_user_ids()
recovered = 0
for uid in user_ids:
try:
count = task_manager.recover_stale_tasks(uid)
recovered += count
except Exception:
pass
if recovered > 0:
logger.info(f"[STARTUP] Recovered {recovered} stale YouTube tasks across {len(user_ids)} users")
except Exception as e:
logger.warning(f"[STARTUP] YouTube task recovery skipped: {e}")
# Check Wix configuration (OAuth-based, API key optional)
wix_api_key = os.getenv('WIX_API_KEY')
if wix_api_key:

View File

@@ -50,6 +50,7 @@ class ClerkAuthMiddleware:
# Cache for PyJWKClient to avoid repeated JWKS fetches
self._jwks_client_cache = {}
self._jwks_url_cache = None
self._issuer_cache = None # Pre-configured Clerk issuer for iss validation
if not self.clerk_secret_key and not self.disable_auth:
logger.warning("CLERK_SECRET_KEY not found, authentication may fail")
@@ -58,15 +59,16 @@ class ClerkAuthMiddleware:
if CLERK_AUTH_AVAILABLE and not self.disable_auth:
try:
if self.clerk_secret_key and self.clerk_publishable_key:
# Extract instance from publishable key for JWKS URL
# Extract instance from publishable key for JWKS URL and issuer validation
# Format: pk_test_<instance>.<domain> or pk_live_<instance>.<domain>
parts = self.clerk_publishable_key.replace('pk_test_', '').replace('pk_live_', '').split('.')
if len(parts) >= 1:
# Extract the domain from publishable key or use default
# Clerk URLs are typically: https://<instance>.clerk.accounts.dev
instance = parts[0]
jwks_url = f"https://{instance}.clerk.accounts.dev/.well-known/jwks.json"
issuer_url = f"https://{instance}.clerk.accounts.dev"
jwks_url = f"{issuer_url}/.well-known/jwks.json"
# Create Clerk configuration with JWKS URL
clerk_config = ClerkConfig(
secret_key=self.clerk_secret_key,
@@ -76,6 +78,7 @@ class ClerkAuthMiddleware:
self.clerk_bearer = ClerkHTTPBearer(clerk_config)
logger.info(f"fastapi-clerk-auth initialized successfully with JWKS URL: {jwks_url}")
self._jwks_url_cache = jwks_url
self._issuer_cache = issuer_url # Pin issuer for VULN-001 fix
else:
logger.warning("Could not extract instance from publishable key")
self.clerk_bearer = None
@@ -118,19 +121,29 @@ class ClerkAuthMiddleware:
import jwt
from jwt import PyJWKClient
# Get the JWKS URL from the token header
# Get the unverified header for key ID lookup
unverified_header = jwt.get_unverified_header(token)
# Decode token to get issuer for JWKS URL
# --- SECURITY FIX (VULN-001): Validate issuer before any JWKS fetch ---
# Pre-configured issuer and JWKS URL derived from CLERK_PUBLISHABLE_KEY
# NEVER use the token's 'iss' claim to construct the JWKS URL (GHSA-426f-p74m-73fv)
expected_issuer = self._issuer_cache
jwks_url = self._jwks_url_cache
if not expected_issuer or not jwks_url:
raise Exception("Clerk issuer/JWKS URL not configured at startup")
# Decode token to validate the issuer claim against the pre-configured value
# WARNING: We must first validate 'iss' before trusting anything else
unverified_claims = jwt.decode(token, options={"verify_signature": False})
issuer = unverified_claims.get('iss', '')
# Construct JWKS URL from issuer
jwks_url = f"{issuer}/.well-known/jwks.json" if issuer else self._jwks_url_cache or ""
if not jwks_url:
raise Exception("Unable to resolve JWKS URL for Clerk verification")
# Use cached PyJWKClient to avoid repeated JWKS fetches
token_issuer = unverified_claims.get('iss', '')
if token_issuer != expected_issuer:
logger.error(
f"Issuer mismatch: token claims '{token_issuer}' "
f"but expected '{expected_issuer}'"
)
return None
# Use cached PyJWKClient with pinned jwks_url (never derived from token)
if jwks_url not in self._jwks_client_cache:
logger.info(f"Creating new PyJWKClient for {jwks_url} with caching enabled")
# Create client with caching enabled (cache_keys=True keeps keys in memory)
@@ -139,17 +152,19 @@ class ClerkAuthMiddleware:
cache_keys=True,
max_cached_keys=16
)
jwks_client = self._jwks_client_cache[jwks_url]
signing_key = jwks_client.get_signing_key_from_jwt(token)
# Verify and decode the token with clock skew tolerance
# Add 300 seconds (5 minutes) leeway to handle clock skew and token refresh delays
# SECURITY: Always pass issuer= to verify the token's 'iss' matches expected (VULN-001)
decoded_token = jwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
options={"verify_signature": True, "verify_exp": True},
issuer=expected_issuer,
options={"verify_signature": True, "verify_exp": True, "verify_iss": True},
leeway=300 # Allow 5 minutes leeway for token refresh during navigation
)

View File

@@ -46,6 +46,7 @@ class OutreachAttempt(Base):
decision_reason = Column(Text, nullable=True)
sent_at = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow, index=True)
message_id = Column(String(255), nullable=True, index=True)
class OutreachReply(Base):

View File

@@ -22,7 +22,10 @@ from services.backlink_outreach_models import (
SuppressionAddRequest,
)
from services.backlink_outreach_service import backlink_outreach_service
from services.backlink_outreach_storage import BacklinkOutreachStorageService
from services.backlink_outreach_storage import (
BacklinkCampaignNotFoundError,
BacklinkOutreachStorageService,
)
from services.backlink_outreach_sender import backlink_outreach_sender
from services.backlink_outreach_reply_monitor import backlink_outreach_reply_monitor
from services.backlink_outreach_template_generator import (
@@ -68,7 +71,7 @@ async def discover_backlink_opportunities(
payload: BacklinkKeywordInput,
current_user: Dict[str, Any] = Depends(get_current_user),
):
return backlink_outreach_service.discover_opportunities(payload.keyword, payload.max_results)
return await backlink_outreach_service.discover_opportunities_async(payload.keyword, payload.max_results)
@router.get("/migration-coverage")
@@ -84,12 +87,25 @@ async def get_backlink_migration_coverage(
async def discover_deep_backlink_opportunities(
payload: DeepKeywordInput,
current_user: Dict[str, Any] = Depends(get_current_user),
scrape_timeout_seconds: float = Query(15.0, ge=1.0, le=60.0),
scrape_max_concurrency: int = Query(5, ge=1, le=20),
):
"""Enhanced discovery using Exa neural search + DuckDuckGo with full-page scraping."""
user_id = _resolve_user_id(current_user)
result = await backlink_outreach_service.deep_discover(payload.keyword, payload.max_results)
storage = None
if payload.campaign_id:
storage = BacklinkOutreachStorageService()
if not storage.get_campaign(payload.campaign_id, user_id):
raise HTTPException(status_code=404, detail="Campaign not found")
result = await backlink_outreach_service.deep_discover(
payload.keyword,
payload.max_results,
user_id=user_id,
scrape_timeout_seconds=scrape_timeout_seconds,
scrape_max_concurrency=scrape_max_concurrency,
)
if payload.campaign_id:
saved = 0
save_failed = 0
for opp in result.get("opportunities", []):
@@ -183,7 +199,9 @@ async def add_campaign_lead(
notes=payload.notes,
)
return lead
except Exception as e:
except BacklinkCampaignNotFoundError:
raise HTTPException(status_code=404, detail="Campaign not found")
except Exception:
raise HTTPException(status_code=500, detail="Failed to add lead")
@@ -192,18 +210,48 @@ async def bulk_update_lead_status(
payload: BulkStatusUpdateRequest,
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""Bulk update lead statuses."""
"""Bulk update lead statuses for leads owned by the current user."""
user_id = _resolve_user_id(current_user)
storage = BacklinkOutreachStorageService()
access_issues = storage.get_lead_access_issues(
payload.lead_ids, user_id, campaign_id=payload.campaign_id
)
if access_issues["unauthorized"]:
raise HTTPException(
status_code=403,
detail={
"message": "One or more leads do not belong to the current user",
"lead_ids": access_issues["unauthorized"],
},
)
if access_issues["missing"]:
raise HTTPException(
status_code=404,
detail={
"message": "One or more leads were not found",
"lead_ids": access_issues["missing"],
},
)
updated = 0
failed: list[str] = []
for lid in payload.lead_ids:
try:
lead = storage.update_lead_status(lid, user_id, payload.status, payload.notes)
lead = storage.update_lead_status(
lid,
user_id,
payload.status,
payload.notes,
campaign_id=payload.campaign_id,
)
if lead:
updated += 1
else:
failed.append(lid)
except PermissionError:
raise HTTPException(
status_code=403, detail="Lead does not belong to the current user"
)
except Exception:
failed.append(lid)
return BulkStatusUpdateResponse(updated=updated, failed=failed)
@@ -218,7 +266,18 @@ async def update_lead_status(
"""Update lead status (discovered -> contacted -> replied -> placed)."""
user_id = _resolve_user_id(current_user)
storage = BacklinkOutreachStorageService()
lead = storage.update_lead_status(lead_id, user_id, payload.status, payload.notes)
try:
lead = storage.update_lead_status(
lead_id,
user_id,
payload.status,
payload.notes,
campaign_id=payload.campaign_id,
)
except PermissionError:
raise HTTPException(
status_code=403, detail="Lead does not belong to the current user"
)
if not lead:
raise HTTPException(status_code=404, detail="Lead not found")
return lead
@@ -260,42 +319,95 @@ async def send_outreach(
subject = backlink_outreach_sender.personalize(tmpl.get("subject_template", subject), variables)
body = backlink_outreach_sender.personalize(tmpl.get("body_template", body), variables)
result = backlink_outreach_service.send_outreach(
SendOutreachRequest(
lead_id=payload.lead_id,
campaign_id=payload.campaign_id,
user_id=user_id,
workspace_id=payload.workspace_id,
sender_email=payload.sender_email,
subject=subject,
body=body,
idempotency_key=payload.idempotency_key,
sender_validation = backlink_outreach_sender.validate_sender_alias(payload.sender_email)
if not sender_validation.authorized:
return SendOutreachResponse(
attempt_id="",
status="failed",
policy_allowed=False,
policy_reasons=sender_validation.failure_reasons,
effective_sender_email=sender_validation.effective_sender_email or None,
)
)
try:
result = backlink_outreach_service.send_outreach(
SendOutreachRequest(
lead_id=payload.lead_id,
campaign_id=payload.campaign_id,
user_id=user_id,
workspace_id=payload.workspace_id,
sender_email=sender_validation.effective_sender_email,
subject=subject,
body=body,
idempotency_key=payload.idempotency_key,
sender_identity=payload.sender_identity,
legal_basis=payload.legal_basis,
contact_discovery_source=payload.contact_discovery_source,
recipient_region=payload.recipient_region,
recipient_region_source=payload.recipient_region_source,
consent_status=payload.consent_status,
approved_by_human=payload.approved_by_human,
unsubscribe_url=payload.unsubscribe_url,
one_click_unsubscribe=payload.one_click_unsubscribe,
)
)
except Exception:
existing = storage.get_attempt_by_idempotency_key(payload.idempotency_key, user_id=user_id)
if existing:
result = backlink_outreach_service.response_from_attempt(existing, duplicate=True)
if sender_validation.effective_sender_email:
result.effective_sender_email = sender_validation.effective_sender_email
return result
raise HTTPException(status_code=409, detail="Unable to reserve idempotency key")
result.effective_sender_email = sender_validation.effective_sender_email
lead_email = ""
if result.attempt_id:
if result.attempt_id and result.status == "approved" and not result.duplicate:
lead = storage.get_lead(payload.lead_id, user_id=user_id)
lead_email = (lead.get("email") or "") if lead else ""
if result.policy_allowed and lead_email:
sent = await backlink_outreach_sender.send_email(
to_email=lead_email,
subject=subject,
body=body,
)
status = "sent" if sent else "failed"
storage.update_attempt_status(result.attempt_id, status, user_id=user_id)
result.status = status
if sent:
storage.mark_idempotency(payload.idempotency_key, user_id)
storage.increment_user_send_counter(user_id)
domain = lead_email.split("@")[-1] if "@" in lead_email else "unknown"
storage.increment_domain_send_counter(domain, user_id=user_id)
elif result.policy_allowed and not lead_email:
storage.update_attempt_status(result.attempt_id, "failed", user_id=user_id)
if result.status == "approved" and result.policy_allowed and not result.duplicate and lead_email:
domain = lead_email.split("@")[-1] if "@" in lead_email else "unknown"
user_within_cap, _ = storage.try_increment_user_send_counter(user_id)
domain_within_cap, _ = storage.try_increment_domain_send_counter(domain, user_id=user_id)
if not (user_within_cap and domain_within_cap):
reasons = []
if not user_within_cap:
reasons.append("user_daily_cap_exceeded")
if not domain_within_cap:
reasons.append("domain_daily_cap_exceeded")
reason_str = f"rate_limit_hit; retry_policy={backlink_outreach_service.SMTP_RETRY_POLICY}"
storage.update_attempt_status(result.attempt_id, "blocked", decision_reason=reason_str, user_id=user_id)
result.status = "blocked"
result.policy_reasons = reasons
else:
send_result = await backlink_outreach_sender.send_email(
to_email=lead_email,
subject=subject,
body=body,
from_email=payload.sender_email,
)
if send_result.success:
storage.update_attempt_status(result.attempt_id, "sent", user_id=user_id)
result.status = "sent"
result.effective_sender_email = send_result.effective_sender_email or result.effective_sender_email
if send_result.message_id:
storage.update_attempt_message_id(result.attempt_id, send_result.message_id, user_id=user_id)
storage.mark_idempotency(payload.idempotency_key, user_id)
else:
reason = f"smtp_send_failed; retry_policy={backlink_outreach_service.SMTP_RETRY_POLICY}"
storage.update_attempt_status(result.attempt_id, "failed", decision_reason=reason, user_id=user_id)
result.status = "failed"
result.policy_reasons = ["smtp_send_failed"]
result.retry_policy = backlink_outreach_service.SMTP_RETRY_POLICY
elif result.status == "approved" and result.policy_allowed and not result.duplicate and not lead_email:
reason = f"lead_has_no_email; retry_policy={backlink_outreach_service.SMTP_RETRY_POLICY}"
storage.update_attempt_status(result.attempt_id, "failed", decision_reason=reason, user_id=user_id)
result.status = "failed"
result.policy_reasons = (result.policy_reasons or []) + ["lead_has_no_email"]
result.retry_policy = backlink_outreach_service.SMTP_RETRY_POLICY
return result
@@ -350,7 +462,18 @@ async def poll_replies(
if storage.reply_exists(from_email, subject, user_id=user_id):
skipped += 1
continue
attempt_id = storage.find_attempt_by_from_email(from_email, user_id=user_id) or ""
attempt_id = ""
in_reply_to = raw.get("in_reply_to", "")
references = raw.get("references", "")
if in_reply_to:
attempt_id = storage.find_attempt_by_message_id(in_reply_to, user_id=user_id) or ""
if not attempt_id and references:
mid = references.split()[-1]
attempt_id = storage.find_attempt_by_message_id(mid, user_id=user_id) or ""
if not attempt_id:
attempt_id = storage.find_attempt_by_from_email(from_email, user_id=user_id) or ""
reply = storage.add_reply(
attempt_id=attempt_id,
from_email=from_email,

View File

@@ -1,7 +1,8 @@
from __future__ import annotations
from pydantic import BaseModel, Field, HttpUrl, EmailStr
from pydantic import BaseModel, Field, HttpUrl
from typing import Dict, List, Optional
from typing_extensions import Literal
class BacklinkKeywordInput(BaseModel):
@@ -10,7 +11,7 @@ class BacklinkKeywordInput(BaseModel):
class OpportunityContactInfo(BaseModel):
email: Optional[EmailStr] = None
email: Optional[str] = None
contact_page: Optional[HttpUrl] = None
@@ -93,8 +94,9 @@ class LeadListResponse(BaseModel):
class LeadStatusUpdateRequest(BaseModel):
status: str = Field(..., min_length=1)
status: Literal["discovered", "contacted", "replied", "placed", "bounced", "unsubscribed"]
notes: Optional[str] = None
campaign_id: Optional[str] = Field(default=None, min_length=1)
class CampaignDetailResponse(BaseModel):
@@ -148,6 +150,21 @@ class OutreachStatusRecord(BaseModel):
notes: Optional[str] = None
class SenderIdentity(BaseModel):
name: str = Field(default="", description="Human sender name displayed to the recipient")
email: str = Field(default="")
organization: str = Field(default="", description="Organization or brand responsible for the outreach")
physical_mailing_address: str = Field(default="", description="Postal address required for commercial outreach compliance")
reply_to_email: Optional[str] = Field(None, description="Optional reply-to mailbox if different from sender email")
class OneClickUnsubscribe(BaseModel):
enabled: bool = Field(default=False)
mailto: Optional[str] = Field(None, description="Mailbox for one-click unsubscribe requests")
header_value: Optional[str] = Field(None, description="List-Unsubscribe / one-click unsubscribe header value")
class SendOutreachRequest(BaseModel):
lead_id: str = Field(..., min_length=1)
campaign_id: str = Field(..., min_length=1)
@@ -157,6 +174,15 @@ class SendOutreachRequest(BaseModel):
subject: str = Field(..., min_length=1)
body: str = Field(..., min_length=1)
idempotency_key: str = Field(..., min_length=8)
sender_identity: Optional[SenderIdentity] = None
legal_basis: str = Field(default="")
contact_discovery_source: str = Field(default="")
recipient_region: str = Field(default="unknown")
recipient_region_source: str = Field(default="user_attested", min_length=2)
consent_status: str = Field(default="unknown", min_length=2)
approved_by_human: bool = False
unsubscribe_url: Optional[HttpUrl] = None
one_click_unsubscribe: Optional[OneClickUnsubscribe] = None
template_id: Optional[str] = Field(None, description="Optional template ID for personalization")
template_variables: Optional[dict] = Field(None, description="Variable values for template personalization")
@@ -166,6 +192,9 @@ class SendOutreachResponse(BaseModel):
status: str
policy_allowed: bool
policy_reasons: List[str] = Field(default_factory=list)
effective_sender_email: Optional[str] = None
duplicate: bool = False
retry_policy: Optional[str] = None
class OutreachAttemptRecord(BaseModel):
@@ -240,10 +269,15 @@ class PolicyValidationRequest(BaseModel):
recipient_email: str = Field(..., min_length=1)
recipient_domain: str
recipient_region: str = Field(default="unknown")
legal_basis: str = Field(..., min_length=2)
recipient_region_source: str = Field(default="user_attested", min_length=2)
legal_basis: str = Field(default="")
contact_discovery_source: str = Field(default="")
consent_status: str = Field(default="unknown", min_length=2)
approved_by_human: bool = False
unsubscribe_url: Optional[HttpUrl] = None
sender_identity: str = Field(..., min_length=3)
one_click_unsubscribe: Optional[OneClickUnsubscribe] = None
sender_identity: Optional[SenderIdentity] = None
sender_email: Optional[str] = Field(None, description="Transport sender email, if separate from identity")
idempotency_key: str = Field(..., min_length=8)
@@ -296,8 +330,9 @@ class ConversionFunnelResponse(BaseModel):
class BulkStatusUpdateRequest(BaseModel):
lead_ids: List[str] = Field(..., min_length=1)
status: str = Field(..., min_length=1)
status: Literal["discovered", "contacted", "replied", "placed", "bounced", "unsubscribed"]
notes: Optional[str] = None
campaign_id: Optional[str] = Field(default=None, min_length=1)
class BulkStatusUpdateResponse(BaseModel):

View File

@@ -104,6 +104,8 @@ class BacklinkOutreachReplyMonitor:
from_email = parsed_msg.get("From", "")
subject = parsed_msg.get("Subject", "")
received_at = parsed_msg.get("Date", "")
in_reply_to = parsed_msg.get("In-Reply-To", "")
references = parsed_msg.get("References", "")
# Extract body
body = ""
@@ -137,6 +139,8 @@ class BacklinkOutreachReplyMonitor:
"body": body[:5000],
"classification": classification,
"received_at": received_at_iso,
"in_reply_to": in_reply_to,
"references": references,
}
except Exception as e:
logger.error(f"Failed to parse reply: {e}")

View File

@@ -8,11 +8,10 @@ from __future__ import annotations
import asyncio
import re
import time
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse
from urllib.parse import quote, urlparse
import requests
import httpx
from bs4 import BeautifulSoup
from loguru import logger
@@ -34,26 +33,47 @@ class BacklinkOutreachScraper:
# -- Public API --
async def deep_discover(
self, keyword: str, max_results: int = 15
self,
keyword: str,
max_results: int = 15,
scrape_timeout_seconds: float = 15.0,
scrape_max_concurrency: int = 5,
) -> Dict[str, Any]:
"""Discover guest-post opportunities using Exa, falling back to DuckDuckGo."""
if self._is_exa_available():
logger.info(f"[BacklinkScraper] Using Exa for keyword: {keyword}")
return await self._discover_with_exa(keyword, max_results)
logger.info(f"[BacklinkScraper] Exa unavailable, falling back to DuckDuckGo for: {keyword}")
return await self._discover_with_duckduckgo(keyword, max_results)
return await self._discover_with_duckduckgo(
keyword,
max_results,
scrape_timeout_seconds=scrape_timeout_seconds,
scrape_max_concurrency=scrape_max_concurrency,
)
def scrape_urls(self, urls: List[str]) -> List[Dict[str, Any]]:
"""Fetch full page content for a list of URLs using Exa get_contents."""
async def scrape_urls(
self,
urls: List[str],
timeout_seconds: float = 15.0,
max_concurrency: int = 5,
) -> List[Dict[str, Any]]:
"""Fetch full page content with non-blocking fallbacks and bounded concurrency."""
exa = self._get_exa_sdk()
if not exa:
return self._scrape_urls_fallback(urls)
return await self._scrape_urls_fallback(
urls, timeout_seconds=timeout_seconds, max_concurrency=max_concurrency
)
loop = asyncio.get_running_loop()
try:
result = exa.get_contents(urls, text={"max_characters": 5000})
result = await loop.run_in_executor(
None, lambda: exa.get_contents(urls, text={"max_characters": 5000})
)
return self._parse_get_contents_result(result)
except Exception as e:
logger.warning(f"[BacklinkScraper] Exa get_contents failed: {e}")
return self._scrape_urls_fallback(urls)
return await self._scrape_urls_fallback(
urls, timeout_seconds=timeout_seconds, max_concurrency=max_concurrency
)
# -- Availability --
@@ -207,24 +227,35 @@ class BacklinkOutreachScraper:
# -- DuckDuckGo Fallback Discovery --
async def _discover_with_duckduckgo(self, keyword: str, max_results: int) -> Dict[str, Any]:
async def _discover_with_duckduckgo(
self,
keyword: str,
max_results: int,
scrape_timeout_seconds: float = 15.0,
scrape_max_concurrency: int = 5,
) -> Dict[str, Any]:
queries = self._generate_search_queries(keyword)
dedup: Dict[str, Dict[str, Any]] = {}
for query in queries[:4]:
rows = self._duckduckgo_search(query)
for row in rows:
norm_url = self._normalize_url(row.get("url", ""))
if not norm_url or norm_url in dedup:
continue
dedup[norm_url] = row
if len(dedup) >= max_results:
break
time.sleep(0.4)
async with httpx.AsyncClient(timeout=httpx.Timeout(12.0), follow_redirects=True) as client:
for query in queries[:4]:
rows = await self._duckduckgo_search(query, client=client)
for row in rows:
norm_url = self._normalize_url(row.get("url", ""))
if not norm_url or norm_url in dedup:
continue
dedup[norm_url] = row
if len(dedup) >= max_results:
break
await asyncio.sleep(0.4)
# Scrape discovered URLs with Exa get_contents (or fallback)
urls_to_scrape = list(dedup.keys())[:max_results]
scraped = self.scrape_urls(urls_to_scrape)
scraped = await self.scrape_urls(
urls_to_scrape,
timeout_seconds=scrape_timeout_seconds,
max_concurrency=scrape_max_concurrency,
)
scraped_map = {self._normalize_url(s.get("url", "")): s for s in scraped}
# Merge DDG results with scraped content
@@ -250,51 +281,76 @@ class BacklinkOutreachScraper:
"opportunities": opportunities,
}
def _duckduckgo_search(self, query: str, retries: int = 2) -> List[Dict[str, Any]]:
encoded = requests.utils.quote(query)
async def _duckduckgo_search(
self,
query: str,
retries: int = 2,
client: Optional[httpx.AsyncClient] = None,
) -> List[Dict[str, Any]]:
encoded = quote(query)
url = f"https://duckduckgo.com/html/?q={encoded}"
headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"}
for attempt in range(retries + 1):
try:
resp = requests.get(url, headers=headers, timeout=12)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
results = []
for result in soup.select("div.result")[:10]:
anchor = result.select_one("a.result__a")
snippet_el = result.select_one("a.result__snippet") or result.select_one("div.result__snippet")
if not anchor or not anchor.get("href"):
continue
results.append({
"url": anchor.get("href"),
"title": anchor.get_text(strip=True),
"snippet": snippet_el.get_text(" ", strip=True) if snippet_el else "",
"highlights": [],
})
return results
except Exception:
if attempt == retries:
return []
time.sleep(0.6 * (attempt + 1))
return []
def _scrape_urls_fallback(self, urls: List[str]) -> List[Dict[str, Any]]:
"""Basic HTTP scrape when Exa is unavailable."""
results = []
async def _request(active_client: httpx.AsyncClient) -> List[Dict[str, Any]]:
for attempt in range(retries + 1):
try:
resp = await active_client.get(url, headers=headers)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
results = []
for result in soup.select("div.result")[:10]:
anchor = result.select_one("a.result__a")
snippet_el = result.select_one("a.result__snippet") or result.select_one("div.result__snippet")
if not anchor or not anchor.get("href"):
continue
results.append({
"url": anchor.get("href"),
"title": anchor.get_text(strip=True),
"snippet": snippet_el.get_text(" ", strip=True) if snippet_el else "",
"highlights": [],
})
return results
except (httpx.HTTPError, httpx.TimeoutException):
if attempt == retries:
return []
await asyncio.sleep(0.6 * (attempt + 1))
return []
if client is not None:
return await _request(client)
async with httpx.AsyncClient(timeout=httpx.Timeout(12.0), follow_redirects=True) as owned_client:
return await _request(owned_client)
async def _scrape_urls_fallback(
self,
urls: List[str],
timeout_seconds: float = 15.0,
max_concurrency: int = 5,
) -> List[Dict[str, Any]]:
"""Basic async HTTP scrape when Exa is unavailable."""
headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"}
for url in urls[:5]:
try:
resp = requests.get(url, headers=headers, timeout=15)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
text = soup.get_text(separator=" ", strip=True)
title = soup.title.get_text(strip=True) if soup.title else ""
results.append({"url": url, "title": title, "text": text[:5000], "highlights": [], "summary": ""})
except Exception:
continue
return results
semaphore = asyncio.Semaphore(max(1, max_concurrency))
timeout = httpx.Timeout(timeout_seconds)
async def scrape_one(client: httpx.AsyncClient, url: str) -> Optional[Dict[str, Any]]:
async with semaphore:
try:
resp = await client.get(url, headers=headers)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
text = soup.get_text(separator=" ", strip=True)
title = soup.title.get_text(strip=True) if soup.title else ""
return {"url": url, "title": title, "text": text[:5000], "highlights": [], "summary": ""}
except (httpx.HTTPError, httpx.TimeoutException):
return None
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
tasks = [scrape_one(client, url) for url in urls]
scraped = await asyncio.gather(*tasks)
return [row for row in scraped if row]
# -- Enrichment Pipeline --

View File

@@ -6,9 +6,11 @@ import os
import ssl
import smtplib
import asyncio
from dataclasses import dataclass, field
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Optional
from typing import List, Optional, Set
from uuid import uuid4
from loguru import logger
@@ -17,11 +19,27 @@ SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
SMTP_USERNAME = os.getenv("SMTP_USERNAME", "")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "")
SMTP_FROM_EMAIL = os.getenv("SMTP_FROM_EMAIL", SMTP_USERNAME)
SMTP_ALLOWED_FROM_EMAILS = os.getenv("SMTP_ALLOWED_FROM_EMAILS", "")
SMTP_USE_TLS = os.getenv("SMTP_USE_TLS", "true").lower() in ("true", "1", "yes")
SMTP_VERIFY_TLS = os.getenv("SMTP_VERIFY_TLS", "true").lower() in ("true", "1", "yes")
SMTP_SEND_TIMEOUT = int(os.getenv("SMTP_SEND_TIMEOUT", "30"))
@dataclass
class SenderAuthorizationResult:
authorized: bool
effective_sender_email: str = ""
failure_reasons: List[str] = field(default_factory=list)
@dataclass
class SendEmailResult:
success: bool
effective_sender_email: str = ""
message_id: str = ""
failure_reasons: List[str] = field(default_factory=list)
class BacklinkOutreachSender:
def __init__(self):
self._host = SMTP_HOST
@@ -29,6 +47,7 @@ class BacklinkOutreachSender:
self._username = SMTP_USERNAME
self._password = SMTP_PASSWORD
self._from_email = SMTP_FROM_EMAIL or SMTP_USERNAME
self._allowed_from_emails = SMTP_ALLOWED_FROM_EMAILS
self._use_tls = SMTP_USE_TLS
self._verify_tls = SMTP_VERIFY_TLS
self._timeout = SMTP_SEND_TIMEOUT
@@ -36,23 +55,75 @@ class BacklinkOutreachSender:
def is_configured(self) -> bool:
return bool(self._username and self._password)
@staticmethod
def _normalize_email(email: Optional[str]) -> str:
return (email or "").strip().lower()
def _allowed_sender_aliases(self) -> Set[str]:
aliases = {
self._normalize_email(alias)
for alias in self._allowed_from_emails.split(",")
if self._normalize_email(alias)
}
for configured_sender in (self._from_email, self._username):
normalized = self._normalize_email(configured_sender)
if normalized:
aliases.add(normalized)
return aliases
def validate_sender_alias(self, from_email: Optional[str] = None) -> SenderAuthorizationResult:
default_sender = self._normalize_email(self._from_email or self._username)
requested_sender = self._normalize_email(from_email) or default_sender
if not self.is_configured():
return SenderAuthorizationResult(
authorized=False,
effective_sender_email=requested_sender,
failure_reasons=["smtp_not_configured"],
)
if not requested_sender:
return SenderAuthorizationResult(
authorized=False,
failure_reasons=["smtp_sender_missing"],
)
allowed_aliases = self._allowed_sender_aliases()
if requested_sender not in allowed_aliases:
return SenderAuthorizationResult(
authorized=False,
effective_sender_email=requested_sender,
failure_reasons=["sender_alias_not_authorized"],
)
return SenderAuthorizationResult(
authorized=True,
effective_sender_email=requested_sender,
)
async def send_email(
self,
to_email: str,
subject: str,
body: str,
from_email: Optional[str] = None,
) -> bool:
if not self.is_configured():
logger.error("SMTP not configured: set SMTP_USERNAME and SMTP_PASSWORD")
return False
) -> SendEmailResult:
sender_validation = self.validate_sender_alias(from_email)
if not sender_validation.authorized:
logger.error(f"SMTP sender validation failed: {sender_validation.failure_reasons}")
return SendEmailResult(
success=False,
effective_sender_email=sender_validation.effective_sender_email,
failure_reasons=sender_validation.failure_reasons,
)
sender = from_email or self._from_email
sender = sender_validation.effective_sender_email
msg_id = f"<{uuid4().hex}@{sender.split('@')[-1] if '@' in sender else 'outreach.local'}>"
msg = MIMEMultipart("alternative")
msg["From"] = sender
msg["To"] = to_email
msg["Subject"] = subject
msg["Message-ID"] = msg_id
msg.attach(MIMEText(body, "plain"))
loop = asyncio.get_running_loop()
@@ -78,7 +149,13 @@ class BacklinkOutreachSender:
logger.error(f"Unexpected error sending to {to_email}: {e}")
return False
return await loop.run_in_executor(None, _send)
success = await loop.run_in_executor(None, _send)
return SendEmailResult(
success=success,
effective_sender_email=sender,
message_id=msg_id if success else "",
failure_reasons=[] if success else ["smtp_send_failed"],
)
def personalize(self, template: str, variables: dict) -> str:
"""Replace {placeholder} variables in a template string."""

View File

@@ -4,10 +4,11 @@ from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from urllib.parse import quote
import asyncio
import re
import time
import requests
import httpx
from bs4 import BeautifulSoup
import csv
@@ -22,9 +23,6 @@ from services.backlink_outreach_models import (
)
from services.backlink_outreach_storage import BacklinkOutreachStorageService
DEFAULT_USER_DAILY_CAP = 100
DEFAULT_DOMAIN_DAILY_CAP = 20
@dataclass
class SearchResult:
url: str
@@ -55,51 +53,67 @@ class BacklinkOutreachService:
f"{normalized} + 'Submit article'",
]
def search_for_urls(self, query: str, timeout_seconds: int = 12, retries: int = 2) -> List[SearchResult]:
encoded_query = requests.utils.quote(query)
async def search_for_urls(
self,
query: str,
timeout_seconds: int = 12,
retries: int = 2,
client: Optional[httpx.AsyncClient] = None,
) -> List[SearchResult]:
"""Search DuckDuckGo HTML using a non-blocking HTTP client."""
encoded_query = quote(query)
url = f"https://duckduckgo.com/html/?q={encoded_query}"
headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"}
for attempt in range(retries + 1):
try:
response = requests.get(url, headers=headers, timeout=timeout_seconds)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
rows: List[SearchResult] = []
for result in soup.select("div.result")[:10]:
anchor = result.select_one("a.result__a")
snippet = result.select_one("a.result__snippet") or result.select_one("div.result__snippet")
if not anchor or not anchor.get("href"):
continue
rows.append(
SearchResult(
url=anchor.get("href"),
title=anchor.get_text(strip=True),
snippet=snippet.get_text(" ", strip=True) if snippet else "",
async def _request(active_client: httpx.AsyncClient) -> List[SearchResult]:
for attempt in range(retries + 1):
try:
response = await active_client.get(url, headers=headers)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
rows: List[SearchResult] = []
for result in soup.select("div.result")[:10]:
anchor = result.select_one("a.result__a")
snippet = result.select_one("a.result__snippet") or result.select_one("div.result__snippet")
if not anchor or not anchor.get("href"):
continue
rows.append(
SearchResult(
url=anchor.get("href"),
title=anchor.get_text(strip=True),
snippet=snippet.get_text(" ", strip=True) if snippet else "",
)
)
)
return rows
except Exception:
if attempt == retries:
return []
time.sleep(0.6 * (attempt + 1))
return []
return rows
except (httpx.HTTPError, httpx.TimeoutException):
if attempt == retries:
return []
await asyncio.sleep(0.6 * (attempt + 1))
return []
def discover_opportunities(self, keyword: str, max_results: int = 10) -> Dict[str, Any]:
if client is not None:
return await _request(client)
timeout = httpx.Timeout(timeout_seconds)
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as owned_client:
return await _request(owned_client)
async def discover_opportunities_async(self, keyword: str, max_results: int = 10) -> Dict[str, Any]:
queries = self.generate_guest_post_queries(keyword)[:4]
dedup: Dict[str, SearchResult] = {}
for query in queries:
for result in self.search_for_urls(query):
normalized_url = self._normalize_url(result.url)
if not normalized_url or normalized_url in dedup:
continue
dedup[normalized_url] = result
async with httpx.AsyncClient(timeout=httpx.Timeout(12.0), follow_redirects=True) as client:
for query in queries:
for result in await self.search_for_urls(query, client=client):
normalized_url = self._normalize_url(result.url)
if not normalized_url or normalized_url in dedup:
continue
dedup[normalized_url] = result
if len(dedup) >= max_results:
break
if len(dedup) >= max_results:
break
if len(dedup) >= max_results:
break
time.sleep(0.4)
await asyncio.sleep(0.4)
opportunities: List[OpportunityRecord] = []
for normalized_url, row in dedup.items():
@@ -118,6 +132,10 @@ class BacklinkOutreachService:
return {"keyword": keyword, "queries": queries, "opportunities": opportunities}
def discover_opportunities(self, keyword: str, max_results: int = 10) -> Dict[str, Any]:
"""Synchronous compatibility wrapper for non-async callers."""
return asyncio.run(self.discover_opportunities_async(keyword, max_results))
def _normalize_url(self, url: str) -> str:
u = (url or "").strip()
if not u:
@@ -144,32 +162,76 @@ class BacklinkOutreachService:
def _get_storage(self) -> BacklinkOutreachStorageService:
return BacklinkOutreachStorageService()
CONSENT_REQUIRED_REGIONS = {"eu", "eea", "uk", "ca"}
MANUAL_REVIEW_REGIONS = {"unknown", "br", "cn", "jp", "kr"}
LOW_CONFIDENCE_REGION_SOURCES = {"tld_inference", "domain_tld", "inferred", "unknown"}
VALID_LEGAL_BASES = {"legitimate_interest", "consent", "contract"}
VALID_CONSENT_STATUSES = {"explicit", "implied", "not_required", "unknown"}
@staticmethod
def _has_one_click_unsubscribe(payload: PolicyValidationRequest) -> bool:
one_click = payload.one_click_unsubscribe
if not one_click or not one_click.enabled:
return False
return bool(one_click.mailto or (one_click.header_value or "").strip())
def validate_send_policy(self, payload: PolicyValidationRequest) -> PolicyValidationResponse:
reasons: List[str] = []
storage = self._get_storage()
legal_basis = payload.legal_basis.strip().lower()
recipient_region = payload.recipient_region.strip().lower()
region_source = payload.recipient_region_source.strip().lower()
consent_status = payload.consent_status.strip().lower()
discovery_source = payload.contact_discovery_source.strip()
sender = payload.sender_identity
if payload.workspace_id.startswith("new-") and not payload.approved_by_human:
reasons.append("human_review_required_for_new_workspace")
if payload.legal_basis.lower() not in {"legitimate_interest", "consent", "contract"}:
reasons.append("invalid_legal_basis")
if payload.recipient_region.lower() in {"eu", "eea"} and payload.legal_basis.lower() != "consent":
reasons.append("region_requires_explicit_consent")
if not legal_basis:
reasons.append("legal_basis_required")
elif legal_basis not in self.VALID_LEGAL_BASES:
reasons.append("invalid_legal_basis_recorded")
if not discovery_source:
reasons.append("contact_discovery_source_required")
if consent_status not in self.VALID_CONSENT_STATUSES:
reasons.append("invalid_consent_status")
if len(payload.sender_identity.strip()) < 3:
reasons.append("sender_identity_required")
has_unsubscribe = bool(payload.unsubscribe_url) or self._has_one_click_unsubscribe(payload)
if not has_unsubscribe:
reasons.append("unsubscribe_url_or_one_click_unsubscribe_required")
if not sender:
reasons.append("complete_sender_identity_required")
else:
sender_email = str(sender.email).strip()
if not sender.name.strip():
reasons.append("sender_name_required")
if not sender_email:
reasons.append("sender_email_required")
elif not re.match(r"^[^@\s]+@[^@\s]+\.[^@\s]+$", sender_email):
reasons.append("sender_email_invalid")
if not sender.organization.strip():
reasons.append("sender_organization_required")
if not sender.physical_mailing_address.strip():
reasons.append("sender_physical_mailing_address_required")
if payload.sender_email and sender_email.lower() != str(payload.sender_email).lower():
reasons.append("sender_identity_email_mismatch")
if recipient_region in self.CONSENT_REQUIRED_REGIONS:
if legal_basis != "consent" or consent_status != "explicit":
reasons.append("region_requires_recorded_explicit_consent")
elif recipient_region in self.MANUAL_REVIEW_REGIONS and not payload.approved_by_human:
reasons.append("manual_review_required_for_recipient_region")
if region_source in self.LOW_CONFIDENCE_REGION_SOURCES and not payload.approved_by_human:
reasons.append("manual_review_required_for_tld_or_unknown_region_source")
if storage.is_suppressed(str(payload.recipient_email), payload.recipient_domain, user_id=payload.user_id):
reasons.append("recipient_suppressed")
if storage.check_idempotency(payload.idempotency_key, user_id=payload.user_id):
reasons.append("duplicate_idempotency_key")
user_count = storage.get_user_send_count(payload.user_id)
domain_count = storage.get_domain_send_count(payload.recipient_domain, user_id=payload.user_id)
if user_count >= DEFAULT_USER_DAILY_CAP:
reasons.append("user_daily_cap_exceeded")
if domain_count >= DEFAULT_DOMAIN_DAILY_CAP:
reasons.append("domain_daily_cap_exceeded")
allowed = len(reasons) == 0
final_status = "approved" if allowed else "blocked"
@@ -199,15 +261,82 @@ class BacklinkOutreachService:
return "au"
return "unknown"
SMTP_RETRY_POLICY = "manual_retry_with_new_idempotency_key"
@staticmethod
def _decision_parts(attempt: Optional[dict]) -> List[str]:
if not attempt:
return []
reason = attempt.get("decision_reason") or ""
return [part.strip() for part in reason.split(";") if part.strip()]
def response_from_attempt(self, attempt: Optional[dict], duplicate: bool = False) -> SendOutreachResponse:
if not attempt:
return SendOutreachResponse(
attempt_id="",
status="duplicate",
policy_allowed=False,
policy_reasons=["duplicate_idempotency_key"],
duplicate=True,
)
status = attempt.get("status", "failed")
parts = self._decision_parts(attempt)
retry_policy = next((part.split("=", 1)[1] for part in parts if part.startswith("retry_policy=")), None)
reasons = [part for part in parts if not part.startswith("retry_policy=")]
if not retry_policy and ("smtp_send_failed" in reasons or "lead_has_no_email" in reasons):
retry_policy = self.SMTP_RETRY_POLICY
policy_allowed = status in {"queued", "approved", "sent", "failed"} and not any(
reason.startswith("human_review_required")
or reason in {
"invalid_legal_basis",
"region_requires_explicit_consent",
"sender_identity_required",
"recipient_suppressed",
"user_daily_cap_exceeded",
"domain_daily_cap_exceeded",
}
for reason in reasons
)
if status == "blocked":
policy_allowed = False
return SendOutreachResponse(
attempt_id=attempt.get("attempt_id", ""),
status=status,
policy_allowed=policy_allowed,
policy_reasons=reasons,
duplicate=duplicate,
retry_policy=retry_policy,
)
def send_outreach(self, request: SendOutreachRequest) -> SendOutreachResponse:
storage = self._get_storage()
lead = storage.get_lead(request.lead_id, user_id=request.user_id)
if not lead:
return SendOutreachResponse(attempt_id="", status="failed", policy_allowed=False, policy_reasons=["lead_not_found"])
reservation = storage.reserve_attempt_idempotency(
lead_id=request.lead_id,
campaign_id=request.campaign_id,
idempotency_key=request.idempotency_key,
sender_email=request.sender_email,
subject=request.subject,
body=request.body,
user_id=request.user_id,
)
if not reservation.get("reserved"):
return self.response_from_attempt(reservation.get("attempt"), duplicate=True)
attempt = reservation.get("attempt") or {}
attempt_id = attempt.get("attempt_id", "")
domain = lead.get("domain", request.sender_email.split("@")[-1] if "@" in request.sender_email else "unknown")
recipient_region = self._infer_region(domain)
legal_basis = "consent" if recipient_region == "eu" else "legitimate_interest"
recipient_region = (request.recipient_region or "unknown").strip().lower()
if recipient_region == "unknown":
recipient_region = self._infer_region(domain)
region_source = "tld_inference" if recipient_region != "unknown" else request.recipient_region_source
else:
region_source = request.recipient_region_source
policy_req = PolicyValidationRequest(
user_id=request.user_id,
@@ -216,31 +345,32 @@ class BacklinkOutreachService:
recipient_email=lead.get("email", ""),
recipient_domain=domain,
recipient_region=recipient_region,
legal_basis=legal_basis,
approved_by_human=False,
unsubscribe_url=None,
sender_identity=request.sender_email,
recipient_region_source=region_source,
legal_basis=request.legal_basis,
contact_discovery_source=request.contact_discovery_source,
consent_status=request.consent_status,
approved_by_human=request.approved_by_human,
unsubscribe_url=request.unsubscribe_url,
one_click_unsubscribe=request.one_click_unsubscribe,
sender_identity=request.sender_identity,
sender_email=request.sender_email,
idempotency_key=request.idempotency_key,
)
policy = self.validate_send_policy(policy_req)
attempt = storage.add_attempt(
lead_id=request.lead_id,
campaign_id=request.campaign_id,
idempotency_key=request.idempotency_key,
sender_email=request.sender_email,
subject=request.subject,
body=request.body,
status="approved" if policy.allowed else "blocked",
updated_attempt = storage.update_attempt_status(
attempt_id,
"approved" if policy.allowed else "blocked",
decision_reason="; ".join(policy.reasons) if policy.reasons else None,
user_id=request.user_id,
)
) or attempt
return SendOutreachResponse(
attempt_id=attempt.get("attempt_id", ""),
status=attempt.get("status", "failed"),
attempt_id=updated_attempt.get("attempt_id", attempt_id),
status=updated_attempt.get("status", "failed"),
policy_allowed=policy.allowed,
policy_reasons=policy.reasons,
effective_sender_email=request.sender_email,
)
def get_reporting_snapshot(self, user_id: str = "default") -> Dict[str, Any]:
@@ -323,11 +453,23 @@ class BacklinkOutreachService:
writer.writerows([{k: self._sanitize_csv_value(v) for k, v in row.items()}])
return output.getvalue()
async def deep_discover(self, keyword: str, max_results: int = 15) -> Dict[str, Any]:
async def deep_discover(
self,
keyword: str,
max_results: int = 15,
user_id: Optional[str] = None,
scrape_timeout_seconds: float = 15.0,
scrape_max_concurrency: int = 5,
) -> Dict[str, Any]:
"""Enhanced discovery using Exa neural search + DuckDuckGo with full-page scraping."""
from services.backlink_outreach_scraper import BacklinkOutreachScraper
scraper = BacklinkOutreachScraper(user_id=self._user_id if hasattr(self, '_user_id') else None)
return await scraper.deep_discover(keyword, max_results)
scraper = BacklinkOutreachScraper(user_id=user_id)
return await scraper.deep_discover(
keyword,
max_results,
scrape_timeout_seconds=scrape_timeout_seconds,
scrape_max_concurrency=scrape_max_concurrency,
)
def get_migration_coverage(self) -> Dict[str, Any]:
implemented = [

View File

@@ -6,6 +6,9 @@ from datetime import datetime, date
from uuid import uuid4
from typing import List, Optional
from sqlalchemy import text as sql_text, func as sa_func
from sqlalchemy.exc import IntegrityError
LEAD_VALID_STATUSES = frozenset({"discovered", "contacted", "replied", "placed", "bounced", "unsubscribed"})
from services.database import get_session_for_user
from models.backlink_outreach_models import (
@@ -16,6 +19,14 @@ from models.backlink_outreach_models import (
)
class BacklinkCampaignNotFoundError(RuntimeError):
"""Raised when a backlink campaign is missing or not owned by the user."""
DEFAULT_USER_DAILY_CAP = 100
DEFAULT_DOMAIN_DAILY_CAP = 20
class BacklinkOutreachStorageService:
_NEW_LEAD_COLUMNS = [
"url", "page_title", "snippet", "confidence_score", "discovery_source", "notes"
@@ -120,6 +131,14 @@ class BacklinkOutreachStorageService:
# -- Lead CRUD --
def _campaign_belongs_to_user(self, db, campaign_id: str, user_id: str) -> bool:
return (
db.query(BacklinkCampaign)
.filter(BacklinkCampaign.id == campaign_id, BacklinkCampaign.user_id == user_id)
.first()
is not None
)
def add_lead(
self,
campaign_id: str,
@@ -138,6 +157,17 @@ class BacklinkOutreachStorageService:
if not db:
raise RuntimeError("Database session unavailable")
try:
if not self._campaign_belongs_to_user(db, campaign_id, user_id):
raise BacklinkCampaignNotFoundError("Campaign not found")
existing = (
db.query(BacklinkLead)
.filter(BacklinkLead.campaign_id == campaign_id, BacklinkLead.url == url)
.first()
)
if existing:
return self._lead_to_dict(existing)
lead = BacklinkLead(
id=f"bl_{uuid4().hex[:16]}",
campaign_id=campaign_id,
@@ -164,12 +194,25 @@ class BacklinkOutreachStorageService:
if not db:
raise RuntimeError("Database session unavailable")
try:
if not self._campaign_belongs_to_user(db, campaign_id, user_id):
raise BacklinkCampaignNotFoundError("Campaign not found")
existing_urls = {
row[0]
for row in db.query(BacklinkLead.url)
.filter(BacklinkLead.campaign_id == campaign_id)
.all()
}
added = []
for data in leads_data:
url = data.get("url", "")
if url in existing_urls:
continue
lead = BacklinkLead(
id=f"bl_{uuid4().hex[:16]}",
campaign_id=campaign_id,
url=data.get("url", ""),
url=url,
domain=data.get("domain", ""),
page_title=data.get("page_title", ""),
snippet=data.get("snippet", ""),
@@ -182,6 +225,7 @@ class BacklinkOutreachStorageService:
)
db.add(lead)
added.append(lead)
existing_urls.add(url)
db.commit()
return [self._lead_to_dict(l) for l in added]
finally:
@@ -204,8 +248,16 @@ class BacklinkOutreachStorageService:
db.close()
def update_lead_status(
self, lead_id: str, user_id: str, status: str, notes: Optional[str] = None
self,
lead_id: str,
user_id: str,
status: str,
notes: Optional[str] = None,
campaign_id: Optional[str] = None,
) -> Optional[dict]:
if status not in LEAD_VALID_STATUSES:
raise ValueError(f"Invalid status '{status}'. Valid values: {sorted(LEAD_VALID_STATUSES)}")
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
@@ -214,6 +266,18 @@ class BacklinkOutreachStorageService:
lead = db.query(BacklinkLead).filter(BacklinkLead.id == lead_id).first()
if not lead:
return None
campaign = (
db.query(BacklinkCampaign)
.filter(BacklinkCampaign.id == lead.campaign_id, BacklinkCampaign.user_id == user_id)
.first()
)
if not campaign:
raise PermissionError("Lead does not belong to the current user")
if campaign_id and lead.campaign_id != campaign_id:
return None
lead.status = status
if notes is not None:
lead.notes = notes
@@ -222,6 +286,44 @@ class BacklinkOutreachStorageService:
finally:
db.close()
def get_lead_access_issues(
self, lead_ids: List[str], user_id: str, campaign_id: Optional[str] = None
) -> dict:
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return {"missing": list(dict.fromkeys(lead_ids)), "unauthorized": []}
try:
unique_lead_ids = list(dict.fromkeys(lead_ids))
access_rows = self._get_lead_access_rows(db, unique_lead_ids)
missing: List[str] = []
unauthorized: List[str] = []
for lid in unique_lead_ids:
access = access_rows.get(lid)
if not access:
missing.append(lid)
elif access["user_id"] != user_id:
unauthorized.append(lid)
elif campaign_id and access["campaign_id"] != campaign_id:
missing.append(lid)
return {"missing": missing, "unauthorized": unauthorized}
finally:
db.close()
def _get_lead_access_rows(self, db, lead_ids: List[str]) -> dict:
if not lead_ids:
return {}
rows = (
db.query(BacklinkLead.id, BacklinkLead.campaign_id, BacklinkCampaign.user_id)
.outerjoin(BacklinkCampaign, BacklinkLead.campaign_id == BacklinkCampaign.id)
.filter(BacklinkLead.id.in_(lead_ids))
.all()
)
return {
row.id: {"campaign_id": row.campaign_id, "user_id": row.user_id}
for row in rows
}
@staticmethod
def _lead_to_dict(lead) -> dict:
return {
@@ -241,6 +343,79 @@ class BacklinkOutreachStorageService:
# -- Outreach Attempt CRUD --
def get_attempt_by_idempotency_key(self, idempotency_key: str, user_id: str = "default") -> Optional[dict]:
"""Return the existing attempt for an idempotency key visible to the user."""
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return None
try:
attempt = (
db.query(OutreachAttempt)
.join(BacklinkCampaign, OutreachAttempt.campaign_id == BacklinkCampaign.id)
.filter(
OutreachAttempt.idempotency_key == idempotency_key,
BacklinkCampaign.user_id == user_id,
)
.first()
)
return self._attempt_to_dict(attempt) if attempt else None
finally:
db.close()
def reserve_attempt_idempotency(
self,
lead_id: str,
campaign_id: str,
idempotency_key: str,
sender_email: str = "",
subject: str = "",
body: str = "",
user_id: str = "default",
) -> dict:
"""Atomically reserve an outreach idempotency key by creating the attempt row.
Returns {"reserved": True, "attempt": attempt_dict} for the caller that won
the reservation, or {"reserved": False, "attempt": existing_attempt_or_none}
when the unique key already exists. Duplicate rows are detected by the
database unique constraint so concurrent requests do not both proceed to
policy approval or SMTP delivery.
"""
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
raise RuntimeError("Database session unavailable")
try:
attempt = OutreachAttempt(
id=f"att_{uuid4().hex[:16]}",
lead_id=lead_id,
campaign_id=campaign_id,
idempotency_key=idempotency_key,
sender_email=sender_email,
subject=subject,
body=body,
status="queued",
created_at=datetime.utcnow(),
)
db.add(attempt)
db.commit()
return {"reserved": True, "attempt": self._attempt_to_dict(attempt)}
except IntegrityError:
db.rollback()
existing = (
db.query(OutreachAttempt)
.join(BacklinkCampaign, OutreachAttempt.campaign_id == BacklinkCampaign.id)
.filter(
OutreachAttempt.idempotency_key == idempotency_key,
BacklinkCampaign.user_id == user_id,
)
.first()
)
return {"reserved": False, "attempt": self._attempt_to_dict(existing) if existing else None}
finally:
db.close()
def add_attempt(
self,
lead_id: str,
@@ -273,6 +448,20 @@ class BacklinkOutreachStorageService:
db.add(attempt)
db.commit()
return self._attempt_to_dict(attempt)
except IntegrityError:
db.rollback()
existing = (
db.query(OutreachAttempt)
.join(BacklinkCampaign, OutreachAttempt.campaign_id == BacklinkCampaign.id)
.filter(
OutreachAttempt.idempotency_key == idempotency_key,
BacklinkCampaign.user_id == user_id,
)
.first()
)
if existing:
return self._attempt_to_dict(existing)
raise
finally:
db.close()
@@ -325,6 +514,7 @@ class BacklinkOutreachStorageService:
"decision_reason": attempt.decision_reason,
"sent_at": attempt.sent_at.isoformat() if attempt.sent_at else None,
"created_at": attempt.created_at.isoformat() if attempt.created_at else None,
"message_id": attempt.message_id or "",
}
def find_attempt_by_from_email(self, from_email: str, user_id: str = "default") -> Optional[str]:
@@ -346,6 +536,37 @@ class BacklinkOutreachStorageService:
finally:
db.close()
def update_attempt_message_id(self, attempt_id: str, message_id: str, user_id: str = "default") -> Optional[dict]:
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return None
try:
attempt = db.query(OutreachAttempt).filter(OutreachAttempt.id == attempt_id).first()
if not attempt:
return None
attempt.message_id = message_id
db.commit()
return self._attempt_to_dict(attempt)
finally:
db.close()
def find_attempt_by_message_id(self, message_id: str, user_id: str = "default") -> Optional[str]:
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return None
try:
clean = message_id.strip()
attempt = (
db.query(OutreachAttempt)
.filter(OutreachAttempt.message_id == clean)
.first()
)
return attempt.id if attempt else None
finally:
db.close()
# -- Outreach Reply CRUD --
def reply_exists(self, from_email: str, subject: str, user_id: str = "default") -> bool:
@@ -678,6 +899,9 @@ class BacklinkOutreachStorageService:
db.add(entry)
db.commit()
return {"idempotency_key": idempotency_key}
except IntegrityError:
db.rollback()
return {"idempotency_key": idempotency_key}
finally:
db.close()
@@ -686,27 +910,6 @@ class BacklinkOutreachStorageService:
def _today(self) -> date:
return date.today()
def increment_user_send_counter(self, user_id: str) -> int:
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return 0
try:
today = self._today()
row_id = f"scu_{uuid4().hex[:16]}"
db.execute(sql_text(
"INSERT INTO backlink_send_counters_user (id, user_id, date, count) "
"VALUES (:id, :uid, :dt, 1) "
"ON CONFLICT (user_id, date) DO UPDATE SET count = count + 1"
), {"id": row_id, "uid": user_id, "dt": today})
db.commit()
result = db.query(SendCounterUser.count).filter(
SendCounterUser.user_id == user_id, SendCounterUser.date == today
).first()
return result[0] if result else 0
finally:
db.close()
def get_user_send_count(self, user_id: str) -> int:
db = get_session_for_user(user_id)
if not db:
@@ -722,28 +925,6 @@ class BacklinkOutreachStorageService:
finally:
db.close()
def increment_domain_send_counter(self, domain: str, user_id: str = "default") -> int:
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return 0
try:
today = self._today()
domain_lower = domain.lower()
row_id = f"scd_{uuid4().hex[:16]}"
db.execute(sql_text(
"INSERT INTO backlink_send_counters_domain (id, domain, date, count) "
"VALUES (:id, :dom, :dt, 1) "
"ON CONFLICT (domain, date) DO UPDATE SET count = count + 1"
), {"id": row_id, "dom": domain_lower, "dt": today})
db.commit()
result = db.query(SendCounterDomain.count).filter(
SendCounterDomain.domain == domain_lower, SendCounterDomain.date == today
).first()
return result[0] if result else 0
finally:
db.close()
def get_domain_send_count(self, domain: str, user_id: str = "default") -> int:
db = get_session_for_user(user_id)
if not db:
@@ -759,6 +940,73 @@ class BacklinkOutreachStorageService:
finally:
db.close()
def try_increment_user_send_counter(self, user_id: str) -> tuple:
"""Atomically check cap and increment. Returns (within_cap, new_count)."""
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return True, 0
try:
today = self._today()
current = (
db.query(SendCounterUser.count)
.filter(SendCounterUser.user_id == user_id, SendCounterUser.date == today)
.scalar()
) or 0
if current >= DEFAULT_USER_DAILY_CAP:
db.close()
return False, current
row_id = f"scu_{uuid4().hex[:16]}"
db.execute(sql_text(
"INSERT INTO backlink_send_counters_user (id, user_id, date, count) "
"VALUES (:id, :uid, :dt, 1) "
"ON CONFLICT (user_id, date) DO UPDATE SET count = count + 1"
), {"id": row_id, "uid": user_id, "dt": today})
db.commit()
result = db.query(SendCounterUser.count).filter(
SendCounterUser.user_id == user_id, SendCounterUser.date == today
).first()
return True, result[0] if result else 0
except Exception:
db.rollback()
return True, 0
finally:
db.close()
def try_increment_domain_send_counter(self, domain: str, user_id: str = "default") -> tuple:
"""Atomically check cap and increment. Returns (within_cap, new_count)."""
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return True, 0
try:
today = self._today()
domain_lower = domain.lower()
current = (
db.query(SendCounterDomain.count)
.filter(SendCounterDomain.domain == domain_lower, SendCounterDomain.date == today)
.scalar()
) or 0
if current >= DEFAULT_DOMAIN_DAILY_CAP:
db.close()
return False, current
row_id = f"scd_{uuid4().hex[:16]}"
db.execute(sql_text(
"INSERT INTO backlink_send_counters_domain (id, domain, date, count) "
"VALUES (:id, :dom, :dt, 1) "
"ON CONFLICT (domain, date) DO UPDATE SET count = count + 1"
), {"id": row_id, "dom": domain_lower, "dt": today})
db.commit()
result = db.query(SendCounterDomain.count).filter(
SendCounterDomain.domain == domain_lower, SendCounterDomain.date == today
).first()
return True, result[0] if result else 0
except Exception:
db.rollback()
return True, 0
finally:
db.close()
# -- Audit Log --
def add_audit_log(

View File

@@ -193,10 +193,10 @@ const App: React.FC = () => {
<Route path="/dashboard" element={<ProtectedRoute><MainDashboard /></ProtectedRoute>} />
<Route path="/seo" element={<ProtectedRoute><FeatureRoute feature="seo"><SEODashboard /></FeatureRoute></ProtectedRoute>} />
<Route path="/seo-dashboard" element={<ProtectedRoute><FeatureRoute feature="seo"><SEODashboard /></FeatureRoute></ProtectedRoute>} />
<Route path="/backlink-outreach" element={<ProtectedRoute><FeatureRoute feature="seo"><BacklinkOutreachDashboard /></FeatureRoute></ProtectedRoute>} />
<Route path="/backlink-outreach" element={<ProtectedRoute><FeatureRoute feature="backlinking"><BacklinkOutreachDashboard /></FeatureRoute></ProtectedRoute>} />
<Route path="/content-planning" element={<ProtectedRoute><FeatureRoute feature="content-planning"><ContentPlanningDashboard /></FeatureRoute></ProtectedRoute>} />
<Route path="/facebook-writer" element={<ProtectedRoute><FeatureRoute feature="social"><FacebookWriter /></FeatureRoute></ProtectedRoute>} />
<Route path="/linkedin-writer" element={<ProtectedRoute><FeatureRoute feature="social"><LinkedInWriter /></FeatureRoute></ProtectedRoute>} />
<Route path="/facebook-writer" element={<ProtectedRoute><FeatureRoute feature="facebook"><FacebookWriter /></FeatureRoute></ProtectedRoute>} />
<Route path="/linkedin-writer" element={<ProtectedRoute><FeatureRoute feature="linkedin"><LinkedInWriter /></FeatureRoute></ProtectedRoute>} />
<Route path="/blog-writer" element={<ProtectedRoute><FeatureRoute feature="blog_writer"><BlogWriter /></FeatureRoute></ProtectedRoute>} />
<Route path="/story-writer" element={<ProtectedRoute><FeatureRoute feature="story"><StoryWriter /></FeatureRoute></ProtectedRoute>} />
<Route path="/story-projects" element={<ProtectedRoute><FeatureRoute feature="story"><StoryProjectList /></FeatureRoute></ProtectedRoute>} />

View File

@@ -76,6 +76,20 @@ export interface DeepDiscoveryResponse {
// -- Policy --
export interface SenderIdentity {
name: string;
email: string;
organization: string;
physical_mailing_address: string;
reply_to_email?: string;
}
export interface OneClickUnsubscribe {
enabled: boolean;
mailto?: string;
header_value?: string;
}
export interface BacklinkPolicyValidationRequest {
user_id: string;
workspace_id: string;
@@ -83,10 +97,15 @@ export interface BacklinkPolicyValidationRequest {
recipient_email: string;
recipient_domain: string;
recipient_region: string;
recipient_region_source: string;
legal_basis: string;
contact_discovery_source: string;
consent_status: string;
approved_by_human: boolean;
unsubscribe_url?: string;
sender_identity: string;
one_click_unsubscribe?: OneClickUnsubscribe;
sender_identity: SenderIdentity;
sender_email?: string;
idempotency_key: string;
}
@@ -139,7 +158,7 @@ export interface LeadRecord {
email: string | null;
confidence_score: number;
discovery_source: string;
status: string;
status: LeadStatus;
notes: string | null;
created_at: string | null;
}
@@ -160,9 +179,12 @@ export interface LeadCreateRequest {
notes?: string;
}
export type LeadStatus = 'discovered' | 'contacted' | 'replied' | 'placed' | 'bounced' | 'unsubscribed';
export interface LeadStatusUpdateRequest {
status: string;
status: LeadStatus;
notes?: string;
campaign_id?: string;
}
export interface CampaignDetailResponse {
@@ -183,6 +205,15 @@ export interface SendOutreachRequest {
subject: string;
body: string;
idempotency_key: string;
sender_identity: SenderIdentity;
legal_basis: string;
contact_discovery_source: string;
recipient_region: string;
recipient_region_source: string;
consent_status: string;
approved_by_human: boolean;
unsubscribe_url?: string;
one_click_unsubscribe?: OneClickUnsubscribe;
template_id?: string;
template_variables?: Record<string, string>;
}
@@ -192,6 +223,7 @@ export interface SendOutreachResponse {
status: string;
policy_allowed: boolean;
policy_reasons: string[];
effective_sender_email?: string | null;
}
export interface OutreachAttemptRecord {
@@ -305,8 +337,9 @@ export interface FollowUpRequest {
export interface BulkStatusUpdateRequest {
lead_ids: string[];
status: string;
status: LeadStatus;
notes?: string;
campaign_id?: string;
}
export interface BulkStatusUpdateResponse {

View File

@@ -12,6 +12,7 @@ import {
GenerateEmailRequest,
bulkUpdateLeadStatus,
updateLeadStatus,
addLeadToCampaign,
fetchCampaignAnalyticsVolume,
fetchCampaignAnalyticsFunnel,
CampaignVolumePoint,
@@ -25,7 +26,7 @@ import { LineChart, Line, BarChart, Bar, XAxis, YAxis, CartesianGrid, Tooltip as
type Tab = 'campaigns' | 'discover' | 'leads' | 'composer' | 'analytics';
const STATUS_OPTIONS = ['discovered', 'contacted', 'replied', 'placed', 'bounced', 'unsubscribed'];
const STATUS_OPTIONS = ['discovered', 'contacted', 'replied', 'placed', 'bounced', 'unsubscribed'] as const;
const STATUS_EXPLANATIONS: Record<string, string> = {
discovered: 'Lead found but not yet contacted',
@@ -116,6 +117,19 @@ const BacklinkOutreachDashboard: React.FC = () => {
const [subjectSuggestions, setSubjectSuggestions] = useState<string[]>([]);
const [isGenerating, setIsGenerating] = useState(false);
const [senderName, setSenderName] = useState('');
const [senderEmail, setSenderEmail] = useState('');
const [senderOrganization, setSenderOrganization] = useState('');
const [senderAddress, setSenderAddress] = useState('');
const [unsubscribeUrl, setUnsubscribeUrl] = useState('');
const [oneClickUnsubscribe, setOneClickUnsubscribe] = useState(false);
const [legalBasis, setLegalBasis] = useState('legitimate_interest');
const [contactDiscoverySource, setContactDiscoverySource] = useState('');
const [recipientRegion, setRecipientRegion] = useState('unknown');
const [recipientRegionSource, setRecipientRegionSource] = useState('user_attested');
const [consentStatus, setConsentStatus] = useState('unknown');
const [approvedByHuman, setApprovedByHuman] = useState(false);
const [leadName, setLeadName] = useState('');
const [leadSite, setLeadSite] = useState('');
const [leadContentTopic, setLeadContentTopic] = useState('');
@@ -126,7 +140,7 @@ const BacklinkOutreachDashboard: React.FC = () => {
const [templateName, setTemplateName] = useState('');
const [selectedLeadIds, setSelectedLeadIds] = useState<Set<string>>(new Set());
const [bulkStatus, setBulkStatus] = useState('contacted');
const [bulkStatus, setBulkStatus] = useState<'discovered' | 'contacted' | 'replied' | 'placed' | 'bounced' | 'unsubscribed'>('contacted');
const [volumeData, setVolumeData] = useState<CampaignVolumePoint[]>([]);
const [funnelData, setFunnelData] = useState<FunnelStage[]>([]);
@@ -190,9 +204,24 @@ const BacklinkOutreachDashboard: React.FC = () => {
}, [keyword, deepDiscover]);
const handleDiscoverAndSave = useCallback(async () => {
if (!keyword.trim() || !discoverCampaignId) return;
await deepDiscover(keyword.trim(), 15, discoverCampaignId);
}, [keyword, discoverCampaignId, deepDiscover]);
if (!keyword.trim() || !discoverCampaignId || discoveredOpportunities.length === 0) return;
for (const opp of discoveredOpportunities) {
try {
await addLeadToCampaign(discoverCampaignId, {
campaign_id: discoverCampaignId,
url: opp.url,
domain: opp.domain,
page_title: opp.page_title,
snippet: opp.snippet,
email: opp.email ?? undefined,
confidence_score: opp.confidence_score,
});
} catch (e) {
// skip duplicates
}
}
showToastNotification(`Saved ${discoveredOpportunities.length} leads to campaign`, 'success');
}, [keyword, discoverCampaignId, discoveredOpportunities]);
const handleSelectCampaign = useCallback(async (campaignId: string) => {
await selectCampaign(campaignId);
@@ -311,10 +340,13 @@ const BacklinkOutreachDashboard: React.FC = () => {
);
};
const handleSingleStatusUpdate = async (leadId: string, status: string) => {
const handleSingleStatusUpdate = async (leadId: string, status: 'discovered' | 'contacted' | 'replied' | 'placed' | 'bounced' | 'unsubscribed') => {
setIsStatusUpdating(true);
try {
await updateLeadStatus(leadId, { status });
await updateLeadStatus(leadId, {
status,
campaign_id: selectedCampaign!.campaign_id,
});
showToastNotification(`Status updated to "${status}"`, 'success');
await selectCampaign(selectedCampaign!.campaign_id);
} catch (e) {
@@ -328,7 +360,11 @@ const BacklinkOutreachDashboard: React.FC = () => {
if (selectedLeadIds.size === 0) return;
setIsStatusUpdating(true);
try {
const result = await bulkUpdateLeadStatus({ lead_ids: Array.from(selectedLeadIds), status: bulkStatus });
const result = await bulkUpdateLeadStatus({
lead_ids: Array.from(selectedLeadIds),
status: bulkStatus,
campaign_id: selectedCampaign!.campaign_id,
});
if (result.failed.length > 0) {
showToastNotification(`Updated ${result.updated} leads; ${result.failed.length} failed`, 'warning');
} else {
@@ -391,10 +427,27 @@ const BacklinkOutreachDashboard: React.FC = () => {
{ key: 'campaigns', label: 'Campaigns', desc: 'Create and manage outreach campaigns' },
{ key: 'discover', label: 'Discover', desc: 'AI-powered search for guest post opportunities' },
{ key: 'leads', label: 'Leads', desc: 'Track leads, send outreach, and manage replies' },
{ key: 'composer', label: 'Composer', desc: 'AI email composer with smart suggestions' },
{ key: 'composer', label: 'Composer', desc: 'AI email composer with compliance metadata' },
{ key: 'analytics', label: 'Analytics', desc: 'Campaign performance metrics and exports' },
];
const complianceReasons = [
!unsubscribeUrl.trim() && !oneClickUnsubscribe ? 'Add an unsubscribe URL or enable one-click unsubscribe.' : '',
!senderName.trim() ? 'Add the sender name.' : '',
!senderEmail.trim() ? 'Add the sender email.' : '',
!senderOrganization.trim() ? 'Add the sender organization.' : '',
!senderAddress.trim() ? 'Add a physical mailing address.' : '',
!legalBasis.trim() ? 'Record the legal basis.' : '',
!contactDiscoverySource.trim() ? 'Record where the contact was discovered.' : '',
recipientRegion === 'unknown' && !approvedByHuman ? 'Unknown recipient region requires manual review.' : '',
recipientRegionSource === 'tld_inference' && !approvedByHuman ? 'TLD-only region inference requires manual review.' : '',
['eu', 'eea', 'uk', 'ca'].includes(recipientRegion) && (legalBasis !== 'consent' || consentStatus !== 'explicit')
? 'Selected recipient region requires recorded explicit consent.' : '',
].filter(Boolean);
const complianceReady = complianceReasons.length === 0;
const SectionHeader: React.FC<{ title: string; subtitle: string }> = ({ title, subtitle }) => (
<div style={{ marginBottom: '16px' }}>
<h3 style={{ margin: 0, background: GRADIENT_PRIMARY, WebkitBackgroundClip: 'text', WebkitTextFillColor: 'transparent', fontSize: '18px' }}>{title}</h3>
@@ -644,7 +697,7 @@ const BacklinkOutreachDashboard: React.FC = () => {
{selectedLeadIds.size > 0 && (
<>
<TooltipWrap text="Choose the new status for all selected leads">
<select value={bulkStatus} onChange={(e) => setBulkStatus(e.target.value)}
<select value={bulkStatus} onChange={(e) => setBulkStatus(e.target.value as typeof bulkStatus)}
style={{ ...selectSx, padding: '6px 10px', fontSize: '12px', minWidth: '130px' }}>
{STATUS_OPTIONS.map((s) => <option key={s} value={s}>{s}</option>)}
</select>
@@ -708,6 +761,7 @@ const BacklinkOutreachDashboard: React.FC = () => {
<div key={a.attempt_id} style={{ marginTop: '8px', padding: '8px 12px', background: 'rgba(255,255,255,0.04)', borderRadius: '8px', fontSize: '12px' }}>
<span style={{ color: 'rgba(255,255,255,0.5)' }}>Latest: {a.subject} </span>
{renderStatusBadge(a.status)}
{a.sender_email && <span style={{ color: 'rgba(255,255,255,0.35)', marginLeft: '8px' }}>From: {a.sender_email}</span>}
{a.sent_at && <span style={{ color: 'rgba(255,255,255,0.3)', marginLeft: '8px' }}>{new Date(a.sent_at).toLocaleString()}</span>}
</div>
))}
@@ -724,7 +778,7 @@ const BacklinkOutreachDashboard: React.FC = () => {
<table style={{ width: '100%', borderCollapse: 'collapse', fontSize: '13px' }}>
<thead>
<tr style={{ background: 'rgba(255,255,255,0.04)' }}>
{['Subject', 'Status', 'Sender', 'Sent At'].map(h => (
{['Subject', 'Status', 'Effective Sender', 'Sent At'].map(h => (
<th key={h} style={{ padding: '10px 12px', borderBottom: '1px solid rgba(255,255,255,0.08)', textAlign: 'left', color: 'rgba(255,255,255,0.4)', fontWeight: 500, fontSize: '12px', textTransform: 'uppercase', letterSpacing: '0.5px' }}>{h}</th>
))}
</tr>
@@ -893,6 +947,71 @@ const BacklinkOutreachDashboard: React.FC = () => {
style={{ ...inputSx, fontFamily: 'monospace', fontSize: '13px', resize: 'vertical', lineHeight: 1.6 }} />
</div>
{/* Compliance metadata */}
<div style={{ marginTop: '20px', padding: '16px', borderRadius: '10px', background: complianceReady ? 'rgba(67,233,123,0.08)' : 'rgba(245,87,108,0.08)', border: `1px solid ${complianceReady ? 'rgba(67,233,123,0.22)' : 'rgba(245,87,108,0.22)'}` }}>
<h4 style={{ margin: '0 0 4px', color: '#fff', fontSize: '14px' }}>Send Compliance Metadata</h4>
<p style={{ margin: '0 0 12px', color: 'rgba(255,255,255,0.45)', fontSize: '12px' }}>Policy checks require unsubscribe, sender identity, legal basis, contact source, and region-aware consent/review details before a send can be approved.</p>
<div style={{ display: 'grid', gridTemplateColumns: '1fr 1fr', gap: '8px', marginBottom: '8px' }}>
<input type="text" value={senderName} onChange={(e) => setSenderName(e.target.value)} placeholder="Sender name" style={inputSx} />
<input type="email" value={senderEmail} onChange={(e) => setSenderEmail(e.target.value)} placeholder="Sender email" style={inputSx} />
<input type="text" value={senderOrganization} onChange={(e) => setSenderOrganization(e.target.value)} placeholder="Organization / brand" style={inputSx} />
<input type="text" value={senderAddress} onChange={(e) => setSenderAddress(e.target.value)} placeholder="Physical mailing address" style={inputSx} />
</div>
<div style={{ display: 'grid', gridTemplateColumns: '1fr 1fr', gap: '8px', marginBottom: '8px' }}>
<input type="url" value={unsubscribeUrl} onChange={(e) => setUnsubscribeUrl(e.target.value)} placeholder="Unsubscribe URL" style={inputSx} />
<label style={{ ...inputSx, display: 'flex', alignItems: 'center', gap: '8px', cursor: 'pointer' }}>
<input type="checkbox" checked={oneClickUnsubscribe} onChange={(e) => setOneClickUnsubscribe(e.target.checked)} />
One-click unsubscribe available
</label>
</div>
<div style={{ display: 'grid', gridTemplateColumns: '1fr 1fr', gap: '8px', marginBottom: '8px' }}>
<select value={legalBasis} onChange={(e) => setLegalBasis(e.target.value)} style={selectSx}>
<option value="legitimate_interest">Legitimate interest</option>
<option value="consent">Consent</option>
<option value="contract">Contract</option>
</select>
<input type="text" value={contactDiscoverySource} onChange={(e) => setContactDiscoverySource(e.target.value)} placeholder="Contact discovery source (e.g. contact page URL)" style={inputSx} />
<select value={recipientRegion} onChange={(e) => setRecipientRegion(e.target.value)} style={selectSx}>
<option value="unknown">Recipient region unknown</option>
<option value="us">United States</option>
<option value="eu">EU / EEA</option>
<option value="uk">United Kingdom</option>
<option value="ca">Canada</option>
<option value="au">Australia</option>
<option value="br">Brazil</option>
<option value="other">Other</option>
</select>
<select value={recipientRegionSource} onChange={(e) => setRecipientRegionSource(e.target.value)} style={selectSx}>
<option value="user_attested">Region user-attested</option>
<option value="crm_record">Region from CRM/contact record</option>
<option value="billing_or_profile">Region from profile/billing data</option>
<option value="tld_inference">Region inferred from TLD only</option>
<option value="unknown">Region source unknown</option>
</select>
<select value={consentStatus} onChange={(e) => setConsentStatus(e.target.value)} style={selectSx}>
<option value="unknown">Consent status unknown</option>
<option value="explicit">Explicit consent recorded</option>
<option value="implied">Implied consent / soft opt-in</option>
<option value="not_required">Not required for selected basis</option>
</select>
<label style={{ ...inputSx, display: 'flex', alignItems: 'center', gap: '8px', cursor: 'pointer' }}>
<input type="checkbox" checked={approvedByHuman} onChange={(e) => setApprovedByHuman(e.target.checked)} />
Manual review approved
</label>
</div>
<div style={{ padding: '10px 12px', borderRadius: '8px', background: complianceReady ? 'rgba(67,233,123,0.12)' : 'rgba(245,87,108,0.12)', color: complianceReady ? '#43e97b' : '#f5576c', fontSize: '12px' }}>
{complianceReady ? 'Compliance metadata is complete for policy validation.' : (
<ul style={{ margin: 0, paddingLeft: '18px' }}>
{complianceReasons.map((reason) => <li key={reason}>{reason}</li>)}
</ul>
)}
</div>
</div>
{/* Personalize */}
<div style={{ marginTop: '24px', padding: '16px', borderRadius: '10px', background: 'rgba(255,255,255,0.03)', border: '1px solid rgba(255,255,255,0.08)' }}>
<h4 style={{ margin: '0 0 4px', color: '#fff', fontSize: '14px' }}>Personalize for Lead</h4>
@@ -946,13 +1065,13 @@ const BacklinkOutreachDashboard: React.FC = () => {
</div>
{selectedCampaign && subject.trim() && body.trim() && (
<div style={{ marginTop: '16px', padding: '14px', borderRadius: '10px', background: 'rgba(67,233,123,0.1)', border: '1px solid rgba(67,233,123,0.2)' }}>
<p style={{ margin: '0 0 8px', fontSize: '13px', color: '#43e97b' }}>
Ready to send this email to leads in <strong>{selectedCampaign.name}</strong>?
<div style={{ marginTop: '16px', padding: '14px', borderRadius: '10px', background: complianceReady ? 'rgba(67,233,123,0.1)' : 'rgba(245,87,108,0.1)', border: `1px solid ${complianceReady ? 'rgba(67,233,123,0.2)' : 'rgba(245,87,108,0.2)'}` }}>
<p style={{ margin: '0 0 8px', fontSize: '13px', color: complianceReady ? '#43e97b' : '#f5576c' }}>
{complianceReady ? <>Ready to send this email to leads in <strong>{selectedCampaign.name}</strong>.</> : <>Complete compliance metadata before sending to <strong>{selectedCampaign.name}</strong> leads.</>}
</p>
<TooltipWrap text="Go to the Leads tab to select recipients and send">
<button onClick={() => setActiveTab('leads')}
style={{ ...btnBase, padding: '8px 20px', background: GRADIENT_SUCCESS, color: '#1a1a2e', fontSize: '13px' }}>
<TooltipWrap text={complianceReady ? 'Go to the Leads tab to select recipients and send' : 'Policy validation will block sends until all listed compliance fields are complete'}>
<button onClick={() => setActiveTab('leads')} disabled={!complianceReady}
style={{ ...btnBase, padding: '8px 20px', background: GRADIENT_SUCCESS, color: '#1a1a2e', fontSize: '13px', opacity: complianceReady ? 1 : 0.5 }}>
Go to Campaign Leads
</button>
</TooltipWrap>

View File

@@ -16,6 +16,8 @@ export const FEATURE_KEYS = {
SEO: 'seo',
CONTENT_PLANNING: 'content-planning',
SOCIAL: 'social',
LINKEDIN: 'linkedin',
FACEBOOK: 'facebook',
BLOG_WRITER: 'blog_writer',
STORY: 'story',
YOUTUBE: 'youtube',
@@ -28,6 +30,7 @@ export const FEATURE_KEYS = {
WIX: 'wix',
BING: 'bing',
ASSET_LIBRARY: 'asset-library',
BACKLINKING: 'backlinking',
} as const;
export type FeatureKey = typeof FEATURE_KEYS[keyof typeof FEATURE_KEYS];
@@ -124,6 +127,9 @@ export function getSingleFeature(): string | null {
const FEATURE_ROUTE_PRIORITY: [string, string][] = [
['podcast', '/podcast-maker'],
['blog_writer', '/blog-writer'],
['backlinking', '/backlink-outreach'],
['linkedin', '/linkedin-writer'],
['facebook', '/facebook-writer'],
['story', '/story-writer'],
['image', '/image-studio'],
['video', '/video-studio'],