516 lines
23 KiB
Python
516 lines
23 KiB
Python
|
|
import logging
|
|
import random
|
|
import time
|
|
from math import radians, cos, sin, asin, sqrt
|
|
from typing import List, Dict, Any, Optional
|
|
from collections import defaultdict
|
|
from app.config.rider_preferences import RIDER_PREFERRED_KITCHENS
|
|
from app.services.routing.kalman_filter import smooth_rider_locations, smooth_order_coordinates
|
|
from app.config.dynamic_config import get_config
|
|
from app.services.ml.ml_data_collector import get_collector
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class AssignmentService:
|
|
def __init__(self):
|
|
self.rider_preferences = RIDER_PREFERRED_KITCHENS
|
|
self.earth_radius_km = 6371
|
|
self._cfg = get_config()
|
|
|
|
def _load_config(self):
|
|
"""Load ML-tuned hyperparams fresh on every assignment call."""
|
|
cfg = self._cfg
|
|
self.MAX_PICKUP_DISTANCE_KM = cfg.get("max_pickup_distance_km")
|
|
self.MAX_KITCHEN_DISTANCE_KM = cfg.get("max_kitchen_distance_km")
|
|
self.MAX_ORDERS_PER_RIDER = int(cfg.get("max_orders_per_rider"))
|
|
self.IDEAL_LOAD = int(cfg.get("ideal_load"))
|
|
self.WORKLOAD_BALANCE_THRESHOLD = cfg.get("workload_balance_threshold")
|
|
self.WORKLOAD_PENALTY_WEIGHT = cfg.get("workload_penalty_weight")
|
|
self.DISTANCE_PENALTY_WEIGHT = cfg.get("distance_penalty_weight")
|
|
self.PREFERENCE_BONUS = cfg.get("preference_bonus")
|
|
self.HOME_ZONE_BONUS_4KM = cfg.get("home_zone_bonus_4km")
|
|
self.HOME_ZONE_BONUS_2KM = cfg.get("home_zone_bonus_2km")
|
|
self.EMERGENCY_LOAD_PENALTY = cfg.get("emergency_load_penalty")
|
|
|
|
def haversine(self, lat1, lon1, lat2, lon2):
|
|
"""Calculate the great circle distance between two points."""
|
|
lon1, lat1, lon2, lat2 = map(radians, [float(lon1), float(lat1), float(lon2), float(lat2)])
|
|
dlon = lon2 - lon1
|
|
dlat = lat2 - lat1
|
|
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
|
|
c = 2 * asin(min(1.0, sqrt(a))) # Clamp to 1.0 to avoid domain errors
|
|
return c * self.earth_radius_km
|
|
|
|
def get_lat_lon(self, obj: Dict[str, Any], prefix: str = "") -> tuple[float, float]:
|
|
"""Generic helper to extract lat/lon from diversely named keys."""
|
|
# Try specific prefixes first
|
|
candidates = [
|
|
(f"{prefix}lat", f"{prefix}lon"),
|
|
(f"{prefix}lat", f"{prefix}long"),
|
|
(f"{prefix}latitude", f"{prefix}longitude"),
|
|
]
|
|
# Also try standard keys if prefix fails
|
|
candidates.extend([
|
|
("lat", "lon"), ("latitude", "longitude"),
|
|
("pickuplat", "pickuplon"), ("pickuplat", "pickuplong"),
|
|
("deliverylat", "deliverylong"), ("droplat", "droplon")
|
|
])
|
|
|
|
for lat_key, lon_key in candidates:
|
|
if lat_key in obj and lon_key in obj and obj[lat_key] and obj[lon_key]:
|
|
try:
|
|
return float(obj[lat_key]), float(obj[lon_key])
|
|
except: pass
|
|
|
|
# Special case: nested 'pickup_location'
|
|
if "pickup_location" in obj:
|
|
return self.get_lat_lon(obj["pickup_location"])
|
|
|
|
return 0.0, 0.0
|
|
|
|
def get_order_kitchen(self, order: Dict[str, Any]) -> str:
|
|
possible_keys = ['storename', 'restaurantname', 'kitchenname', 'partnername', 'store_name']
|
|
for key in possible_keys:
|
|
if key in order and order[key]:
|
|
return str(order[key]).strip()
|
|
return "Unknown"
|
|
|
|
def assign_orders(self, orders: List[Dict[str, Any]], riders: List[Dict[str, Any]], reshuffle: bool = False) -> tuple[Dict[int, List[Dict[str, Any]]], List[Dict[str, Any]]]:
|
|
"""
|
|
ENHANCED: Cluster-Based Load-Balanced Assignment.
|
|
|
|
Strategy:
|
|
1. Cluster orders by kitchen proximity
|
|
2. Calculate rider workload (current capacity usage)
|
|
3. Assign clusters to best-fit riders (proximity + workload balance)
|
|
4. Rebalance if needed
|
|
|
|
If reshuffle=True, controlled randomness is injected into rider scoring
|
|
so that retrying the same input can explore alternative assignments.
|
|
"""
|
|
from app.services.rider.rider_history_service import RiderHistoryService
|
|
from app.services.rider.rider_state_manager import RiderStateManager
|
|
from app.services.routing.clustering_service import ClusteringService
|
|
|
|
# -- Load ML-tuned hyperparameters (or defaults on first run) ------
|
|
self._load_config()
|
|
_call_start = time.time()
|
|
|
|
# 0. Prep
|
|
assignments: Dict[int, List[Dict[str, Any]]] = defaultdict(list)
|
|
unassigned_orders: List[Dict[str, Any]] = []
|
|
rider_states = {} # Track live load
|
|
|
|
# 0a. KALMAN FILTER - Smooth rider GPS locations before scoring
|
|
riders = smooth_rider_locations(list(riders))
|
|
|
|
# 0b. KALMAN FILTER - Smooth order delivery coordinates before clustering
|
|
orders = smooth_order_coordinates(list(orders))
|
|
|
|
# 1. Parse and Filter Riders
|
|
valid_riders = []
|
|
BLOCKED_RIDERS = [1242, 1266, 1245, 1232, 1240, 1007] # Test/Blocked IDs
|
|
|
|
# Load Existing State (Persistence)
|
|
state_mgr = RiderStateManager()
|
|
|
|
for r in riders:
|
|
# Robust ID Extraction
|
|
rid_raw = r.get("userid") or r.get("riderid") or r.get("id") or r.get("_id")
|
|
try:
|
|
rid = int(rid_raw)
|
|
except (ValueError, TypeError):
|
|
continue
|
|
|
|
if rid in BLOCKED_RIDERS: continue
|
|
|
|
# Robust Status Check
|
|
# Keep if: onduty (1, "1", True) OR status is active/idle/online
|
|
is_onduty = str(r.get("onduty")) in ["1", "True"] or r.get("onduty") is True
|
|
is_active = r.get("status") in ["active", "idle", "online"]
|
|
|
|
if not (is_onduty or is_active):
|
|
continue
|
|
|
|
# Location
|
|
lat, lon = self.get_lat_lon(r)
|
|
|
|
# Fetch previous state to know if they are already busy
|
|
p_state = state_mgr.get_rider_state(rid)
|
|
|
|
# If rider has valid GPS, use it. If not, fallback to Last Drop or Home.
|
|
if lat == 0 or lon == 0:
|
|
if p_state['last_drop_lat']:
|
|
lat, lon = p_state['last_drop_lat'], p_state['last_drop_lon']
|
|
else:
|
|
# Home Location Fallback
|
|
from app.config.rider_preferences import RIDER_HOME_LOCATIONS
|
|
lat, lon = RIDER_HOME_LOCATIONS.get(rid, (0.0, 0.0))
|
|
|
|
valid_riders.append({
|
|
"id": rid,
|
|
"lat": lat,
|
|
"lon": lon,
|
|
"obj": r
|
|
})
|
|
|
|
# Initialize rider state with existing workload
|
|
existing_load = p_state.get('minutes_remaining', 0) / 15 # Convert minutes to order estimate
|
|
|
|
rider_states[rid] = {
|
|
'lat': lat,
|
|
'lon': lon,
|
|
'kitchens': set(),
|
|
'count': int(existing_load), # Start with existing workload
|
|
'workload_score': existing_load # For prioritization
|
|
}
|
|
|
|
if not valid_riders:
|
|
logger.warning("No riders passed on-duty filter. Retrying with all available riders as emergency rescue...")
|
|
# If no on-duty riders, we take ANY rider provided by the API to ensure assignment
|
|
for r in riders:
|
|
rid = int(r.get("userid", 0))
|
|
if rid in BLOCKED_RIDERS: continue
|
|
|
|
lat, lon = self.get_lat_lon(r)
|
|
if lat == 0 or lon == 0:
|
|
from app.config.rider_preferences import RIDER_HOME_LOCATIONS
|
|
lat, lon = RIDER_HOME_LOCATIONS.get(rid, (0.0, 0.0))
|
|
|
|
if lat != 0:
|
|
valid_riders.append({"id": rid, "lat": lat, "lon": lon, "obj": r})
|
|
rider_states[rid] = {
|
|
'lat': lat, 'lon': lon, 'kitchens': set(),
|
|
'count': 0, 'workload_score': 0
|
|
}
|
|
|
|
if not valid_riders:
|
|
logger.error("DANGER: Absolutely no riders available for assignment.")
|
|
# Mark all as unassigned
|
|
for o in orders:
|
|
o["unassigned_reason"] = "No riders found (check partner online status)."
|
|
unassigned_orders.append(o)
|
|
return assignments, unassigned_orders
|
|
|
|
logger.info(f"Found {len(valid_riders)} active riders")
|
|
|
|
# 2. CLUSTER ORDERS BY KITCHEN PROXIMITY
|
|
clustering_service = ClusteringService()
|
|
clusters = clustering_service.cluster_orders_by_kitchen(orders, max_cluster_radius_km=self.MAX_KITCHEN_DISTANCE_KM) # radius from ML
|
|
|
|
logger.info(f"Created {len(clusters)} order clusters")
|
|
|
|
# 3. ASSIGN CLUSTERS TO RIDERS (Load-Balanced)
|
|
for cluster_idx, cluster in enumerate(clusters):
|
|
centroid_lat, centroid_lon = cluster['centroid']
|
|
cluster_orders = cluster['orders']
|
|
cluster_size = len(cluster_orders)
|
|
|
|
logger.info(f"Assigning cluster {cluster_idx+1}/{len(clusters)}: {cluster_size} orders at ({centroid_lat:.4f}, {centroid_lon:.4f})")
|
|
|
|
# Find best riders for this cluster
|
|
candidate_riders = []
|
|
|
|
for r in valid_riders:
|
|
rid = r["id"]
|
|
r_state = rider_states[rid]
|
|
|
|
# Calculate distance to cluster centroid
|
|
dist = self.haversine(r_state['lat'], r_state['lon'], centroid_lat, centroid_lon)
|
|
|
|
# Preference bonus & Distance Bypass
|
|
prefs = self.rider_preferences.get(rid, [])
|
|
has_preference = False
|
|
for k_name in cluster['kitchen_names']:
|
|
if any(p.lower() in k_name.lower() or k_name.lower() in p.lower() for p in prefs):
|
|
has_preference = True
|
|
break
|
|
|
|
# Dynamic Limit: 6km default, 10km for preferred kitchens
|
|
allowed_dist = self.MAX_PICKUP_DISTANCE_KM
|
|
if has_preference:
|
|
allowed_dist = max(allowed_dist, 10.0)
|
|
|
|
# Skip if too far
|
|
if dist > allowed_dist:
|
|
continue
|
|
|
|
# Calculate workload utilization (0.0 to 1.0)
|
|
utilization = r_state['count'] / self.MAX_ORDERS_PER_RIDER
|
|
|
|
# Calculate score (lower is better) - weights from DynamicConfig
|
|
workload_penalty = utilization * self.WORKLOAD_PENALTY_WEIGHT
|
|
distance_penalty = dist * self.DISTANCE_PENALTY_WEIGHT
|
|
|
|
# Preference bonus (ML-tuned)
|
|
preference_bonus = self.PREFERENCE_BONUS if has_preference else 0
|
|
|
|
# Home zone bonus (ML-tuned)
|
|
from app.config.rider_preferences import RIDER_HOME_LOCATIONS
|
|
h_lat, h_lon = RIDER_HOME_LOCATIONS.get(rid, (0.0, 0.0))
|
|
home_bonus = 0
|
|
if h_lat != 0:
|
|
home_dist = self.haversine(h_lat, h_lon, centroid_lat, centroid_lon)
|
|
if home_dist <= 4.0:
|
|
home_bonus = self.HOME_ZONE_BONUS_4KM
|
|
if home_dist <= 2.0:
|
|
home_bonus = self.HOME_ZONE_BONUS_2KM
|
|
|
|
score = workload_penalty + distance_penalty + preference_bonus + home_bonus
|
|
|
|
# RESHUFFLE: Add controlled noise so retries explore different riders
|
|
if reshuffle:
|
|
noise = random.uniform(-15.0, 15.0)
|
|
score += noise
|
|
|
|
candidate_riders.append({
|
|
'id': rid,
|
|
'score': score,
|
|
'distance': dist,
|
|
'utilization': utilization,
|
|
'current_load': r_state['count']
|
|
})
|
|
|
|
if not candidate_riders:
|
|
logger.warning(f"No riders available for cluster {cluster_idx+1}")
|
|
for o in cluster_orders:
|
|
o["unassigned_reason"] = f"No riders within {self.MAX_PICKUP_DISTANCE_KM}km radius of kitchen."
|
|
unassigned_orders.append(o)
|
|
continue
|
|
|
|
# Sort by score (best first)
|
|
candidate_riders.sort(key=lambda x: x['score'])
|
|
|
|
# SMART DISTRIBUTION: Split cluster if needed
|
|
remaining_orders = cluster_orders[:]
|
|
|
|
while remaining_orders and candidate_riders:
|
|
best_rider = candidate_riders[0]
|
|
rid = best_rider['id']
|
|
r_state = rider_states[rid]
|
|
|
|
# How many orders can this rider take?
|
|
available_capacity = self.MAX_ORDERS_PER_RIDER - r_state['count']
|
|
|
|
if available_capacity <= 0:
|
|
# Rider is full, remove from candidates
|
|
candidate_riders.pop(0)
|
|
continue
|
|
|
|
# Decide batch size
|
|
# If rider is underutilized and cluster is small, give all
|
|
# If rider is busy or cluster is large, split it
|
|
if best_rider['utilization'] < self.WORKLOAD_BALANCE_THRESHOLD:
|
|
# Rider has capacity, can take more
|
|
batch_size = min(available_capacity, len(remaining_orders))
|
|
else:
|
|
# Rider is getting busy, be conservative (IDEAL_LOAD from ML)
|
|
batch_size = min(self.IDEAL_LOAD - r_state['count'], len(remaining_orders), available_capacity)
|
|
batch_size = max(1, batch_size) # At least 1 order
|
|
|
|
# Assign batch
|
|
batch = remaining_orders[:batch_size]
|
|
remaining_orders = remaining_orders[batch_size:]
|
|
|
|
assignments[rid].extend(batch)
|
|
|
|
# Update rider state
|
|
r_state['count'] += len(batch)
|
|
r_state['lat'] = centroid_lat
|
|
r_state['lon'] = centroid_lon
|
|
r_state['kitchens'].update(cluster['kitchen_names'])
|
|
r_state['workload_score'] = r_state['count'] / self.MAX_ORDERS_PER_RIDER
|
|
|
|
logger.info(f" -> Assigned {len(batch)} orders to Rider {rid} (load: {r_state['count']}/{self.MAX_ORDERS_PER_RIDER})")
|
|
|
|
# Re-sort candidates by updated scores
|
|
for candidate in candidate_riders:
|
|
if candidate['id'] == rid:
|
|
candidate['utilization'] = r_state['count'] / self.MAX_ORDERS_PER_RIDER
|
|
candidate['current_load'] = r_state['count']
|
|
# Recalculate score
|
|
workload_penalty = candidate['utilization'] * 100
|
|
distance_penalty = candidate['distance'] * 2
|
|
candidate['score'] = workload_penalty + distance_penalty
|
|
|
|
candidate_riders.sort(key=lambda x: x['score'])
|
|
|
|
# If any orders left in the cluster after exhaustion of candidates
|
|
if remaining_orders:
|
|
# Instead of giving up, keep them in a pool for mandatory assignment
|
|
unassigned_orders.extend(remaining_orders)
|
|
|
|
# 4. EMERGENCY MANDATORY ASSIGNMENT (Ensures 0 unassigned if riders exist)
|
|
if unassigned_orders and valid_riders:
|
|
logger.info(f"[ALERT] Starting Emergency Mandatory Assignment for {len(unassigned_orders)} orders...")
|
|
force_pool = unassigned_orders[:]
|
|
unassigned_orders.clear()
|
|
|
|
for o in force_pool:
|
|
# Determine pickup location
|
|
o_lat, o_lon = self.get_lat_lon(o, prefix="pickup")
|
|
if o_lat == 0:
|
|
o["unassigned_reason"] = "Could not geolocate order (0,0)."
|
|
unassigned_orders.append(o)
|
|
continue
|
|
|
|
# Find the 'least bad' rider (Closest + Balanced Load)
|
|
best_emergency_rider = None
|
|
best_emergency_score = float('inf')
|
|
|
|
for r in valid_riders:
|
|
rid = r["id"]
|
|
r_state = rider_states[rid]
|
|
|
|
dist = self.haversine(r_state['lat'], r_state['lon'], o_lat, o_lon)
|
|
# For emergency: Distance is important, but load prevents one rider taking EVERYTHING
|
|
# Score = distance + ML-tuned penalty per existing order
|
|
e_score = dist + (r_state['count'] * self.EMERGENCY_LOAD_PENALTY)
|
|
|
|
if e_score < best_emergency_score:
|
|
best_emergency_score = e_score
|
|
best_emergency_rider = rid
|
|
|
|
if best_emergency_rider:
|
|
assignments[best_emergency_rider].append(o)
|
|
rider_states[best_emergency_rider]['count'] += 1
|
|
logger.info(f" Force-Assigned order {o.get('orderid')} to Rider {best_emergency_rider} (Score: {best_emergency_score:.2f})")
|
|
else:
|
|
unassigned_orders.append(o)
|
|
|
|
# 5. FINAL REBALANCING (Optional)
|
|
# Check if any rider is overloaded while others are idle
|
|
self._rebalance_workload(assignments, rider_states, valid_riders)
|
|
|
|
# 6. Commit State and History
|
|
self._post_process(assignments, rider_states)
|
|
|
|
# 7. -- ML DATA COLLECTION -----------------------------------------
|
|
try:
|
|
elapsed_ms = (time.time() - _call_start) * 1000
|
|
get_collector().log_assignment_event(
|
|
num_orders=len(orders),
|
|
num_riders=len(riders),
|
|
hyperparams=self._cfg.get_all(),
|
|
assignments=assignments,
|
|
unassigned_count=len(unassigned_orders),
|
|
elapsed_ms=elapsed_ms,
|
|
)
|
|
except Exception as _ml_err:
|
|
logger.debug(f"ML logging skipped: {_ml_err}")
|
|
|
|
# Log final distribution
|
|
logger.info("=" * 50)
|
|
logger.info("FINAL ASSIGNMENT DISTRIBUTION:")
|
|
for rid, orders in sorted(assignments.items()):
|
|
logger.info(f" Rider {rid}: {len(orders)} orders")
|
|
|
|
if unassigned_orders:
|
|
logger.warning(f" [ALERT] STILL UNASSIGNED: {len(unassigned_orders)} (Reason: No riders online or invalid coords)")
|
|
else:
|
|
logger.info(" [OK] ALL ORDERS ASSIGNED SUCCESSFULLY")
|
|
logger.info("=" * 50)
|
|
|
|
return assignments, unassigned_orders
|
|
|
|
def _rebalance_workload(self, assignments: Dict[int, List], rider_states: Dict, valid_riders: List):
|
|
"""
|
|
Rebalance if workload is heavily skewed.
|
|
Move orders from overloaded riders to idle ones if possible.
|
|
"""
|
|
if not assignments:
|
|
return
|
|
|
|
# Calculate average load
|
|
total_orders = sum(len(orders) for orders in assignments.values())
|
|
avg_load = total_orders / len(valid_riders) if valid_riders else 0
|
|
|
|
# Find overloaded and underutilized riders
|
|
overloaded = []
|
|
underutilized = []
|
|
|
|
for r in valid_riders:
|
|
rid = r['id']
|
|
load = rider_states[rid]['count']
|
|
|
|
if load > avg_load * 1.5 and load > self.IDEAL_LOAD: # 50% above average
|
|
overloaded.append(rid)
|
|
elif load < avg_load * 0.5: # 50% below average
|
|
underutilized.append(rid)
|
|
|
|
if not overloaded or not underutilized:
|
|
return
|
|
|
|
logger.info(f"Rebalancing: {len(overloaded)} overloaded, {len(underutilized)} underutilized riders")
|
|
|
|
# Try to move orders from overloaded to underutilized
|
|
for over_rid in overloaded:
|
|
over_orders = assignments[over_rid]
|
|
over_state = rider_states[over_rid]
|
|
|
|
# Try to offload some orders
|
|
for under_rid in underutilized:
|
|
under_state = rider_states[under_rid]
|
|
under_capacity = self.MAX_ORDERS_PER_RIDER - under_state['count']
|
|
|
|
if under_capacity <= 0:
|
|
continue
|
|
|
|
# Find orders that are closer to underutilized rider
|
|
transferable = []
|
|
for order in over_orders:
|
|
o_lat, o_lon = self.get_lat_lon(order, prefix="pickup")
|
|
if o_lat == 0:
|
|
continue
|
|
|
|
dist_to_under = self.haversine(under_state['lat'], under_state['lon'], o_lat, o_lon)
|
|
dist_to_over = self.haversine(over_state['lat'], over_state['lon'], o_lat, o_lon)
|
|
|
|
# Transfer if underutilized rider is closer or similar distance
|
|
if dist_to_under <= self.MAX_PICKUP_DISTANCE_KM and dist_to_under <= dist_to_over * 1.2:
|
|
transferable.append(order)
|
|
|
|
if transferable:
|
|
# Transfer up to capacity
|
|
transfer_count = min(len(transferable), under_capacity, over_state['count'] - self.IDEAL_LOAD)
|
|
transfer_batch = transferable[:transfer_count]
|
|
|
|
# Move orders
|
|
for order in transfer_batch:
|
|
over_orders.remove(order)
|
|
assignments[under_rid].append(order)
|
|
|
|
# Update states
|
|
over_state['count'] -= len(transfer_batch)
|
|
under_state['count'] += len(transfer_batch)
|
|
|
|
logger.info(f" Rebalanced: {len(transfer_batch)} orders from Rider {over_rid} -> {under_rid}")
|
|
|
|
def _post_process(self, assignments, rider_states):
|
|
"""Update History and Persistence."""
|
|
from app.services.rider.rider_history_service import RiderHistoryService
|
|
from app.services.rider.rider_state_manager import RiderStateManager
|
|
|
|
history_service = RiderHistoryService()
|
|
state_mgr = RiderStateManager()
|
|
|
|
import time
|
|
ts = time.time()
|
|
|
|
for rid, orders in assignments.items():
|
|
if not orders: continue
|
|
|
|
history_service.update_rider_stats(rid, 5.0, len(orders))
|
|
|
|
st = rider_states[rid]
|
|
state_mgr.states[rid] = {
|
|
'minutes_remaining': len(orders) * 15,
|
|
'last_drop_lat': st['lat'],
|
|
'last_drop_lon': st['lon'],
|
|
'active_kitchens': st['kitchens'],
|
|
'last_updated_ts': ts
|
|
}
|
|
|
|
state_mgr._save_states()
|