Files
routesapi/app/services/core/assignment_service.py

516 lines
23 KiB
Python

import logging
import random
import time
from math import radians, cos, sin, asin, sqrt
from typing import List, Dict, Any, Optional
from collections import defaultdict
from app.config.rider_preferences import RIDER_PREFERRED_KITCHENS
from app.services.routing.kalman_filter import smooth_rider_locations, smooth_order_coordinates
from app.config.dynamic_config import get_config
from app.services.ml.ml_data_collector import get_collector
logger = logging.getLogger(__name__)
class AssignmentService:
def __init__(self):
self.rider_preferences = RIDER_PREFERRED_KITCHENS
self.earth_radius_km = 6371
self._cfg = get_config()
def _load_config(self):
"""Load ML-tuned hyperparams fresh on every assignment call."""
cfg = self._cfg
self.MAX_PICKUP_DISTANCE_KM = cfg.get("max_pickup_distance_km")
self.MAX_KITCHEN_DISTANCE_KM = cfg.get("max_kitchen_distance_km")
self.MAX_ORDERS_PER_RIDER = int(cfg.get("max_orders_per_rider"))
self.IDEAL_LOAD = int(cfg.get("ideal_load"))
self.WORKLOAD_BALANCE_THRESHOLD = cfg.get("workload_balance_threshold")
self.WORKLOAD_PENALTY_WEIGHT = cfg.get("workload_penalty_weight")
self.DISTANCE_PENALTY_WEIGHT = cfg.get("distance_penalty_weight")
self.PREFERENCE_BONUS = cfg.get("preference_bonus")
self.HOME_ZONE_BONUS_4KM = cfg.get("home_zone_bonus_4km")
self.HOME_ZONE_BONUS_2KM = cfg.get("home_zone_bonus_2km")
self.EMERGENCY_LOAD_PENALTY = cfg.get("emergency_load_penalty")
def haversine(self, lat1, lon1, lat2, lon2):
"""Calculate the great circle distance between two points."""
lon1, lat1, lon2, lat2 = map(radians, [float(lon1), float(lat1), float(lon2), float(lat2)])
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(min(1.0, sqrt(a))) # Clamp to 1.0 to avoid domain errors
return c * self.earth_radius_km
def get_lat_lon(self, obj: Dict[str, Any], prefix: str = "") -> tuple[float, float]:
"""Generic helper to extract lat/lon from diversely named keys."""
# Try specific prefixes first
candidates = [
(f"{prefix}lat", f"{prefix}lon"),
(f"{prefix}lat", f"{prefix}long"),
(f"{prefix}latitude", f"{prefix}longitude"),
]
# Also try standard keys if prefix fails
candidates.extend([
("lat", "lon"), ("latitude", "longitude"),
("pickuplat", "pickuplon"), ("pickuplat", "pickuplong"),
("deliverylat", "deliverylong"), ("droplat", "droplon")
])
for lat_key, lon_key in candidates:
if lat_key in obj and lon_key in obj and obj[lat_key] and obj[lon_key]:
try:
return float(obj[lat_key]), float(obj[lon_key])
except: pass
# Special case: nested 'pickup_location'
if "pickup_location" in obj:
return self.get_lat_lon(obj["pickup_location"])
return 0.0, 0.0
def get_order_kitchen(self, order: Dict[str, Any]) -> str:
possible_keys = ['storename', 'restaurantname', 'kitchenname', 'partnername', 'store_name']
for key in possible_keys:
if key in order and order[key]:
return str(order[key]).strip()
return "Unknown"
def assign_orders(self, orders: List[Dict[str, Any]], riders: List[Dict[str, Any]], reshuffle: bool = False) -> tuple[Dict[int, List[Dict[str, Any]]], List[Dict[str, Any]]]:
"""
ENHANCED: Cluster-Based Load-Balanced Assignment.
Strategy:
1. Cluster orders by kitchen proximity
2. Calculate rider workload (current capacity usage)
3. Assign clusters to best-fit riders (proximity + workload balance)
4. Rebalance if needed
If reshuffle=True, controlled randomness is injected into rider scoring
so that retrying the same input can explore alternative assignments.
"""
from app.services.rider.rider_history_service import RiderHistoryService
from app.services.rider.rider_state_manager import RiderStateManager
from app.services.routing.clustering_service import ClusteringService
# -- Load ML-tuned hyperparameters (or defaults on first run) ------
self._load_config()
_call_start = time.time()
# 0. Prep
assignments: Dict[int, List[Dict[str, Any]]] = defaultdict(list)
unassigned_orders: List[Dict[str, Any]] = []
rider_states = {} # Track live load
# 0a. KALMAN FILTER - Smooth rider GPS locations before scoring
riders = smooth_rider_locations(list(riders))
# 0b. KALMAN FILTER - Smooth order delivery coordinates before clustering
orders = smooth_order_coordinates(list(orders))
# 1. Parse and Filter Riders
valid_riders = []
BLOCKED_RIDERS = [1242, 1266, 1245, 1232, 1240, 1007] # Test/Blocked IDs
# Load Existing State (Persistence)
state_mgr = RiderStateManager()
for r in riders:
# Robust ID Extraction
rid_raw = r.get("userid") or r.get("riderid") or r.get("id") or r.get("_id")
try:
rid = int(rid_raw)
except (ValueError, TypeError):
continue
if rid in BLOCKED_RIDERS: continue
# Robust Status Check
# Keep if: onduty (1, "1", True) OR status is active/idle/online
is_onduty = str(r.get("onduty")) in ["1", "True"] or r.get("onduty") is True
is_active = r.get("status") in ["active", "idle", "online"]
if not (is_onduty or is_active):
continue
# Location
lat, lon = self.get_lat_lon(r)
# Fetch previous state to know if they are already busy
p_state = state_mgr.get_rider_state(rid)
# If rider has valid GPS, use it. If not, fallback to Last Drop or Home.
if lat == 0 or lon == 0:
if p_state['last_drop_lat']:
lat, lon = p_state['last_drop_lat'], p_state['last_drop_lon']
else:
# Home Location Fallback
from app.config.rider_preferences import RIDER_HOME_LOCATIONS
lat, lon = RIDER_HOME_LOCATIONS.get(rid, (0.0, 0.0))
valid_riders.append({
"id": rid,
"lat": lat,
"lon": lon,
"obj": r
})
# Initialize rider state with existing workload
existing_load = p_state.get('minutes_remaining', 0) / 15 # Convert minutes to order estimate
rider_states[rid] = {
'lat': lat,
'lon': lon,
'kitchens': set(),
'count': int(existing_load), # Start with existing workload
'workload_score': existing_load # For prioritization
}
if not valid_riders:
logger.warning("No riders passed on-duty filter. Retrying with all available riders as emergency rescue...")
# If no on-duty riders, we take ANY rider provided by the API to ensure assignment
for r in riders:
rid = int(r.get("userid", 0))
if rid in BLOCKED_RIDERS: continue
lat, lon = self.get_lat_lon(r)
if lat == 0 or lon == 0:
from app.config.rider_preferences import RIDER_HOME_LOCATIONS
lat, lon = RIDER_HOME_LOCATIONS.get(rid, (0.0, 0.0))
if lat != 0:
valid_riders.append({"id": rid, "lat": lat, "lon": lon, "obj": r})
rider_states[rid] = {
'lat': lat, 'lon': lon, 'kitchens': set(),
'count': 0, 'workload_score': 0
}
if not valid_riders:
logger.error("DANGER: Absolutely no riders available for assignment.")
# Mark all as unassigned
for o in orders:
o["unassigned_reason"] = "No riders found (check partner online status)."
unassigned_orders.append(o)
return assignments, unassigned_orders
logger.info(f"Found {len(valid_riders)} active riders")
# 2. CLUSTER ORDERS BY KITCHEN PROXIMITY
clustering_service = ClusteringService()
clusters = clustering_service.cluster_orders_by_kitchen(orders, max_cluster_radius_km=self.MAX_KITCHEN_DISTANCE_KM) # radius from ML
logger.info(f"Created {len(clusters)} order clusters")
# 3. ASSIGN CLUSTERS TO RIDERS (Load-Balanced)
for cluster_idx, cluster in enumerate(clusters):
centroid_lat, centroid_lon = cluster['centroid']
cluster_orders = cluster['orders']
cluster_size = len(cluster_orders)
logger.info(f"Assigning cluster {cluster_idx+1}/{len(clusters)}: {cluster_size} orders at ({centroid_lat:.4f}, {centroid_lon:.4f})")
# Find best riders for this cluster
candidate_riders = []
for r in valid_riders:
rid = r["id"]
r_state = rider_states[rid]
# Calculate distance to cluster centroid
dist = self.haversine(r_state['lat'], r_state['lon'], centroid_lat, centroid_lon)
# Preference bonus & Distance Bypass
prefs = self.rider_preferences.get(rid, [])
has_preference = False
for k_name in cluster['kitchen_names']:
if any(p.lower() in k_name.lower() or k_name.lower() in p.lower() for p in prefs):
has_preference = True
break
# Dynamic Limit: 6km default, 10km for preferred kitchens
allowed_dist = self.MAX_PICKUP_DISTANCE_KM
if has_preference:
allowed_dist = max(allowed_dist, 10.0)
# Skip if too far
if dist > allowed_dist:
continue
# Calculate workload utilization (0.0 to 1.0)
utilization = r_state['count'] / self.MAX_ORDERS_PER_RIDER
# Calculate score (lower is better) - weights from DynamicConfig
workload_penalty = utilization * self.WORKLOAD_PENALTY_WEIGHT
distance_penalty = dist * self.DISTANCE_PENALTY_WEIGHT
# Preference bonus (ML-tuned)
preference_bonus = self.PREFERENCE_BONUS if has_preference else 0
# Home zone bonus (ML-tuned)
from app.config.rider_preferences import RIDER_HOME_LOCATIONS
h_lat, h_lon = RIDER_HOME_LOCATIONS.get(rid, (0.0, 0.0))
home_bonus = 0
if h_lat != 0:
home_dist = self.haversine(h_lat, h_lon, centroid_lat, centroid_lon)
if home_dist <= 4.0:
home_bonus = self.HOME_ZONE_BONUS_4KM
if home_dist <= 2.0:
home_bonus = self.HOME_ZONE_BONUS_2KM
score = workload_penalty + distance_penalty + preference_bonus + home_bonus
# RESHUFFLE: Add controlled noise so retries explore different riders
if reshuffle:
noise = random.uniform(-15.0, 15.0)
score += noise
candidate_riders.append({
'id': rid,
'score': score,
'distance': dist,
'utilization': utilization,
'current_load': r_state['count']
})
if not candidate_riders:
logger.warning(f"No riders available for cluster {cluster_idx+1}")
for o in cluster_orders:
o["unassigned_reason"] = f"No riders within {self.MAX_PICKUP_DISTANCE_KM}km radius of kitchen."
unassigned_orders.append(o)
continue
# Sort by score (best first)
candidate_riders.sort(key=lambda x: x['score'])
# SMART DISTRIBUTION: Split cluster if needed
remaining_orders = cluster_orders[:]
while remaining_orders and candidate_riders:
best_rider = candidate_riders[0]
rid = best_rider['id']
r_state = rider_states[rid]
# How many orders can this rider take?
available_capacity = self.MAX_ORDERS_PER_RIDER - r_state['count']
if available_capacity <= 0:
# Rider is full, remove from candidates
candidate_riders.pop(0)
continue
# Decide batch size
# If rider is underutilized and cluster is small, give all
# If rider is busy or cluster is large, split it
if best_rider['utilization'] < self.WORKLOAD_BALANCE_THRESHOLD:
# Rider has capacity, can take more
batch_size = min(available_capacity, len(remaining_orders))
else:
# Rider is getting busy, be conservative (IDEAL_LOAD from ML)
batch_size = min(self.IDEAL_LOAD - r_state['count'], len(remaining_orders), available_capacity)
batch_size = max(1, batch_size) # At least 1 order
# Assign batch
batch = remaining_orders[:batch_size]
remaining_orders = remaining_orders[batch_size:]
assignments[rid].extend(batch)
# Update rider state
r_state['count'] += len(batch)
r_state['lat'] = centroid_lat
r_state['lon'] = centroid_lon
r_state['kitchens'].update(cluster['kitchen_names'])
r_state['workload_score'] = r_state['count'] / self.MAX_ORDERS_PER_RIDER
logger.info(f" -> Assigned {len(batch)} orders to Rider {rid} (load: {r_state['count']}/{self.MAX_ORDERS_PER_RIDER})")
# Re-sort candidates by updated scores
for candidate in candidate_riders:
if candidate['id'] == rid:
candidate['utilization'] = r_state['count'] / self.MAX_ORDERS_PER_RIDER
candidate['current_load'] = r_state['count']
# Recalculate score
workload_penalty = candidate['utilization'] * 100
distance_penalty = candidate['distance'] * 2
candidate['score'] = workload_penalty + distance_penalty
candidate_riders.sort(key=lambda x: x['score'])
# If any orders left in the cluster after exhaustion of candidates
if remaining_orders:
# Instead of giving up, keep them in a pool for mandatory assignment
unassigned_orders.extend(remaining_orders)
# 4. EMERGENCY MANDATORY ASSIGNMENT (Ensures 0 unassigned if riders exist)
if unassigned_orders and valid_riders:
logger.info(f"[ALERT] Starting Emergency Mandatory Assignment for {len(unassigned_orders)} orders...")
force_pool = unassigned_orders[:]
unassigned_orders.clear()
for o in force_pool:
# Determine pickup location
o_lat, o_lon = self.get_lat_lon(o, prefix="pickup")
if o_lat == 0:
o["unassigned_reason"] = "Could not geolocate order (0,0)."
unassigned_orders.append(o)
continue
# Find the 'least bad' rider (Closest + Balanced Load)
best_emergency_rider = None
best_emergency_score = float('inf')
for r in valid_riders:
rid = r["id"]
r_state = rider_states[rid]
dist = self.haversine(r_state['lat'], r_state['lon'], o_lat, o_lon)
# For emergency: Distance is important, but load prevents one rider taking EVERYTHING
# Score = distance + ML-tuned penalty per existing order
e_score = dist + (r_state['count'] * self.EMERGENCY_LOAD_PENALTY)
if e_score < best_emergency_score:
best_emergency_score = e_score
best_emergency_rider = rid
if best_emergency_rider:
assignments[best_emergency_rider].append(o)
rider_states[best_emergency_rider]['count'] += 1
logger.info(f" Force-Assigned order {o.get('orderid')} to Rider {best_emergency_rider} (Score: {best_emergency_score:.2f})")
else:
unassigned_orders.append(o)
# 5. FINAL REBALANCING (Optional)
# Check if any rider is overloaded while others are idle
self._rebalance_workload(assignments, rider_states, valid_riders)
# 6. Commit State and History
self._post_process(assignments, rider_states)
# 7. -- ML DATA COLLECTION -----------------------------------------
try:
elapsed_ms = (time.time() - _call_start) * 1000
get_collector().log_assignment_event(
num_orders=len(orders),
num_riders=len(riders),
hyperparams=self._cfg.get_all(),
assignments=assignments,
unassigned_count=len(unassigned_orders),
elapsed_ms=elapsed_ms,
)
except Exception as _ml_err:
logger.debug(f"ML logging skipped: {_ml_err}")
# Log final distribution
logger.info("=" * 50)
logger.info("FINAL ASSIGNMENT DISTRIBUTION:")
for rid, orders in sorted(assignments.items()):
logger.info(f" Rider {rid}: {len(orders)} orders")
if unassigned_orders:
logger.warning(f" [ALERT] STILL UNASSIGNED: {len(unassigned_orders)} (Reason: No riders online or invalid coords)")
else:
logger.info(" [OK] ALL ORDERS ASSIGNED SUCCESSFULLY")
logger.info("=" * 50)
return assignments, unassigned_orders
def _rebalance_workload(self, assignments: Dict[int, List], rider_states: Dict, valid_riders: List):
"""
Rebalance if workload is heavily skewed.
Move orders from overloaded riders to idle ones if possible.
"""
if not assignments:
return
# Calculate average load
total_orders = sum(len(orders) for orders in assignments.values())
avg_load = total_orders / len(valid_riders) if valid_riders else 0
# Find overloaded and underutilized riders
overloaded = []
underutilized = []
for r in valid_riders:
rid = r['id']
load = rider_states[rid]['count']
if load > avg_load * 1.5 and load > self.IDEAL_LOAD: # 50% above average
overloaded.append(rid)
elif load < avg_load * 0.5: # 50% below average
underutilized.append(rid)
if not overloaded or not underutilized:
return
logger.info(f"Rebalancing: {len(overloaded)} overloaded, {len(underutilized)} underutilized riders")
# Try to move orders from overloaded to underutilized
for over_rid in overloaded:
over_orders = assignments[over_rid]
over_state = rider_states[over_rid]
# Try to offload some orders
for under_rid in underutilized:
under_state = rider_states[under_rid]
under_capacity = self.MAX_ORDERS_PER_RIDER - under_state['count']
if under_capacity <= 0:
continue
# Find orders that are closer to underutilized rider
transferable = []
for order in over_orders:
o_lat, o_lon = self.get_lat_lon(order, prefix="pickup")
if o_lat == 0:
continue
dist_to_under = self.haversine(under_state['lat'], under_state['lon'], o_lat, o_lon)
dist_to_over = self.haversine(over_state['lat'], over_state['lon'], o_lat, o_lon)
# Transfer if underutilized rider is closer or similar distance
if dist_to_under <= self.MAX_PICKUP_DISTANCE_KM and dist_to_under <= dist_to_over * 1.2:
transferable.append(order)
if transferable:
# Transfer up to capacity
transfer_count = min(len(transferable), under_capacity, over_state['count'] - self.IDEAL_LOAD)
transfer_batch = transferable[:transfer_count]
# Move orders
for order in transfer_batch:
over_orders.remove(order)
assignments[under_rid].append(order)
# Update states
over_state['count'] -= len(transfer_batch)
under_state['count'] += len(transfer_batch)
logger.info(f" Rebalanced: {len(transfer_batch)} orders from Rider {over_rid} -> {under_rid}")
def _post_process(self, assignments, rider_states):
"""Update History and Persistence."""
from app.services.rider.rider_history_service import RiderHistoryService
from app.services.rider.rider_state_manager import RiderStateManager
history_service = RiderHistoryService()
state_mgr = RiderStateManager()
import time
ts = time.time()
for rid, orders in assignments.items():
if not orders: continue
history_service.update_rider_stats(rid, 5.0, len(orders))
st = rider_states[rid]
state_mgr.states[rid] = {
'minutes_remaining': len(orders) * 15,
'last_drop_lat': st['lat'],
'last_drop_lon': st['lon'],
'active_kitchens': st['kitchens'],
'last_updated_ts': ts
}
state_mgr._save_states()