"""Provider payload optimization endpoints.""" import logging import time from fastapi import APIRouter, Request, Depends, status, HTTPException, Query from app.controllers.route_controller import RouteController from app.core.exceptions import APIException from app.core.arrow_utils import save_optimized_route_parquet import os logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/v1/optimization", tags=["Route Optimization"], responses={ 400: {"description": "Bad request - Invalid input parameters"}, 422: {"description": "Validation error - Request validation failed"}, 500: {"description": "Internal server error"} } ) def get_route_controller() -> RouteController: """Dependency injection for route controller.""" return RouteController() # Legacy single-route endpoint removed; provider flow only. @router.post( "/createdeliveries", status_code=status.HTTP_200_OK, summary="Optimize provider payload (forwarding paused)", description=""" Accepts the provider's orders array, reorders it using greedy nearest-neighbor, adds only: - step (1..N) - previouskms (distance from previous stop in km) - cumulativekms (total distance so far in km) - actualkms (direct pickup-to-delivery distance) Forwarding is temporarily paused: returns the optimized array in the response. """, responses={ 200: { "description": "Upstream response", "content": { "application/json": { "example": {"code": 200, "details": [], "message": "Success", "status": True} } } } } ) async def provider_optimize_forward( body: list[dict], controller: RouteController = Depends(get_route_controller) ): """ Accept provider JSON array, reorder by greedy nearest-neighbor, annotate each item with: - step (1..N) - previouskms (km from previous point) - cumulativekms (km so far) - actualkms (pickup to delivery distance) Then forward the optimized array to the external API and return only its response. """ try: url = "https://jupiter.nearle.app/live/api/v1/deliveries/createdeliveries" result = await controller.optimize_and_forward_provider_payload(body, url) # Performance Logging: Save a Parquet Snapshot (Async-friendly backup) try: os.makedirs("data/snapshots", exist_ok=True) snapshot_path = f"data/snapshots/route_{int(time.time())}.parquet" save_optimized_route_parquet(body, snapshot_path) logger.info(f"Apache Arrow: Snapshot saved to {snapshot_path}") except Exception as e: logger.warning(f"Could not save Arrow snapshot: {e}") return result except APIException: raise except Exception as e: logger.error(f"Unexpected error in provider_optimize_forward: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error") @router.get( "/createdeliveries", summary="Usage info for provider optimize forward" ) async def provider_optimize_forward_info(): """Return usage info; this endpoint accepts POST only for processing.""" return { "message": "Use POST with a JSON array of orders to optimize and forward.", "method": "POST", "path": "/api/v1/optimization/provider-optimize-forward" } @router.post( "/riderassign", status_code=status.HTTP_200_OK, summary="Assign created orders to active riders", description=""" Assigns orders to riders based on kitchen preferences, proximity, and load. - If a payload of orders is provided, processes those. - If payload is empty, fetches all 'created' orders from the external API. - Fetches active riders and matches them. """, responses={ 200: { "description": "Assignment Result", "content": { "application/json": { "example": {"code": 200, "details": {"1234": [{"orderid": "..."}]}, "message": "Success", "status": True} } } } } ) async def assign_orders_to_riders( request: Request, body: list[dict] = None, resuffle: bool = Query(False), reshuffle: bool = Query(False), rehuffle: bool = Query(False), hypertuning_params: str = None ): """ Smart assignment of orders to riders. """ from app.services.rider.get_active_riders import fetch_active_riders, fetch_created_orders, fetch_rider_pricing from app.services.core.assignment_service import AssignmentService from app.services.routing.route_optimizer import RouteOptimizer from app.services.routing.realistic_eta_calculator import RealisticETACalculator from datetime import datetime, timedelta from dateutil.parser import parse as parse_date import asyncio eta_calculator = RealisticETACalculator() try: # Check if any variant is present in query params (flag-style) or explicitly true q_params = request.query_params do_reshuffle = any(k in q_params for k in ["reshuffle", "resuffle", "rehuffle"]) or \ resuffle or reshuffle or rehuffle # 1. Fetch Riders and Pricing riders_task = fetch_active_riders() pricing_task = fetch_rider_pricing() riders, pricing = await asyncio.gather(riders_task, pricing_task) # Determine pricing (Default: 30 base + 2.5/km) fuel_charge = 2.5 base_pay = 30.0 if pricing: shift_1 = next((p for p in pricing if p.get("shiftid") == 1), None) if shift_1: fuel_charge = float(shift_1.get("fuelcharge", 2.5)) base_pay = float(shift_1.get("basepay") or shift_1.get("base_pay") or 30.0) # 2. Determine Orders Source orders = body if not orders: logger.info("No payload provided, fetching created orders from external API.") orders = await fetch_created_orders() else: logger.info(f"Processing {len(orders)} orders from payload.") if not orders: return { "code": 200, "details": {}, "message": "No orders found to assign.", "status": True, "meta": { "active_riders_count": len(riders) } } # 3. Run Assignment (AssignmentService) # -- Per-request strategy override -- from app.config.dynamic_config import get_config _cfg = get_config() _original_strategy = None valid_strategies = ["balanced", "fuel_saver", "aggressive_speed", "zone_strict"] if hypertuning_params and hypertuning_params in valid_strategies: _original_strategy = _cfg.get("ml_strategy", "balanced") _cfg._cache["ml_strategy"] = hypertuning_params logger.info(f"[HYPERTUNE] Per-request strategy override: {hypertuning_params}") service = AssignmentService() assignments, unassigned_orders = await service.assign_orders( riders=riders, orders=orders, fuel_charge=fuel_charge, base_pay=base_pay, reshuffle=do_reshuffle ) # Restore original strategy after this call if _original_strategy is not None: _cfg._cache["ml_strategy"] = _original_strategy if do_reshuffle: logger.info("[RESHUFFLE] Retry mode active - exploring alternative rider assignments.") # 4. Optimize Routes for Each Rider and Flatten Response optimizer = RouteOptimizer() flat_orders_list = [] # Prepare tasks for parallel execution # We need to store context (rider_id) to map results back optimization_tasks = [] task_contexts = [] for rider_id, rider_orders in assignments.items(): if not rider_orders: continue # Align with createdeliveries model: Always optimize from the Pickup/Kitchen location. # This prevents route reversal if the rider is on the "far" side of the deliveries. # The rider's current location (rlat/rlon) is ignored for sequence optimization # to ensure the logical flow (Kitchen -> Stop 1 -> Stop 2 -> Stop 3) is followed. start_coords = None # Add to task list optimization_tasks.append( optimizer.optimize_provider_payload(rider_orders, start_coords=start_coords) ) task_contexts.append(rider_id) total_assigned = 0 # Execute all optimizations in parallel # This dramatically reduces time from Sum(RiderTimes) to Max(RiderTime) if optimization_tasks: results = await asyncio.gather(*optimization_tasks) # Create a lookup for rider details rider_info_map = {} for r in riders: # Use string conversion for robust ID matching r_id = str(r.get("userid") or r.get("_id", "")) if r_id: rider_info_map[r_id] = { "name": r.get("username", ""), "contactno": r.get("contactno", "") } # Process results matching them back to riders for stored_rider_id, optimized_route in zip(task_contexts, results): r_id_str = str(stored_rider_id) r_info = rider_info_map.get(r_id_str, {}) rider_name = r_info.get("name", "") rider_contact = r_info.get("contactno", "") # Calculate total distance for this rider total_rider_kms = 0 if optimized_route: # Usually the last order has the max cumulative kms if steps are 1..N try: total_rider_kms = max([float(o.get("cumulativekms", 0)) for o in optimized_route]) except: total_rider_kms = sum([float(o.get("actualkms", o.get("kms", 0))) for o in optimized_route]) for order in optimized_route: order["userid"] = stored_rider_id order["username"] = rider_name # Populate the specific fields requested by the user order["rider"] = rider_name order["ridercontactno"] = rider_contact order["riderkms"] = str(round(total_rider_kms, 2)) # --- DYNAMIC ETA COMPUTATION ----------------------------- # Try various cases and names for pickup slot pickup_slot_str = ( order.get("pickupSlot") or order.get("pickupslot") or order.get("pickup_slot") or order.get("pickuptime") ) if pickup_slot_str: # Find the actual travel distance for THIS specific order # cumulativekms represents distance from pickup to this delivery stop dist_km = float(order.get("cumulativekms") or order.get("actualkms", order.get("kms", 0))) step = int(order.get("step", 1)) order_type = order.get("ordertype", "Economy") try: # Robust date parsing (handles almost any format magically) pickup_time = parse_date(str(pickup_slot_str)) eta_mins = eta_calculator.calculate_eta( distance_km=dist_km, is_first_order=(step == 1), order_type=order_type, time_of_day="normal" ) expected_time = pickup_time + timedelta(minutes=eta_mins) # Format output as requested: "2026-03-24 08:25 AM" order["expectedDeliveryTime"] = expected_time.strftime("%Y-%m-%d %I:%M %p") order["transitMinutes"] = eta_mins order["calculationDistanceKm"] = round(dist_km, 2) except Exception as e: logger.warning(f"Could not calculate ETA from pickupSlot '{pickup_slot_str}': {e}") # --------------------------------------------------------- flat_orders_list.append(order) total_assigned += len(optimized_route) # 5. Zone Processing from app.services.routing.zone_service import ZoneService zone_service = ZoneService() zone_data = zone_service.group_by_zones(flat_orders_list, unassigned_orders, fuel_charge=fuel_charge, base_pay=base_pay) zones_structure = zone_data["detailed_zones"] zone_analysis = zone_data["zone_analysis"] return { "code": 200, "zone_summary": zone_analysis, # High-level zone metrics "zones": zones_structure, # Detailed data "details": flat_orders_list, # Flat list "message": "Success", "status": True, "meta": { "total_orders": len(orders), "utilized_riders": len([rid for rid, rl in assignments.items() if rl]), "active_riders_pool": len(riders), "assigned_orders": total_assigned, "unassigned_orders": len(unassigned_orders), "total_profit": round(sum(z["total_profit"] for z in zone_analysis), 2), "fuel_charge_base": fuel_charge, "unassigned_details": [ { "orderid": o.get("orderid") or o.get("_id"), "reason": o.get("unassigned_reason", "Unknown capacity/proximity issue") } for o in unassigned_orders ], "distribution_summary": {rid: len(rl) for rid, rl in assignments.items() if rl}, "resuffle_mode": do_reshuffle, "hypertuning_params": hypertuning_params or "default" } } except Exception as e: logger.error(f"Error in rider assignment: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error during assignment") finally: # -- Fire ML training trigger (non-blocking) ----------------------- # Runs AFTER response is ready. Every 10th call kicks off a # background thread that retrains the model. API is never blocked. try: from app.main import trigger_training_if_due trigger_training_if_due() except Exception: pass # Never crash the endpoint due to ML trigger