ConsentOS — a privacy-first cookie consent management platform. Self-hosted, source-available alternative to OneTrust, Cookiebot, and CookieYes. Full standards coverage (IAB TCF v2.2, GPP v1, Google Consent Mode v2, GPC, Shopify Customer Privacy API), multi-tenant architecture with role-based access, configuration cascade (system → org → group → site → region), dark-pattern detection in the scanner, and a tamper-evident consent record audit trail. This is the initial public release. Prior development history is retained internally. See README.md for the feature list, architecture overview, and quick-start instructions. Licensed under the Elastic Licence 2.0 — self-host freely; do not resell as a managed service.
574 lines
21 KiB
Python
574 lines
21 KiB
Python
"""Tests for the GeoIP service.
|
|
|
|
Covers header-based detection, IP lookup, country-to-region mapping,
|
|
client IP extraction, and the combined detect_region flow.
|
|
"""
|
|
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import httpx
|
|
import pytest
|
|
|
|
import src.services.geoip as geoip_module
|
|
from src.services.geoip import (
|
|
GeoResult,
|
|
_is_private_ip,
|
|
country_to_region,
|
|
detect_region,
|
|
detect_region_from_headers,
|
|
get_client_ip,
|
|
lookup_ip_maxmind,
|
|
lookup_ip_region,
|
|
)
|
|
|
|
# ── country_to_region ────────────────────────────────────────────────
|
|
|
|
|
|
class TestCountryToRegion:
|
|
def test_eu_country_returns_eu(self):
|
|
assert country_to_region("DE") == "EU"
|
|
assert country_to_region("FR") == "EU"
|
|
assert country_to_region("IT") == "EU"
|
|
assert country_to_region("ES") == "EU"
|
|
|
|
def test_eu_country_case_insensitive(self):
|
|
assert country_to_region("de") == "EU"
|
|
assert country_to_region("fr") == "EU"
|
|
|
|
def test_gb_returns_gb(self):
|
|
assert country_to_region("GB") == "GB"
|
|
|
|
def test_br_returns_br(self):
|
|
assert country_to_region("BR") == "BR"
|
|
|
|
def test_us_without_state(self):
|
|
assert country_to_region("US") == "US"
|
|
|
|
def test_us_with_state(self):
|
|
assert country_to_region("US", "CA") == "US-CA"
|
|
assert country_to_region("US", "ny") == "US-NY"
|
|
|
|
def test_non_eu_country_returned_as_is(self):
|
|
assert country_to_region("JP") == "JP"
|
|
assert country_to_region("AU") == "AU"
|
|
assert country_to_region("CA") == "CA"
|
|
|
|
|
|
# ── detect_region_from_headers ───────────────────────────────────────
|
|
|
|
|
|
class TestDetectRegionFromHeaders:
|
|
def _make_request(self, headers: dict[str, str]) -> MagicMock:
|
|
request = MagicMock()
|
|
request.headers = headers
|
|
return request
|
|
|
|
def test_cloudflare_header(self):
|
|
request = self._make_request({"cf-ipcountry": "DE"})
|
|
result = detect_region_from_headers(request)
|
|
assert result.country_code == "DE"
|
|
assert result.region == "EU"
|
|
assert result.is_resolved is True
|
|
|
|
def test_vercel_header(self):
|
|
request = self._make_request({"x-vercel-ip-country": "GB"})
|
|
result = detect_region_from_headers(request)
|
|
assert result.country_code == "GB"
|
|
assert result.region == "GB"
|
|
|
|
def test_appengine_header(self):
|
|
request = self._make_request({"x-appengine-country": "BR"})
|
|
result = detect_region_from_headers(request)
|
|
assert result.country_code == "BR"
|
|
assert result.region == "BR"
|
|
|
|
def test_custom_header(self):
|
|
request = self._make_request({"x-country-code": "JP"})
|
|
result = detect_region_from_headers(request)
|
|
assert result.country_code == "JP"
|
|
assert result.region == "JP"
|
|
|
|
def test_no_geo_headers(self):
|
|
request = self._make_request({})
|
|
result = detect_region_from_headers(request)
|
|
assert result.country_code is None
|
|
assert result.region is None
|
|
assert result.is_resolved is False
|
|
|
|
def test_ignores_xx_value(self):
|
|
request = self._make_request({"cf-ipcountry": "XX"})
|
|
result = detect_region_from_headers(request)
|
|
assert result.is_resolved is False
|
|
|
|
def test_header_priority_cloudflare_first(self):
|
|
request = self._make_request(
|
|
{
|
|
"cf-ipcountry": "FR",
|
|
"x-vercel-ip-country": "DE",
|
|
}
|
|
)
|
|
result = detect_region_from_headers(request)
|
|
assert result.country_code == "FR"
|
|
|
|
def test_case_normalisation(self):
|
|
request = self._make_request({"cf-ipcountry": "gb"})
|
|
result = detect_region_from_headers(request)
|
|
assert result.country_code == "GB"
|
|
assert result.region == "GB"
|
|
|
|
def test_configured_custom_header(self):
|
|
"""An operator-configured header is honoured."""
|
|
request = self._make_request({"x-gclb-country": "JP"})
|
|
with patch("src.services.geoip.get_settings") as mock_settings:
|
|
mock_settings.return_value.geoip_country_header = "x-gclb-country"
|
|
result = detect_region_from_headers(request)
|
|
assert result.country_code == "JP"
|
|
assert result.region == "JP"
|
|
|
|
def test_configured_custom_header_takes_priority(self):
|
|
"""When both a custom and a built-in header are present, the
|
|
custom one wins — that's the operator's explicit choice."""
|
|
request = self._make_request(
|
|
{
|
|
"cf-ipcountry": "FR",
|
|
"x-gclb-country": "JP",
|
|
}
|
|
)
|
|
with patch("src.services.geoip.get_settings") as mock_settings:
|
|
mock_settings.return_value.geoip_country_header = "x-gclb-country"
|
|
result = detect_region_from_headers(request)
|
|
assert result.country_code == "JP"
|
|
|
|
def test_configured_header_falls_through_to_builtin(self):
|
|
"""If the custom header isn't present, the built-in list still
|
|
applies."""
|
|
request = self._make_request({"cf-ipcountry": "FR"})
|
|
with patch("src.services.geoip.get_settings") as mock_settings:
|
|
mock_settings.return_value.geoip_country_header = "x-gclb-country"
|
|
mock_settings.return_value.geoip_region_header = None
|
|
result = detect_region_from_headers(request)
|
|
assert result.country_code == "FR"
|
|
assert result.region == "EU"
|
|
|
|
def test_configured_region_header_pairs_with_country(self):
|
|
"""A configured region header is paired with the custom country."""
|
|
request = self._make_request(
|
|
{
|
|
"x-gclb-country": "US",
|
|
"x-gclb-region": "CA",
|
|
}
|
|
)
|
|
with patch("src.services.geoip.get_settings") as mock_settings:
|
|
mock_settings.return_value.geoip_country_header = "x-gclb-country"
|
|
mock_settings.return_value.geoip_region_header = "x-gclb-region"
|
|
result = detect_region_from_headers(request)
|
|
assert result.country_code == "US"
|
|
assert result.region == "US-CA"
|
|
|
|
def test_configured_region_header_strips_country_prefix(self):
|
|
"""ISO 3166-2 subdivisions may arrive prefixed (``US-CA``)."""
|
|
request = self._make_request(
|
|
{
|
|
"x-gclb-country": "US",
|
|
"x-gclb-region": "US-NY",
|
|
}
|
|
)
|
|
with patch("src.services.geoip.get_settings") as mock_settings:
|
|
mock_settings.return_value.geoip_country_header = "x-gclb-country"
|
|
mock_settings.return_value.geoip_region_header = "x-gclb-region"
|
|
result = detect_region_from_headers(request)
|
|
assert result.region == "US-NY"
|
|
|
|
def test_configured_region_header_missing_is_country_only(self):
|
|
"""Only country hits region-aware path if the region header is absent."""
|
|
request = self._make_request({"x-gclb-country": "US"})
|
|
with patch("src.services.geoip.get_settings") as mock_settings:
|
|
mock_settings.return_value.geoip_country_header = "x-gclb-country"
|
|
mock_settings.return_value.geoip_region_header = "x-gclb-region"
|
|
result = detect_region_from_headers(request)
|
|
assert result.country_code == "US"
|
|
assert result.region == "US"
|
|
|
|
def test_configured_region_header_xx_ignored(self):
|
|
"""Region value of ``XX`` is treated as unknown."""
|
|
request = self._make_request(
|
|
{
|
|
"x-gclb-country": "US",
|
|
"x-gclb-region": "XX",
|
|
}
|
|
)
|
|
with patch("src.services.geoip.get_settings") as mock_settings:
|
|
mock_settings.return_value.geoip_country_header = "x-gclb-country"
|
|
mock_settings.return_value.geoip_region_header = "x-gclb-region"
|
|
result = detect_region_from_headers(request)
|
|
assert result.region == "US"
|
|
|
|
|
|
# ── get_client_ip ────────────────────────────────────────────────────
|
|
|
|
|
|
class TestGetClientIp:
|
|
def _make_request(
|
|
self,
|
|
headers: dict[str, str] | None = None,
|
|
client_host: str | None = None,
|
|
) -> MagicMock:
|
|
request = MagicMock()
|
|
request.headers = headers or {}
|
|
if client_host:
|
|
request.client = MagicMock()
|
|
request.client.host = client_host
|
|
else:
|
|
request.client = None
|
|
return request
|
|
|
|
def test_x_forwarded_for_single(self):
|
|
request = self._make_request({"x-forwarded-for": "1.2.3.4"})
|
|
assert get_client_ip(request) == "1.2.3.4"
|
|
|
|
def test_x_forwarded_for_multiple(self):
|
|
request = self._make_request({"x-forwarded-for": "1.2.3.4, 5.6.7.8, 9.10.11.12"})
|
|
assert get_client_ip(request) == "1.2.3.4"
|
|
|
|
def test_x_real_ip(self):
|
|
request = self._make_request({"x-real-ip": "1.2.3.4"})
|
|
assert get_client_ip(request) == "1.2.3.4"
|
|
|
|
def test_forwarded_for_takes_priority_over_real_ip(self):
|
|
request = self._make_request(
|
|
{
|
|
"x-forwarded-for": "1.1.1.1",
|
|
"x-real-ip": "2.2.2.2",
|
|
}
|
|
)
|
|
assert get_client_ip(request) == "1.1.1.1"
|
|
|
|
def test_falls_back_to_client_host(self):
|
|
request = self._make_request(client_host="10.0.0.1")
|
|
assert get_client_ip(request) == "10.0.0.1"
|
|
|
|
def test_returns_none_when_no_ip(self):
|
|
request = self._make_request()
|
|
assert get_client_ip(request) is None
|
|
|
|
|
|
# ── _is_private_ip ───────────────────────────────────────────────────
|
|
|
|
|
|
class TestIsPrivateIp:
|
|
def test_loopback(self):
|
|
assert _is_private_ip("127.0.0.1") is True
|
|
assert _is_private_ip("127.0.0.2") is True
|
|
|
|
def test_private_ranges(self):
|
|
assert _is_private_ip("10.0.0.1") is True
|
|
assert _is_private_ip("192.168.1.1") is True
|
|
assert _is_private_ip("172.16.0.1") is True
|
|
|
|
def test_ipv6_loopback(self):
|
|
assert _is_private_ip("::1") is True
|
|
|
|
def test_localhost_string(self):
|
|
assert _is_private_ip("localhost") is True
|
|
|
|
def test_public_ip(self):
|
|
assert _is_private_ip("8.8.8.8") is False
|
|
assert _is_private_ip("1.1.1.1") is False
|
|
|
|
|
|
# ── lookup_ip_region ─────────────────────────────────────────────────
|
|
|
|
|
|
class TestLookupIpRegion:
|
|
@pytest.mark.asyncio
|
|
async def test_private_ip_returns_unresolved(self):
|
|
result = await lookup_ip_region("127.0.0.1")
|
|
assert result.is_resolved is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_private_ip_10_range(self):
|
|
result = await lookup_ip_region("10.0.0.1")
|
|
assert result.is_resolved is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_successful_lookup(self):
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {
|
|
"status": "success",
|
|
"countryCode": "DE",
|
|
"region": "BY",
|
|
}
|
|
|
|
mock_client = AsyncMock()
|
|
mock_client.get = AsyncMock(return_value=mock_response)
|
|
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
with patch("src.services.geoip.httpx.AsyncClient", return_value=mock_client):
|
|
result = await lookup_ip_region("8.8.8.8")
|
|
|
|
assert result.country_code == "DE"
|
|
assert result.region == "EU"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_failed_status(self):
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {"status": "fail", "message": "invalid query"}
|
|
|
|
mock_client = AsyncMock()
|
|
mock_client.get = AsyncMock(return_value=mock_response)
|
|
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
with patch("src.services.geoip.httpx.AsyncClient", return_value=mock_client):
|
|
result = await lookup_ip_region("8.8.8.8")
|
|
|
|
assert result.is_resolved is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_http_error(self):
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 500
|
|
|
|
mock_client = AsyncMock()
|
|
mock_client.get = AsyncMock(return_value=mock_response)
|
|
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
with patch("src.services.geoip.httpx.AsyncClient", return_value=mock_client):
|
|
result = await lookup_ip_region("8.8.8.8")
|
|
|
|
assert result.is_resolved is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_network_exception(self):
|
|
mock_client = AsyncMock()
|
|
mock_client.get = AsyncMock(side_effect=httpx.ConnectError("Connection refused"))
|
|
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
with patch("src.services.geoip.httpx.AsyncClient", return_value=mock_client):
|
|
result = await lookup_ip_region("8.8.8.8")
|
|
|
|
assert result.is_resolved is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_us_with_state_lookup(self):
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {
|
|
"status": "success",
|
|
"countryCode": "US",
|
|
"region": "CA",
|
|
}
|
|
|
|
mock_client = AsyncMock()
|
|
mock_client.get = AsyncMock(return_value=mock_response)
|
|
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
with patch("src.services.geoip.httpx.AsyncClient", return_value=mock_client):
|
|
result = await lookup_ip_region("8.8.8.8")
|
|
|
|
assert result.country_code == "US"
|
|
assert result.region == "US-CA"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_missing_country_code(self):
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {"status": "success"}
|
|
|
|
mock_client = AsyncMock()
|
|
mock_client.get = AsyncMock(return_value=mock_response)
|
|
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
with patch("src.services.geoip.httpx.AsyncClient", return_value=mock_client):
|
|
result = await lookup_ip_region("8.8.8.8")
|
|
|
|
assert result.is_resolved is False
|
|
|
|
|
|
# ── detect_region (combined) ─────────────────────────────────────────
|
|
|
|
|
|
class TestDetectRegion:
|
|
@pytest.mark.asyncio
|
|
async def test_uses_headers_when_available(self):
|
|
request = MagicMock()
|
|
request.headers = {"cf-ipcountry": "FR"}
|
|
request.client = None
|
|
|
|
result = await detect_region(request)
|
|
assert result.country_code == "FR"
|
|
assert result.region == "EU"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_falls_back_to_ip_lookup(self):
|
|
request = MagicMock()
|
|
request.headers = {}
|
|
request.client = MagicMock()
|
|
request.client.host = "8.8.8.8"
|
|
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {
|
|
"status": "success",
|
|
"countryCode": "US",
|
|
"region": "CA",
|
|
}
|
|
|
|
mock_client = AsyncMock()
|
|
mock_client.get = AsyncMock(return_value=mock_response)
|
|
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
with patch("src.services.geoip.httpx.AsyncClient", return_value=mock_client):
|
|
result = await detect_region(request)
|
|
|
|
assert result.country_code == "US"
|
|
assert result.region == "US-CA"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_unresolved_when_no_ip(self):
|
|
request = MagicMock()
|
|
request.headers = {}
|
|
request.client = None
|
|
|
|
result = await detect_region(request)
|
|
assert result.is_resolved is False
|
|
|
|
|
|
# ── GeoResult ────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestGeoResult:
|
|
def test_is_resolved_true(self):
|
|
result = GeoResult(country_code="GB", region="GB")
|
|
assert result.is_resolved is True
|
|
|
|
def test_is_resolved_false(self):
|
|
result = GeoResult(country_code=None, region=None)
|
|
assert result.is_resolved is False
|
|
|
|
def test_frozen_dataclass(self):
|
|
result = GeoResult(country_code="GB", region="GB")
|
|
with pytest.raises(AttributeError):
|
|
result.country_code = "US" # type: ignore[misc]
|
|
|
|
|
|
# ── MaxMind database lookup ──────────────────────────────────────────
|
|
|
|
|
|
class TestLookupIpMaxmind:
|
|
def setup_method(self):
|
|
# Reset the module-level cache so each test starts clean.
|
|
geoip_module._maxmind_reader = None
|
|
geoip_module._maxmind_initialised = False
|
|
|
|
def _mock_reader(self, country_iso: str | None, subdivision_iso: str | None):
|
|
reader = MagicMock()
|
|
response = MagicMock()
|
|
response.country.iso_code = country_iso
|
|
if subdivision_iso is None:
|
|
response.subdivisions = None
|
|
else:
|
|
response.subdivisions.most_specific.iso_code = subdivision_iso
|
|
reader.city.return_value = response
|
|
return reader
|
|
|
|
def test_private_ip_returns_unresolved(self):
|
|
result = lookup_ip_maxmind("10.0.0.1")
|
|
assert result.is_resolved is False
|
|
|
|
def test_no_db_configured_returns_unresolved(self):
|
|
with patch("src.services.geoip.get_settings") as mock_settings:
|
|
mock_settings.return_value.geoip_maxmind_db_path = None
|
|
result = lookup_ip_maxmind("8.8.8.8")
|
|
assert result.is_resolved is False
|
|
|
|
def test_successful_lookup_with_subdivision(self):
|
|
reader = self._mock_reader("US", "CA")
|
|
geoip_module._maxmind_reader = reader
|
|
geoip_module._maxmind_initialised = True
|
|
|
|
result = lookup_ip_maxmind("8.8.8.8")
|
|
assert result.country_code == "US"
|
|
assert result.region == "US-CA"
|
|
reader.city.assert_called_once_with("8.8.8.8")
|
|
|
|
def test_successful_lookup_without_subdivision(self):
|
|
reader = self._mock_reader("DE", None)
|
|
geoip_module._maxmind_reader = reader
|
|
geoip_module._maxmind_initialised = True
|
|
|
|
result = lookup_ip_maxmind("8.8.8.8")
|
|
assert result.country_code == "DE"
|
|
assert result.region == "EU"
|
|
|
|
def test_reader_raises_returns_unresolved(self):
|
|
reader = MagicMock()
|
|
reader.city.side_effect = RuntimeError("corrupt db")
|
|
geoip_module._maxmind_reader = reader
|
|
geoip_module._maxmind_initialised = True
|
|
|
|
result = lookup_ip_maxmind("8.8.8.8")
|
|
assert result.is_resolved is False
|
|
|
|
def test_reader_missing_country_returns_unresolved(self):
|
|
reader = self._mock_reader(None, None)
|
|
geoip_module._maxmind_reader = reader
|
|
geoip_module._maxmind_initialised = True
|
|
|
|
result = lookup_ip_maxmind("8.8.8.8")
|
|
assert result.is_resolved is False
|
|
|
|
def test_bad_db_path_is_cached_as_failure(self):
|
|
"""A missing ``.mmdb`` file should not reopen on every request."""
|
|
with patch("src.services.geoip.get_settings") as mock_settings:
|
|
mock_settings.return_value.geoip_maxmind_db_path = "/nonexistent/geo.mmdb"
|
|
r1 = lookup_ip_maxmind("8.8.8.8")
|
|
r2 = lookup_ip_maxmind("1.1.1.1")
|
|
assert r1.is_resolved is False
|
|
assert r2.is_resolved is False
|
|
assert geoip_module._maxmind_initialised is True
|
|
assert geoip_module._maxmind_reader is None
|
|
|
|
|
|
class TestDetectRegionMaxmind:
|
|
def setup_method(self):
|
|
geoip_module._maxmind_reader = None
|
|
geoip_module._maxmind_initialised = False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_uses_maxmind_before_external_api(self):
|
|
"""With MaxMind configured, ip-api.com must not be called."""
|
|
reader = MagicMock()
|
|
response = MagicMock()
|
|
response.country.iso_code = "GB"
|
|
response.subdivisions.most_specific.iso_code = "SCT"
|
|
reader.city.return_value = response
|
|
geoip_module._maxmind_reader = reader
|
|
geoip_module._maxmind_initialised = True
|
|
|
|
request = MagicMock()
|
|
request.headers = {"x-forwarded-for": "8.8.8.8"}
|
|
request.client = None
|
|
|
|
with (
|
|
patch("src.services.geoip.get_settings") as mock_settings,
|
|
patch("src.services.geoip.httpx.AsyncClient") as mock_httpx,
|
|
):
|
|
mock_settings.return_value.geoip_country_header = None
|
|
mock_settings.return_value.geoip_region_header = None
|
|
mock_settings.return_value.geoip_maxmind_db_path = "/data/GeoLite2-City.mmdb"
|
|
|
|
result = await detect_region(request)
|
|
|
|
assert result.country_code == "GB"
|
|
assert result.region == "GB-SCT"
|
|
mock_httpx.assert_not_called()
|