159 lines
5.2 KiB
Python
159 lines
5.2 KiB
Python
"""
|
|
Realistic ETA Calculator for Delivery Operations
|
|
|
|
Accounts for:
|
|
- City traffic conditions
|
|
- Stop time at pickup/delivery
|
|
- Navigation time
|
|
- Parking/finding address time
|
|
- Different speeds for different order types
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RealisticETACalculator:
|
|
"""
|
|
Calculates realistic ETAs accounting for real-world delivery conditions.
|
|
"""
|
|
|
|
def __init__(self):
|
|
from app.config.dynamic_config import get_config
|
|
cfg = get_config()
|
|
|
|
# BASE SPEED (km/h) - Driven by the DB configuration
|
|
base_speed = cfg.get("avg_speed_kmh", 18.0)
|
|
|
|
# REALISTIC SPEEDS based on time of day
|
|
self.CITY_SPEED_HEAVY_TRAFFIC = base_speed * 0.7 # Usually ~12 km/h
|
|
self.CITY_SPEED_MODERATE = base_speed # Usually ~18 km/h
|
|
self.CITY_SPEED_LIGHT = base_speed * 1.2 # Usually ~21.6 km/h
|
|
|
|
# TIME BUFFERS (minutes)
|
|
self.PICKUP_TIME = cfg.get("eta_pickup_time_min", 3.0)
|
|
self.DELIVERY_TIME = cfg.get("eta_delivery_time_min", 4.0)
|
|
self.NAVIGATION_BUFFER = cfg.get("eta_navigation_buffer_min", 1.5)
|
|
|
|
# DISTANCE-BASED SPEED SELECTION
|
|
# Short distances (<2km) are slower due to more stops/starts
|
|
# Long distances (>8km) might have highway portions
|
|
self.SHORT_TRIP_FACTOR = cfg.get("eta_short_trip_factor", 0.8)
|
|
self.LONG_TRIP_FACTOR = cfg.get("eta_long_trip_factor", 1.1)
|
|
|
|
def calculate_eta(self,
|
|
distance_km: float,
|
|
is_first_order: bool = False,
|
|
order_type: str = "Economy",
|
|
time_of_day: str = "peak") -> int:
|
|
"""
|
|
Calculate realistic ETA in minutes.
|
|
|
|
Args:
|
|
distance_km: Distance to travel in kilometers
|
|
is_first_order: If True, includes pickup time
|
|
order_type: "Economy", "Premium", or "Risky"
|
|
time_of_day: "peak", "normal", or "light" traffic
|
|
|
|
Returns:
|
|
ETA in minutes (rounded up for safety)
|
|
"""
|
|
|
|
if distance_km <= 0:
|
|
return 0
|
|
|
|
# 1. SELECT SPEED BASED ON CONDITIONS
|
|
if time_of_day == "peak":
|
|
base_speed = self.CITY_SPEED_HEAVY_TRAFFIC
|
|
elif time_of_day == "light":
|
|
base_speed = self.CITY_SPEED_LIGHT
|
|
else:
|
|
base_speed = self.CITY_SPEED_MODERATE
|
|
|
|
# 2. ADJUST SPEED BASED ON DISTANCE
|
|
# Short trips are slower (more intersections, traffic lights)
|
|
if distance_km < 2.0:
|
|
effective_speed = base_speed * self.SHORT_TRIP_FACTOR
|
|
elif distance_km > 8.0:
|
|
effective_speed = base_speed * self.LONG_TRIP_FACTOR
|
|
else:
|
|
effective_speed = base_speed
|
|
|
|
# 3. CALCULATE TRAVEL TIME
|
|
travel_time = (distance_km / effective_speed) * 60 # Convert to minutes
|
|
|
|
# 4. ADD BUFFERS
|
|
total_time = travel_time
|
|
|
|
# Pickup time (only for first order in sequence)
|
|
if is_first_order:
|
|
total_time += self.PICKUP_TIME
|
|
|
|
# Delivery time (always)
|
|
total_time += self.DELIVERY_TIME
|
|
|
|
# Navigation buffer (proportional to distance)
|
|
if distance_km > 3.0:
|
|
total_time += self.NAVIGATION_BUFFER
|
|
|
|
# 5. SAFETY MARGIN (Round up to next minute)
|
|
# Riders prefer to arrive early than late
|
|
eta_minutes = int(total_time) + 1
|
|
|
|
return eta_minutes
|
|
|
|
def calculate_batch_eta(self, orders: list) -> list:
|
|
"""
|
|
Calculate ETAs for a batch of orders in sequence.
|
|
|
|
Args:
|
|
orders: List of order dicts with 'previouskms' and 'step' fields
|
|
|
|
Returns:
|
|
Same list with updated 'eta' fields
|
|
"""
|
|
for order in orders:
|
|
distance_km = float(order.get('previouskms', 0))
|
|
step = order.get('step', 1)
|
|
order_type = order.get('ordertype', 'Economy')
|
|
|
|
# First order includes pickup time
|
|
is_first = (step == 1)
|
|
|
|
# Assume peak traffic for safety (can be made dynamic)
|
|
eta = self.calculate_eta(
|
|
distance_km=distance_km,
|
|
is_first_order=is_first,
|
|
order_type=order_type,
|
|
time_of_day="normal" # Default to moderate traffic
|
|
)
|
|
|
|
order['eta'] = str(eta)
|
|
order['eta_realistic'] = True # Flag to indicate realistic calculation
|
|
|
|
return orders
|
|
|
|
|
|
def get_time_of_day_category() -> str:
|
|
"""
|
|
Determine current traffic conditions based on time.
|
|
|
|
Returns:
|
|
"peak", "normal", or "light"
|
|
"""
|
|
from datetime import datetime
|
|
|
|
|
|
current_hour = datetime.now().hour
|
|
|
|
# Peak hours: 8-10 AM, 12-2 PM, 5-8 PM
|
|
if (8 <= current_hour < 10) or (12 <= current_hour < 14) or (17 <= current_hour < 20):
|
|
return "peak"
|
|
# Light traffic: Late night/early morning
|
|
elif current_hour < 7 or current_hour >= 22:
|
|
return "light"
|
|
else:
|
|
return "normal"
|