Files
consentos/apps/scanner/tests/test_crawler.py
James Cottrill fbf26453f2 feat: initial public release
ConsentOS — a privacy-first cookie consent management platform.

Self-hosted, source-available alternative to OneTrust, Cookiebot, and
CookieYes. Full standards coverage (IAB TCF v2.2, GPP v1, Google
Consent Mode v2, GPC, Shopify Customer Privacy API), multi-tenant
architecture with role-based access, configuration cascade
(system → org → group → site → region), dark-pattern detection in
the scanner, and a tamper-evident consent record audit trail.

This is the initial public release. Prior development history is
retained internally.

See README.md for the feature list, architecture overview, and
quick-start instructions. Licensed under the Elastic Licence 2.0 —
self-host freely; do not resell as a managed service.
2026-04-14 09:18:18 +00:00

441 lines
16 KiB
Python

"""Tests for the Playwright cookie crawler — CMP-21.
These tests mock Playwright to avoid requiring an actual browser.
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.crawler import (
CookieCrawler,
CrawlResult,
DiscoveredCookie,
SiteCrawlResult,
_build_initiator_chain,
_get_script_initiator,
)
# ── Fixtures ────────────────────────────────────────────────────────────
def _make_mock_page(
*,
cookies: list[dict] | None = None,
ls_items: list[dict] | None = None,
ss_items: list[dict] | None = None,
):
"""Build a mock Playwright Page object."""
page = AsyncMock()
page.goto = AsyncMock()
page.on = MagicMock() # synchronous registration
# page.evaluate returns different results for localStorage vs sessionStorage
eval_results = []
eval_results.append(ls_items or [])
eval_results.append(ss_items or [])
page.evaluate = AsyncMock(side_effect=eval_results)
return page
def _make_mock_context(page, cookies: list[dict] | None = None):
"""Build a mock BrowserContext."""
context = AsyncMock()
context.new_page = AsyncMock(return_value=page)
context.cookies = AsyncMock(return_value=cookies or [])
context.clear_cookies = AsyncMock()
context.close = AsyncMock()
return context
def _make_mock_browser(context):
"""Build a mock Browser."""
browser = AsyncMock()
browser.new_context = AsyncMock(return_value=context)
browser.close = AsyncMock()
return browser
# ── DiscoveredCookie dataclass ──────────────────────────────────────────
class TestDiscoveredCookie:
def test_defaults(self):
c = DiscoveredCookie(name="_ga", domain="example.com")
assert c.storage_type == "cookie"
assert c.path is None
assert c.expires is None
assert c.http_only is None
assert c.secure is None
assert c.same_site is None
assert c.value_length == 0
assert c.script_source is None
assert c.page_url == ""
def test_initiator_chain_defaults_to_empty(self):
c = DiscoveredCookie(name="_ga", domain="example.com")
assert c.initiator_chain == []
def test_with_all_fields(self):
c = DiscoveredCookie(
name="_ga",
domain=".example.com",
storage_type="cookie",
path="/",
expires=1700000000.0,
http_only=True,
secure=True,
same_site="Lax",
value_length=42,
script_source="https://cdn.example.com/tracker.js",
page_url="https://example.com/",
initiator_chain=["https://example.com/", "https://cdn.example.com/tracker.js"],
)
assert c.http_only is True
assert c.value_length == 42
assert len(c.initiator_chain) == 2
# ── CrawlResult dataclass ──────────────────────────────────────────────
class TestCrawlResult:
def test_defaults(self):
r = CrawlResult(url="https://example.com/")
assert r.cookies == []
assert r.error is None
def test_with_error(self):
r = CrawlResult(url="https://example.com/", error="Timeout")
assert r.error == "Timeout"
# ── SiteCrawlResult ────────────────────────────────────────────────────
class TestSiteCrawlResult:
def test_unique_cookies_deduplicates(self):
cookie_a = DiscoveredCookie(name="_ga", domain="example.com", storage_type="cookie")
cookie_b = DiscoveredCookie(name="_ga", domain="example.com", storage_type="cookie")
cookie_c = DiscoveredCookie(name="_gid", domain="example.com", storage_type="cookie")
result = SiteCrawlResult(
domain="example.com",
pages=[
CrawlResult(url="https://example.com/", cookies=[cookie_a, cookie_c]),
CrawlResult(url="https://example.com/about", cookies=[cookie_b]),
],
total_cookies_found=3,
)
unique = result.unique_cookies
assert len(unique) == 2
names = {c.name for c in unique}
assert names == {"_ga", "_gid"}
def test_unique_cookies_separates_storage_types(self):
"""Same name in cookie vs localStorage should be separate entries."""
cookie = DiscoveredCookie(name="token", domain="example.com", storage_type="cookie")
ls = DiscoveredCookie(name="token", domain="example.com", storage_type="local_storage")
result = SiteCrawlResult(
domain="example.com",
pages=[CrawlResult(url="https://example.com/", cookies=[cookie, ls])],
total_cookies_found=2,
)
assert len(result.unique_cookies) == 2
def test_empty_pages(self):
result = SiteCrawlResult(domain="example.com")
assert result.unique_cookies == []
# ── _get_script_initiator ──────────────────────────────────────────────
class TestGetScriptInitiator:
def test_identifies_js_url(self):
request = MagicMock()
request.url = "https://cdn.example.com/tracker.js"
request.resource_type = "script"
request.redirected_from = None
assert _get_script_initiator(request) == "https://cdn.example.com/tracker.js"
def test_follows_redirect_chain(self):
original = MagicMock()
original.url = "https://cdn.example.com/analytics.js"
original.resource_type = "script"
original.redirected_from = None
redirect = MagicMock()
redirect.url = "https://example.com/track"
redirect.resource_type = "fetch"
redirect.redirected_from = original
assert _get_script_initiator(redirect) == "https://cdn.example.com/analytics.js"
def test_returns_none_for_non_script(self):
request = MagicMock()
request.url = "https://example.com/image.png"
request.resource_type = "image"
request.redirected_from = None
assert _get_script_initiator(request) is None
def test_handles_javascript_resource_type(self):
request = MagicMock()
request.url = "https://example.com/bundle"
request.resource_type = "javascript"
request.redirected_from = None
assert _get_script_initiator(request) == "https://example.com/bundle"
def test_handles_circular_redirect(self):
"""Should not loop infinitely on circular redirects."""
req_a = MagicMock()
req_a.url = "https://example.com/a"
req_a.resource_type = "fetch"
req_b = MagicMock()
req_b.url = "https://example.com/b"
req_b.resource_type = "fetch"
# Create circular chain
req_a.redirected_from = req_b
req_b.redirected_from = req_a
# Should not hang — returns None since neither is a script
result = _get_script_initiator(req_a)
assert result is None
# ── _build_initiator_chain ────────────────────────────────────────────
class TestBuildInitiatorChain:
def test_single_url_no_parent(self):
chain = _build_initiator_chain("https://example.com/script.js", {})
assert chain == ["https://example.com/script.js"]
def test_two_level_chain(self):
imap = {"https://cdn.example.com/tracker.js": "https://example.com/"}
chain = _build_initiator_chain("https://cdn.example.com/tracker.js", imap)
assert chain == ["https://example.com/", "https://cdn.example.com/tracker.js"]
def test_three_level_chain(self):
imap = {
"https://cdn.example.com/pixel.js": "https://cdn.example.com/gtm.js",
"https://cdn.example.com/gtm.js": "https://example.com/",
}
chain = _build_initiator_chain("https://cdn.example.com/pixel.js", imap)
assert chain == [
"https://example.com/",
"https://cdn.example.com/gtm.js",
"https://cdn.example.com/pixel.js",
]
def test_respects_max_depth(self):
# Build a chain longer than max_depth
imap = {}
for i in range(25):
imap[f"https://example.com/s{i + 1}.js"] = f"https://example.com/s{i}.js"
chain = _build_initiator_chain("https://example.com/s25.js", imap, max_depth=5)
# Should be capped: the leaf + 5 parents = 6 entries at most
assert len(chain) <= 6
def test_handles_circular_reference(self):
imap = {
"https://a.com/a.js": "https://b.com/b.js",
"https://b.com/b.js": "https://a.com/a.js",
}
chain = _build_initiator_chain("https://a.com/a.js", imap)
# Should not loop — cycle detected via seen set
assert len(chain) == 2
# ── CookieCrawler._crawl_page ──────────────────────────────────────────
class TestCrawlPage:
@pytest.mark.asyncio(loop_scope="session")
async def test_discovers_browser_cookies(self):
cdp_cookies = [
{
"name": "_ga",
"domain": ".example.com",
"path": "/",
"expires": 1700000000,
"httpOnly": False,
"secure": True,
"sameSite": "Lax",
"value": "GA1.2.12345",
}
]
page = _make_mock_page()
context = _make_mock_context(page, cookies=cdp_cookies)
browser = _make_mock_browser(context)
crawler = CookieCrawler()
result = await crawler._crawl_page(browser, "https://example.com/")
assert len(result.cookies) == 1
assert result.cookies[0].name == "_ga"
assert result.cookies[0].domain == ".example.com"
assert result.cookies[0].storage_type == "cookie"
assert result.cookies[0].secure is True
assert result.cookies[0].value_length == len("GA1.2.12345")
assert result.error is None
@pytest.mark.asyncio(loop_scope="session")
async def test_discovers_local_storage(self):
ls_items = [{"name": "theme", "valueLength": 4}]
page = _make_mock_page(ls_items=ls_items)
context = _make_mock_context(page)
browser = _make_mock_browser(context)
crawler = CookieCrawler()
result = await crawler._crawl_page(browser, "https://example.com/")
ls_cookies = [c for c in result.cookies if c.storage_type == "local_storage"]
assert len(ls_cookies) == 1
assert ls_cookies[0].name == "theme"
assert ls_cookies[0].value_length == 4
assert ls_cookies[0].domain == "example.com"
@pytest.mark.asyncio(loop_scope="session")
async def test_discovers_session_storage(self):
ss_items = [{"name": "session_id", "valueLength": 36}]
page = _make_mock_page(ss_items=ss_items)
context = _make_mock_context(page)
browser = _make_mock_browser(context)
crawler = CookieCrawler()
result = await crawler._crawl_page(browser, "https://example.com/")
ss_cookies = [c for c in result.cookies if c.storage_type == "session_storage"]
assert len(ss_cookies) == 1
assert ss_cookies[0].name == "session_id"
@pytest.mark.asyncio(loop_scope="session")
async def test_handles_page_error(self):
page = _make_mock_page()
page.goto = AsyncMock(side_effect=Exception("Navigation timeout"))
context = _make_mock_context(page)
browser = _make_mock_browser(context)
crawler = CookieCrawler()
result = await crawler._crawl_page(browser, "https://example.com/")
assert result.error == "Navigation timeout"
@pytest.mark.asyncio(loop_scope="session")
async def test_context_closed_after_crawl(self):
page = _make_mock_page()
context = _make_mock_context(page)
browser = _make_mock_browser(context)
crawler = CookieCrawler()
await crawler._crawl_page(browser, "https://example.com/")
context.close.assert_awaited_once()
@pytest.mark.asyncio(loop_scope="session")
async def test_context_closed_on_error(self):
page = _make_mock_page()
page.goto = AsyncMock(side_effect=Exception("fail"))
context = _make_mock_context(page)
browser = _make_mock_browser(context)
crawler = CookieCrawler()
await crawler._crawl_page(browser, "https://example.com/")
context.close.assert_awaited_once()
@pytest.mark.asyncio(loop_scope="session")
async def test_custom_user_agent(self):
page = _make_mock_page()
context = _make_mock_context(page)
browser = _make_mock_browser(context)
crawler = CookieCrawler(user_agent="CMPBot/1.0")
await crawler._crawl_page(browser, "https://example.com/")
browser.new_context.assert_awaited_once()
call_kwargs = browser.new_context.call_args[1]
assert call_kwargs["user_agent"] == "CMPBot/1.0"
# ── CookieCrawler.crawl_site ───────────────────────────────────────────
class TestCrawlSite:
@pytest.mark.asyncio(loop_scope="session")
@patch("src.crawler.async_playwright")
async def test_crawls_multiple_pages(self, mock_pw):
cdp_cookies = [{"name": "_ga", "domain": ".example.com", "value": "x"}]
page = _make_mock_page()
context = _make_mock_context(page, cookies=cdp_cookies)
browser = _make_mock_browser(context)
pw_instance = AsyncMock()
pw_instance.chromium.launch = AsyncMock(return_value=browser)
mock_pw.return_value.__aenter__ = AsyncMock(return_value=pw_instance)
mock_pw.return_value.__aexit__ = AsyncMock(return_value=False)
crawler = CookieCrawler()
result = await crawler.crawl_site(["https://example.com/", "https://example.com/about"])
assert result.domain == "example.com"
assert len(result.pages) == 2
assert result.total_cookies_found >= 2
@pytest.mark.asyncio(loop_scope="session")
@patch("src.crawler.async_playwright")
async def test_respects_max_pages(self, mock_pw):
page = _make_mock_page()
context = _make_mock_context(page)
browser = _make_mock_browser(context)
pw_instance = AsyncMock()
pw_instance.chromium.launch = AsyncMock(return_value=browser)
mock_pw.return_value.__aenter__ = AsyncMock(return_value=pw_instance)
mock_pw.return_value.__aexit__ = AsyncMock(return_value=False)
urls = [f"https://example.com/page{i}" for i in range(10)]
crawler = CookieCrawler()
result = await crawler.crawl_site(urls, max_pages=3)
assert len(result.pages) == 3
@pytest.mark.asyncio(loop_scope="session")
async def test_empty_urls(self):
crawler = CookieCrawler()
result = await crawler.crawl_site([])
assert result.domain == ""
assert result.pages == []
@pytest.mark.asyncio(loop_scope="session")
@patch("src.crawler.async_playwright")
async def test_browser_closed_after_crawl(self, mock_pw):
page = _make_mock_page()
context = _make_mock_context(page)
browser = _make_mock_browser(context)
pw_instance = AsyncMock()
pw_instance.chromium.launch = AsyncMock(return_value=browser)
mock_pw.return_value.__aenter__ = AsyncMock(return_value=pw_instance)
mock_pw.return_value.__aexit__ = AsyncMock(return_value=False)
crawler = CookieCrawler()
await crawler.crawl_site(["https://example.com/"])
browser.close.assert_awaited_once()