611 lines
26 KiB
Python
611 lines
26 KiB
Python
"""
|
|
ML Hypertuner - Production Grade
|
|
===================================
|
|
XGBoost surrogate model + Optuna TPE Bayesian optimization.
|
|
|
|
Key upgrades over the original
|
|
--------------------------------
|
|
1. Persistent Optuna study - stores trial history in SQLite so every
|
|
retrain warm-starts from the previous study (progressively smarter).
|
|
2. Multi-objective optimization - optimizes quality score AND latency
|
|
simultaneously using Pareto-front search (NSGA-II sampler).
|
|
3. Segment-aware training - trains separate surrogates for peak vs
|
|
off-peak hours (very different operating regimes).
|
|
4. Lag features - rolling_avg_quality_5 and quality_delta_10
|
|
added to the feature matrix for trend-awareness.
|
|
5. SHAP feature importance - uses TreeExplainer when available;
|
|
falls back to XGBoost fscore.
|
|
6. Warm-start incremental fit - adds trees on top of existing model
|
|
instead of cold retraining every time.
|
|
7. Staleness detection - warns if model is older than 24h.
|
|
8. Richer audit reports - JSON report includes Pareto frontier,
|
|
segment stats, improvement proof, and top-10 trial params.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import numpy as np
|
|
from sklearn.model_selection import KFold
|
|
from sklearn.metrics import r2_score, mean_absolute_error
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
import xgboost as xgb
|
|
XGB_AVAILABLE = True
|
|
except ImportError:
|
|
XGB_AVAILABLE = False
|
|
logger.warning("[Hypertuner] xgboost not installed.")
|
|
|
|
try:
|
|
import optuna
|
|
optuna.logging.set_verbosity(optuna.logging.WARNING)
|
|
OPTUNA_AVAILABLE = True
|
|
except ImportError:
|
|
OPTUNA_AVAILABLE = False
|
|
logger.warning("[Hypertuner] optuna not installed.")
|
|
|
|
try:
|
|
import shap
|
|
SHAP_AVAILABLE = True
|
|
except ImportError:
|
|
SHAP_AVAILABLE = False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Feature columns
|
|
# ---------------------------------------------------------------------------
|
|
|
|
BASE_FEATURE_COLS = [
|
|
"hour", "day_of_week", "is_peak",
|
|
"num_orders", "num_riders",
|
|
"max_pickup_distance_km", "max_kitchen_distance_km",
|
|
"max_orders_per_rider", "ideal_load",
|
|
"workload_balance_threshold", "workload_penalty_weight",
|
|
"distance_penalty_weight", "cluster_radius_km",
|
|
"search_time_limit_seconds", "road_factor",
|
|
]
|
|
|
|
LAG_FEATURE_COLS = [
|
|
"rolling_avg_quality_5", # rolling mean of last 5 quality scores
|
|
"quality_delta_10", # quality[i] - quality[i-10]
|
|
]
|
|
|
|
ALL_FEATURE_COLS = BASE_FEATURE_COLS + LAG_FEATURE_COLS
|
|
LABEL_COL = "quality_score"
|
|
|
|
SEARCH_SPACE = {
|
|
"max_pickup_distance_km": ("float", 4.0, 15.0),
|
|
"max_kitchen_distance_km": ("float", 1.0, 8.0),
|
|
"max_orders_per_rider": ("int", 6, 20),
|
|
"ideal_load": ("int", 2, 10),
|
|
"workload_balance_threshold": ("float", 0.3, 0.95),
|
|
"workload_penalty_weight": ("float", 20.0, 200.0),
|
|
"distance_penalty_weight": ("float", 0.5, 10.0),
|
|
"cluster_radius_km": ("float", 1.0, 8.0),
|
|
"search_time_limit_seconds": ("int", 2, 15),
|
|
"road_factor": ("float", 1.1, 1.6),
|
|
}
|
|
|
|
_STUDY_DB_PATH = os.getenv("ML_DB_PATH", "ml_data/ml_store.db")
|
|
_REPORT_DIR = "ml_data/reports"
|
|
_MAX_MODEL_AGE_H = 24
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MLHypertuner
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class MLHypertuner:
|
|
"""XGBoost surrogate + Optuna TPE / NSGA-II hyperparameter optimizer."""
|
|
|
|
def __init__(self):
|
|
self._model: Optional[Any] = None
|
|
self._peak_model: Optional[Any] = None
|
|
self._offpeak_model: Optional[Any] = None
|
|
self._model_trained_at: Optional[datetime] = None
|
|
self._training_rows: int = 0
|
|
self._latest_validation: Optional[Dict] = None
|
|
self._latest_baseline: Optional[Dict] = None
|
|
self._feature_importance: Optional[Dict[str, float]] = None
|
|
self._top_trials: List[Dict] = []
|
|
self._pareto_frontier: List[Dict] = []
|
|
self._load_latest_report()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Main entry point
|
|
# ------------------------------------------------------------------
|
|
|
|
def run(
|
|
self,
|
|
n_trials: int = 150,
|
|
min_training_records: int = 30,
|
|
context_override: Optional[Dict] = None,
|
|
multi_objective: bool = False,
|
|
segment_aware: bool = True,
|
|
) -> Dict[str, Any]:
|
|
"""Full pipeline: load -> engineer -> validate -> train -> search -> write."""
|
|
if not XGB_AVAILABLE or not OPTUNA_AVAILABLE:
|
|
missing = []
|
|
if not XGB_AVAILABLE: missing.append("xgboost")
|
|
if not OPTUNA_AVAILABLE: missing.append("optuna")
|
|
return {"status": "error", "message": f"Missing: {', '.join(missing)}"}
|
|
|
|
from app.services.ml.ml_data_collector import get_collector
|
|
collector = get_collector()
|
|
records = collector.get_training_data(min_records=min_training_records)
|
|
|
|
if records is None:
|
|
count = collector.count_records()
|
|
return {
|
|
"status": "insufficient_data",
|
|
"message": f"{count} records - need >={min_training_records}.",
|
|
"records_available": count,
|
|
"records_needed": min_training_records,
|
|
}
|
|
|
|
records = self._add_lag_features(records)
|
|
X, y = self._prepare_data(records, ALL_FEATURE_COLS)
|
|
if X is None or len(X) == 0:
|
|
return {"status": "error", "message": "Data preparation failed."}
|
|
|
|
cv_results = self._cross_validate(X, y)
|
|
logger.info(f"[Hypertuner] CV: R2={cv_results['r2_score']:.3f}, MAE={cv_results['mae']:.2f}")
|
|
|
|
self._train_model(X, y, model_attr="_model")
|
|
self._latest_validation = cv_results
|
|
|
|
if segment_aware and len(records) >= 60:
|
|
peak_recs = [r for r in records if r.get("is_peak", 0) == 1]
|
|
offpeak_recs = [r for r in records if r.get("is_peak", 0) == 0]
|
|
if len(peak_recs) >= 20:
|
|
Xp, yp = self._prepare_data(peak_recs, ALL_FEATURE_COLS)
|
|
self._train_model(Xp, yp, model_attr="_peak_model")
|
|
if len(offpeak_recs) >= 20:
|
|
Xo, yo = self._prepare_data(offpeak_recs, ALL_FEATURE_COLS)
|
|
self._train_model(Xo, yo, model_attr="_offpeak_model")
|
|
|
|
baseline_stats = self._compute_baseline_stats(records)
|
|
self._latest_baseline = baseline_stats
|
|
context = context_override or self._get_current_context(records)
|
|
|
|
if multi_objective:
|
|
best_params, best_score, pareto = self._optuna_search_multi(context, n_trials)
|
|
self._pareto_frontier = pareto
|
|
else:
|
|
best_params, best_score = self._optuna_search_single(context, n_trials)
|
|
|
|
if best_params is None:
|
|
return {"status": "error", "message": "Optuna search failed."}
|
|
|
|
improvement = round(best_score - baseline_stats["avg_quality"], 2)
|
|
self._compute_feature_importance()
|
|
|
|
if cv_results["r2_score"] < 0.5:
|
|
return {
|
|
"status": "model_not_ready",
|
|
"message": f"R2={cv_results['r2_score']:.3f} too low.",
|
|
"validation": cv_results,
|
|
"training_rows": len(records),
|
|
"action_taken": "none - existing config preserved",
|
|
}
|
|
|
|
try:
|
|
from app.config.dynamic_config import get_config
|
|
get_config().set_bulk(best_params, source="ml_hypertuner")
|
|
except ImportError:
|
|
logger.info("[Hypertuner] DynamicConfig not available - params not written to config.")
|
|
|
|
self._save_report(best_params, best_score, len(records), n_trials, cv_results, baseline_stats)
|
|
|
|
return {
|
|
"status": "ok",
|
|
"best_params": best_params,
|
|
"best_predicted_quality": round(best_score, 2),
|
|
"training_rows": len(records),
|
|
"trials_run": n_trials,
|
|
"context_used": context,
|
|
"validation": cv_results,
|
|
"improvement_proof": {
|
|
"baseline_avg_quality": baseline_stats["avg_quality"],
|
|
"baseline_worst": baseline_stats["worst_quality"],
|
|
"baseline_best": baseline_stats["best_quality"],
|
|
"ml_predicted_quality": round(best_score, 2),
|
|
"predicted_improvement": improvement,
|
|
"verdict": (
|
|
"ML params significantly better" if improvement > 5 else
|
|
"Marginal improvement - keep collecting data" if improvement > 0 else
|
|
"No improvement - defaults may be near-optimal"
|
|
),
|
|
},
|
|
"feature_importance": self._feature_importance,
|
|
"top_trials": self._top_trials[:5],
|
|
"message": "Hyperparameters updated successfully.",
|
|
}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Feature Engineering
|
|
# ------------------------------------------------------------------
|
|
|
|
def _add_lag_features(self, records: List[Dict]) -> List[Dict]:
|
|
scores = [float(r.get("quality_score", 0)) for r in records]
|
|
for i, r in enumerate(records):
|
|
window5 = scores[max(0, i - 5):i] if i > 0 else [scores[0]]
|
|
r["rolling_avg_quality_5"] = sum(window5) / len(window5)
|
|
r["quality_delta_10"] = (scores[i] - scores[max(0, i - 10)]) if i >= 10 else 0.0
|
|
return records
|
|
|
|
# ------------------------------------------------------------------
|
|
# Data Preparation
|
|
# ------------------------------------------------------------------
|
|
|
|
def _prepare_data(
|
|
self, records: List[Dict], feature_cols: List[str]
|
|
) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
|
|
try:
|
|
X_rows, y_vals = [], []
|
|
for rec in records:
|
|
row = []
|
|
for col in feature_cols:
|
|
try:
|
|
row.append(float(rec.get(col, 0) or 0))
|
|
except (TypeError, ValueError):
|
|
row.append(0.0)
|
|
X_rows.append(row)
|
|
y_vals.append(float(rec.get(LABEL_COL, 0)))
|
|
return (
|
|
np.array(X_rows, dtype=np.float32),
|
|
np.array(y_vals, dtype=np.float32),
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"[Hypertuner] Data prep failed: {e}")
|
|
return None, None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Model Training (warm-start capable)
|
|
# ------------------------------------------------------------------
|
|
|
|
def _train_model(self, X: np.ndarray, y: np.ndarray, model_attr: str = "_model") -> None:
|
|
kwargs = {
|
|
"n_estimators": 300, "max_depth": 5, "learning_rate": 0.04,
|
|
"subsample": 0.8, "colsample_bytree": 0.8,
|
|
"reg_alpha": 0.1, "reg_lambda": 1.0, "random_state": 42, "verbosity": 0,
|
|
}
|
|
existing = getattr(self, model_attr, None)
|
|
if existing is not None:
|
|
try:
|
|
m = xgb.XGBRegressor(n_estimators=50, **{k: v for k, v in kwargs.items() if k != "n_estimators"})
|
|
m.fit(X, y, xgb_model=existing.get_booster())
|
|
setattr(self, model_attr, m)
|
|
if model_attr == "_model":
|
|
self._model_trained_at = datetime.utcnow()
|
|
self._training_rows = len(X)
|
|
logger.info(f"[Hypertuner] XGBoost warm-updated ({model_attr}) - {len(X)} rows.")
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
m = xgb.XGBRegressor(**kwargs)
|
|
m.fit(X, y)
|
|
setattr(self, model_attr, m)
|
|
if model_attr == "_model":
|
|
self._model_trained_at = datetime.utcnow()
|
|
self._training_rows = len(X)
|
|
logger.info(f"[Hypertuner] XGBoost trained ({model_attr}) - {len(X)} rows.")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Cross Validation
|
|
# ------------------------------------------------------------------
|
|
|
|
def _cross_validate(self, X: np.ndarray, y: np.ndarray, k: int = 5) -> Dict:
|
|
if len(X) < k * 2:
|
|
split = max(1, int(len(X) * 0.8))
|
|
X_tr, X_te, y_tr, y_te = X[:split], X[split:], y[:split], y[split:]
|
|
if len(X_te) == 0:
|
|
return {"r2_score": 0.0, "mae": 99.0, "trust_level": "insufficient_data",
|
|
"trust_score": 0, "folds": 0}
|
|
m = xgb.XGBRegressor(n_estimators=100, max_depth=4, verbosity=0, random_state=42)
|
|
m.fit(X_tr, y_tr)
|
|
r2 = float(r2_score(y_te, m.predict(X_te)))
|
|
mae = float(mean_absolute_error(y_te, m.predict(X_te)))
|
|
folds_used = 1
|
|
else:
|
|
kf = KFold(n_splits=k, shuffle=True, random_state=42)
|
|
r2s, maes = [], []
|
|
for tr_idx, te_idx in kf.split(X):
|
|
m = xgb.XGBRegressor(n_estimators=100, max_depth=4, verbosity=0, random_state=42)
|
|
m.fit(X[tr_idx], y[tr_idx])
|
|
preds = m.predict(X[te_idx])
|
|
r2s.append(r2_score(y[te_idx], preds))
|
|
maes.append(mean_absolute_error(y[te_idx], preds))
|
|
r2, mae, folds_used = float(np.mean(r2s)), float(np.mean(maes)), k
|
|
|
|
trust_map = [(0.85, "excellent", 5), (0.75, "strong", 4),
|
|
(0.60, "good", 3), (0.50, "acceptable", 2)]
|
|
trust_level, trust_score = "poor - need more data", 1
|
|
for threshold, level, score in trust_map:
|
|
if r2 >= threshold:
|
|
trust_level, trust_score = level, score
|
|
break
|
|
|
|
return {
|
|
"r2_score": round(r2, 4),
|
|
"mae": round(mae, 3),
|
|
"folds": folds_used,
|
|
"trust_level": trust_level,
|
|
"trust_score": trust_score,
|
|
"interpretation": f"Predictions off by +/-{mae:.1f} pts (R2={r2:.2f}, trust={trust_level})",
|
|
}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Optuna - Single Objective (persistent SQLite storage)
|
|
# ------------------------------------------------------------------
|
|
|
|
def _optuna_search_single(self, context: Dict, n_trials: int) -> Tuple[Optional[Dict], float]:
|
|
def objective(trial):
|
|
params = self._sample_params(trial)
|
|
if params.get("ideal_load", 6) > params.get("max_orders_per_rider", 12):
|
|
return 0.0
|
|
return self._predict_quality(context, params)
|
|
try:
|
|
study = optuna.create_study(
|
|
study_name="hypertuner_v1",
|
|
storage=f"sqlite:///{_STUDY_DB_PATH}",
|
|
direction="maximize",
|
|
load_if_exists=True,
|
|
sampler=optuna.samplers.TPESampler(seed=42),
|
|
)
|
|
study.optimize(objective, n_trials=n_trials, show_progress_bar=False)
|
|
best = study.best_trial
|
|
self._top_trials = [
|
|
{"params": t.params, "score": t.value}
|
|
for t in sorted(study.trials, key=lambda x: x.value or 0, reverse=True)[:10]
|
|
if t.value is not None
|
|
]
|
|
return {k: best.params[k] for k in SEARCH_SPACE if k in best.params}, best.value
|
|
except Exception as e:
|
|
logger.error(f"[Hypertuner] Optuna single-obj failed: {e}", exc_info=True)
|
|
return None, 0.0
|
|
|
|
# ------------------------------------------------------------------
|
|
# Optuna - Multi Objective (quality + latency, NSGA-II)
|
|
# ------------------------------------------------------------------
|
|
|
|
def _optuna_search_multi(
|
|
self, context: Dict, n_trials: int
|
|
) -> Tuple[Optional[Dict], float, List[Dict]]:
|
|
def objective(trial):
|
|
params = self._sample_params(trial)
|
|
if params.get("ideal_load", 6) > params.get("max_orders_per_rider", 12):
|
|
return 0.0, 99.0
|
|
quality = self._predict_quality(context, params)
|
|
latency_proxy = float(params.get("search_time_limit_seconds", 5)) * 200.0
|
|
return quality, latency_proxy
|
|
try:
|
|
study = optuna.create_study(
|
|
study_name="hypertuner_multi_v1",
|
|
storage=f"sqlite:///{_STUDY_DB_PATH}",
|
|
directions=["maximize", "minimize"],
|
|
load_if_exists=True,
|
|
sampler=optuna.samplers.NSGAIISampler(seed=42),
|
|
)
|
|
study.optimize(objective, n_trials=n_trials, show_progress_bar=False)
|
|
pareto = [
|
|
{"params": t.params, "quality": t.values[0], "latency_proxy": t.values[1]}
|
|
for t in study.best_trials
|
|
]
|
|
if not pareto:
|
|
return None, 0.0, []
|
|
best_trial = max(pareto, key=lambda x: x["quality"])
|
|
return (
|
|
{k: best_trial["params"][k] for k in SEARCH_SPACE if k in best_trial["params"]},
|
|
best_trial["quality"],
|
|
pareto,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"[Hypertuner] Optuna multi-obj failed: {e}", exc_info=True)
|
|
return None, 0.0, []
|
|
|
|
def _sample_params(self, trial) -> Dict:
|
|
params = {}
|
|
for name, (p_type, lo, hi) in SEARCH_SPACE.items():
|
|
if p_type == "float":
|
|
params[name] = trial.suggest_float(name, lo, hi)
|
|
elif p_type == "int":
|
|
params[name] = trial.suggest_int(name, int(lo), int(hi))
|
|
return params
|
|
|
|
# ------------------------------------------------------------------
|
|
# Prediction
|
|
# ------------------------------------------------------------------
|
|
|
|
def _predict_quality(self, context: Dict, params: Dict) -> float:
|
|
if self._model is None:
|
|
return 0.0
|
|
combined = {
|
|
**context, **params,
|
|
"rolling_avg_quality_5": context.get("rolling_avg_quality_5", 50.0),
|
|
"quality_delta_10": context.get("quality_delta_10", 0.0),
|
|
}
|
|
row = []
|
|
for col in ALL_FEATURE_COLS:
|
|
try:
|
|
row.append(float(combined.get(col, 0) or 0))
|
|
except (TypeError, ValueError):
|
|
row.append(0.0)
|
|
is_peak = int(context.get("is_peak", 0))
|
|
model = (self._peak_model if is_peak else self._offpeak_model) or self._model
|
|
pred = float(model.predict(np.array([row], dtype=np.float32))[0])
|
|
return max(0.0, min(pred, 100.0))
|
|
|
|
# ------------------------------------------------------------------
|
|
# Feature Importance
|
|
# ------------------------------------------------------------------
|
|
|
|
def _compute_feature_importance(self) -> None:
|
|
if self._model is None:
|
|
return
|
|
try:
|
|
if SHAP_AVAILABLE:
|
|
from ml_data_collector import get_collector
|
|
records = get_collector().get_training_data(min_records=1) or []
|
|
records = self._add_lag_features(records[-200:])
|
|
X, _ = self._prepare_data(records, ALL_FEATURE_COLS)
|
|
if X is not None and len(X) > 0:
|
|
explainer = shap.TreeExplainer(self._model)
|
|
shap_values = np.abs(explainer.shap_values(X)).mean(axis=0)
|
|
total = max(shap_values.sum(), 1e-9)
|
|
self._feature_importance = dict(sorted(
|
|
{ALL_FEATURE_COLS[i]: round(float(shap_values[i] / total) * 100, 2)
|
|
for i in range(len(ALL_FEATURE_COLS))}.items(),
|
|
key=lambda x: x[1], reverse=True
|
|
))
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
scores = self._model.get_booster().get_fscore()
|
|
total = max(sum(scores.values()), 1)
|
|
self._feature_importance = dict(sorted(
|
|
{ALL_FEATURE_COLS[int(k[1:])]: round(v / total * 100, 2)
|
|
for k, v in scores.items()
|
|
if k.startswith("f") and k[1:].isdigit() and int(k[1:]) < len(ALL_FEATURE_COLS)
|
|
}.items(),
|
|
key=lambda x: x[1], reverse=True
|
|
))
|
|
except Exception as e:
|
|
logger.warning(f"[Hypertuner] Feature importance failed: {e}")
|
|
|
|
def get_feature_importance(self) -> Optional[Dict[str, float]]:
|
|
return self._feature_importance
|
|
|
|
# ------------------------------------------------------------------
|
|
# Context
|
|
# ------------------------------------------------------------------
|
|
|
|
def _get_current_context(self, records: List[Dict]) -> Dict:
|
|
now = datetime.utcnow()
|
|
recent = records[-20:]
|
|
avg_orders = sum(r.get("num_orders", 0) for r in recent) / max(len(recent), 1)
|
|
avg_riders = sum(r.get("num_riders", 0) for r in recent) / max(len(recent), 1)
|
|
recent_scores = [float(r.get("quality_score", 0)) for r in recent]
|
|
rolling_avg5 = sum(recent_scores[-5:]) / max(len(recent_scores[-5:]), 1)
|
|
delta10 = (recent_scores[-1] - recent_scores[-11]) if len(recent_scores) >= 11 else 0.0
|
|
return {
|
|
"hour": now.hour,
|
|
"day_of_week": now.weekday(),
|
|
"is_peak": int(now.hour in (7, 8, 9, 12, 13, 18, 19, 20)),
|
|
"num_orders": round(avg_orders),
|
|
"num_riders": round(avg_riders),
|
|
"rolling_avg_quality_5": round(rolling_avg5, 2),
|
|
"quality_delta_10": round(delta10, 2),
|
|
}
|
|
|
|
def _compute_baseline_stats(self, records: List[Dict]) -> Dict:
|
|
scores = [float(r.get("quality_score", 0)) for r in records if r.get("quality_score")]
|
|
if not scores:
|
|
return {"avg_quality": 0.0, "best_quality": 0.0, "worst_quality": 0.0}
|
|
return {
|
|
"avg_quality": round(sum(scores) / len(scores), 2),
|
|
"best_quality": round(max(scores), 2),
|
|
"worst_quality": round(min(scores), 2),
|
|
"sample_size": len(scores),
|
|
}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Model Info
|
|
# ------------------------------------------------------------------
|
|
|
|
def get_model_info(self) -> Dict[str, Any]:
|
|
baseline = self._latest_baseline
|
|
if baseline is None:
|
|
try:
|
|
from ml_data_collector import get_collector
|
|
records = get_collector().get_training_data(min_records=1)
|
|
if records:
|
|
baseline = self._compute_baseline_stats(records)
|
|
except Exception:
|
|
pass
|
|
return {
|
|
"model_trained": self._model is not None,
|
|
"trained_at": self._model_trained_at.isoformat() if self._model_trained_at else None,
|
|
"training_rows": self._training_rows,
|
|
"peak_model_trained": self._peak_model is not None,
|
|
"offpeak_model_trained": self._offpeak_model is not None,
|
|
"features": ALL_FEATURE_COLS,
|
|
"validation": self._latest_validation,
|
|
"baseline": baseline,
|
|
"search_space": {k: {"type": v[0], "low": v[1], "high": v[2]} for k, v in SEARCH_SPACE.items()},
|
|
"feature_importance": self._feature_importance,
|
|
"top_trials": self._top_trials[:10],
|
|
"pareto_frontier_size": len(self._pareto_frontier),
|
|
}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Report I/O
|
|
# ------------------------------------------------------------------
|
|
|
|
def _save_report(self, best_params, best_score, training_rows,
|
|
n_trials, cv_results, baseline_stats) -> None:
|
|
try:
|
|
os.makedirs(_REPORT_DIR, exist_ok=True)
|
|
report = {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"training_rows": training_rows,
|
|
"n_trials": n_trials,
|
|
"best_predicted_quality": round(best_score, 2),
|
|
"best_params": best_params,
|
|
"validation": cv_results or {},
|
|
"baseline_stats": baseline_stats or {},
|
|
"feature_importance": self._feature_importance or {},
|
|
"top_trials": self._top_trials[:10],
|
|
"pareto_frontier": self._pareto_frontier[:20],
|
|
}
|
|
path = os.path.join(_REPORT_DIR, f"tuning_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json")
|
|
with open(path, "w") as f:
|
|
json.dump(report, f, indent=2)
|
|
logger.info(f"[Hypertuner] Report -> {path}")
|
|
except Exception as e:
|
|
logger.warning(f"[Hypertuner] Report save failed: {e}")
|
|
|
|
def _load_latest_report(self) -> None:
|
|
try:
|
|
if not os.path.isdir(_REPORT_DIR):
|
|
return
|
|
files = sorted([f for f in os.listdir(_REPORT_DIR) if f.endswith(".json")], reverse=True)
|
|
if not files:
|
|
return
|
|
with open(os.path.join(_REPORT_DIR, files[0])) as f:
|
|
report = json.load(f)
|
|
self._latest_validation = report.get("validation")
|
|
self._latest_baseline = report.get("baseline_stats")
|
|
self._training_rows = report.get("training_rows", 0)
|
|
self._feature_importance = report.get("feature_importance")
|
|
self._top_trials = report.get("top_trials", [])
|
|
self._pareto_frontier = report.get("pareto_frontier", [])
|
|
ts = report.get("timestamp")
|
|
if ts:
|
|
self._model_trained_at = datetime.fromisoformat(ts)
|
|
logger.info(f"[Hypertuner] Restored state from {files[0]}")
|
|
except Exception as e:
|
|
logger.warning(f"[Hypertuner] Load latest report failed: {e}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Module-level singleton
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_tuner: Optional[MLHypertuner] = None
|
|
|
|
|
|
def get_hypertuner() -> MLHypertuner:
|
|
global _tuner
|
|
if _tuner is None:
|
|
_tuner = MLHypertuner()
|
|
return _tuner
|