Compare commits

...

15 Commits

Author SHA1 Message Date
ajaysi
b894bc0abb fix: GHSA-426f-p74m-73fv — JWT JWKS issuer confusion auth bypass (CVSS 9.4)
Pin issuer and JWKS URL at startup from CLERK_PUBLISHABLE_KEY.
Validate token iss claim before any JWKS fetch.
Add issuer= to jwt.decode() with verify_iss=True.
2026-06-05 12:07:22 +05:30
ajaysi
70542b32fc feat: add linkedin and facebook feature flags, clean up dead code
- Register 'linkedin' FeatureGroup with routers.linkedin and
  api.linkedin_image_generation routers
- Register 'facebook' FeatureGroup with
  api.facebook_writer.routers:facebook_router
- Add 'linkedin' and 'facebook' profiles to PROFILE_GROUP_MAP
- Remove dead imports of linkedin_router, linkedin_image_router,
  and facebook_router from app.py (router manager handles via
  CORE_ROUTER_REGISTRY)
- Add LINKEDIN and FACEBOOK keys to frontend FEATURE_KEYS
- Add route priorities for /linkedin-writer and /facebook-writer
- Change route gates from feature='social' to feature='linkedin'
  and feature='facebook' respectively
2026-06-05 12:07:22 +05:30
ajaysi
9a3d704c5c feat: add backlinking feature flag following blog_writer pattern
- Register 'backlinking' FeatureGroup in feature_registry.py with
  routers=routers.backlink_outreach:router
- Add 'backlinking' profile to PROFILE_GROUP_MAP (core + backlinking)
- Add backlink_outreach to OPTIONAL_ROUTER_REGISTRY with
  features={'all', 'backlinking'}
- Remove direct import/include of backlink_outreach from app.py
  (router manager handles both 'all' and 'backlinking' modes)
- Add BACKLINKING key to FEATURE_KEYS and route priority in
  frontend demoMode.ts
- Change frontend route gate from feature='seo' to feature='backlinking'
  so ALWRITY_ENABLED_FEATURES=backlinking enables the route
2026-06-03 20:19:41 +05:30
ajaysi
8699ffc27d fix: resolve remaining 5 QA audit findings (#3, #8, #10, #11, #12)
#3 — Duplicate prospect handling: add_lead now checks (campaign_id, url)
     before insert; bulk_add_leads skips existing URLs.
#8 — Atomic rate limiting: try_increment_* methods atomically check cap
     and increment in a single session; router uses these before send.
#10 — Reply matching via Message-ID: sender generates Message-ID header,
     stored on OutreachAttempt; reply monitor parses In-Reply-To/References;
     poll_replies matches by message_id first, falls back to from_email.
#11 — Save-to-campaign uses existing store results instead of
      re-running expensive deepDiscover.
#12 — Lead status Literal type: Pydantic models enforce valid status
      values; backend validates via LEAD_VALID_STATUSES frozenset;
      frontend API typed as LeadStatus union.
2026-06-03 20:06:11 +05:30
ajaysi
259194c289 Merge remote-tracking branch 'origin/codex/add-atomic-idempotency-reservation-method'
# Conflicts:
#	backend/routers/backlink_outreach.py
#	backend/services/backlink_outreach_models.py
2026-06-03 18:52:18 +05:30
ajaysi
2f93ae4891 Merge remote-tracking branch 'origin/codex/add-sender-email-validation-and-logging' 2026-06-03 18:50:53 +05:30
ي
bf22a3d318 Handle backlink outreach idempotency reservations 2026-06-03 18:49:14 +05:30
ajaysi
2a879a6e24 Merge remote-tracking branch 'origin/codex/update-compliance-requirements-for-outreach-send' 2026-06-03 18:49:07 +05:30
ajaysi
7749b4db0e Merge remote-tracking branch 'origin/codex/refactor-backlink-outreach-services-for-async-support'
# Conflicts:
#	backend/routers/backlink_outreach.py
2026-06-03 18:49:01 +05:30
ي
cbace3b752 Validate backlink outreach sender aliases 2026-06-03 18:48:17 +05:30
ي
98d4ac6dbd Harden backlink outreach send policy 2026-06-03 18:33:11 +05:30
ي
55b7209554 Refactor backlink discovery HTTP calls 2026-06-03 18:28:40 +05:30
ajaysi
57e46a20f8 Merge remote-tracking branch 'origin/codex/update-backlink-outreach-for-campaign-validation' 2026-06-03 18:22:35 +05:30
ي
ec2f9151b8 Harden backlink lead campaign ownership 2026-06-03 18:19:16 +05:30
ي
40516e5c79 Secure backlink lead status updates 2026-06-03 18:16:10 +05:30
16 changed files with 1168 additions and 280 deletions

View File

@@ -58,6 +58,21 @@ FEATURE_GROUPS: Dict[str, FeatureGroup] = {
"api.blog_writer.seo_analysis:router", "api.blog_writer.seo_analysis:router",
), ),
), ),
"backlinking": FeatureGroup(
features=("backlinking",),
routers=("routers.backlink_outreach:router",),
),
"linkedin": FeatureGroup(
features=("linkedin",),
routers=(
"routers.linkedin:router",
"api.linkedin_image_generation:router",
),
),
"facebook": FeatureGroup(
features=("facebook",),
routers=("api.facebook_writer.routers:facebook_router",),
),
} }
@@ -67,5 +82,8 @@ PROFILE_GROUP_MAP: Dict[str, Tuple[str, ...]] = {
"podcast": ("core", "podcast"), "podcast": ("core", "podcast"),
"youtube": ("core", "youtube"), "youtube": ("core", "youtube"),
"blog_writer": ("core", "blog_writer"), "blog_writer": ("core", "blog_writer"),
"backlinking": ("core", "backlinking"),
"linkedin": ("core", "linkedin"),
"facebook": ("core", "facebook"),
"planning": ("core", "content_planning"), "planning": ("core", "content_planning"),
} }

View File

