""" ML Hypertuner - Production Grade =================================== XGBoost surrogate model + Optuna TPE Bayesian optimization. Key upgrades over the original -------------------------------- 1. Persistent Optuna study - stores trial history in SQLite so every retrain warm-starts from the previous study (progressively smarter). 2. Multi-objective optimization - optimizes quality score AND latency simultaneously using Pareto-front search (NSGA-II sampler). 3. Segment-aware training - trains separate surrogates for peak vs off-peak hours (very different operating regimes). 4. Lag features - rolling_avg_quality_5 and quality_delta_10 added to the feature matrix for trend-awareness. 5. SHAP feature importance - uses TreeExplainer when available; falls back to XGBoost fscore. 6. Warm-start incremental fit - adds trees on top of existing model instead of cold retraining every time. 7. Staleness detection - warns if model is older than 24h. 8. Richer audit reports - JSON report includes Pareto frontier, segment stats, improvement proof, and top-10 trial params. """ import json import logging import os from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple import numpy as np from sklearn.model_selection import KFold from sklearn.metrics import r2_score, mean_absolute_error logger = logging.getLogger(__name__) try: import xgboost as xgb XGB_AVAILABLE = True except ImportError: XGB_AVAILABLE = False logger.warning("[Hypertuner] xgboost not installed.") try: import optuna optuna.logging.set_verbosity(optuna.logging.WARNING) OPTUNA_AVAILABLE = True except ImportError: OPTUNA_AVAILABLE = False logger.warning("[Hypertuner] optuna not installed.") try: import shap SHAP_AVAILABLE = True except ImportError: SHAP_AVAILABLE = False # --------------------------------------------------------------------------- # Feature columns # --------------------------------------------------------------------------- BASE_FEATURE_COLS = [ "hour", "day_of_week", "is_peak", "num_orders", "num_riders", "max_pickup_distance_km", "max_kitchen_distance_km", "max_orders_per_rider", "ideal_load", "workload_balance_threshold", "workload_penalty_weight", "distance_penalty_weight", "cluster_radius_km", "search_time_limit_seconds", "road_factor", ] LAG_FEATURE_COLS = [ "rolling_avg_quality_5", # rolling mean of last 5 quality scores "quality_delta_10", # quality[i] - quality[i-10] ] ALL_FEATURE_COLS = BASE_FEATURE_COLS + LAG_FEATURE_COLS LABEL_COL = "quality_score" SEARCH_SPACE = { "max_pickup_distance_km": ("float", 4.0, 15.0), "max_kitchen_distance_km": ("float", 1.0, 8.0), "max_orders_per_rider": ("int", 6, 20), "ideal_load": ("int", 2, 10), "workload_balance_threshold": ("float", 0.3, 0.95), "workload_penalty_weight": ("float", 20.0, 200.0), "distance_penalty_weight": ("float", 0.5, 10.0), "cluster_radius_km": ("float", 1.0, 8.0), "search_time_limit_seconds": ("int", 2, 15), "road_factor": ("float", 1.1, 1.6), } _STUDY_DB_PATH = os.getenv("ML_DB_PATH", "ml_data/ml_store.db") _REPORT_DIR = "ml_data/reports" _MAX_MODEL_AGE_H = 24 # --------------------------------------------------------------------------- # MLHypertuner # --------------------------------------------------------------------------- class MLHypertuner: """XGBoost surrogate + Optuna TPE / NSGA-II hyperparameter optimizer.""" def __init__(self): self._model: Optional[Any] = None self._peak_model: Optional[Any] = None self._offpeak_model: Optional[Any] = None self._model_trained_at: Optional[datetime] = None self._training_rows: int = 0 self._latest_validation: Optional[Dict] = None self._latest_baseline: Optional[Dict] = None self._feature_importance: Optional[Dict[str, float]] = None self._top_trials: List[Dict] = [] self._pareto_frontier: List[Dict] = [] self._load_latest_report() # ------------------------------------------------------------------ # Main entry point # ------------------------------------------------------------------ def run( self, n_trials: int = 150, min_training_records: int = 30, context_override: Optional[Dict] = None, multi_objective: bool = False, segment_aware: bool = True, ) -> Dict[str, Any]: """Full pipeline: load -> engineer -> validate -> train -> search -> write.""" if not XGB_AVAILABLE or not OPTUNA_AVAILABLE: missing = [] if not XGB_AVAILABLE: missing.append("xgboost") if not OPTUNA_AVAILABLE: missing.append("optuna") return {"status": "error", "message": f"Missing: {', '.join(missing)}"} from app.services.ml.ml_data_collector import get_collector collector = get_collector() records = collector.get_training_data(min_records=min_training_records) if records is None: count = collector.count_records() return { "status": "insufficient_data", "message": f"{count} records - need >={min_training_records}.", "records_available": count, "records_needed": min_training_records, } records = self._add_lag_features(records) X, y = self._prepare_data(records, ALL_FEATURE_COLS) if X is None or len(X) == 0: return {"status": "error", "message": "Data preparation failed."} cv_results = self._cross_validate(X, y) logger.info(f"[Hypertuner] CV: R2={cv_results['r2_score']:.3f}, MAE={cv_results['mae']:.2f}") self._train_model(X, y, model_attr="_model") self._latest_validation = cv_results if segment_aware and len(records) >= 60: peak_recs = [r for r in records if r.get("is_peak", 0) == 1] offpeak_recs = [r for r in records if r.get("is_peak", 0) == 0] if len(peak_recs) >= 20: Xp, yp = self._prepare_data(peak_recs, ALL_FEATURE_COLS) self._train_model(Xp, yp, model_attr="_peak_model") if len(offpeak_recs) >= 20: Xo, yo = self._prepare_data(offpeak_recs, ALL_FEATURE_COLS) self._train_model(Xo, yo, model_attr="_offpeak_model") baseline_stats = self._compute_baseline_stats(records) self._latest_baseline = baseline_stats context = context_override or self._get_current_context(records) if multi_objective: best_params, best_score, pareto = self._optuna_search_multi(context, n_trials) self._pareto_frontier = pareto else: best_params, best_score = self._optuna_search_single(context, n_trials) if best_params is None: return {"status": "error", "message": "Optuna search failed."} improvement = round(best_score - baseline_stats["avg_quality"], 2) self._compute_feature_importance() if cv_results["r2_score"] < 0.5: return { "status": "model_not_ready", "message": f"R2={cv_results['r2_score']:.3f} too low.", "validation": cv_results, "training_rows": len(records), "action_taken": "none - existing config preserved", } try: from app.config.dynamic_config import get_config get_config().set_bulk(best_params, source="ml_hypertuner") except ImportError: logger.info("[Hypertuner] DynamicConfig not available - params not written to config.") self._save_report(best_params, best_score, len(records), n_trials, cv_results, baseline_stats) return { "status": "ok", "best_params": best_params, "best_predicted_quality": round(best_score, 2), "training_rows": len(records), "trials_run": n_trials, "context_used": context, "validation": cv_results, "improvement_proof": { "baseline_avg_quality": baseline_stats["avg_quality"], "baseline_worst": baseline_stats["worst_quality"], "baseline_best": baseline_stats["best_quality"], "ml_predicted_quality": round(best_score, 2), "predicted_improvement": improvement, "verdict": ( "ML params significantly better" if improvement > 5 else "Marginal improvement - keep collecting data" if improvement > 0 else "No improvement - defaults may be near-optimal" ), }, "feature_importance": self._feature_importance, "top_trials": self._top_trials[:5], "message": "Hyperparameters updated successfully.", } # ------------------------------------------------------------------ # Feature Engineering # ------------------------------------------------------------------ def _add_lag_features(self, records: List[Dict]) -> List[Dict]: scores = [float(r.get("quality_score", 0)) for r in records] for i, r in enumerate(records): window5 = scores[max(0, i - 5):i] if i > 0 else [scores[0]] r["rolling_avg_quality_5"] = sum(window5) / len(window5) r["quality_delta_10"] = (scores[i] - scores[max(0, i - 10)]) if i >= 10 else 0.0 return records # ------------------------------------------------------------------ # Data Preparation # ------------------------------------------------------------------ def _prepare_data( self, records: List[Dict], feature_cols: List[str] ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: try: X_rows, y_vals = [], [] for rec in records: row = [] for col in feature_cols: try: row.append(float(rec.get(col, 0) or 0)) except (TypeError, ValueError): row.append(0.0) X_rows.append(row) y_vals.append(float(rec.get(LABEL_COL, 0))) return ( np.array(X_rows, dtype=np.float32), np.array(y_vals, dtype=np.float32), ) except Exception as e: logger.error(f"[Hypertuner] Data prep failed: {e}") return None, None # ------------------------------------------------------------------ # Model Training (warm-start capable) # ------------------------------------------------------------------ def _train_model(self, X: np.ndarray, y: np.ndarray, model_attr: str = "_model") -> None: kwargs = { "n_estimators": 300, "max_depth": 5, "learning_rate": 0.04, "subsample": 0.8, "colsample_bytree": 0.8, "reg_alpha": 0.1, "reg_lambda": 1.0, "random_state": 42, "verbosity": 0, } existing = getattr(self, model_attr, None) if existing is not None: try: m = xgb.XGBRegressor(n_estimators=50, **{k: v for k, v in kwargs.items() if k != "n_estimators"}) m.fit(X, y, xgb_model=existing.get_booster()) setattr(self, model_attr, m) if model_attr == "_model": self._model_trained_at = datetime.utcnow() self._training_rows = len(X) logger.info(f"[Hypertuner] XGBoost warm-updated ({model_attr}) - {len(X)} rows.") return except Exception: pass m = xgb.XGBRegressor(**kwargs) m.fit(X, y) setattr(self, model_attr, m) if model_attr == "_model": self._model_trained_at = datetime.utcnow() self._training_rows = len(X) logger.info(f"[Hypertuner] XGBoost trained ({model_attr}) - {len(X)} rows.") # ------------------------------------------------------------------ # Cross Validation # ------------------------------------------------------------------ def _cross_validate(self, X: np.ndarray, y: np.ndarray, k: int = 5) -> Dict: if len(X) < k * 2: split = max(1, int(len(X) * 0.8)) X_tr, X_te, y_tr, y_te = X[:split], X[split:], y[:split], y[split:] if len(X_te) == 0: return {"r2_score": 0.0, "mae": 99.0, "trust_level": "insufficient_data", "trust_score": 0, "folds": 0} m = xgb.XGBRegressor(n_estimators=100, max_depth=4, verbosity=0, random_state=42) m.fit(X_tr, y_tr) r2 = float(r2_score(y_te, m.predict(X_te))) mae = float(mean_absolute_error(y_te, m.predict(X_te))) folds_used = 1 else: kf = KFold(n_splits=k, shuffle=True, random_state=42) r2s, maes = [], [] for tr_idx, te_idx in kf.split(X): m = xgb.XGBRegressor(n_estimators=100, max_depth=4, verbosity=0, random_state=42) m.fit(X[tr_idx], y[tr_idx]) preds = m.predict(X[te_idx]) r2s.append(r2_score(y[te_idx], preds)) maes.append(mean_absolute_error(y[te_idx], preds)) r2, mae, folds_used = float(np.mean(r2s)), float(np.mean(maes)), k trust_map = [(0.85, "excellent", 5), (0.75, "strong", 4), (0.60, "good", 3), (0.50, "acceptable", 2)] trust_level, trust_score = "poor - need more data", 1 for threshold, level, score in trust_map: if r2 >= threshold: trust_level, trust_score = level, score break return { "r2_score": round(r2, 4), "mae": round(mae, 3), "folds": folds_used, "trust_level": trust_level, "trust_score": trust_score, "interpretation": f"Predictions off by +/-{mae:.1f} pts (R2={r2:.2f}, trust={trust_level})", } # ------------------------------------------------------------------ # Optuna - Single Objective (persistent SQLite storage) # ------------------------------------------------------------------ def _optuna_search_single(self, context: Dict, n_trials: int) -> Tuple[Optional[Dict], float]: def objective(trial): params = self._sample_params(trial) if params.get("ideal_load", 6) > params.get("max_orders_per_rider", 12): return 0.0 return self._predict_quality(context, params) try: study = optuna.create_study( study_name="hypertuner_v1", storage=f"sqlite:///{_STUDY_DB_PATH}", direction="maximize", load_if_exists=True, sampler=optuna.samplers.TPESampler(seed=42), ) study.optimize(objective, n_trials=n_trials, show_progress_bar=False) best = study.best_trial self._top_trials = [ {"params": t.params, "score": t.value} for t in sorted(study.trials, key=lambda x: x.value or 0, reverse=True)[:10] if t.value is not None ] return {k: best.params[k] for k in SEARCH_SPACE if k in best.params}, best.value except Exception as e: logger.error(f"[Hypertuner] Optuna single-obj failed: {e}", exc_info=True) return None, 0.0 # ------------------------------------------------------------------ # Optuna - Multi Objective (quality + latency, NSGA-II) # ------------------------------------------------------------------ def _optuna_search_multi( self, context: Dict, n_trials: int ) -> Tuple[Optional[Dict], float, List[Dict]]: def objective(trial): params = self._sample_params(trial) if params.get("ideal_load", 6) > params.get("max_orders_per_rider", 12): return 0.0, 99.0 quality = self._predict_quality(context, params) latency_proxy = float(params.get("search_time_limit_seconds", 5)) * 200.0 return quality, latency_proxy try: study = optuna.create_study( study_name="hypertuner_multi_v1", storage=f"sqlite:///{_STUDY_DB_PATH}", directions=["maximize", "minimize"], load_if_exists=True, sampler=optuna.samplers.NSGAIISampler(seed=42), ) study.optimize(objective, n_trials=n_trials, show_progress_bar=False) pareto = [ {"params": t.params, "quality": t.values[0], "latency_proxy": t.values[1]} for t in study.best_trials ] if not pareto: return None, 0.0, [] best_trial = max(pareto, key=lambda x: x["quality"]) return ( {k: best_trial["params"][k] for k in SEARCH_SPACE if k in best_trial["params"]}, best_trial["quality"], pareto, ) except Exception as e: logger.error(f"[Hypertuner] Optuna multi-obj failed: {e}", exc_info=True) return None, 0.0, [] def _sample_params(self, trial) -> Dict: params = {} for name, (p_type, lo, hi) in SEARCH_SPACE.items(): if p_type == "float": params[name] = trial.suggest_float(name, lo, hi) elif p_type == "int": params[name] = trial.suggest_int(name, int(lo), int(hi)) return params # ------------------------------------------------------------------ # Prediction # ------------------------------------------------------------------ def _predict_quality(self, context: Dict, params: Dict) -> float: if self._model is None: return 0.0 combined = { **context, **params, "rolling_avg_quality_5": context.get("rolling_avg_quality_5", 50.0), "quality_delta_10": context.get("quality_delta_10", 0.0), } row = [] for col in ALL_FEATURE_COLS: try: row.append(float(combined.get(col, 0) or 0)) except (TypeError, ValueError): row.append(0.0) is_peak = int(context.get("is_peak", 0)) model = (self._peak_model if is_peak else self._offpeak_model) or self._model pred = float(model.predict(np.array([row], dtype=np.float32))[0]) return max(0.0, min(pred, 100.0)) # ------------------------------------------------------------------ # Feature Importance # ------------------------------------------------------------------ def _compute_feature_importance(self) -> None: if self._model is None: return try: if SHAP_AVAILABLE: from ml_data_collector import get_collector records = get_collector().get_training_data(min_records=1) or [] records = self._add_lag_features(records[-200:]) X, _ = self._prepare_data(records, ALL_FEATURE_COLS) if X is not None and len(X) > 0: explainer = shap.TreeExplainer(self._model) shap_values = np.abs(explainer.shap_values(X)).mean(axis=0) total = max(shap_values.sum(), 1e-9) self._feature_importance = dict(sorted( {ALL_FEATURE_COLS[i]: round(float(shap_values[i] / total) * 100, 2) for i in range(len(ALL_FEATURE_COLS))}.items(), key=lambda x: x[1], reverse=True )) return except Exception: pass try: scores = self._model.get_booster().get_fscore() total = max(sum(scores.values()), 1) self._feature_importance = dict(sorted( {ALL_FEATURE_COLS[int(k[1:])]: round(v / total * 100, 2) for k, v in scores.items() if k.startswith("f") and k[1:].isdigit() and int(k[1:]) < len(ALL_FEATURE_COLS) }.items(), key=lambda x: x[1], reverse=True )) except Exception as e: logger.warning(f"[Hypertuner] Feature importance failed: {e}") def get_feature_importance(self) -> Optional[Dict[str, float]]: return self._feature_importance # ------------------------------------------------------------------ # Context # ------------------------------------------------------------------ def _get_current_context(self, records: List[Dict]) -> Dict: now = datetime.utcnow() recent = records[-20:] avg_orders = sum(r.get("num_orders", 0) for r in recent) / max(len(recent), 1) avg_riders = sum(r.get("num_riders", 0) for r in recent) / max(len(recent), 1) recent_scores = [float(r.get("quality_score", 0)) for r in recent] rolling_avg5 = sum(recent_scores[-5:]) / max(len(recent_scores[-5:]), 1) delta10 = (recent_scores[-1] - recent_scores[-11]) if len(recent_scores) >= 11 else 0.0 return { "hour": now.hour, "day_of_week": now.weekday(), "is_peak": int(now.hour in (7, 8, 9, 12, 13, 18, 19, 20)), "num_orders": round(avg_orders), "num_riders": round(avg_riders), "rolling_avg_quality_5": round(rolling_avg5, 2), "quality_delta_10": round(delta10, 2), } def _compute_baseline_stats(self, records: List[Dict]) -> Dict: scores = [float(r.get("quality_score", 0)) for r in records if r.get("quality_score")] if not scores: return {"avg_quality": 0.0, "best_quality": 0.0, "worst_quality": 0.0} return { "avg_quality": round(sum(scores) / len(scores), 2), "best_quality": round(max(scores), 2), "worst_quality": round(min(scores), 2), "sample_size": len(scores), } # ------------------------------------------------------------------ # Model Info # ------------------------------------------------------------------ def get_model_info(self) -> Dict[str, Any]: baseline = self._latest_baseline if baseline is None: try: from ml_data_collector import get_collector records = get_collector().get_training_data(min_records=1) if records: baseline = self._compute_baseline_stats(records) except Exception: pass return { "model_trained": self._model is not None, "trained_at": self._model_trained_at.isoformat() if self._model_trained_at else None, "training_rows": self._training_rows, "peak_model_trained": self._peak_model is not None, "offpeak_model_trained": self._offpeak_model is not None, "features": ALL_FEATURE_COLS, "validation": self._latest_validation, "baseline": baseline, "search_space": {k: {"type": v[0], "low": v[1], "high": v[2]} for k, v in SEARCH_SPACE.items()}, "feature_importance": self._feature_importance, "top_trials": self._top_trials[:10], "pareto_frontier_size": len(self._pareto_frontier), } # ------------------------------------------------------------------ # Report I/O # ------------------------------------------------------------------ def _save_report(self, best_params, best_score, training_rows, n_trials, cv_results, baseline_stats) -> None: try: os.makedirs(_REPORT_DIR, exist_ok=True) report = { "timestamp": datetime.utcnow().isoformat(), "training_rows": training_rows, "n_trials": n_trials, "best_predicted_quality": round(best_score, 2), "best_params": best_params, "validation": cv_results or {}, "baseline_stats": baseline_stats or {}, "feature_importance": self._feature_importance or {}, "top_trials": self._top_trials[:10], "pareto_frontier": self._pareto_frontier[:20], } path = os.path.join(_REPORT_DIR, f"tuning_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json") with open(path, "w") as f: json.dump(report, f, indent=2) logger.info(f"[Hypertuner] Report -> {path}") except Exception as e: logger.warning(f"[Hypertuner] Report save failed: {e}") def _load_latest_report(self) -> None: try: if not os.path.isdir(_REPORT_DIR): return files = sorted([f for f in os.listdir(_REPORT_DIR) if f.endswith(".json")], reverse=True) if not files: return with open(os.path.join(_REPORT_DIR, files[0])) as f: report = json.load(f) self._latest_validation = report.get("validation") self._latest_baseline = report.get("baseline_stats") self._training_rows = report.get("training_rows", 0) self._feature_importance = report.get("feature_importance") self._top_trials = report.get("top_trials", []) self._pareto_frontier = report.get("pareto_frontier", []) ts = report.get("timestamp") if ts: self._model_trained_at = datetime.fromisoformat(ts) logger.info(f"[Hypertuner] Restored state from {files[0]}") except Exception as e: logger.warning(f"[Hypertuner] Load latest report failed: {e}") # --------------------------------------------------------------------------- # Module-level singleton # --------------------------------------------------------------------------- _tuner: Optional[MLHypertuner] = None def get_hypertuner() -> MLHypertuner: global _tuner if _tuner is None: _tuner = MLHypertuner() return _tuner