Files
Krow-workspace/app_core/services/scheduler_service.py
2026-04-07 12:31:22 +05:30

90 lines
3.0 KiB
Python

import logging
import os
from datetime import datetime
from zoneinfo import ZoneInfo
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore
from app_core.services.daily_report import main as run_daily_report
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SchedulerService:
def __init__(self):
self.scheduler = None
self.ist = ZoneInfo("Asia/Kolkata")
def start_scheduler(self):
"""Start the background scheduler for daily email reports."""
if self.scheduler and self.scheduler.running:
logger.info("Scheduler is already running")
return
# Configure job stores and executors
jobstores = {
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(20)
}
job_defaults = {
'coalesce': False,
'max_instances': 1
}
self.scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone=self.ist
)
# Schedule daily email at 8:00 PM IST (20:00)
self.scheduler.add_job(
func=self._send_daily_report,
trigger=CronTrigger(hour=20, minute=0, timezone=self.ist),
id='daily_email_report',
name='Daily Email Report',
replace_existing=True
)
# Start the scheduler
self.scheduler.start()
logger.info("Daily email scheduler started - will send reports at 8:00 PM IST")
def stop_scheduler(self):
"""Stop the background scheduler."""
if self.scheduler and self.scheduler.running:
self.scheduler.shutdown()
logger.info("Daily email scheduler stopped")
def _send_daily_report(self):
"""Internal method to send daily report."""
try:
logger.info(f"Starting daily report at {datetime.now(self.ist)}")
result = run_daily_report()
if result == 0:
logger.info("Daily report sent successfully")
else:
logger.warning(f"Daily report failed with exit code: {result}")
except Exception as e:
logger.error(f"Error sending daily report: {str(e)}")
def get_next_run_time(self):
"""Get the next scheduled run time for the daily report."""
if not self.scheduler or not self.scheduler.running:
return None
job = self.scheduler.get_job('daily_email_report')
if job:
return job.next_run_time
return None
def is_running(self):
"""Check if scheduler is running."""
return self.scheduler is not None and self.scheduler.running