Secure backlink lead status updates

This commit is contained in:
ي
2026-06-03 18:16:10 +05:30
parent 923fa671fe
commit 40516e5c79
5 changed files with 119 additions and 7 deletions

View File

@@ -95,6 +95,7 @@ class LeadListResponse(BaseModel):
class LeadStatusUpdateRequest(BaseModel):
status: str = Field(..., min_length=1)
notes: Optional[str] = None
campaign_id: Optional[str] = Field(default=None, min_length=1)
class CampaignDetailResponse(BaseModel):
@@ -298,6 +299,7 @@ class BulkStatusUpdateRequest(BaseModel):
lead_ids: List[str] = Field(..., min_length=1)
status: str = Field(..., min_length=1)
notes: Optional[str] = None
campaign_id: Optional[str] = Field(default=None, min_length=1)
class BulkStatusUpdateResponse(BaseModel):

View File

@@ -204,16 +204,38 @@ class BacklinkOutreachStorageService:
db.close()
def update_lead_status(
self, lead_id: str, user_id: str, status: str, notes: Optional[str] = None
self,
lead_id: str,
user_id: str,
status: str,
notes: Optional[str] = None,
campaign_id: Optional[str] = None,
) -> Optional[dict]:
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return None
try:
lead = db.query(BacklinkLead).filter(BacklinkLead.id == lead_id).first()
query = (
db.query(BacklinkLead)
.join(BacklinkCampaign, BacklinkLead.campaign_id == BacklinkCampaign.id)
.filter(
BacklinkLead.id == lead_id,
BacklinkCampaign.user_id == user_id,
)
)
if campaign_id:
query = query.filter(BacklinkCampaign.id == campaign_id)
lead = query.first()
if not lead:
access = self._get_lead_access_rows(db, [lead_id]).get(lead_id)
if not access:
return None
if access["user_id"] != user_id:
raise PermissionError("Lead does not belong to the current user")
return None
lead.status = status
if notes is not None:
lead.notes = notes
@@ -222,6 +244,44 @@ class BacklinkOutreachStorageService:
finally:
db.close()
def get_lead_access_issues(
self, lead_ids: List[str], user_id: str, campaign_id: Optional[str] = None
) -> dict:
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return {"missing": list(dict.fromkeys(lead_ids)), "unauthorized": []}
try:
unique_lead_ids = list(dict.fromkeys(lead_ids))
access_rows = self._get_lead_access_rows(db, unique_lead_ids)
missing: List[str] = []
unauthorized: List[str] = []
for lid in unique_lead_ids:
access = access_rows.get(lid)
if not access:
missing.append(lid)
elif access["user_id"] != user_id:
unauthorized.append(lid)
elif campaign_id and access["campaign_id"] != campaign_id:
missing.append(lid)
return {"missing": missing, "unauthorized": unauthorized}
finally:
db.close()
def _get_lead_access_rows(self, db, lead_ids: List[str]) -> dict:
if not lead_ids:
return {}
rows = (
db.query(BacklinkLead.id, BacklinkLead.campaign_id, BacklinkCampaign.user_id)
.outerjoin(BacklinkCampaign, BacklinkLead.campaign_id == BacklinkCampaign.id)
.filter(BacklinkLead.id.in_(lead_ids))
.all()
)
return {
row.id: {"campaign_id": row.campaign_id, "user_id": row.user_id}
for row in rows
}
@staticmethod
def _lead_to_dict(lead) -> dict:
return {