fleet-platform/app/config.py
kianiadee 4632990143 Geocoder circuit breaker + invalid-input regression tests
Geocoder trips open after N consecutive Nominatim failures and skips ticks
for a cooldown, instead of grinding the whole batch 1 req/sec on every tick.
Adds tests for _coerce_payload (malformed JSON degrades to {_raw}) and
parse_raw tolerating a malformed payload.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-29 18:00:39 +03:00

58 lines
2.8 KiB
Python

from functools import lru_cache
from typing import Literal
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore")
database_url: str = Field(alias="DATABASE_URL")
jwt_secret: str = Field(alias="JWT_SECRET")
jwt_access_ttl_min: int = Field(default=15, alias="JWT_ACCESS_TTL_MIN")
jwt_refresh_ttl_days: int = Field(default=30, alias="JWT_REFRESH_TTL_DAYS")
tracksolid_push_token: str = Field(default="", alias="TRACKSOLID_PUSH_TOKEN")
tracksolid_api_base_url: str = Field(default="", alias="TRACKSOLID_API_BASE_URL")
tracksolid_app_key: str = Field(default="", alias="TRACKSOLID_APP_KEY")
tracksolid_app_secret: str = Field(default="", alias="TRACKSOLID_APP_SECRET")
tracksolid_user_id: str = Field(default="", alias="TRACKSOLID_USER_ID")
tracksolid_pwd_md5: str = Field(default="", alias="TRACKSOLID_PWD_MD5")
tracksolid_target_account: str = Field(default="", alias="TRACKSOLID_TARGET_ACCOUNT")
tracksolid_targets: str = Field(default="", alias="TRACKSOLID_TARGETS") # comma-separated
tracksolid_token_ttl_sec: int = Field(default=7200, alias="TRACKSOLID_TOKEN_TTL_SEC")
tracksolid_poll_interval_sec: int = Field(default=60, alias="TRACKSOLID_POLL_INTERVAL_SEC")
tracksolid_stale_poll_interval_sec: int = Field(
default=600, alias="TRACKSOLID_STALE_POLL_INTERVAL_SEC"
)
tracksolid_stale_after_sec: int = Field(default=1800, alias="TRACKSOLID_STALE_AFTER_SEC")
ntfy_base_url: str = Field(default="", alias="NTFY_BASE_URL")
ntfy_topic: str = Field(default="fleet-slo-breach", alias="NTFY_TOPIC")
ntfy_token: str = Field(default="", alias="NTFY_TOKEN")
nominatim_base_url: str = Field(
default="https://nominatim.openstreetmap.org", alias="NOMINATIM_BASE_URL"
)
nominatim_user_agent: str = Field(
default="fleet-platform/0.1 (admin@rahamafresh.com)", alias="NOMINATIM_USER_AGENT"
)
geocoder_max_per_tick: int = Field(default=8, alias="GEOCODER_MAX_PER_TICK")
geocoder_tick_sec: int = Field(default=30, alias="GEOCODER_TICK_SEC")
geocoder_rate_limit_sec: float = Field(default=1.1, alias="GEOCODER_RATE_LIMIT_SEC")
geocoder_breaker_threshold: int = Field(default=3, alias="GEOCODER_BREAKER_THRESHOLD")
geocoder_breaker_cooldown_sec: float = Field(
default=300.0, alias="GEOCODER_BREAKER_COOLDOWN_SEC"
)
app_mode: Literal["dev", "prod"] = Field(default="prod", alias="APP_MODE")
app_role: Literal["gateway", "worker", "cron"] = Field(default="gateway", alias="APP_ROLE")
app_log_level: str = Field(default="INFO", alias="APP_LOG_LEVEL")
app_git_sha: str = Field(default="unknown", alias="APP_GIT_SHA")
@lru_cache(maxsize=1)
def get_settings() -> Settings:
return Settings()