Files
consentos/apps/api/tests/test_geoip.py
James Cottrill fbf26453f2 feat: initial public release
ConsentOS — a privacy-first cookie consent management platform.

Self-hosted, source-available alternative to OneTrust, Cookiebot, and
CookieYes. Full standards coverage (IAB TCF v2.2, GPP v1, Google
Consent Mode v2, GPC, Shopify Customer Privacy API), multi-tenant
architecture with role-based access, configuration cascade
(system → org → group → site → region), dark-pattern detection in
the scanner, and a tamper-evident consent record audit trail.

This is the initial public release. Prior development history is
retained internally.

See README.md for the feature list, architecture overview, and
quick-start instructions. Licensed under the Elastic Licence 2.0 —
self-host freely; do not resell as a managed service.
2026-04-14 09:18:18 +00:00

574 lines
21 KiB
Python

"""Tests for the GeoIP service.
Covers header-based detection, IP lookup, country-to-region mapping,
client IP extraction, and the combined detect_region flow.
"""
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
import src.services.geoip as geoip_module
from src.services.geoip import (
GeoResult,
_is_private_ip,
country_to_region,
detect_region,
detect_region_from_headers,
get_client_ip,
lookup_ip_maxmind,
lookup_ip_region,
)
# ── country_to_region ────────────────────────────────────────────────
class TestCountryToRegion:
def test_eu_country_returns_eu(self):
assert country_to_region("DE") == "EU"
assert country_to_region("FR") == "EU"
assert country_to_region("IT") == "EU"
assert country_to_region("ES") == "EU"
def test_eu_country_case_insensitive(self):
assert country_to_region("de") == "EU"
assert country_to_region("fr") == "EU"
def test_gb_returns_gb(self):
assert country_to_region("GB") == "GB"
def test_br_returns_br(self):
assert country_to_region("BR") == "BR"
def test_us_without_state(self):
assert country_to_region("US") == "US"
def test_us_with_state(self):
assert country_to_region("US", "CA") == "US-CA"
assert country_to_region("US", "ny") == "US-NY"
def test_non_eu_country_returned_as_is(self):
assert country_to_region("JP") == "JP"
assert country_to_region("AU") == "AU"
assert country_to_region("CA") == "CA"
# ── detect_region_from_headers ───────────────────────────────────────
class TestDetectRegionFromHeaders:
def _make_request(self, headers: dict[str, str]) -> MagicMock:
request = MagicMock()
request.headers = headers
return request
def test_cloudflare_header(self):
request = self._make_request({"cf-ipcountry": "DE"})
result = detect_region_from_headers(request)
assert result.country_code == "DE"
assert result.region == "EU"
assert result.is_resolved is True
def test_vercel_header(self):
request = self._make_request({"x-vercel-ip-country": "GB"})
result = detect_region_from_headers(request)
assert result.country_code == "GB"
assert result.region == "GB"
def test_appengine_header(self):
request = self._make_request({"x-appengine-country": "BR"})
result = detect_region_from_headers(request)
assert result.country_code == "BR"
assert result.region == "BR"
def test_custom_header(self):
request = self._make_request({"x-country-code": "JP"})
result = detect_region_from_headers(request)
assert result.country_code == "JP"
assert result.region == "JP"
def test_no_geo_headers(self):
request = self._make_request({})
result = detect_region_from_headers(request)
assert result.country_code is None
assert result.region is None
assert result.is_resolved is False
def test_ignores_xx_value(self):
request = self._make_request({"cf-ipcountry": "XX"})
result = detect_region_from_headers(request)
assert result.is_resolved is False
def test_header_priority_cloudflare_first(self):
request = self._make_request(
{
"cf-ipcountry": "FR",
"x-vercel-ip-country": "DE",
}
)
result = detect_region_from_headers(request)
assert result.country_code == "FR"
def test_case_normalisation(self):
request = self._make_request({"cf-ipcountry": "gb"})
result = detect_region_from_headers(request)
assert result.country_code == "GB"
assert result.region == "GB"
def test_configured_custom_header(self):
"""An operator-configured header is honoured."""
request = self._make_request({"x-gclb-country": "JP"})
with patch("src.services.geoip.get_settings") as mock_settings:
mock_settings.return_value.geoip_country_header = "x-gclb-country"
result = detect_region_from_headers(request)
assert result.country_code == "JP"
assert result.region == "JP"
def test_configured_custom_header_takes_priority(self):
"""When both a custom and a built-in header are present, the
custom one wins — that's the operator's explicit choice."""
request = self._make_request(
{
"cf-ipcountry": "FR",
"x-gclb-country": "JP",
}
)
with patch("src.services.geoip.get_settings") as mock_settings:
mock_settings.return_value.geoip_country_header = "x-gclb-country"
result = detect_region_from_headers(request)
assert result.country_code == "JP"
def test_configured_header_falls_through_to_builtin(self):
"""If the custom header isn't present, the built-in list still
applies."""
request = self._make_request({"cf-ipcountry": "FR"})
with patch("src.services.geoip.get_settings") as mock_settings:
mock_settings.return_value.geoip_country_header = "x-gclb-country"
mock_settings.return_value.geoip_region_header = None
result = detect_region_from_headers(request)
assert result.country_code == "FR"
assert result.region == "EU"
def test_configured_region_header_pairs_with_country(self):
"""A configured region header is paired with the custom country."""
request = self._make_request(
{
"x-gclb-country": "US",
"x-gclb-region": "CA",
}
)
with patch("src.services.geoip.get_settings") as mock_settings:
mock_settings.return_value.geoip_country_header = "x-gclb-country"
mock_settings.return_value.geoip_region_header = "x-gclb-region"
result = detect_region_from_headers(request)
assert result.country_code == "US"
assert result.region == "US-CA"
def test_configured_region_header_strips_country_prefix(self):
"""ISO 3166-2 subdivisions may arrive prefixed (``US-CA``)."""
request = self._make_request(
{
"x-gclb-country": "US",
"x-gclb-region": "US-NY",
}
)
with patch("src.services.geoip.get_settings") as mock_settings:
mock_settings.return_value.geoip_country_header = "x-gclb-country"
mock_settings.return_value.geoip_region_header = "x-gclb-region"
result = detect_region_from_headers(request)
assert result.region == "US-NY"
def test_configured_region_header_missing_is_country_only(self):
"""Only country hits region-aware path if the region header is absent."""
request = self._make_request({"x-gclb-country": "US"})
with patch("src.services.geoip.get_settings") as mock_settings:
mock_settings.return_value.geoip_country_header = "x-gclb-country"
mock_settings.return_value.geoip_region_header = "x-gclb-region"
result = detect_region_from_headers(request)
assert result.country_code == "US"
assert result.region == "US"
def test_configured_region_header_xx_ignored(self):
"""Region value of ``XX`` is treated as unknown."""
request = self._make_request(
{
"x-gclb-country": "US",
"x-gclb-region": "XX",
}
)
with patch("src.services.geoip.get_settings") as mock_settings:
mock_settings.return_value.geoip_country_header = "x-gclb-country"
mock_settings.return_value.geoip_region_header = "x-gclb-region"
result = detect_region_from_headers(request)
assert result.region == "US"
# ── get_client_ip ────────────────────────────────────────────────────
class TestGetClientIp:
def _make_request(
self,
headers: dict[str, str] | None = None,
client_host: str | None = None,
) -> MagicMock:
request = MagicMock()
request.headers = headers or {}
if client_host:
request.client = MagicMock()
request.client.host = client_host
else:
request.client = None
return request
def test_x_forwarded_for_single(self):
request = self._make_request({"x-forwarded-for": "1.2.3.4"})
assert get_client_ip(request) == "1.2.3.4"
def test_x_forwarded_for_multiple(self):
request = self._make_request({"x-forwarded-for": "1.2.3.4, 5.6.7.8, 9.10.11.12"})
assert get_client_ip(request) == "1.2.3.4"
def test_x_real_ip(self):
request = self._make_request({"x-real-ip": "1.2.3.4"})
assert get_client_ip(request) == "1.2.3.4"
def test_forwarded_for_takes_priority_over_real_ip(self):
request = self._make_request(
{
"x-forwarded-for": "1.1.1.1",
"x-real-ip": "2.2.2.2",
}
)
assert get_client_ip(request) == "1.1.1.1"
def test_falls_back_to_client_host(self):
request = self._make_request(client_host="10.0.0.1")
assert get_client_ip(request) == "10.0.0.1"
def test_returns_none_when_no_ip(self):
request = self._make_request()
assert get_client_ip(request) is None
# ── _is_private_ip ───────────────────────────────────────────────────
class TestIsPrivateIp:
def test_loopback(self):
assert _is_private_ip("127.0.0.1") is True
assert _is_private_ip("127.0.0.2") is True
def test_private_ranges(self):
assert _is_private_ip("10.0.0.1") is True
assert _is_private_ip("192.168.1.1") is True
assert _is_private_ip("172.16.0.1") is True
def test_ipv6_loopback(self):
assert _is_private_ip("::1") is True
def test_localhost_string(self):
assert _is_private_ip("localhost") is True
def test_public_ip(self):
assert _is_private_ip("8.8.8.8") is False
assert _is_private_ip("1.1.1.1") is False
# ── lookup_ip_region ─────────────────────────────────────────────────
class TestLookupIpRegion:
@pytest.mark.asyncio
async def test_private_ip_returns_unresolved(self):
result = await lookup_ip_region("127.0.0.1")
assert result.is_resolved is False
@pytest.mark.asyncio
async def test_private_ip_10_range(self):
result = await lookup_ip_region("10.0.0.1")
assert result.is_resolved is False
@pytest.mark.asyncio
async def test_successful_lookup(self):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"status": "success",
"countryCode": "DE",
"region": "BY",
}
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("src.services.geoip.httpx.AsyncClient", return_value=mock_client):
result = await lookup_ip_region("8.8.8.8")
assert result.country_code == "DE"
assert result.region == "EU"
@pytest.mark.asyncio
async def test_failed_status(self):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"status": "fail", "message": "invalid query"}
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("src.services.geoip.httpx.AsyncClient", return_value=mock_client):
result = await lookup_ip_region("8.8.8.8")
assert result.is_resolved is False
@pytest.mark.asyncio
async def test_http_error(self):
mock_response = MagicMock()
mock_response.status_code = 500
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("src.services.geoip.httpx.AsyncClient", return_value=mock_client):
result = await lookup_ip_region("8.8.8.8")
assert result.is_resolved is False
@pytest.mark.asyncio
async def test_network_exception(self):
mock_client = AsyncMock()
mock_client.get = AsyncMock(side_effect=httpx.ConnectError("Connection refused"))
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("src.services.geoip.httpx.AsyncClient", return_value=mock_client):
result = await lookup_ip_region("8.8.8.8")
assert result.is_resolved is False
@pytest.mark.asyncio
async def test_us_with_state_lookup(self):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"status": "success",
"countryCode": "US",
"region": "CA",
}
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("src.services.geoip.httpx.AsyncClient", return_value=mock_client):
result = await lookup_ip_region("8.8.8.8")
assert result.country_code == "US"
assert result.region == "US-CA"
@pytest.mark.asyncio
async def test_missing_country_code(self):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"status": "success"}
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("src.services.geoip.httpx.AsyncClient", return_value=mock_client):
result = await lookup_ip_region("8.8.8.8")
assert result.is_resolved is False
# ── detect_region (combined) ─────────────────────────────────────────
class TestDetectRegion:
@pytest.mark.asyncio
async def test_uses_headers_when_available(self):
request = MagicMock()
request.headers = {"cf-ipcountry": "FR"}
request.client = None
result = await detect_region(request)
assert result.country_code == "FR"
assert result.region == "EU"
@pytest.mark.asyncio
async def test_falls_back_to_ip_lookup(self):
request = MagicMock()
request.headers = {}
request.client = MagicMock()
request.client.host = "8.8.8.8"
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"status": "success",
"countryCode": "US",
"region": "CA",
}
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("src.services.geoip.httpx.AsyncClient", return_value=mock_client):
result = await detect_region(request)
assert result.country_code == "US"
assert result.region == "US-CA"
@pytest.mark.asyncio
async def test_returns_unresolved_when_no_ip(self):
request = MagicMock()
request.headers = {}
request.client = None
result = await detect_region(request)
assert result.is_resolved is False
# ── GeoResult ────────────────────────────────────────────────────────
class TestGeoResult:
def test_is_resolved_true(self):
result = GeoResult(country_code="GB", region="GB")
assert result.is_resolved is True
def test_is_resolved_false(self):
result = GeoResult(country_code=None, region=None)
assert result.is_resolved is False
def test_frozen_dataclass(self):
result = GeoResult(country_code="GB", region="GB")
with pytest.raises(AttributeError):
result.country_code = "US" # type: ignore[misc]
# ── MaxMind database lookup ──────────────────────────────────────────
class TestLookupIpMaxmind:
def setup_method(self):
# Reset the module-level cache so each test starts clean.
geoip_module._maxmind_reader = None
geoip_module._maxmind_initialised = False
def _mock_reader(self, country_iso: str | None, subdivision_iso: str | None):
reader = MagicMock()
response = MagicMock()
response.country.iso_code = country_iso
if subdivision_iso is None:
response.subdivisions = None
else:
response.subdivisions.most_specific.iso_code = subdivision_iso
reader.city.return_value = response
return reader
def test_private_ip_returns_unresolved(self):
result = lookup_ip_maxmind("10.0.0.1")
assert result.is_resolved is False
def test_no_db_configured_returns_unresolved(self):
with patch("src.services.geoip.get_settings") as mock_settings:
mock_settings.return_value.geoip_maxmind_db_path = None
result = lookup_ip_maxmind("8.8.8.8")
assert result.is_resolved is False
def test_successful_lookup_with_subdivision(self):
reader = self._mock_reader("US", "CA")
geoip_module._maxmind_reader = reader
geoip_module._maxmind_initialised = True
result = lookup_ip_maxmind("8.8.8.8")
assert result.country_code == "US"
assert result.region == "US-CA"
reader.city.assert_called_once_with("8.8.8.8")
def test_successful_lookup_without_subdivision(self):
reader = self._mock_reader("DE", None)
geoip_module._maxmind_reader = reader
geoip_module._maxmind_initialised = True
result = lookup_ip_maxmind("8.8.8.8")
assert result.country_code == "DE"
assert result.region == "EU"
def test_reader_raises_returns_unresolved(self):
reader = MagicMock()
reader.city.side_effect = RuntimeError("corrupt db")
geoip_module._maxmind_reader = reader
geoip_module._maxmind_initialised = True
result = lookup_ip_maxmind("8.8.8.8")
assert result.is_resolved is False
def test_reader_missing_country_returns_unresolved(self):
reader = self._mock_reader(None, None)
geoip_module._maxmind_reader = reader
geoip_module._maxmind_initialised = True
result = lookup_ip_maxmind("8.8.8.8")
assert result.is_resolved is False
def test_bad_db_path_is_cached_as_failure(self):
"""A missing ``.mmdb`` file should not reopen on every request."""
with patch("src.services.geoip.get_settings") as mock_settings:
mock_settings.return_value.geoip_maxmind_db_path = "/nonexistent/geo.mmdb"
r1 = lookup_ip_maxmind("8.8.8.8")
r2 = lookup_ip_maxmind("1.1.1.1")
assert r1.is_resolved is False
assert r2.is_resolved is False
assert geoip_module._maxmind_initialised is True
assert geoip_module._maxmind_reader is None
class TestDetectRegionMaxmind:
def setup_method(self):
geoip_module._maxmind_reader = None
geoip_module._maxmind_initialised = False
@pytest.mark.asyncio
async def test_uses_maxmind_before_external_api(self):
"""With MaxMind configured, ip-api.com must not be called."""
reader = MagicMock()
response = MagicMock()
response.country.iso_code = "GB"
response.subdivisions.most_specific.iso_code = "SCT"
reader.city.return_value = response
geoip_module._maxmind_reader = reader
geoip_module._maxmind_initialised = True
request = MagicMock()
request.headers = {"x-forwarded-for": "8.8.8.8"}
request.client = None
with (
patch("src.services.geoip.get_settings") as mock_settings,
patch("src.services.geoip.httpx.AsyncClient") as mock_httpx,
):
mock_settings.return_value.geoip_country_header = None
mock_settings.return_value.geoip_region_header = None
mock_settings.return_value.geoip_maxmind_db_path = "/data/GeoLite2-City.mmdb"
result = await detect_region(request)
assert result.country_code == "GB"
assert result.region == "GB-SCT"
mock_httpx.assert_not_called()