@@ -67,6 +67,7 @@ OPTIONAL_ROUTER_REGISTRY = [
{"name": "oauth_token_monitoring", "module": "api.oauth_token_monitoring_routes", "attr": "router", "features": {"all", "core"}}, {"name": "oauth_token_monitoring", "module": "api.oauth_token_monitoring_routes", "attr": "router", "features": {"all", "core"}},
{"name": "agents", "module": "api.agents_api", "attr": "router", "features": {"all"}}, {"name": "agents", "module": "api.agents_api", "attr": "router", "features": {"all"}},
{"name": "today_workflow", "module": "api.today_workflow", "attr": "router", "features": {"all"}}, {"name": "today_workflow", "module": "api.today_workflow", "attr": "router", "features": {"all"}},
{"name": "backlink_outreach", "module": "routers.backlink_outreach", "attr": "router", "features": {"all", "backlinking"}},
] ]
OPTIONAL_MODULE_MATRIX = { OPTIONAL_MODULE_MATRIX = {

View File

@@ -126,19 +126,14 @@ seo_tools_router = None
if _is_full_mode(): if _is_full_mode():
from routers.seo_tools import router as seo_tools_router from routers.seo_tools import router as seo_tools_router
# Skip Facebook Writer, LinkedIn, and other non-essential routes in feature-only modes # Skip heavy services in feature-only modes (PersonaAnalysisService, etc.)
# Also skip other heavy services that trigger PersonaAnalysisService initialization
if _is_full_mode(): if _is_full_mode():
from api.facebook_writer.routers import facebook_router
from routers.linkedin import router as linkedin_router
from api.linkedin_image_generation import router as linkedin_image_router
from api.brainstorm import router as brainstorm_router from api.brainstorm import router as brainstorm_router
from api.images import router as images_router from api.images import router as images_router
from api.assets_serving import router as assets_serving_router from api.assets_serving import router as assets_serving_router
from routers.image_studio import router as image_studio_router from routers.image_studio import router as image_studio_router
from routers.product_marketing import router as product_marketing_router from routers.product_marketing import router as product_marketing_router
from routers.campaign_creator import router as campaign_creator_router from routers.campaign_creator import router as campaign_creator_router
from routers.backlink_outreach import router as backlink_outreach_router
else: else:
# In feature-only modes, only load essential assets router # In feature-only modes, only load essential assets router
from api.assets_serving import router as assets_serving_router from api.assets_serving import router as assets_serving_router
@@ -147,7 +142,6 @@ else:
image_studio_router = None image_studio_router = None
product_marketing_router = None product_marketing_router = None
campaign_creator_router = None campaign_creator_router = None
backlink_outreach_router = None
# Import hallucination detector router # Import hallucination detector router
try: try:
@@ -683,8 +677,6 @@ if _is_full_mode():
app.include_router(product_marketing_router) app.include_router(product_marketing_router)
if campaign_creator_router: if campaign_creator_router:
app.include_router(campaign_creator_router) app.include_router(campaign_creator_router)
if backlink_outreach_router:
app.include_router(backlink_outreach_router)
router_group_status["platform_extensions"] = { router_group_status["platform_extensions"] = {
"mounted": True, "mounted": True,
@@ -799,6 +791,24 @@ async def startup_event():
else: else:
logger.info(f"[FEATURE-MODE] Skipping scheduler startup (features: {enabled_features})") logger.info(f"[FEATURE-MODE] Skipping scheduler startup (features: {enabled_features})")
# Recover stale YouTube tasks on startup
if _is_feature_enabled("youtube"):
try:
from api.youtube.task_manager import task_manager
from services.database import get_all_user_ids
user_ids = get_all_user_ids()
recovered = 0
for uid in user_ids:
try:
count = task_manager.recover_stale_tasks(uid)
recovered += count
except Exception:
pass
if recovered > 0:
logger.info(f"[STARTUP] Recovered {recovered} stale YouTube tasks across {len(user_ids)} users")
except Exception as e:
logger.warning(f"[STARTUP] YouTube task recovery skipped: {e}")
# Check Wix configuration (OAuth-based, API key optional) # Check Wix configuration (OAuth-based, API key optional)
wix_api_key = os.getenv('WIX_API_KEY') wix_api_key = os.getenv('WIX_API_KEY')
if wix_api_key: if wix_api_key:

View File

@@ -50,6 +50,7 @@ class ClerkAuthMiddleware:
# Cache for PyJWKClient to avoid repeated JWKS fetches # Cache for PyJWKClient to avoid repeated JWKS fetches
self._jwks_client_cache = {} self._jwks_client_cache = {}
self._jwks_url_cache = None self._jwks_url_cache = None
self._issuer_cache = None # Pre-configured Clerk issuer for iss validation
if not self.clerk_secret_key and not self.disable_auth: if not self.clerk_secret_key and not self.disable_auth:
logger.warning("CLERK_SECRET_KEY not found, authentication may fail") logger.warning("CLERK_SECRET_KEY not found, authentication may fail")
@@ -58,15 +59,16 @@ class ClerkAuthMiddleware:
if CLERK_AUTH_AVAILABLE and not self.disable_auth: if CLERK_AUTH_AVAILABLE and not self.disable_auth:
try: try:
if self.clerk_secret_key and self.clerk_publishable_key: if self.clerk_secret_key and self.clerk_publishable_key:
# Extract instance from publishable key for JWKS URL # Extract instance from publishable key for JWKS URL and issuer validation
# Format: pk_test_<instance>.<domain> or pk_live_<instance>.<domain> # Format: pk_test_<instance>.<domain> or pk_live_<instance>.<domain>
parts = self.clerk_publishable_key.replace('pk_test_', '').replace('pk_live_', '').split('.') parts = self.clerk_publishable_key.replace('pk_test_', '').replace('pk_live_', '').split('.')
if len(parts) >= 1: if len(parts) >= 1:
# Extract the domain from publishable key or use default # Extract the domain from publishable key or use default
# Clerk URLs are typically: https://<instance>.clerk.accounts.dev # Clerk URLs are typically: https://<instance>.clerk.accounts.dev
instance = parts[0] instance = parts[0]
jwks_url = f"https://{instance}.clerk.accounts.dev/.well-known/jwks.json" issuer_url = f"https://{instance}.clerk.accounts.dev"
jwks_url = f"{issuer_url}/.well-known/jwks.json"
# Create Clerk configuration with JWKS URL # Create Clerk configuration with JWKS URL
clerk_config = ClerkConfig( clerk_config = ClerkConfig(
secret_key=self.clerk_secret_key, secret_key=self.clerk_secret_key,
@@ -76,6 +78,7 @@ class ClerkAuthMiddleware:
self.clerk_bearer = ClerkHTTPBearer(clerk_config) self.clerk_bearer = ClerkHTTPBearer(clerk_config)
logger.info(f"fastapi-clerk-auth initialized successfully with JWKS URL: {jwks_url}") logger.info(f"fastapi-clerk-auth initialized successfully with JWKS URL: {jwks_url}")
self._jwks_url_cache = jwks_url self._jwks_url_cache = jwks_url
self._issuer_cache = issuer_url # Pin issuer for VULN-001 fix
else: else:
logger.warning("Could not extract instance from publishable key") logger.warning("Could not extract instance from publishable key")
self.clerk_bearer = None self.clerk_bearer = None
@@ -118,19 +121,29 @@ class ClerkAuthMiddleware:
import jwt import jwt
from jwt import PyJWKClient from jwt import PyJWKClient
# Get the JWKS URL from the token header # Get the unverified header for key ID lookup
unverified_header = jwt.get_unverified_header(token) unverified_header = jwt.get_unverified_header(token)
# Decode token to get issuer for JWKS URL # --- SECURITY FIX (VULN-001): Validate issuer before any JWKS fetch ---
# Pre-configured issuer and JWKS URL derived from CLERK_PUBLISHABLE_KEY
# NEVER use the token's 'iss' claim to construct the JWKS URL (GHSA-426f-p74m-73fv)
expected_issuer = self._issuer_cache
jwks_url = self._jwks_url_cache
if not expected_issuer or not jwks_url:
raise Exception("Clerk issuer/JWKS URL not configured at startup")
# Decode token to validate the issuer claim against the pre-configured value
# WARNING: We must first validate 'iss' before trusting anything else
unverified_claims = jwt.decode(token, options={"verify_signature": False}) unverified_claims = jwt.decode(token, options={"verify_signature": False})
issuer = unverified_claims.get('iss', '') token_issuer = unverified_claims.get('iss', '')
if token_issuer != expected_issuer:
# Construct JWKS URL from issuer logger.error(
jwks_url = f"{issuer}/.well-known/jwks.json" if issuer else self._jwks_url_cache or "" f"Issuer mismatch: token claims '{token_issuer}' "
if not jwks_url: f"but expected '{expected_issuer}'"
raise Exception("Unable to resolve JWKS URL for Clerk verification") )
return None
# Use cached PyJWKClient to avoid repeated JWKS fetches
# Use cached PyJWKClient with pinned jwks_url (never derived from token)
if jwks_url not in self._jwks_client_cache: if jwks_url not in self._jwks_client_cache:
logger.info(f"Creating new PyJWKClient for {jwks_url} with caching enabled") logger.info(f"Creating new PyJWKClient for {jwks_url} with caching enabled")
# Create client with caching enabled (cache_keys=True keeps keys in memory) # Create client with caching enabled (cache_keys=True keeps keys in memory)
@@ -139,17 +152,19 @@ class ClerkAuthMiddleware:
cache_keys=True, cache_keys=True,
max_cached_keys=16 max_cached_keys=16
) )
jwks_client = self._jwks_client_cache[jwks_url] jwks_client = self._jwks_client_cache[jwks_url]
signing_key = jwks_client.get_signing_key_from_jwt(token) signing_key = jwks_client.get_signing_key_from_jwt(token)
# Verify and decode the token with clock skew tolerance # Verify and decode the token with clock skew tolerance
# Add 300 seconds (5 minutes) leeway to handle clock skew and token refresh delays # Add 300 seconds (5 minutes) leeway to handle clock skew and token refresh delays
# SECURITY: Always pass issuer= to verify the token's 'iss' matches expected (VULN-001)
decoded_token = jwt.decode( decoded_token = jwt.decode(
token, token,
signing_key.key, signing_key.key,
algorithms=["RS256"], algorithms=["RS256"],
options={"verify_signature": True, "verify_exp": True}, issuer=expected_issuer,
options={"verify_signature": True, "verify_exp": True, "verify_iss": True},
leeway=300 # Allow 5 minutes leeway for token refresh during navigation leeway=300 # Allow 5 minutes leeway for token refresh during navigation
) )

View File

@@ -46,6 +46,7 @@ class OutreachAttempt(Base):
decision_reason = Column(Text, nullable=True) decision_reason = Column(Text, nullable=True)
sent_at = Column(DateTime, nullable=True) sent_at = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow, index=True) created_at = Column(DateTime, default=datetime.utcnow, index=True)
message_id = Column(String(255), nullable=True, index=True)
class OutreachReply(Base): class OutreachReply(Base):

View File

@@ -22,7 +22,10 @@ from services.backlink_outreach_models import (
SuppressionAddRequest, SuppressionAddRequest,
) )
from services.backlink_outreach_service import backlink_outreach_service from services.backlink_outreach_service import backlink_outreach_service
from services.backlink_outreach_storage import BacklinkOutreachStorageService from services.backlink_outreach_storage import (
BacklinkCampaignNotFoundError,
BacklinkOutreachStorageService,
)
from services.backlink_outreach_sender import backlink_outreach_sender from services.backlink_outreach_sender import backlink_outreach_sender
from services.backlink_outreach_reply_monitor import backlink_outreach_reply_monitor from services.backlink_outreach_reply_monitor import backlink_outreach_reply_monitor
from services.backlink_outreach_template_generator import ( from services.backlink_outreach_template_generator import (
@@ -68,7 +71,7 @@ async def discover_backlink_opportunities(
payload: BacklinkKeywordInput, payload: BacklinkKeywordInput,
current_user: Dict[str, Any] = Depends(get_current_user), current_user: Dict[str, Any] = Depends(get_current_user),
): ):
return backlink_outreach_service.discover_opportunities(payload.keyword, payload.max_results) return await backlink_outreach_service.discover_opportunities_async(payload.keyword, payload.max_results)
@router.get("/migration-coverage") @router.get("/migration-coverage")
@@ -84,12 +87,25 @@ async def get_backlink_migration_coverage(
async def discover_deep_backlink_opportunities( async def discover_deep_backlink_opportunities(
payload: DeepKeywordInput, payload: DeepKeywordInput,
current_user: Dict[str, Any] = Depends(get_current_user), current_user: Dict[str, Any] = Depends(get_current_user),
scrape_timeout_seconds: float = Query(15.0, ge=1.0, le=60.0),
scrape_max_concurrency: int = Query(5, ge=1, le=20),
): ):
"""Enhanced discovery using Exa neural search + DuckDuckGo with full-page scraping.""" """Enhanced discovery using Exa neural search + DuckDuckGo with full-page scraping."""
user_id = _resolve_user_id(current_user) user_id = _resolve_user_id(current_user)
result = await backlink_outreach_service.deep_discover(payload.keyword, payload.max_results) storage = None
if payload.campaign_id: if payload.campaign_id:
storage = BacklinkOutreachStorageService() storage = BacklinkOutreachStorageService()
if not storage.get_campaign(payload.campaign_id, user_id):
raise HTTPException(status_code=404, detail="Campaign not found")
result = await backlink_outreach_service.deep_discover(
payload.keyword,
payload.max_results,
user_id=user_id,
scrape_timeout_seconds=scrape_timeout_seconds,
scrape_max_concurrency=scrape_max_concurrency,
)
if payload.campaign_id:
saved = 0 saved = 0
save_failed = 0 save_failed = 0
for opp in result.get("opportunities", []): for opp in result.get("opportunities", []):
@@ -183,7 +199,9 @@ async def add_campaign_lead(
notes=payload.notes, notes=payload.notes,
) )
return lead return lead
except Exception as e: except BacklinkCampaignNotFoundError:
raise HTTPException(status_code=404, detail="Campaign not found")
except Exception:
raise HTTPException(status_code=500, detail="Failed to add lead") raise HTTPException(status_code=500, detail="Failed to add lead")
@@ -192,18 +210,48 @@ async def bulk_update_lead_status(
payload: BulkStatusUpdateRequest, payload: BulkStatusUpdateRequest,
current_user: Dict[str, Any] = Depends(get_current_user), current_user: Dict[str, Any] = Depends(get_current_user),
): ):
"""Bulk update lead statuses.""" """Bulk update lead statuses for leads owned by the current user."""
user_id = _resolve_user_id(current_user) user_id = _resolve_user_id(current_user)
storage = BacklinkOutreachStorageService() storage = BacklinkOutreachStorageService()
access_issues = storage.get_lead_access_issues(
payload.lead_ids, user_id, campaign_id=payload.campaign_id
)
if access_issues["unauthorized"]:
raise HTTPException(
status_code=403,
detail={
"message": "One or more leads do not belong to the current user",
"lead_ids": access_issues["unauthorized"],
},
)
if access_issues["missing"]:
raise HTTPException(
status_code=404,
detail={
"message": "One or more leads were not found",
"lead_ids": access_issues["missing"],
},
)
updated = 0 updated = 0
failed: list[str] = [] failed: list[str] = []
for lid in payload.lead_ids: for lid in payload.lead_ids:
try: try:
lead = storage.update_lead_status(lid, user_id, payload.status, payload.notes) lead = storage.update_lead_status(
lid,
user_id,
payload.status,
payload.notes,
campaign_id=payload.campaign_id,
)
if lead: if lead:
updated += 1 updated += 1
else: else:
failed.append(lid) failed.append(lid)
except PermissionError:
raise HTTPException(
status_code=403, detail="Lead does not belong to the current user"
)
except Exception: except Exception:
failed.append(lid) failed.append(lid)
return BulkStatusUpdateResponse(updated=updated, failed=failed) return BulkStatusUpdateResponse(updated=updated, failed=failed)
@@ -218,7 +266,18 @@ async def update_lead_status(
"""Update lead status (discovered -> contacted -> replied -> placed).""" """Update lead status (discovered -> contacted -> replied -> placed)."""
user_id = _resolve_user_id(current_user) user_id = _resolve_user_id(current_user)
storage = BacklinkOutreachStorageService() storage = BacklinkOutreachStorageService()
lead = storage.update_lead_status(lead_id, user_id, payload.status, payload.notes) try:
lead = storage.update_lead_status(
lead_id,
user_id,
payload.status,
payload.notes,
campaign_id=payload.campaign_id,
)
except PermissionError:
raise HTTPException(
status_code=403, detail="Lead does not belong to the current user"
)
if not lead: if not lead:
raise HTTPException(status_code=404, detail="Lead not found") raise HTTPException(status_code=404, detail="Lead not found")
return lead return lead
@@ -260,42 +319,95 @@ async def send_outreach(
subject = backlink_outreach_sender.personalize(tmpl.get("subject_template", subject), variables) subject = backlink_outreach_sender.personalize(tmpl.get("subject_template", subject), variables)
body = backlink_outreach_sender.personalize(tmpl.get("body_template", body), variables) body = backlink_outreach_sender.personalize(tmpl.get("body_template", body), variables)
result = backlink_outreach_service.send_outreach( sender_validation = backlink_outreach_sender.validate_sender_alias(payload.sender_email)
SendOutreachRequest( if not sender_validation.authorized:
lead_id=payload.lead_id, return SendOutreachResponse(
campaign_id=payload.campaign_id, attempt_id="",
user_id=user_id, status="failed",
workspace_id=payload.workspace_id, policy_allowed=False,
sender_email=payload.sender_email, policy_reasons=sender_validation.failure_reasons,
subject=subject, effective_sender_email=sender_validation.effective_sender_email or None,
body=body,
idempotency_key=payload.idempotency_key,
) )
)
try:
result = backlink_outreach_service.send_outreach(
SendOutreachRequest(
lead_id=payload.lead_id,
campaign_id=payload.campaign_id,
user_id=user_id,
workspace_id=payload.workspace_id,
sender_email=sender_validation.effective_sender_email,
subject=subject,
body=body,
idempotency_key=payload.idempotency_key,
sender_identity=payload.sender_identity,
legal_basis=payload.legal_basis,
contact_discovery_source=payload.contact_discovery_source,
recipient_region=payload.recipient_region,
recipient_region_source=payload.recipient_region_source,
consent_status=payload.consent_status,
approved_by_human=payload.approved_by_human,
unsubscribe_url=payload.unsubscribe_url,
one_click_unsubscribe=payload.one_click_unsubscribe,
)
)
except Exception:
existing = storage.get_attempt_by_idempotency_key(payload.idempotency_key, user_id=user_id)
if existing:
result = backlink_outreach_service.response_from_attempt(existing, duplicate=True)
if sender_validation.effective_sender_email:
result.effective_sender_email = sender_validation.effective_sender_email
return result
raise HTTPException(status_code=409, detail="Unable to reserve idempotency key")
result.effective_sender_email = sender_validation.effective_sender_email
lead_email = "" lead_email = ""
if result.attempt_id: if result.attempt_id and result.status == "approved" and not result.duplicate:
lead = storage.get_lead(payload.lead_id, user_id=user_id) lead = storage.get_lead(payload.lead_id, user_id=user_id)
lead_email = (lead.get("email") or "") if lead else "" lead_email = (lead.get("email") or "") if lead else ""
if result.policy_allowed and lead_email: if result.status == "approved" and result.policy_allowed and not result.duplicate and lead_email:
sent = await backlink_outreach_sender.send_email( domain = lead_email.split("@")[-1] if "@" in lead_email else "unknown"
to_email=lead_email,
subject=subject, user_within_cap, _ = storage.try_increment_user_send_counter(user_id)
body=body, domain_within_cap, _ = storage.try_increment_domain_send_counter(domain, user_id=user_id)
) if not (user_within_cap and domain_within_cap):
status = "sent" if sent else "failed" reasons = []
storage.update_attempt_status(result.attempt_id, status, user_id=user_id) if not user_within_cap:
result.status = status reasons.append("user_daily_cap_exceeded")
if sent: if not domain_within_cap:
storage.mark_idempotency(payload.idempotency_key, user_id) reasons.append("domain_daily_cap_exceeded")
storage.increment_user_send_counter(user_id) reason_str = f"rate_limit_hit; retry_policy={backlink_outreach_service.SMTP_RETRY_POLICY}"
domain = lead_email.split("@")[-1] if "@" in lead_email else "unknown" storage.update_attempt_status(result.attempt_id, "blocked", decision_reason=reason_str, user_id=user_id)
storage.increment_domain_send_counter(domain, user_id=user_id) result.status = "blocked"
elif result.policy_allowed and not lead_email: result.policy_reasons = reasons
storage.update_attempt_status(result.attempt_id, "failed", user_id=user_id) else:
send_result = await backlink_outreach_sender.send_email(
to_email=lead_email,
subject=subject,
body=body,
from_email=payload.sender_email,
)
if send_result.success:
storage.update_attempt_status(result.attempt_id, "sent", user_id=user_id)
result.status = "sent"
result.effective_sender_email = send_result.effective_sender_email or result.effective_sender_email
if send_result.message_id:
storage.update_attempt_message_id(result.attempt_id, send_result.message_id, user_id=user_id)
storage.mark_idempotency(payload.idempotency_key, user_id)
else:
reason = f"smtp_send_failed; retry_policy={backlink_outreach_service.SMTP_RETRY_POLICY}"
storage.update_attempt_status(result.attempt_id, "failed", decision_reason=reason, user_id=user_id)
result.status = "failed"
result.policy_reasons = ["smtp_send_failed"]
result.retry_policy = backlink_outreach_service.SMTP_RETRY_POLICY
elif result.status == "approved" and result.policy_allowed and not result.duplicate and not lead_email:
reason = f"lead_has_no_email; retry_policy={backlink_outreach_service.SMTP_RETRY_POLICY}"
storage.update_attempt_status(result.attempt_id, "failed", decision_reason=reason, user_id=user_id)
result.status = "failed" result.status = "failed"
result.policy_reasons = (result.policy_reasons or []) + ["lead_has_no_email"] result.policy_reasons = (result.policy_reasons or []) + ["lead_has_no_email"]
result.retry_policy = backlink_outreach_service.SMTP_RETRY_POLICY
return result return result
@@ -350,7 +462,18 @@ async def poll_replies(
if storage.reply_exists(from_email, subject, user_id=user_id): if storage.reply_exists(from_email, subject, user_id=user_id):
skipped += 1 skipped += 1
continue continue
attempt_id = storage.find_attempt_by_from_email(from_email, user_id=user_id) or ""
attempt_id = ""
in_reply_to = raw.get("in_reply_to", "")
references = raw.get("references", "")
if in_reply_to:
attempt_id = storage.find_attempt_by_message_id(in_reply_to, user_id=user_id) or ""
if not attempt_id and references:
mid = references.split()[-1]
attempt_id = storage.find_attempt_by_message_id(mid, user_id=user_id) or ""
if not attempt_id:
attempt_id = storage.find_attempt_by_from_email(from_email, user_id=user_id) or ""
reply = storage.add_reply( reply = storage.add_reply(
attempt_id=attempt_id, attempt_id=attempt_id,
from_email=from_email, from_email=from_email,

View File

@@ -1,7 +1,8 @@
from __future__ import annotations from __future__ import annotations
from pydantic import BaseModel, Field, HttpUrl, EmailStr from pydantic import BaseModel, Field, HttpUrl
from typing import Dict, List, Optional from typing import Dict, List, Optional
from typing_extensions import Literal
class BacklinkKeywordInput(BaseModel): class BacklinkKeywordInput(BaseModel):
@@ -10,7 +11,7 @@ class BacklinkKeywordInput(BaseModel):
class OpportunityContactInfo(BaseModel): class OpportunityContactInfo(BaseModel):
email: Optional[EmailStr] = None email: Optional[str] = None
contact_page: Optional[HttpUrl] = None contact_page: Optional[HttpUrl] = None
@@ -93,8 +94,9 @@ class LeadListResponse(BaseModel):
class LeadStatusUpdateRequest(BaseModel): class LeadStatusUpdateRequest(BaseModel):
status: str = Field(..., min_length=1) status: Literal["discovered", "contacted", "replied", "placed", "bounced", "unsubscribed"]
notes: Optional[str] = None notes: Optional[str] = None
campaign_id: Optional[str] = Field(default=None, min_length=1)
class CampaignDetailResponse(BaseModel): class CampaignDetailResponse(BaseModel):
@@ -148,6 +150,21 @@ class OutreachStatusRecord(BaseModel):
notes: Optional[str] = None notes: Optional[str] = None
class SenderIdentity(BaseModel):
name: str = Field(default="", description="Human sender name displayed to the recipient")
email: str = Field(default="")
organization: str = Field(default="", description="Organization or brand responsible for the outreach")
physical_mailing_address: str = Field(default="", description="Postal address required for commercial outreach compliance")
reply_to_email: Optional[str] = Field(None, description="Optional reply-to mailbox if different from sender email")
class OneClickUnsubscribe(BaseModel):
enabled: bool = Field(default=False)
mailto: Optional[str] = Field(None, description="Mailbox for one-click unsubscribe requests")
header_value: Optional[str] = Field(None, description="List-Unsubscribe / one-click unsubscribe header value")
class SendOutreachRequest(BaseModel): class SendOutreachRequest(BaseModel):
lead_id: str = Field(..., min_length=1) lead_id: str = Field(..., min_length=1)
campaign_id: str = Field(..., min_length=1) campaign_id: str = Field(..., min_length=1)
@@ -157,6 +174,15 @@ class SendOutreachRequest(BaseModel):
subject: str = Field(..., min_length=1) subject: str = Field(..., min_length=1)
body: str = Field(..., min_length=1) body: str = Field(..., min_length=1)
idempotency_key: str = Field(..., min_length=8) idempotency_key: str = Field(..., min_length=8)
sender_identity: Optional[SenderIdentity] = None
legal_basis: str = Field(default="")
contact_discovery_source: str = Field(default="")
recipient_region: str = Field(default="unknown")
recipient_region_source: str = Field(default="user_attested", min_length=2)
consent_status: str = Field(default="unknown", min_length=2)
approved_by_human: bool = False
unsubscribe_url: Optional[HttpUrl] = None
one_click_unsubscribe: Optional[OneClickUnsubscribe] = None
template_id: Optional[str] = Field(None, description="Optional template ID for personalization") template_id: Optional[str] = Field(None, description="Optional template ID for personalization")
template_variables: Optional[dict] = Field(None, description="Variable values for template personalization") template_variables: Optional[dict] = Field(None, description="Variable values for template personalization")
@@ -166,6 +192,9 @@ class SendOutreachResponse(BaseModel):
status: str status: str
policy_allowed: bool policy_allowed: bool
policy_reasons: List[str] = Field(default_factory=list) policy_reasons: List[str] = Field(default_factory=list)
effective_sender_email: Optional[str] = None
duplicate: bool = False
retry_policy: Optional[str] = None
class OutreachAttemptRecord(BaseModel): class OutreachAttemptRecord(BaseModel):
@@ -240,10 +269,15 @@ class PolicyValidationRequest(BaseModel):
recipient_email: str = Field(..., min_length=1) recipient_email: str = Field(..., min_length=1)
recipient_domain: str recipient_domain: str
recipient_region: str = Field(default="unknown") recipient_region: str = Field(default="unknown")
legal_basis: str = Field(..., min_length=2) recipient_region_source: str = Field(default="user_attested", min_length=2)
legal_basis: str = Field(default="")
contact_discovery_source: str = Field(default="")
consent_status: str = Field(default="unknown", min_length=2)
approved_by_human: bool = False approved_by_human: bool = False
unsubscribe_url: Optional[HttpUrl] = None unsubscribe_url: Optional[HttpUrl] = None
sender_identity: str = Field(..., min_length=3) one_click_unsubscribe: Optional[OneClickUnsubscribe] = None
sender_identity: Optional[SenderIdentity] = None
sender_email: Optional[str] = Field(None, description="Transport sender email, if separate from identity")
idempotency_key: str = Field(..., min_length=8) idempotency_key: str = Field(..., min_length=8)
@@ -296,8 +330,9 @@ class ConversionFunnelResponse(BaseModel):
class BulkStatusUpdateRequest(BaseModel): class BulkStatusUpdateRequest(BaseModel):
lead_ids: List[str] = Field(..., min_length=1) lead_ids: List[str] = Field(..., min_length=1)
status: str = Field(..., min_length=1) status: Literal["discovered", "contacted", "replied", "placed", "bounced", "unsubscribed"]
notes: Optional[str] = None notes: Optional[str] = None
campaign_id: Optional[str] = Field(default=None, min_length=1)
class BulkStatusUpdateResponse(BaseModel): class BulkStatusUpdateResponse(BaseModel):

View File

@@ -104,6 +104,8 @@ class BacklinkOutreachReplyMonitor:
from_email = parsed_msg.get("From", "") from_email = parsed_msg.get("From", "")
subject = parsed_msg.get("Subject", "") subject = parsed_msg.get("Subject", "")
received_at = parsed_msg.get("Date", "") received_at = parsed_msg.get("Date", "")
in_reply_to = parsed_msg.get("In-Reply-To", "")
references = parsed_msg.get("References", "")
# Extract body # Extract body
body = "" body = ""
@@ -137,6 +139,8 @@ class BacklinkOutreachReplyMonitor:
"body": body[:5000], "body": body[:5000],
"classification": classification, "classification": classification,
"received_at": received_at_iso, "received_at": received_at_iso,
"in_reply_to": in_reply_to,
"references": references,
} }
except Exception as e: except Exception as e:
logger.error(f"Failed to parse reply: {e}") logger.error(f"Failed to parse reply: {e}")

View File

@@ -8,11 +8,10 @@ from __future__ import annotations
import asyncio import asyncio
import re import re
import time
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from urllib.parse import urlparse from urllib.parse import quote, urlparse
import requests import httpx
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from loguru import logger from loguru import logger
@@ -34,26 +33,47 @@ class BacklinkOutreachScraper:
# -- Public API -- # -- Public API --
async def deep_discover( async def deep_discover(
self, keyword: str, max_results: int = 15 self,
keyword: str,
max_results: int = 15,
scrape_timeout_seconds: float = 15.0,
scrape_max_concurrency: int = 5,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Discover guest-post opportunities using Exa, falling back to DuckDuckGo.""" """Discover guest-post opportunities using Exa, falling back to DuckDuckGo."""
if self._is_exa_available(): if self._is_exa_available():
logger.info(f"[BacklinkScraper] Using Exa for keyword: {keyword}") logger.info(f"[BacklinkScraper] Using Exa for keyword: {keyword}")
return await self._discover_with_exa(keyword, max_results) return await self._discover_with_exa(keyword, max_results)
logger.info(f"[BacklinkScraper] Exa unavailable, falling back to DuckDuckGo for: {keyword}") logger.info(f"[BacklinkScraper] Exa unavailable, falling back to DuckDuckGo for: {keyword}")
return await self._discover_with_duckduckgo(keyword, max_results) return await self._discover_with_duckduckgo(
keyword,
max_results,
scrape_timeout_seconds=scrape_timeout_seconds,
scrape_max_concurrency=scrape_max_concurrency,
)
def scrape_urls(self, urls: List[str]) -> List[Dict[str, Any]]: async def scrape_urls(
"""Fetch full page content for a list of URLs using Exa get_contents.""" self,
urls: List[str],
timeout_seconds: float = 15.0,
max_concurrency: int = 5,
) -> List[Dict[str, Any]]:
"""Fetch full page content with non-blocking fallbacks and bounded concurrency."""
exa = self._get_exa_sdk() exa = self._get_exa_sdk()
if not exa: if not exa:
return self._scrape_urls_fallback(urls) return await self._scrape_urls_fallback(
urls, timeout_seconds=timeout_seconds, max_concurrency=max_concurrency
)
loop = asyncio.get_running_loop()
try: try:
result = exa.get_contents(urls, text={"max_characters": 5000}) result = await loop.run_in_executor(
None, lambda: exa.get_contents(urls, text={"max_characters": 5000})
)
return self._parse_get_contents_result(result) return self._parse_get_contents_result(result)
except Exception as e: except Exception as e:
logger.warning(f"[BacklinkScraper] Exa get_contents failed: {e}") logger.warning(f"[BacklinkScraper] Exa get_contents failed: {e}")
return self._scrape_urls_fallback(urls) return await self._scrape_urls_fallback(
urls, timeout_seconds=timeout_seconds, max_concurrency=max_concurrency
)
# -- Availability -- # -- Availability --
@@ -207,24 +227,35 @@ class BacklinkOutreachScraper:
# -- DuckDuckGo Fallback Discovery -- # -- DuckDuckGo Fallback Discovery --
async def _discover_with_duckduckgo(self, keyword: str, max_results: int) -> Dict[str, Any]: async def _discover_with_duckduckgo(
self,
keyword: str,
max_results: int,
scrape_timeout_seconds: float = 15.0,
scrape_max_concurrency: int = 5,
) -> Dict[str, Any]:
queries = self._generate_search_queries(keyword) queries = self._generate_search_queries(keyword)
dedup: Dict[str, Dict[str, Any]] = {} dedup: Dict[str, Dict[str, Any]] = {}
for query in queries[:4]: async with httpx.AsyncClient(timeout=httpx.Timeout(12.0), follow_redirects=True) as client:
rows = self._duckduckgo_search(query) for query in queries[:4]:
for row in rows: rows = await self._duckduckgo_search(query, client=client)
norm_url = self._normalize_url(row.get("url", "")) for row in rows:
if not norm_url or norm_url in dedup: norm_url = self._normalize_url(row.get("url", ""))
continue if not norm_url or norm_url in dedup:
dedup[norm_url] = row continue
if len(dedup) >= max_results: dedup[norm_url] = row
break if len(dedup) >= max_results:
time.sleep(0.4) break
await asyncio.sleep(0.4)
# Scrape discovered URLs with Exa get_contents (or fallback) # Scrape discovered URLs with Exa get_contents (or fallback)
urls_to_scrape = list(dedup.keys())[:max_results] urls_to_scrape = list(dedup.keys())[:max_results]
scraped = self.scrape_urls(urls_to_scrape) scraped = await self.scrape_urls(
urls_to_scrape,
timeout_seconds=scrape_timeout_seconds,
max_concurrency=scrape_max_concurrency,
)
scraped_map = {self._normalize_url(s.get("url", "")): s for s in scraped} scraped_map = {self._normalize_url(s.get("url", "")): s for s in scraped}
# Merge DDG results with scraped content # Merge DDG results with scraped content
@@ -250,51 +281,76 @@ class BacklinkOutreachScraper:
"opportunities": opportunities, "opportunities": opportunities,
} }
def _duckduckgo_search(self, query: str, retries: int = 2) -> List[Dict[str, Any]]: async def _duckduckgo_search(
encoded = requests.utils.quote(query) self,
query: str,
retries: int = 2,
client: Optional[httpx.AsyncClient] = None,
) -> List[Dict[str, Any]]:
encoded = quote(query)
url = f"https://duckduckgo.com/html/?q={encoded}" url = f"https://duckduckgo.com/html/?q={encoded}"
headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"} headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"}
for attempt in range(retries + 1):
try:
resp = requests.get(url, headers=headers, timeout=12)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
results = []
for result in soup.select("div.result")[:10]:
anchor = result.select_one("a.result__a")
snippet_el = result.select_one("a.result__snippet") or result.select_one("div.result__snippet")
if not anchor or not anchor.get("href"):
continue
results.append({
"url": anchor.get("href"),
"title": anchor.get_text(strip=True),
"snippet": snippet_el.get_text(" ", strip=True) if snippet_el else "",
"highlights": [],
})
return results
except Exception:
if attempt == retries:
return []
time.sleep(0.6 * (attempt + 1))
return []
def _scrape_urls_fallback(self, urls: List[str]) -> List[Dict[str, Any]]: async def _request(active_client: httpx.AsyncClient) -> List[Dict[str, Any]]:
"""Basic HTTP scrape when Exa is unavailable.""" for attempt in range(retries + 1):
results = [] try:
resp = await active_client.get(url, headers=headers)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
results = []
for result in soup.select("div.result")[:10]:
anchor = result.select_one("a.result__a")
snippet_el = result.select_one("a.result__snippet") or result.select_one("div.result__snippet")
if not anchor or not anchor.get("href"):
continue
results.append({
"url": anchor.get("href"),
"title": anchor.get_text(strip=True),
"snippet": snippet_el.get_text(" ", strip=True) if snippet_el else "",
"highlights": [],
})
return results
except (httpx.HTTPError, httpx.TimeoutException):
if attempt == retries:
return []
await asyncio.sleep(0.6 * (attempt + 1))
return []
if client is not None:
return await _request(client)
async with httpx.AsyncClient(timeout=httpx.Timeout(12.0), follow_redirects=True) as owned_client:
return await _request(owned_client)
async def _scrape_urls_fallback(
self,
urls: List[str],
timeout_seconds: float = 15.0,
max_concurrency: int = 5,
) -> List[Dict[str, Any]]:
"""Basic async HTTP scrape when Exa is unavailable."""
headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"} headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"}
for url in urls[:5]: semaphore = asyncio.Semaphore(max(1, max_concurrency))
try: timeout = httpx.Timeout(timeout_seconds)
resp = requests.get(url, headers=headers, timeout=15)
resp.raise_for_status() async def scrape_one(client: httpx.AsyncClient, url: str) -> Optional[Dict[str, Any]]:
soup = BeautifulSoup(resp.text, "html.parser") async with semaphore:
for tag in soup(["script", "style", "nav", "footer", "header"]): try:
tag.decompose() resp = await client.get(url, headers=headers)
text = soup.get_text(separator=" ", strip=True) resp.raise_for_status()
title = soup.title.get_text(strip=True) if soup.title else "" soup = BeautifulSoup(resp.text, "html.parser")
results.append({"url": url, "title": title, "text": text[:5000], "highlights": [], "summary": ""}) for tag in soup(["script", "style", "nav", "footer", "header"]):
except Exception: tag.decompose()
continue text = soup.get_text(separator=" ", strip=True)
return results title = soup.title.get_text(strip=True) if soup.title else ""
return {"url": url, "title": title, "text": text[:5000], "highlights": [], "summary": ""}
except (httpx.HTTPError, httpx.TimeoutException):
return None
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
tasks = [scrape_one(client, url) for url in urls]
scraped = await asyncio.gather(*tasks)
return [row for row in scraped if row]
# -- Enrichment Pipeline -- # -- Enrichment Pipeline --

View File

@@ -6,9 +6,11 @@ import os
import ssl import ssl
import smtplib import smtplib
import asyncio import asyncio
from dataclasses import dataclass, field
from email.mime.text import MIMEText from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
from typing import Optional from typing import List, Optional, Set
from uuid import uuid4
from loguru import logger from loguru import logger
@@ -17,11 +19,27 @@ SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
SMTP_USERNAME = os.getenv("SMTP_USERNAME", "") SMTP_USERNAME = os.getenv("SMTP_USERNAME", "")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "") SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "")
SMTP_FROM_EMAIL = os.getenv("SMTP_FROM_EMAIL", SMTP_USERNAME) SMTP_FROM_EMAIL = os.getenv("SMTP_FROM_EMAIL", SMTP_USERNAME)
SMTP_ALLOWED_FROM_EMAILS = os.getenv("SMTP_ALLOWED_FROM_EMAILS", "")
SMTP_USE_TLS = os.getenv("SMTP_USE_TLS", "true").lower() in ("true", "1", "yes") SMTP_USE_TLS = os.getenv("SMTP_USE_TLS", "true").lower() in ("true", "1", "yes")
SMTP_VERIFY_TLS = os.getenv("SMTP_VERIFY_TLS", "true").lower() in ("true", "1", "yes") SMTP_VERIFY_TLS = os.getenv("SMTP_VERIFY_TLS", "true").lower() in ("true", "1", "yes")
SMTP_SEND_TIMEOUT = int(os.getenv("SMTP_SEND_TIMEOUT", "30")) SMTP_SEND_TIMEOUT = int(os.getenv("SMTP_SEND_TIMEOUT", "30"))
@dataclass
class SenderAuthorizationResult:
authorized: bool
effective_sender_email: str = ""
failure_reasons: List[str] = field(default_factory=list)
@dataclass
class SendEmailResult:
success: bool
effective_sender_email: str = ""
message_id: str = ""
failure_reasons: List[str] = field(default_factory=list)
class BacklinkOutreachSender: class BacklinkOutreachSender:
def __init__(self): def __init__(self):
self._host = SMTP_HOST self._host = SMTP_HOST
@@ -29,6 +47,7 @@ class BacklinkOutreachSender:
self._username = SMTP_USERNAME self._username = SMTP_USERNAME
self._password = SMTP_PASSWORD self._password = SMTP_PASSWORD
self._from_email = SMTP_FROM_EMAIL or SMTP_USERNAME self._from_email = SMTP_FROM_EMAIL or SMTP_USERNAME
self._allowed_from_emails = SMTP_ALLOWED_FROM_EMAILS
self._use_tls = SMTP_USE_TLS self._use_tls = SMTP_USE_TLS
self._verify_tls = SMTP_VERIFY_TLS self._verify_tls = SMTP_VERIFY_TLS
self._timeout = SMTP_SEND_TIMEOUT self._timeout = SMTP_SEND_TIMEOUT
@@ -36,23 +55,75 @@ class BacklinkOutreachSender:
def is_configured(self) -> bool: def is_configured(self) -> bool:
return bool(self._username and self._password) return bool(self._username and self._password)
@staticmethod
def _normalize_email(email: Optional[str]) -> str:
return (email or "").strip().lower()
def _allowed_sender_aliases(self) -> Set[str]:
aliases = {
self._normalize_email(alias)
for alias in self._allowed_from_emails.split(",")
if self._normalize_email(alias)
}
for configured_sender in (self._from_email, self._username):
normalized = self._normalize_email(configured_sender)
if normalized:
aliases.add(normalized)
return aliases
def validate_sender_alias(self, from_email: Optional[str] = None) -> SenderAuthorizationResult:
default_sender = self._normalize_email(self._from_email or self._username)
requested_sender = self._normalize_email(from_email) or default_sender
if not self.is_configured():
return SenderAuthorizationResult(
authorized=False,
effective_sender_email=requested_sender,
failure_reasons=["smtp_not_configured"],
)
if not requested_sender:
return SenderAuthorizationResult(
authorized=False,
failure_reasons=["smtp_sender_missing"],
)
allowed_aliases = self._allowed_sender_aliases()
if requested_sender not in allowed_aliases:
return SenderAuthorizationResult(
authorized=False,
effective_sender_email=requested_sender,
failure_reasons=["sender_alias_not_authorized"],
)
return SenderAuthorizationResult(
authorized=True,
effective_sender_email=requested_sender,
)
async def send_email( async def send_email(
self, self,
to_email: str, to_email: str,
subject: str, subject: str,
body: str, body: str,
from_email: Optional[str] = None, from_email: Optional[str] = None,
) -> bool: ) -> SendEmailResult:
if not self.is_configured(): sender_validation = self.validate_sender_alias(from_email)
logger.error("SMTP not configured: set SMTP_USERNAME and SMTP_PASSWORD") if not sender_validation.authorized:
return False logger.error(f"SMTP sender validation failed: {sender_validation.failure_reasons}")
return SendEmailResult(
success=False,
effective_sender_email=sender_validation.effective_sender_email,
failure_reasons=sender_validation.failure_reasons,
)
sender = from_email or self._from_email sender = sender_validation.effective_sender_email
msg_id = f"<{uuid4().hex}@{sender.split('@')[-1] if '@' in sender else 'outreach.local'}>"
msg = MIMEMultipart("alternative") msg = MIMEMultipart("alternative")
msg["From"] = sender msg["From"] = sender
msg["To"] = to_email msg["To"] = to_email
msg["Subject"] = subject msg["Subject"] = subject
msg["Message-ID"] = msg_id
msg.attach(MIMEText(body, "plain")) msg.attach(MIMEText(body, "plain"))
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
@@ -78,7 +149,13 @@ class BacklinkOutreachSender:
logger.error(f"Unexpected error sending to {to_email}: {e}") logger.error(f"Unexpected error sending to {to_email}: {e}")
return False return False
return await loop.run_in_executor(None, _send) success = await loop.run_in_executor(None, _send)
return SendEmailResult(
success=success,
effective_sender_email=sender,
message_id=msg_id if success else "",
failure_reasons=[] if success else ["smtp_send_failed"],
)
def personalize(self, template: str, variables: dict) -> str: def personalize(self, template: str, variables: dict) -> str:
"""Replace {placeholder} variables in a template string.""" """Replace {placeholder} variables in a template string."""

View File

@@ -4,10 +4,11 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from urllib.parse import quote
import asyncio
import re import re
import time
import requests import httpx
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
import csv import csv
@@ -22,9 +23,6 @@ from services.backlink_outreach_models import (
) )
from services.backlink_outreach_storage import BacklinkOutreachStorageService from services.backlink_outreach_storage import BacklinkOutreachStorageService
DEFAULT_USER_DAILY_CAP = 100
DEFAULT_DOMAIN_DAILY_CAP = 20
@dataclass @dataclass
class SearchResult: class SearchResult:
url: str url: str
@@ -55,51 +53,67 @@ class BacklinkOutreachService:
f"{normalized} + 'Submit article'", f"{normalized} + 'Submit article'",
] ]
def search_for_urls(self, query: str, timeout_seconds: int = 12, retries: int = 2) -> List[SearchResult]: async def search_for_urls(
encoded_query = requests.utils.quote(query) self,
query: str,
timeout_seconds: int = 12,
retries: int = 2,
client: Optional[httpx.AsyncClient] = None,
) -> List[SearchResult]:
"""Search DuckDuckGo HTML using a non-blocking HTTP client."""
encoded_query = quote(query)
url = f"https://duckduckgo.com/html/?q={encoded_query}" url = f"https://duckduckgo.com/html/?q={encoded_query}"
headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"} headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"}
for attempt in range(retries + 1): async def _request(active_client: httpx.AsyncClient) -> List[SearchResult]:
try: for attempt in range(retries + 1):
response = requests.get(url, headers=headers, timeout=timeout_seconds) try:
response.raise_for_status() response = await active_client.get(url, headers=headers)
soup = BeautifulSoup(response.text, "html.parser") response.raise_for_status()
rows: List[SearchResult] = [] soup = BeautifulSoup(response.text, "html.parser")
for result in soup.select("div.result")[:10]: rows: List[SearchResult] = []
anchor = result.select_one("a.result__a") for result in soup.select("div.result")[:10]:
snippet = result.select_one("a.result__snippet") or result.select_one("div.result__snippet") anchor = result.select_one("a.result__a")
if not anchor or not anchor.get("href"): snippet = result.select_one("a.result__snippet") or result.select_one("div.result__snippet")
continue if not anchor or not anchor.get("href"):
rows.append( continue
SearchResult( rows.append(
url=anchor.get("href"), SearchResult(
title=anchor.get_text(strip=True), url=anchor.get("href"),
snippet=snippet.get_text(" ", strip=True) if snippet else "", title=anchor.get_text(strip=True),
snippet=snippet.get_text(" ", strip=True) if snippet else "",
)
) )
) return rows
return rows except (httpx.HTTPError, httpx.TimeoutException):
except Exception: if attempt == retries:
if attempt == retries: return []
return [] await asyncio.sleep(0.6 * (attempt + 1))
time.sleep(0.6 * (attempt + 1)) return []
return []
def discover_opportunities(self, keyword: str, max_results: int = 10) -> Dict[str, Any]: if client is not None:
return await _request(client)
timeout = httpx.Timeout(timeout_seconds)
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as owned_client:
return await _request(owned_client)
async def discover_opportunities_async(self, keyword: str, max_results: int = 10) -> Dict[str, Any]:
queries = self.generate_guest_post_queries(keyword)[:4] queries = self.generate_guest_post_queries(keyword)[:4]
dedup: Dict[str, SearchResult] = {} dedup: Dict[str, SearchResult] = {}
for query in queries: async with httpx.AsyncClient(timeout=httpx.Timeout(12.0), follow_redirects=True) as client:
for result in self.search_for_urls(query): for query in queries:
normalized_url = self._normalize_url(result.url) for result in await self.search_for_urls(query, client=client):
if not normalized_url or normalized_url in dedup: normalized_url = self._normalize_url(result.url)
continue if not normalized_url or normalized_url in dedup:
dedup[normalized_url] = result continue
dedup[normalized_url] = result
if len(dedup) >= max_results:
break
if len(dedup) >= max_results: if len(dedup) >= max_results:
break break
if len(dedup) >= max_results: await asyncio.sleep(0.4)
break
time.sleep(0.4)
opportunities: List[OpportunityRecord] = [] opportunities: List[OpportunityRecord] = []
for normalized_url, row in dedup.items(): for normalized_url, row in dedup.items():
@@ -118,6 +132,10 @@ class BacklinkOutreachService:
return {"keyword": keyword, "queries": queries, "opportunities": opportunities} return {"keyword": keyword, "queries": queries, "opportunities": opportunities}
def discover_opportunities(self, keyword: str, max_results: int = 10) -> Dict[str, Any]:
"""Synchronous compatibility wrapper for non-async callers."""
return asyncio.run(self.discover_opportunities_async(keyword, max_results))
def _normalize_url(self, url: str) -> str: def _normalize_url(self, url: str) -> str:
u = (url or "").strip() u = (url or "").strip()
if not u: if not u:
@@ -144,32 +162,76 @@ class BacklinkOutreachService:
def _get_storage(self) -> BacklinkOutreachStorageService: def _get_storage(self) -> BacklinkOutreachStorageService:
return BacklinkOutreachStorageService() return BacklinkOutreachStorageService()
CONSENT_REQUIRED_REGIONS = {"eu", "eea", "uk", "ca"}
MANUAL_REVIEW_REGIONS = {"unknown", "br", "cn", "jp", "kr"}
LOW_CONFIDENCE_REGION_SOURCES = {"tld_inference", "domain_tld", "inferred", "unknown"}
VALID_LEGAL_BASES = {"legitimate_interest", "consent", "contract"}
VALID_CONSENT_STATUSES = {"explicit", "implied", "not_required", "unknown"}
@staticmethod
def _has_one_click_unsubscribe(payload: PolicyValidationRequest) -> bool:
one_click = payload.one_click_unsubscribe
if not one_click or not one_click.enabled:
return False
return bool(one_click.mailto or (one_click.header_value or "").strip())
def validate_send_policy(self, payload: PolicyValidationRequest) -> PolicyValidationResponse: def validate_send_policy(self, payload: PolicyValidationRequest) -> PolicyValidationResponse:
reasons: List[str] = [] reasons: List[str] = []
storage = self._get_storage() storage = self._get_storage()
legal_basis = payload.legal_basis.strip().lower()
recipient_region = payload.recipient_region.strip().lower()
region_source = payload.recipient_region_source.strip().lower()
consent_status = payload.consent_status.strip().lower()
discovery_source = payload.contact_discovery_source.strip()
sender = payload.sender_identity
if payload.workspace_id.startswith("new-") and not payload.approved_by_human: if payload.workspace_id.startswith("new-") and not payload.approved_by_human:
reasons.append("human_review_required_for_new_workspace") reasons.append("human_review_required_for_new_workspace")
if payload.legal_basis.lower() not in {"legitimate_interest", "consent", "contract"}: if not legal_basis:
reasons.append("invalid_legal_basis") reasons.append("legal_basis_required")
if payload.recipient_region.lower() in {"eu", "eea"} and payload.legal_basis.lower() != "consent": elif legal_basis not in self.VALID_LEGAL_BASES:
reasons.append("region_requires_explicit_consent") reasons.append("invalid_legal_basis_recorded")
if not discovery_source:
reasons.append("contact_discovery_source_required")
if consent_status not in self.VALID_CONSENT_STATUSES:
reasons.append("invalid_consent_status")
if len(payload.sender_identity.strip()) < 3: has_unsubscribe = bool(payload.unsubscribe_url) or self._has_one_click_unsubscribe(payload)
reasons.append("sender_identity_required") if not has_unsubscribe:
reasons.append("unsubscribe_url_or_one_click_unsubscribe_required")
if not sender:
reasons.append("complete_sender_identity_required")
else:
sender_email = str(sender.email).strip()
if not sender.name.strip():
reasons.append("sender_name_required")
if not sender_email:
reasons.append("sender_email_required")
elif not re.match(r"^[^@\s]+@[^@\s]+\.[^@\s]+$", sender_email):
reasons.append("sender_email_invalid")
if not sender.organization.strip():
reasons.append("sender_organization_required")
if not sender.physical_mailing_address.strip():
reasons.append("sender_physical_mailing_address_required")
if payload.sender_email and sender_email.lower() != str(payload.sender_email).lower():
reasons.append("sender_identity_email_mismatch")
if recipient_region in self.CONSENT_REQUIRED_REGIONS:
if legal_basis != "consent" or consent_status != "explicit":
reasons.append("region_requires_recorded_explicit_consent")
elif recipient_region in self.MANUAL_REVIEW_REGIONS and not payload.approved_by_human:
reasons.append("manual_review_required_for_recipient_region")
if region_source in self.LOW_CONFIDENCE_REGION_SOURCES and not payload.approved_by_human:
reasons.append("manual_review_required_for_tld_or_unknown_region_source")
if storage.is_suppressed(str(payload.recipient_email), payload.recipient_domain, user_id=payload.user_id): if storage.is_suppressed(str(payload.recipient_email), payload.recipient_domain, user_id=payload.user_id):
reasons.append("recipient_suppressed") reasons.append("recipient_suppressed")
if storage.check_idempotency(payload.idempotency_key, user_id=payload.user_id): if storage.check_idempotency(payload.idempotency_key, user_id=payload.user_id):
reasons.append("duplicate_idempotency_key") reasons.append("duplicate_idempotency_key")
user_count = storage.get_user_send_count(payload.user_id)
domain_count = storage.get_domain_send_count(payload.recipient_domain, user_id=payload.user_id)
if user_count >= DEFAULT_USER_DAILY_CAP:
reasons.append("user_daily_cap_exceeded")
if domain_count >= DEFAULT_DOMAIN_DAILY_CAP:
reasons.append("domain_daily_cap_exceeded")
allowed = len(reasons) == 0 allowed = len(reasons) == 0
final_status = "approved" if allowed else "blocked" final_status = "approved" if allowed else "blocked"
@@ -199,15 +261,82 @@ class BacklinkOutreachService:
return "au" return "au"
return "unknown" return "unknown"
SMTP_RETRY_POLICY = "manual_retry_with_new_idempotency_key"
@staticmethod
def _decision_parts(attempt: Optional[dict]) -> List[str]:
if not attempt:
return []
reason = attempt.get("decision_reason") or ""
return [part.strip() for part in reason.split(";") if part.strip()]
def response_from_attempt(self, attempt: Optional[dict], duplicate: bool = False) -> SendOutreachResponse:
if not attempt:
return SendOutreachResponse(
attempt_id="",
status="duplicate",
policy_allowed=False,
policy_reasons=["duplicate_idempotency_key"],
duplicate=True,
)
status = attempt.get("status", "failed")
parts = self._decision_parts(attempt)
retry_policy = next((part.split("=", 1)[1] for part in parts if part.startswith("retry_policy=")), None)
reasons = [part for part in parts if not part.startswith("retry_policy=")]
if not retry_policy and ("smtp_send_failed" in reasons or "lead_has_no_email" in reasons):
retry_policy = self.SMTP_RETRY_POLICY
policy_allowed = status in {"queued", "approved", "sent", "failed"} and not any(
reason.startswith("human_review_required")
or reason in {
"invalid_legal_basis",
"region_requires_explicit_consent",
"sender_identity_required",
"recipient_suppressed",
"user_daily_cap_exceeded",
"domain_daily_cap_exceeded",
}
for reason in reasons
)
if status == "blocked":
policy_allowed = False
return SendOutreachResponse(
attempt_id=attempt.get("attempt_id", ""),
status=status,
policy_allowed=policy_allowed,
policy_reasons=reasons,
duplicate=duplicate,
retry_policy=retry_policy,
)
def send_outreach(self, request: SendOutreachRequest) -> SendOutreachResponse: def send_outreach(self, request: SendOutreachRequest) -> SendOutreachResponse:
storage = self._get_storage() storage = self._get_storage()
lead = storage.get_lead(request.lead_id, user_id=request.user_id) lead = storage.get_lead(request.lead_id, user_id=request.user_id)
if not lead: if not lead:
return SendOutreachResponse(attempt_id="", status="failed", policy_allowed=False, policy_reasons=["lead_not_found"]) return SendOutreachResponse(attempt_id="", status="failed", policy_allowed=False, policy_reasons=["lead_not_found"])
reservation = storage.reserve_attempt_idempotency(
lead_id=request.lead_id,
campaign_id=request.campaign_id,
idempotency_key=request.idempotency_key,
sender_email=request.sender_email,
subject=request.subject,
body=request.body,
user_id=request.user_id,
)
if not reservation.get("reserved"):
return self.response_from_attempt(reservation.get("attempt"), duplicate=True)
attempt = reservation.get("attempt") or {}
attempt_id = attempt.get("attempt_id", "")
domain = lead.get("domain", request.sender_email.split("@")[-1] if "@" in request.sender_email else "unknown") domain = lead.get("domain", request.sender_email.split("@")[-1] if "@" in request.sender_email else "unknown")
recipient_region = self._infer_region(domain) recipient_region = (request.recipient_region or "unknown").strip().lower()
legal_basis = "consent" if recipient_region == "eu" else "legitimate_interest" if recipient_region == "unknown":
recipient_region = self._infer_region(domain)
region_source = "tld_inference" if recipient_region != "unknown" else request.recipient_region_source
else:
region_source = request.recipient_region_source
policy_req = PolicyValidationRequest( policy_req = PolicyValidationRequest(
user_id=request.user_id, user_id=request.user_id,
@@ -216,31 +345,32 @@ class BacklinkOutreachService:
recipient_email=lead.get("email", ""), recipient_email=lead.get("email", ""),
recipient_domain=domain, recipient_domain=domain,
recipient_region=recipient_region, recipient_region=recipient_region,
legal_basis=legal_basis, recipient_region_source=region_source,
approved_by_human=False, legal_basis=request.legal_basis,
unsubscribe_url=None, contact_discovery_source=request.contact_discovery_source,
sender_identity=request.sender_email, consent_status=request.consent_status,
approved_by_human=request.approved_by_human,
unsubscribe_url=request.unsubscribe_url,
one_click_unsubscribe=request.one_click_unsubscribe,
sender_identity=request.sender_identity,
sender_email=request.sender_email,
idempotency_key=request.idempotency_key, idempotency_key=request.idempotency_key,
) )
policy = self.validate_send_policy(policy_req) policy = self.validate_send_policy(policy_req)
attempt = storage.add_attempt( updated_attempt = storage.update_attempt_status(
lead_id=request.lead_id, attempt_id,
campaign_id=request.campaign_id, "approved" if policy.allowed else "blocked",
idempotency_key=request.idempotency_key,
sender_email=request.sender_email,
subject=request.subject,
body=request.body,
status="approved" if policy.allowed else "blocked",
decision_reason="; ".join(policy.reasons) if policy.reasons else None, decision_reason="; ".join(policy.reasons) if policy.reasons else None,
user_id=request.user_id, user_id=request.user_id,
) ) or attempt
return SendOutreachResponse( return SendOutreachResponse(
attempt_id=attempt.get("attempt_id", ""), attempt_id=updated_attempt.get("attempt_id", attempt_id),
status=attempt.get("status", "failed"), status=updated_attempt.get("status", "failed"),
policy_allowed=policy.allowed, policy_allowed=policy.allowed,
policy_reasons=policy.reasons, policy_reasons=policy.reasons,
effective_sender_email=request.sender_email,
) )
def get_reporting_snapshot(self, user_id: str = "default") -> Dict[str, Any]: def get_reporting_snapshot(self, user_id: str = "default") -> Dict[str, Any]:
@@ -323,11 +453,23 @@ class BacklinkOutreachService:
writer.writerows([{k: self._sanitize_csv_value(v) for k, v in row.items()}]) writer.writerows([{k: self._sanitize_csv_value(v) for k, v in row.items()}])
return output.getvalue() return output.getvalue()
async def deep_discover(self, keyword: str, max_results: int = 15) -> Dict[str, Any]: async def deep_discover(
self,
keyword: str,
max_results: int = 15,
user_id: Optional[str] = None,
scrape_timeout_seconds: float = 15.0,
scrape_max_concurrency: int = 5,
) -> Dict[str, Any]:
"""Enhanced discovery using Exa neural search + DuckDuckGo with full-page scraping.""" """Enhanced discovery using Exa neural search + DuckDuckGo with full-page scraping."""
from services.backlink_outreach_scraper import BacklinkOutreachScraper from services.backlink_outreach_scraper import BacklinkOutreachScraper
scraper = BacklinkOutreachScraper(user_id=self._user_id if hasattr(self, '_user_id') else None) scraper = BacklinkOutreachScraper(user_id=user_id)
return await scraper.deep_discover(keyword, max_results) return await scraper.deep_discover(
keyword,
max_results,
scrape_timeout_seconds=scrape_timeout_seconds,
scrape_max_concurrency=scrape_max_concurrency,
)
def get_migration_coverage(self) -> Dict[str, Any]: def get_migration_coverage(self) -> Dict[str, Any]:
implemented = [ implemented = [

View File

@@ -6,6 +6,9 @@ from datetime import datetime, date
from uuid import uuid4 from uuid import uuid4
from typing import List, Optional from typing import List, Optional
from sqlalchemy import text as sql_text, func as sa_func from sqlalchemy import text as sql_text, func as sa_func
from sqlalchemy.exc import IntegrityError
LEAD_VALID_STATUSES = frozenset({"discovered", "contacted", "replied", "placed", "bounced", "unsubscribed"})
from services.database import get_session_for_user from services.database import get_session_for_user
from models.backlink_outreach_models import ( from models.backlink_outreach_models import (
@@ -16,6 +19,14 @@ from models.backlink_outreach_models import (
) )
class BacklinkCampaignNotFoundError(RuntimeError):
"""Raised when a backlink campaign is missing or not owned by the user."""
DEFAULT_USER_DAILY_CAP = 100
DEFAULT_DOMAIN_DAILY_CAP = 20
class BacklinkOutreachStorageService: class BacklinkOutreachStorageService:
_NEW_LEAD_COLUMNS = [ _NEW_LEAD_COLUMNS = [
"url", "page_title", "snippet", "confidence_score", "discovery_source", "notes" "url", "page_title", "snippet", "confidence_score", "discovery_source", "notes"
@@ -120,6 +131,14 @@ class BacklinkOutreachStorageService:
# -- Lead CRUD -- # -- Lead CRUD --
def _campaign_belongs_to_user(self, db, campaign_id: str, user_id: str) -> bool:
return (
db.query(BacklinkCampaign)
.filter(BacklinkCampaign.id == campaign_id, BacklinkCampaign.user_id == user_id)
.first()
is not None
)
def add_lead( def add_lead(
self, self,
campaign_id: str, campaign_id: str,
@@ -138,6 +157,17 @@ class BacklinkOutreachStorageService:
if not db: if not db:
raise RuntimeError("Database session unavailable") raise RuntimeError("Database session unavailable")
try: try:
if not self._campaign_belongs_to_user(db, campaign_id, user_id):
raise BacklinkCampaignNotFoundError("Campaign not found")
existing = (
db.query(BacklinkLead)
.filter(BacklinkLead.campaign_id == campaign_id, BacklinkLead.url == url)
.first()
)
if existing:
return self._lead_to_dict(existing)
lead = BacklinkLead( lead = BacklinkLead(
id=f"bl_{uuid4().hex[:16]}", id=f"bl_{uuid4().hex[:16]}",
campaign_id=campaign_id, campaign_id=campaign_id,
@@ -164,12 +194,25 @@ class BacklinkOutreachStorageService:
if not db: if not db:
raise RuntimeError("Database session unavailable") raise RuntimeError("Database session unavailable")
try: try:
if not self._campaign_belongs_to_user(db, campaign_id, user_id):
raise BacklinkCampaignNotFoundError("Campaign not found")
existing_urls = {
row[0]
for row in db.query(BacklinkLead.url)
.filter(BacklinkLead.campaign_id == campaign_id)
.all()
}
added = [] added = []
for data in leads_data: for data in leads_data:
url = data.get("url", "")
if url in existing_urls:
continue
lead = BacklinkLead( lead = BacklinkLead(
id=f"bl_{uuid4().hex[:16]}", id=f"bl_{uuid4().hex[:16]}",
campaign_id=campaign_id, campaign_id=campaign_id,
url=data.get("url", ""), url=url,
domain=data.get("domain", ""), domain=data.get("domain", ""),
page_title=data.get("page_title", ""), page_title=data.get("page_title", ""),
snippet=data.get("snippet", ""), snippet=data.get("snippet", ""),
@@ -182,6 +225,7 @@ class BacklinkOutreachStorageService:
) )
db.add(lead) db.add(lead)
added.append(lead) added.append(lead)
existing_urls.add(url)
db.commit() db.commit()
return [self._lead_to_dict(l) for l in added] return [self._lead_to_dict(l) for l in added]
finally: finally:
@@ -204,8 +248,16 @@ class BacklinkOutreachStorageService:
db.close() db.close()
def update_lead_status( def update_lead_status(
self, lead_id: str, user_id: str, status: str, notes: Optional[str] = None self,
lead_id: str,
user_id: str,
status: str,
notes: Optional[str] = None,
campaign_id: Optional[str] = None,
) -> Optional[dict]: ) -> Optional[dict]:
if status not in LEAD_VALID_STATUSES:
raise ValueError(f"Invalid status '{status}'. Valid values: {sorted(LEAD_VALID_STATUSES)}")
self._ensure_tables(user_id) self._ensure_tables(user_id)
db = get_session_for_user(user_id) db = get_session_for_user(user_id)
if not db: if not db:
@@ -214,6 +266,18 @@ class BacklinkOutreachStorageService:
lead = db.query(BacklinkLead).filter(BacklinkLead.id == lead_id).first() lead = db.query(BacklinkLead).filter(BacklinkLead.id == lead_id).first()
if not lead: if not lead:
return None return None
campaign = (
db.query(BacklinkCampaign)
.filter(BacklinkCampaign.id == lead.campaign_id, BacklinkCampaign.user_id == user_id)
.first()
)
if not campaign:
raise PermissionError("Lead does not belong to the current user")
if campaign_id and lead.campaign_id != campaign_id:
return None
lead.status = status lead.status = status
if notes is not None: if notes is not None:
lead.notes = notes lead.notes = notes
@@ -222,6 +286,44 @@ class BacklinkOutreachStorageService:
finally: finally:
db.close() db.close()
def get_lead_access_issues(
self, lead_ids: List[str], user_id: str, campaign_id: Optional[str] = None
) -> dict:
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return {"missing": list(dict.fromkeys(lead_ids)), "unauthorized": []}
try:
unique_lead_ids = list(dict.fromkeys(lead_ids))
access_rows = self._get_lead_access_rows(db, unique_lead_ids)
missing: List[str] = []
unauthorized: List[str] = []
for lid in unique_lead_ids:
access = access_rows.get(lid)
if not access:
missing.append(lid)
elif access["user_id"] != user_id:
unauthorized.append(lid)
elif campaign_id and access["campaign_id"] != campaign_id:
missing.append(lid)
return {"missing": missing, "unauthorized": unauthorized}
finally:
db.close()
def _get_lead_access_rows(self, db, lead_ids: List[str]) -> dict:
if not lead_ids:
return {}
rows = (
db.query(BacklinkLead.id, BacklinkLead.campaign_id, BacklinkCampaign.user_id)
.outerjoin(BacklinkCampaign, BacklinkLead.campaign_id == BacklinkCampaign.id)
.filter(BacklinkLead.id.in_(lead_ids))
.all()
)
return {
row.id: {"campaign_id": row.campaign_id, "user_id": row.user_id}
for row in rows
}
@staticmethod @staticmethod
def _lead_to_dict(lead) -> dict: def _lead_to_dict(lead) -> dict:
return { return {
@@ -241,6 +343,79 @@ class BacklinkOutreachStorageService:
# -- Outreach Attempt CRUD -- # -- Outreach Attempt CRUD --
def get_attempt_by_idempotency_key(self, idempotency_key: str, user_id: str = "default") -> Optional[dict]:
"""Return the existing attempt for an idempotency key visible to the user."""
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return None
try:
attempt = (
db.query(OutreachAttempt)
.join(BacklinkCampaign, OutreachAttempt.campaign_id == BacklinkCampaign.id)
.filter(
OutreachAttempt.idempotency_key == idempotency_key,
BacklinkCampaign.user_id == user_id,
)
.first()
)
return self._attempt_to_dict(attempt) if attempt else None
finally:
db.close()
def reserve_attempt_idempotency(
self,
lead_id: str,
campaign_id: str,
idempotency_key: str,
sender_email: str = "",
subject: str = "",
body: str = "",
user_id: str = "default",
) -> dict:
"""Atomically reserve an outreach idempotency key by creating the attempt row.
Returns {"reserved": True, "attempt": attempt_dict} for the caller that won
the reservation, or {"reserved": False, "attempt": existing_attempt_or_none}
when the unique key already exists. Duplicate rows are detected by the
database unique constraint so concurrent requests do not both proceed to
policy approval or SMTP delivery.
"""
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
raise RuntimeError("Database session unavailable")
try:
attempt = OutreachAttempt(
id=f"att_{uuid4().hex[:16]}",
lead_id=lead_id,
campaign_id=campaign_id,
idempotency_key=idempotency_key,
sender_email=sender_email,
subject=subject,
body=body,
status="queued",
created_at=datetime.utcnow(),
)
db.add(attempt)
db.commit()
return {"reserved": True, "attempt": self._attempt_to_dict(attempt)}
except IntegrityError:
db.rollback()
existing = (
db.query(OutreachAttempt)
.join(BacklinkCampaign, OutreachAttempt.campaign_id == BacklinkCampaign.id)
.filter(
OutreachAttempt.idempotency_key == idempotency_key,
BacklinkCampaign.user_id == user_id,
)
.first()
)
return {"reserved": False, "attempt": self._attempt_to_dict(existing) if existing else None}
finally:
db.close()
def add_attempt( def add_attempt(
self, self,
lead_id: str, lead_id: str,
@@ -273,6 +448,20 @@ class BacklinkOutreachStorageService:
db.add(attempt) db.add(attempt)
db.commit() db.commit()
return self._attempt_to_dict(attempt) return self._attempt_to_dict(attempt)
except IntegrityError:
db.rollback()
existing = (
db.query(OutreachAttempt)
.join(BacklinkCampaign, OutreachAttempt.campaign_id == BacklinkCampaign.id)
.filter(
OutreachAttempt.idempotency_key == idempotency_key,
BacklinkCampaign.user_id == user_id,
)
.first()
)
if existing:
return self._attempt_to_dict(existing)
raise
finally: finally:
db.close() db.close()
@@ -325,6 +514,7 @@ class BacklinkOutreachStorageService:
"decision_reason": attempt.decision_reason, "decision_reason": attempt.decision_reason,
"sent_at": attempt.sent_at.isoformat() if attempt.sent_at else None, "sent_at": attempt.sent_at.isoformat() if attempt.sent_at else None,
"created_at": attempt.created_at.isoformat() if attempt.created_at else None, "created_at": attempt.created_at.isoformat() if attempt.created_at else None,
"message_id": attempt.message_id or "",
} }
def find_attempt_by_from_email(self, from_email: str, user_id: str = "default") -> Optional[str]: def find_attempt_by_from_email(self, from_email: str, user_id: str = "default") -> Optional[str]:
@@ -346,6 +536,37 @@ class BacklinkOutreachStorageService:
finally: finally:
db.close() db.close()
def update_attempt_message_id(self, attempt_id: str, message_id: str, user_id: str = "default") -> Optional[dict]:
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return None
try:
attempt = db.query(OutreachAttempt).filter(OutreachAttempt.id == attempt_id).first()
if not attempt:
return None
attempt.message_id = message_id
db.commit()
return self._attempt_to_dict(attempt)
finally:
db.close()
def find_attempt_by_message_id(self, message_id: str, user_id: str = "default") -> Optional[str]:
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return None
try:
clean = message_id.strip()
attempt = (
db.query(OutreachAttempt)
.filter(OutreachAttempt.message_id == clean)
.first()
)
return attempt.id if attempt else None
finally:
db.close()
# -- Outreach Reply CRUD -- # -- Outreach Reply CRUD --
def reply_exists(self, from_email: str, subject: str, user_id: str = "default") -> bool: def reply_exists(self, from_email: str, subject: str, user_id: str = "default") -> bool:
@@ -678,6 +899,9 @@ class BacklinkOutreachStorageService:
db.add(entry) db.add(entry)
db.commit() db.commit()
return {"idempotency_key": idempotency_key} return {"idempotency_key": idempotency_key}
except IntegrityError:
db.rollback()
return {"idempotency_key": idempotency_key}
finally: finally:
db.close() db.close()
@@ -686,27 +910,6 @@ class BacklinkOutreachStorageService:
def _today(self) -> date: def _today(self) -> date:
return date.today() return date.today()
def increment_user_send_counter(self, user_id: str) -> int:
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return 0
try:
today = self._today()
row_id = f"scu_{uuid4().hex[:16]}"
db.execute(sql_text(
"INSERT INTO backlink_send_counters_user (id, user_id, date, count) "
"VALUES (:id, :uid, :dt, 1) "
"ON CONFLICT (user_id, date) DO UPDATE SET count = count + 1"
), {"id": row_id, "uid": user_id, "dt": today})
db.commit()
result = db.query(SendCounterUser.count).filter(
SendCounterUser.user_id == user_id, SendCounterUser.date == today
).first()
return result[0] if result else 0
finally:
db.close()
def get_user_send_count(self, user_id: str) -> int: def get_user_send_count(self, user_id: str) -> int:
db = get_session_for_user(user_id) db = get_session_for_user(user_id)
if not db: if not db:
@@ -722,28 +925,6 @@ class BacklinkOutreachStorageService:
finally: finally:
db.close() db.close()
def increment_domain_send_counter(self, domain: str, user_id: str = "default") -> int:
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return 0
try:
today = self._today()
domain_lower = domain.lower()
row_id = f"scd_{uuid4().hex[:16]}"
db.execute(sql_text(
"INSERT INTO backlink_send_counters_domain (id, domain, date, count) "
"VALUES (:id, :dom, :dt, 1) "
"ON CONFLICT (domain, date) DO UPDATE SET count = count + 1"
), {"id": row_id, "dom": domain_lower, "dt": today})
db.commit()
result = db.query(SendCounterDomain.count).filter(
SendCounterDomain.domain == domain_lower, SendCounterDomain.date == today
).first()
return result[0] if result else 0
finally:
db.close()
def get_domain_send_count(self, domain: str, user_id: str = "default") -> int: def get_domain_send_count(self, domain: str, user_id: str = "default") -> int:
db = get_session_for_user(user_id) db = get_session_for_user(user_id)
if not db: if not db:
@@ -759,6 +940,73 @@ class BacklinkOutreachStorageService:
finally: finally:
db.close() db.close()
def try_increment_user_send_counter(self, user_id: str) -> tuple:
"""Atomically check cap and increment. Returns (within_cap, new_count)."""
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return True, 0
try:
today = self._today()
current = (
db.query(SendCounterUser.count)
.filter(SendCounterUser.user_id == user_id, SendCounterUser.date == today)
.scalar()
) or 0
if current >= DEFAULT_USER_DAILY_CAP:
db.close()
return False, current
row_id = f"scu_{uuid4().hex[:16]}"
db.execute(sql_text(
"INSERT INTO backlink_send_counters_user (id, user_id, date, count) "
"VALUES (:id, :uid, :dt, 1) "
"ON CONFLICT (user_id, date) DO UPDATE SET count = count + 1"
), {"id": row_id, "uid": user_id, "dt": today})
db.commit()
result = db.query(SendCounterUser.count).filter(
SendCounterUser.user_id == user_id, SendCounterUser.date == today
).first()
return True, result[0] if result else 0
except Exception:
db.rollback()
return True, 0
finally:
db.close()
def try_increment_domain_send_counter(self, domain: str, user_id: str = "default") -> tuple:
"""Atomically check cap and increment. Returns (within_cap, new_count)."""
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return True, 0
try:
today = self._today()
domain_lower = domain.lower()
current = (
db.query(SendCounterDomain.count)
.filter(SendCounterDomain.domain == domain_lower, SendCounterDomain.date == today)
.scalar()
) or 0
if current >= DEFAULT_DOMAIN_DAILY_CAP:
db.close()
return False, current
row_id = f"scd_{uuid4().hex[:16]}"
db.execute(sql_text(
"INSERT INTO backlink_send_counters_domain (id, domain, date, count) "
"VALUES (:id, :dom, :dt, 1) "
"ON CONFLICT (domain, date) DO UPDATE SET count = count + 1"
), {"id": row_id, "dom": domain_lower, "dt": today})
db.commit()
result = db.query(SendCounterDomain.count).filter(
SendCounterDomain.domain == domain_lower, SendCounterDomain.date == today
).first()
return True, result[0] if result else 0
except Exception:
db.rollback()
return True, 0
finally:
db.close()
# -- Audit Log -- # -- Audit Log --
def add_audit_log( def add_audit_log(

View File

@@ -193,10 +193,10 @@ const App: React.FC = () => {
<Route path="/dashboard" element={<ProtectedRoute><MainDashboard /></ProtectedRoute>} /> <Route path="/dashboard" element={<ProtectedRoute><MainDashboard /></ProtectedRoute>} />
<Route path="/seo" element={<ProtectedRoute><FeatureRoute feature="seo"><SEODashboard /></FeatureRoute></ProtectedRoute>} /> <Route path="/seo" element={<ProtectedRoute><FeatureRoute feature="seo"><SEODashboard /></FeatureRoute></ProtectedRoute>} />
<Route path="/seo-dashboard" element={<ProtectedRoute><FeatureRoute feature="seo"><SEODashboard /></FeatureRoute></ProtectedRoute>} /> <Route path="/seo-dashboard" element={<ProtectedRoute><FeatureRoute feature="seo"><SEODashboard /></FeatureRoute></ProtectedRoute>} />
<Route path="/backlink-outreach" element={<ProtectedRoute><FeatureRoute feature="seo"><BacklinkOutreachDashboard /></FeatureRoute></ProtectedRoute>} /> <Route path="/backlink-outreach" element={<ProtectedRoute><FeatureRoute feature="backlinking"><BacklinkOutreachDashboard /></FeatureRoute></ProtectedRoute>} />
<Route path="/content-planning" element={<ProtectedRoute><FeatureRoute feature="content-planning"><ContentPlanningDashboard /></FeatureRoute></ProtectedRoute>} /> <Route path="/content-planning" element={<ProtectedRoute><FeatureRoute feature="content-planning"><ContentPlanningDashboard /></FeatureRoute></ProtectedRoute>} />
<Route path="/facebook-writer" element={<ProtectedRoute><FeatureRoute feature="social"><FacebookWriter /></FeatureRoute></ProtectedRoute>} /> <Route path="/facebook-writer" element={<ProtectedRoute><FeatureRoute feature="facebook"><FacebookWriter /></FeatureRoute></ProtectedRoute>} />
<Route path="/linkedin-writer" element={<ProtectedRoute><FeatureRoute feature="social"><LinkedInWriter /></FeatureRoute></ProtectedRoute>} /> <Route path="/linkedin-writer" element={<ProtectedRoute><FeatureRoute feature="linkedin"><LinkedInWriter /></FeatureRoute></ProtectedRoute>} />
<Route path="/blog-writer" element={<ProtectedRoute><FeatureRoute feature="blog_writer"><BlogWriter /></FeatureRoute></ProtectedRoute>} /> <Route path="/blog-writer" element={<ProtectedRoute><FeatureRoute feature="blog_writer"><BlogWriter /></FeatureRoute></ProtectedRoute>} />
<Route path="/story-writer" element={<ProtectedRoute><FeatureRoute feature="story"><StoryWriter /></FeatureRoute></ProtectedRoute>} /> <Route path="/story-writer" element={<ProtectedRoute><FeatureRoute feature="story"><StoryWriter /></FeatureRoute></ProtectedRoute>} />
<Route path="/story-projects" element={<ProtectedRoute><FeatureRoute feature="story"><StoryProjectList /></FeatureRoute></ProtectedRoute>} /> <Route path="/story-projects" element={<ProtectedRoute><FeatureRoute feature="story"><StoryProjectList /></FeatureRoute></ProtectedRoute>} />

View File

@@ -76,6 +76,20 @@ export interface DeepDiscoveryResponse {
// -- Policy -- // -- Policy --
export interface SenderIdentity {
name: string;
email: string;
organization: string;
physical_mailing_address: string;
reply_to_email?: string;
}
export interface OneClickUnsubscribe {
enabled: boolean;
mailto?: string;
header_value?: string;
}
export interface BacklinkPolicyValidationRequest { export interface BacklinkPolicyValidationRequest {
user_id: string; user_id: string;
workspace_id: string; workspace_id: string;
@@ -83,10 +97,15 @@ export interface BacklinkPolicyValidationRequest {
recipient_email: string; recipient_email: string;
recipient_domain: string; recipient_domain: string;
recipient_region: string; recipient_region: string;
recipient_region_source: string;
legal_basis: string; legal_basis: string;
contact_discovery_source: string;
consent_status: string;
approved_by_human: boolean; approved_by_human: boolean;
unsubscribe_url?: string; unsubscribe_url?: string;
sender_identity: string; one_click_unsubscribe?: OneClickUnsubscribe;
sender_identity: SenderIdentity;
sender_email?: string;
idempotency_key: string; idempotency_key: string;
} }
@@ -139,7 +158,7 @@ export interface LeadRecord {
email: string | null; email: string | null;
confidence_score: number; confidence_score: number;
discovery_source: string; discovery_source: string;
status: string; status: LeadStatus;
notes: string | null; notes: string | null;
created_at: string | null; created_at: string | null;
} }
@@ -160,9 +179,12 @@ export interface LeadCreateRequest {
notes?: string; notes?: string;
} }
export type LeadStatus = 'discovered' | 'contacted' | 'replied' | 'placed' | 'bounced' | 'unsubscribed';
export interface LeadStatusUpdateRequest { export interface LeadStatusUpdateRequest {
status: string; status: LeadStatus;
notes?: string; notes?: string;
campaign_id?: string;
} }
export interface CampaignDetailResponse { export interface CampaignDetailResponse {
@@ -183,6 +205,15 @@ export interface SendOutreachRequest {
subject: string; subject: string;
body: string; body: string;
idempotency_key: string; idempotency_key: string;
sender_identity: SenderIdentity;
legal_basis: string;
contact_discovery_source: string;
recipient_region: string;
recipient_region_source: string;
consent_status: string;
approved_by_human: boolean;
unsubscribe_url?: string;
one_click_unsubscribe?: OneClickUnsubscribe;
template_id?: string; template_id?: string;
template_variables?: Record<string, string>; template_variables?: Record<string, string>;
} }
@@ -192,6 +223,7 @@ export interface SendOutreachResponse {
status: string; status: string;
policy_allowed: boolean; policy_allowed: boolean;
policy_reasons: string[]; policy_reasons: string[];
effective_sender_email?: string | null;
} }
export interface OutreachAttemptRecord { export interface OutreachAttemptRecord {
@@ -305,8 +337,9 @@ export interface FollowUpRequest {
export interface BulkStatusUpdateRequest { export interface BulkStatusUpdateRequest {
lead_ids: string[]; lead_ids: string[];
status: string; status: LeadStatus;
notes?: string; notes?: string;
campaign_id?: string;
} }
export interface BulkStatusUpdateResponse { export interface BulkStatusUpdateResponse {

View File

@@ -12,6 +12,7 @@ import {
GenerateEmailRequest, GenerateEmailRequest,
bulkUpdateLeadStatus, bulkUpdateLeadStatus,
updateLeadStatus, updateLeadStatus,
addLeadToCampaign,
fetchCampaignAnalyticsVolume, fetchCampaignAnalyticsVolume,
fetchCampaignAnalyticsFunnel, fetchCampaignAnalyticsFunnel,
CampaignVolumePoint, CampaignVolumePoint,
@@ -25,7 +26,7 @@ import { LineChart, Line, BarChart, Bar, XAxis, YAxis, CartesianGrid, Tooltip as
type Tab = 'campaigns' | 'discover' | 'leads' | 'composer' | 'analytics'; type Tab = 'campaigns' | 'discover' | 'leads' | 'composer' | 'analytics';
const STATUS_OPTIONS = ['discovered', 'contacted', 'replied', 'placed', 'bounced', 'unsubscribed']; const STATUS_OPTIONS = ['discovered', 'contacted', 'replied', 'placed', 'bounced', 'unsubscribed'] as const;
const STATUS_EXPLANATIONS: Record<string, string> = { const STATUS_EXPLANATIONS: Record<string, string> = {
discovered: 'Lead found but not yet contacted', discovered: 'Lead found but not yet contacted',
@@ -116,6 +117,19 @@ const BacklinkOutreachDashboard: React.FC = () => {
const [subjectSuggestions, setSubjectSuggestions] = useState<string[]>([]); const [subjectSuggestions, setSubjectSuggestions] = useState<string[]>([]);
const [isGenerating, setIsGenerating] = useState(false); const [isGenerating, setIsGenerating] = useState(false);
const [senderName, setSenderName] = useState('');
const [senderEmail, setSenderEmail] = useState('');
const [senderOrganization, setSenderOrganization] = useState('');
const [senderAddress, setSenderAddress] = useState('');
const [unsubscribeUrl, setUnsubscribeUrl] = useState('');
const [oneClickUnsubscribe, setOneClickUnsubscribe] = useState(false);
const [legalBasis, setLegalBasis] = useState('legitimate_interest');
const [contactDiscoverySource, setContactDiscoverySource] = useState('');
const [recipientRegion, setRecipientRegion] = useState('unknown');
const [recipientRegionSource, setRecipientRegionSource] = useState('user_attested');
const [consentStatus, setConsentStatus] = useState('unknown');
const [approvedByHuman, setApprovedByHuman] = useState(false);
const [leadName, setLeadName] = useState(''); const [leadName, setLeadName] = useState('');
const [leadSite, setLeadSite] = useState(''); const [leadSite, setLeadSite] = useState('');
const [leadContentTopic, setLeadContentTopic] = useState(''); const [leadContentTopic, setLeadContentTopic] = useState('');
@@ -126,7 +140,7 @@ const BacklinkOutreachDashboard: React.FC = () => {
const [templateName, setTemplateName] = useState(''); const [templateName, setTemplateName] = useState('');
const [selectedLeadIds, setSelectedLeadIds] = useState<Set<string>>(new Set()); const [selectedLeadIds, setSelectedLeadIds] = useState<Set<string>>(new Set());
const [bulkStatus, setBulkStatus] = useState('contacted'); const [bulkStatus, setBulkStatus] = useState<'discovered' | 'contacted' | 'replied' | 'placed' | 'bounced' | 'unsubscribed'>('contacted');
const [volumeData, setVolumeData] = useState<CampaignVolumePoint[]>([]); const [volumeData, setVolumeData] = useState<CampaignVolumePoint[]>([]);
const [funnelData, setFunnelData] = useState<FunnelStage[]>([]); const [funnelData, setFunnelData] = useState<FunnelStage[]>([]);
@@ -190,9 +204,24 @@ const BacklinkOutreachDashboard: React.FC = () => {
}, [keyword, deepDiscover]); }, [keyword, deepDiscover]);
const handleDiscoverAndSave = useCallback(async () => { const handleDiscoverAndSave = useCallback(async () => {
if (!keyword.trim() || !discoverCampaignId) return; if (!keyword.trim() || !discoverCampaignId || discoveredOpportunities.length === 0) return;
await deepDiscover(keyword.trim(), 15, discoverCampaignId); for (const opp of discoveredOpportunities) {
}, [keyword, discoverCampaignId, deepDiscover]); try {
await addLeadToCampaign(discoverCampaignId, {
campaign_id: discoverCampaignId,
url: opp.url,
domain: opp.domain,
page_title: opp.page_title,
snippet: opp.snippet,
email: opp.email ?? undefined,
confidence_score: opp.confidence_score,
});
} catch (e) {
// skip duplicates
}
}
showToastNotification(`Saved ${discoveredOpportunities.length} leads to campaign`, 'success');
}, [keyword, discoverCampaignId, discoveredOpportunities]);
const handleSelectCampaign = useCallback(async (campaignId: string) => { const handleSelectCampaign = useCallback(async (campaignId: string) => {
await selectCampaign(campaignId); await selectCampaign(campaignId);
@@ -311,10 +340,13 @@ const BacklinkOutreachDashboard: React.FC = () => {
); );
}; };
const handleSingleStatusUpdate = async (leadId: string, status: string) => { const handleSingleStatusUpdate = async (leadId: string, status: 'discovered' | 'contacted' | 'replied' | 'placed' | 'bounced' | 'unsubscribed') => {
setIsStatusUpdating(true); setIsStatusUpdating(true);
try { try {
await updateLeadStatus(leadId, { status }); await updateLeadStatus(leadId, {
status,
campaign_id: selectedCampaign!.campaign_id,
});
showToastNotification(`Status updated to "${status}"`, 'success'); showToastNotification(`Status updated to "${status}"`, 'success');
await selectCampaign(selectedCampaign!.campaign_id); await selectCampaign(selectedCampaign!.campaign_id);
} catch (e) { } catch (e) {
@@ -328,7 +360,11 @@ const BacklinkOutreachDashboard: React.FC = () => {
if (selectedLeadIds.size === 0) return; if (selectedLeadIds.size === 0) return;
setIsStatusUpdating(true); setIsStatusUpdating(true);
try { try {
const result = await bulkUpdateLeadStatus({ lead_ids: Array.from(selectedLeadIds), status: bulkStatus }); const result = await bulkUpdateLeadStatus({
lead_ids: Array.from(selectedLeadIds),
status: bulkStatus,
campaign_id: selectedCampaign!.campaign_id,
});
if (result.failed.length > 0) { if (result.failed.length > 0) {
showToastNotification(`Updated ${result.updated} leads; ${result.failed.length} failed`, 'warning'); showToastNotification(`Updated ${result.updated} leads; ${result.failed.length} failed`, 'warning');
} else { } else {
@@ -391,10 +427,27 @@ const BacklinkOutreachDashboard: React.FC = () => {
{ key: 'campaigns', label: 'Campaigns', desc: 'Create and manage outreach campaigns' }, { key: 'campaigns', label: 'Campaigns', desc: 'Create and manage outreach campaigns' },
{ key: 'discover', label: 'Discover', desc: 'AI-powered search for guest post opportunities' }, { key: 'discover', label: 'Discover', desc: 'AI-powered search for guest post opportunities' },
{ key: 'leads', label: 'Leads', desc: 'Track leads, send outreach, and manage replies' }, { key: 'leads', label: 'Leads', desc: 'Track leads, send outreach, and manage replies' },
{ key: 'composer', label: 'Composer', desc: 'AI email composer with smart suggestions' }, { key: 'composer', label: 'Composer', desc: 'AI email composer with compliance metadata' },
{ key: 'analytics', label: 'Analytics', desc: 'Campaign performance metrics and exports' }, { key: 'analytics', label: 'Analytics', desc: 'Campaign performance metrics and exports' },
]; ];
const complianceReasons = [
!unsubscribeUrl.trim() && !oneClickUnsubscribe ? 'Add an unsubscribe URL or enable one-click unsubscribe.' : '',
!senderName.trim() ? 'Add the sender name.' : '',
!senderEmail.trim() ? 'Add the sender email.' : '',
!senderOrganization.trim() ? 'Add the sender organization.' : '',
!senderAddress.trim() ? 'Add a physical mailing address.' : '',
!legalBasis.trim() ? 'Record the legal basis.' : '',
!contactDiscoverySource.trim() ? 'Record where the contact was discovered.' : '',
recipientRegion === 'unknown' && !approvedByHuman ? 'Unknown recipient region requires manual review.' : '',
recipientRegionSource === 'tld_inference' && !approvedByHuman ? 'TLD-only region inference requires manual review.' : '',
['eu', 'eea', 'uk', 'ca'].includes(recipientRegion) && (legalBasis !== 'consent' || consentStatus !== 'explicit')
? 'Selected recipient region requires recorded explicit consent.' : '',
].filter(Boolean);
const complianceReady = complianceReasons.length === 0;
const SectionHeader: React.FC<{ title: string; subtitle: string }> = ({ title, subtitle }) => ( const SectionHeader: React.FC<{ title: string; subtitle: string }> = ({ title, subtitle }) => (
<div style={{ marginBottom: '16px' }}> <div style={{ marginBottom: '16px' }}>
<h3 style={{ margin: 0, background: GRADIENT_PRIMARY, WebkitBackgroundClip: 'text', WebkitTextFillColor: 'transparent', fontSize: '18px' }}>{title}</h3> <h3 style={{ margin: 0, background: GRADIENT_PRIMARY, WebkitBackgroundClip: 'text', WebkitTextFillColor: 'transparent', fontSize: '18px' }}>{title}</h3>
@@ -644,7 +697,7 @@ const BacklinkOutreachDashboard: React.FC = () => {
{selectedLeadIds.size > 0 && ( {selectedLeadIds.size > 0 && (
<> <>
<TooltipWrap text="Choose the new status for all selected leads"> <TooltipWrap text="Choose the new status for all selected leads">
<select value={bulkStatus} onChange={(e) => setBulkStatus(e.target.value)} <select value={bulkStatus} onChange={(e) => setBulkStatus(e.target.value as typeof bulkStatus)}
style={{ ...selectSx, padding: '6px 10px', fontSize: '12px', minWidth: '130px' }}> style={{ ...selectSx, padding: '6px 10px', fontSize: '12px', minWidth: '130px' }}>
{STATUS_OPTIONS.map((s) => <option key={s} value={s}>{s}</option>)} {STATUS_OPTIONS.map((s) => <option key={s} value={s}>{s}</option>)}
</select> </select>
@@ -708,6 +761,7 @@ const BacklinkOutreachDashboard: React.FC = () => {
<div key={a.attempt_id} style={{ marginTop: '8px', padding: '8px 12px', background: 'rgba(255,255,255,0.04)', borderRadius: '8px', fontSize: '12px' }}> <div key={a.attempt_id} style={{ marginTop: '8px', padding: '8px 12px', background: 'rgba(255,255,255,0.04)', borderRadius: '8px', fontSize: '12px' }}>
<span style={{ color: 'rgba(255,255,255,0.5)' }}>Latest: {a.subject} </span> <span style={{ color: 'rgba(255,255,255,0.5)' }}>Latest: {a.subject} </span>
{renderStatusBadge(a.status)} {renderStatusBadge(a.status)}
{a.sender_email && <span style={{ color: 'rgba(255,255,255,0.35)', marginLeft: '8px' }}>From: {a.sender_email}</span>}
{a.sent_at && <span style={{ color: 'rgba(255,255,255,0.3)', marginLeft: '8px' }}>{new Date(a.sent_at).toLocaleString()}</span>} {a.sent_at && <span style={{ color: 'rgba(255,255,255,0.3)', marginLeft: '8px' }}>{new Date(a.sent_at).toLocaleString()}</span>}
</div> </div>
))} ))}
@@ -724,7 +778,7 @@ const BacklinkOutreachDashboard: React.FC = () => {
<table style={{ width: '100%', borderCollapse: 'collapse', fontSize: '13px' }}> <table style={{ width: '100%', borderCollapse: 'collapse', fontSize: '13px' }}>
<thead> <thead>
<tr style={{ background: 'rgba(255,255,255,0.04)' }}> <tr style={{ background: 'rgba(255,255,255,0.04)' }}>
{['Subject', 'Status', 'Sender', 'Sent At'].map(h => ( {['Subject', 'Status', 'Effective Sender', 'Sent At'].map(h => (
<th key={h} style={{ padding: '10px 12px', borderBottom: '1px solid rgba(255,255,255,0.08)', textAlign: 'left', color: 'rgba(255,255,255,0.4)', fontWeight: 500, fontSize: '12px', textTransform: 'uppercase', letterSpacing: '0.5px' }}>{h}</th> <th key={h} style={{ padding: '10px 12px', borderBottom: '1px solid rgba(255,255,255,0.08)', textAlign: 'left', color: 'rgba(255,255,255,0.4)', fontWeight: 500, fontSize: '12px', textTransform: 'uppercase', letterSpacing: '0.5px' }}>{h}</th>
))} ))}
</tr> </tr>
@@ -893,6 +947,71 @@ const BacklinkOutreachDashboard: React.FC = () => {
style={{ ...inputSx, fontFamily: 'monospace', fontSize: '13px', resize: 'vertical', lineHeight: 1.6 }} /> style={{ ...inputSx, fontFamily: 'monospace', fontSize: '13px', resize: 'vertical', lineHeight: 1.6 }} />
</div> </div>
{/* Compliance metadata */}
<div style={{ marginTop: '20px', padding: '16px', borderRadius: '10px', background: complianceReady ? 'rgba(67,233,123,0.08)' : 'rgba(245,87,108,0.08)', border: `1px solid ${complianceReady ? 'rgba(67,233,123,0.22)' : 'rgba(245,87,108,0.22)'}` }}>
<h4 style={{ margin: '0 0 4px', color: '#fff', fontSize: '14px' }}>Send Compliance Metadata</h4>
<p style={{ margin: '0 0 12px', color: 'rgba(255,255,255,0.45)', fontSize: '12px' }}>Policy checks require unsubscribe, sender identity, legal basis, contact source, and region-aware consent/review details before a send can be approved.</p>
<div style={{ display: 'grid', gridTemplateColumns: '1fr 1fr', gap: '8px', marginBottom: '8px' }}>
<input type="text" value={senderName} onChange={(e) => setSenderName(e.target.value)} placeholder="Sender name" style={inputSx} />
<input type="email" value={senderEmail} onChange={(e) => setSenderEmail(e.target.value)} placeholder="Sender email" style={inputSx} />
<input type="text" value={senderOrganization} onChange={(e) => setSenderOrganization(e.target.value)} placeholder="Organization / brand" style={inputSx} />
<input type="text" value={senderAddress} onChange={(e) => setSenderAddress(e.target.value)} placeholder="Physical mailing address" style={inputSx} />
</div>
<div style={{ display: 'grid', gridTemplateColumns: '1fr 1fr', gap: '8px', marginBottom: '8px' }}>
<input type="url" value={unsubscribeUrl} onChange={(e) => setUnsubscribeUrl(e.target.value)} placeholder="Unsubscribe URL" style={inputSx} />
<label style={{ ...inputSx, display: 'flex', alignItems: 'center', gap: '8px', cursor: 'pointer' }}>
<input type="checkbox" checked={oneClickUnsubscribe} onChange={(e) => setOneClickUnsubscribe(e.target.checked)} />
One-click unsubscribe available
</label>
</div>
<div style={{ display: 'grid', gridTemplateColumns: '1fr 1fr', gap: '8px', marginBottom: '8px' }}>
<select value={legalBasis} onChange={(e) => setLegalBasis(e.target.value)} style={selectSx}>
<option value="legitimate_interest">Legitimate interest</option>
<option value="consent">Consent</option>
<option value="contract">Contract</option>
</select>
<input type="text" value={contactDiscoverySource} onChange={(e) => setContactDiscoverySource(e.target.value)} placeholder="Contact discovery source (e.g. contact page URL)" style={inputSx} />
<select value={recipientRegion} onChange={(e) => setRecipientRegion(e.target.value)} style={selectSx}>
<option value="unknown">Recipient region unknown</option>
<option value="us">United States</option>
<option value="eu">EU / EEA</option>
<option value="uk">United Kingdom</option>
<option value="ca">Canada</option>
<option value="au">Australia</option>
<option value="br">Brazil</option>
<option value="other">Other</option>
</select>
<select value={recipientRegionSource} onChange={(e) => setRecipientRegionSource(e.target.value)} style={selectSx}>
<option value="user_attested">Region user-attested</option>
<option value="crm_record">Region from CRM/contact record</option>
<option value="billing_or_profile">Region from profile/billing data</option>
<option value="tld_inference">Region inferred from TLD only</option>
<option value="unknown">Region source unknown</option>
</select>
<select value={consentStatus} onChange={(e) => setConsentStatus(e.target.value)} style={selectSx}>
<option value="unknown">Consent status unknown</option>
<option value="explicit">Explicit consent recorded</option>
<option value="implied">Implied consent / soft opt-in</option>
<option value="not_required">Not required for selected basis</option>
</select>
<label style={{ ...inputSx, display: 'flex', alignItems: 'center', gap: '8px', cursor: 'pointer' }}>
<input type="checkbox" checked={approvedByHuman} onChange={(e) => setApprovedByHuman(e.target.checked)} />
Manual review approved
</label>
</div>
<div style={{ padding: '10px 12px', borderRadius: '8px', background: complianceReady ? 'rgba(67,233,123,0.12)' : 'rgba(245,87,108,0.12)', color: complianceReady ? '#43e97b' : '#f5576c', fontSize: '12px' }}>
{complianceReady ? 'Compliance metadata is complete for policy validation.' : (
<ul style={{ margin: 0, paddingLeft: '18px' }}>
{complianceReasons.map((reason) => <li key={reason}>{reason}</li>)}
</ul>
)}
</div>
</div>
{/* Personalize */} {/* Personalize */}
<div style={{ marginTop: '24px', padding: '16px', borderRadius: '10px', background: 'rgba(255,255,255,0.03)', border: '1px solid rgba(255,255,255,0.08)' }}> <div style={{ marginTop: '24px', padding: '16px', borderRadius: '10px', background: 'rgba(255,255,255,0.03)', border: '1px solid rgba(255,255,255,0.08)' }}>
<h4 style={{ margin: '0 0 4px', color: '#fff', fontSize: '14px' }}>Personalize for Lead</h4> <h4 style={{ margin: '0 0 4px', color: '#fff', fontSize: '14px' }}>Personalize for Lead</h4>
@@ -946,13 +1065,13 @@ const BacklinkOutreachDashboard: React.FC = () => {
</div> </div>
{selectedCampaign && subject.trim() && body.trim() && ( {selectedCampaign && subject.trim() && body.trim() && (
<div style={{ marginTop: '16px', padding: '14px', borderRadius: '10px', background: 'rgba(67,233,123,0.1)', border: '1px solid rgba(67,233,123,0.2)' }}> <div style={{ marginTop: '16px', padding: '14px', borderRadius: '10px', background: complianceReady ? 'rgba(67,233,123,0.1)' : 'rgba(245,87,108,0.1)', border: `1px solid ${complianceReady ? 'rgba(67,233,123,0.2)' : 'rgba(245,87,108,0.2)'}` }}>
<p style={{ margin: '0 0 8px', fontSize: '13px', color: '#43e97b' }}> <p style={{ margin: '0 0 8px', fontSize: '13px', color: complianceReady ? '#43e97b' : '#f5576c' }}>
Ready to send this email to leads in <strong>{selectedCampaign.name}</strong>? {complianceReady ? <>Ready to send this email to leads in <strong>{selectedCampaign.name}</strong>.</> : <>Complete compliance metadata before sending to <strong>{selectedCampaign.name}</strong> leads.</>}
</p> </p>
<TooltipWrap text="Go to the Leads tab to select recipients and send"> <TooltipWrap text={complianceReady ? 'Go to the Leads tab to select recipients and send' : 'Policy validation will block sends until all listed compliance fields are complete'}>
<button onClick={() => setActiveTab('leads')} <button onClick={() => setActiveTab('leads')} disabled={!complianceReady}
style={{ ...btnBase, padding: '8px 20px', background: GRADIENT_SUCCESS, color: '#1a1a2e', fontSize: '13px' }}> style={{ ...btnBase, padding: '8px 20px', background: GRADIENT_SUCCESS, color: '#1a1a2e', fontSize: '13px', opacity: complianceReady ? 1 : 0.5 }}>
Go to Campaign Leads Go to Campaign Leads
</button> </button>
</TooltipWrap> </TooltipWrap>

View File

@@ -16,6 +16,8 @@ export const FEATURE_KEYS = {
SEO: 'seo', SEO: 'seo',
CONTENT_PLANNING: 'content-planning', CONTENT_PLANNING: 'content-planning',
SOCIAL: 'social', SOCIAL: 'social',
LINKEDIN: 'linkedin',
FACEBOOK: 'facebook',
BLOG_WRITER: 'blog_writer', BLOG_WRITER: 'blog_writer',
STORY: 'story', STORY: 'story',
YOUTUBE: 'youtube', YOUTUBE: 'youtube',
@@ -28,6 +30,7 @@ export const FEATURE_KEYS = {
WIX: 'wix', WIX: 'wix',
BING: 'bing', BING: 'bing',
ASSET_LIBRARY: 'asset-library', ASSET_LIBRARY: 'asset-library',
BACKLINKING: 'backlinking',
} as const; } as const;
export type FeatureKey = typeof FEATURE_KEYS[keyof typeof FEATURE_KEYS]; export type FeatureKey = typeof FEATURE_KEYS[keyof typeof FEATURE_KEYS];
@@ -124,6 +127,9 @@ export function getSingleFeature(): string | null {
const FEATURE_ROUTE_PRIORITY: [string, string][] = [ const FEATURE_ROUTE_PRIORITY: [string, string][] = [
['podcast', '/podcast-maker'], ['podcast', '/podcast-maker'],
['blog_writer', '/blog-writer'], ['blog_writer', '/blog-writer'],
['backlinking', '/backlink-outreach'],
['linkedin', '/linkedin-writer'],
['facebook', '/facebook-writer'],
['story', '/story-writer'], ['story', '/story-writer'],
['image', '/image-studio'], ['image', '/image-studio'],
['video', '/video-studio'], ['video', '/video-studio'],