"""Professional FastAPI application for delivery route optimization.""" import logging import os import sys import time import threading from contextlib import asynccontextmanager from fastapi import FastAPI, Request, status from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from fastapi.exceptions import RequestValidationError from starlette.exceptions import HTTPException as StarletteHTTPException from app.routes import optimization_router, health_router, cache_router, ml_router, ml_web_router from app.middleware.request_id import RequestIDMiddleware from app.core.exceptions import APIException from app.core.exception_handlers import ( api_exception_handler, http_exception_handler, validation_exception_handler, general_exception_handler ) # Configure professional logging with env control _log_level_name = os.getenv("LOG_LEVEL", "INFO").upper() _log_level = getattr(logging, _log_level_name, logging.INFO) logging.basicConfig( level=_log_level, format="%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", handlers=[ logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) # Ensure root and key libraries honor desired level logging.getLogger().setLevel(_log_level) logging.getLogger("httpx").setLevel(_log_level) logging.getLogger("uvicorn").setLevel(_log_level) logging.getLogger("uvicorn.error").setLevel(_log_level) logging.getLogger("uvicorn.access").setLevel(_log_level) # --- Smart Post-Call ML Trainer ---------------------------------------------------------- # # Trains in a BACKGROUND THREAD after every N /riderassign calls. # - The API response is NEVER blocked - training is fully async. # - Cooldown prevents overlapping runs (won't train if one is already running). # - MIN_RECORDS guard: won't attempt if DB doesn't have enough data yet. # # Config: # TRAIN_EVERY_N_CALLS : retrain after this many calls (default: 10) # MIN_RECORDS_TO_TRAIN: minimum DB rows before first train (default: 30) # COOLDOWN_SECONDS : min gap between two training runs (default: 120s) # ------------------------------------------------------------------- TRAIN_EVERY_N_CALLS = int(os.getenv("ML_TRAIN_EVERY_N", "10")) MIN_RECORDS_TO_TRAIN = int(os.getenv("ML_MIN_RECORDS", "30")) COOLDOWN_SECONDS = int(os.getenv("ML_COOLDOWN_SEC", "120")) _call_counter = 0 _counter_lock = threading.Lock() _training_lock = threading.Lock() _last_trained_at = 0.0 # epoch seconds def _run_training_background(): """ The actual training job - runs in a daemon thread. Fully safe to call while the API is serving requests. """ global _last_trained_at # Acquire lock - only ONE training run at a time if not _training_lock.acquire(blocking=False): logger.info("[MLTrigger] Training already running - skipping this trigger.") return try: from app.services.ml.ml_hypertuner import get_hypertuner from app.services.ml.ml_data_collector import get_collector count = get_collector().count_records() if count < MIN_RECORDS_TO_TRAIN: logger.info(f"[MLTrigger] Only {count} records - need >={MIN_RECORDS_TO_TRAIN}. Skipping.") return logger.info(f"[MLTrigger] [ML] Background hypertuning started ({count} records)...") result = get_hypertuner().run(n_trials=100) if result.get("status") == "ok": _last_trained_at = time.time() logger.info( f"[MLTrigger] [OK] Hypertuning done - " f"quality={result.get('best_predicted_quality', '?')}/100 " f"| {result.get('training_rows', '?')} rows " f"| {result.get('trials_run', '?')} trials" ) else: logger.info(f"[MLTrigger] Hypertuning skipped: {result.get('message', '')}") except Exception as e: logger.error(f"[MLTrigger] Background training error: {e}", exc_info=True) finally: _training_lock.release() def trigger_training_if_due(): """ Called after every /riderassign call. Increments counter - fires background thread every TRAIN_EVERY_N_CALLS. Non-blocking: returns immediately regardless. """ global _call_counter, _last_trained_at with _counter_lock: _call_counter += 1 should_train = (_call_counter % TRAIN_EVERY_N_CALLS == 0) if not should_train: return # Cooldown check - don't train if we just trained recently elapsed = time.time() - _last_trained_at if elapsed < COOLDOWN_SECONDS: logger.info( f"[MLTrigger] Cooldown active - " f"{int(COOLDOWN_SECONDS - elapsed)}s remaining. Skipping." ) return # Fire background thread - does NOT block the API response t = threading.Thread(target=_run_training_background, daemon=True, name="ml-hypertuner") t.start() logger.info(f"[MLTrigger] [START] Background training thread launched (call #{_call_counter})") @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan events.""" logger.info("[START] Starting Route Optimization API...") # -- On startup: if enough data exists, train immediately in background -- try: from app.services.ml.ml_data_collector import get_collector count = get_collector().count_records() if count >= MIN_RECORDS_TO_TRAIN: logger.info(f"[Startup] {count} records found -> launching startup hypertuning...") t = threading.Thread(target=_run_training_background, daemon=True, name="ml-startup") t.start() else: logger.info( f"[Startup] {count}/{MIN_RECORDS_TO_TRAIN} records in ML DB - " f"will auto-train after every {TRAIN_EVERY_N_CALLS} /riderassign calls." ) except Exception as e: logger.warning(f"[Startup] ML status check failed (non-fatal): {e}") logger.info( f"[OK] Application initialized - " f"ML trains every {TRAIN_EVERY_N_CALLS} calls " f"(cooldown {COOLDOWN_SECONDS}s, min {MIN_RECORDS_TO_TRAIN} records)" ) yield logger.info(" Shutting down Route Optimization API...") # Create FastAPI application with professional configuration app = FastAPI( title="Route Optimization API", version="2.0.0", docs_url="/docs", redoc_url="/redoc", openapi_url="/api/v1/openapi.json", lifespan=lifespan ) # Add Request ID middleware (must be first) app.add_middleware(RequestIDMiddleware) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure specific domains in production allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["*"], expose_headers=["X-Request-ID", "X-Process-Time"] ) # Add GZIP compression app.add_middleware(GZipMiddleware, minimum_size=1000) # Add request timing middleware @app.middleware("http") async def add_process_time_header(request: Request, call_next): """Add performance monitoring headers.""" start_time = time.time() response = await call_next(request) process_time = time.time() - start_time response.headers["X-Process-Time"] = str(round(process_time, 4)) response.headers["X-API-Version"] = "2.0.0" return response # Register exception handlers app.add_exception_handler(APIException, api_exception_handler) app.add_exception_handler(StarletteHTTPException, http_exception_handler) app.add_exception_handler(RequestValidationError, validation_exception_handler) app.add_exception_handler(Exception, general_exception_handler) # Include routers app.include_router(optimization_router) app.include_router(health_router) app.include_router(cache_router) app.include_router(ml_router) app.include_router(ml_web_router) @app.get("/", tags=["Root"]) async def root(request: Request): """ API root endpoint with service information. Returns API metadata, available endpoints, and usage information. """ request_id = getattr(request.state, "request_id", None) return { "service": "Route Optimization API", "version": "2.0.0", "status": "operational", "documentation": { "swagger": "/docs", "redoc": "/redoc", "openapi": "/api/v1/openapi.json" }, "endpoints": { "createdeliveries": { "url": "/api/v1/optimization/createdeliveries", "method": "POST", "description": "Accept provider array, optimize order, add step/previouskms/cumulativekms, forward upstream" }, "health": { "url": "/api/v1/health", "method": "GET", "description": "Health check endpoint" } }, "features": { "algorithm": "Greedy Nearest-Neighbor", "optimization": "Provider array reordering with distance metrics", "added_fields": ["step", "previouskms", "cumulativekms", "actualkms"] }, "request_id": request_id } if __name__ == "__main__": import uvicorn uvicorn.run("app.main:app", host="0.0.0.0", port=8002, reload=True)