from __future__ import annotations import smtplib import sys import os from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from typing import List, Tuple from datetime import date from datetime import date import pandas as pd from sqlalchemy import text import streamlit as st # Add the project root to Python path project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) if project_root not in sys.path: sys.path.insert(0, project_root) from app_core.config.settings import AppSettings, STORES from app_core.db.database import engine, SessionLocal from app_core.db.models import EmailLog class MailerService: def __init__(self, settings: AppSettings | None = None) -> None: self.settings = settings or AppSettings() def fetch_daily_rows(self, report_date) -> pd.DataFrame: sql = ( 'SELECT * FROM "tenantpostings" ' 'WHERE "created_at"::date = %(d)s ' 'ORDER BY "id" DESC ' 'LIMIT 10000' ) with engine.connect() as conn: df = pd.read_sql(sql, conn, params={"d": report_date}) return df def select_report_date(self, preferred: date | None = None) -> date | None: """Return preferred date if it has data; else most recent date with data; else None.""" with engine.connect() as conn: dates_df = pd.read_sql( 'SELECT "created_at"::date AS d, COUNT(*) AS c\n' 'FROM "tenantpostings"\n' 'GROUP BY d\n' 'ORDER BY d DESC', conn, ) if dates_df.empty: return None # Normalize if 'd' not in dates_df.columns: return None dates_df['d'] = pd.to_datetime(dates_df['d'], errors='coerce') available = [d.date() for d in dates_df['d'].dropna().tolist()] if preferred and preferred in available: return preferred return available[0] if available else None def build_email_html(self, row: dict, df: pd.DataFrame | None = None) -> str: outlet = row.get("outlet_name") or row.get("register_name") or "Outlet" division = row.get("division_code") or "PC" status = (row.get("triumph_status") or "Posted successfully").capitalize() register_close_id = row.get("register_close_id", "—") register_id = row.get("register_id", "—") def lines_for(ptype: str) -> list[str]: """Return formatted lines for all rows of a processing_type. Example line: 3,616.19 (Event ID: 2904783) """ if df is None or df.empty or 'processing_type' not in df.columns: return [] sub = df[df['processing_type'].astype(str).str.upper() == ptype.upper()] if 'processing_type' in df.columns else pd.DataFrame() if sub.empty: return [] # De-duplicate by triumph_event to avoid double-counting retries if 'triumph_event' in sub.columns: sub = sub.sort_values(['triumph_event', 'id'], ascending=[True, False]).drop_duplicates(subset=['triumph_event'], keep='first') result: list[str] = [] for _, r in sub.sort_values('id', ascending=False).iterrows(): amt = r.get('total_amount') evt = r.get('triumph_event', '—') try: amt_str = f"{float(amt):,.2f}" except Exception: amt_str = str(amt) if amt is not None else '—' result.append(f"{amt_str} (Event ID: {evt})") return result journal_lines = lines_for('JOURNAL') bank_journal_lines = lines_for('BANKING_JOURNAL') invoice_lines = lines_for('INVOICE') receipt_lines = lines_for('RECEIPT') # Optional: transaction summary by store (single table) store_summary_table_html = "" events_matrix_html = "" if isinstance(df, pd.DataFrame) and not df.empty and ('tenant_id' in df.columns): def summarize_for(store: dict) -> dict[str, str]: sid = store.get('tenant_id') name = store.get('label') sub = df[df['tenant_id'] == sid] # De-duplicate each processing type context within the store if not sub.empty and 'triumph_event' in sub.columns and 'processing_type' in sub.columns: # Filter out non-event rows before dedupe if necessary, but here we just dedupe everything with an event ID # We keep the one with highest ID for each event has_event = sub['triumph_event'].fillna('').astype(str).str.strip() != '' sub_with_events = sub[has_event].sort_values(['processing_type', 'triumph_event', 'id'], ascending=[True, True, False]).drop_duplicates(subset=['processing_type', 'triumph_event'], keep='first') sub_no_events = sub[~has_event] sub = pd.concat([sub_with_events, sub_no_events]).sort_values('id', ascending=False) def pick_total(kind: str) -> tuple[str, int]: if sub.empty or 'processing_type' not in sub.columns: return ("0.00", 0) s = sub[sub['processing_type'].astype(str).str.upper() == kind] if s.empty: return ("0.00", 0) try: total = float(s['total_amount'].fillna(0).sum()) if 'total_amount' in s.columns else 0.0 except Exception: total = 0.0 return (f"{total:,.2f}", len(s)) def has_rows(kind: str) -> bool: if sub.empty or 'processing_type' not in sub.columns: return False s = sub[sub['processing_type'].astype(str).str.upper() == kind] return not s.empty def latest_event(kind: str) -> str: if sub.empty or 'processing_type' not in sub.columns: return "—" s = sub[sub['processing_type'].astype(str).str.upper() == kind] if s.empty: return "—" series = s.get('triumph_event') if 'triumph_event' in s.columns else None if series is None or series.empty: return "—" try: return str(series.dropna().astype(str).iloc[0]) except Exception: return "—" def latest_status_emoji(kind: str) -> str: if sub.empty or 'processing_type' not in sub.columns: return "" s = sub[sub['processing_type'].astype(str).str.upper() == kind] if s.empty: return "" status_series = s.get('triumph_status') if 'triumph_status' in s.columns else None if status_series is None or status_series.empty: return "" try: val = str(status_series.iloc[0]).strip().lower() except Exception: val = "" if any(x in val for x in ["success", "ok", "completed", "done"]): return " ✅" if any(x in val for x in ["fail", "error", "invalid", "dead"]): return " ❌" if any(x in val for x in ["pending", "queue", "waiting", "processing"]): return " ⚠️" return "" j_total, _ = pick_total('JOURNAL') b_total, _ = pick_total('BANKING_JOURNAL') i_total, _ = pick_total('INVOICE') r_total, _ = pick_total('RECEIPT') j_eid = latest_event('JOURNAL'); j_stat = latest_status_emoji('JOURNAL') b_eid = latest_event('BANKING_JOURNAL'); b_stat = latest_status_emoji('BANKING_JOURNAL') i_eid = latest_event('INVOICE'); i_stat = latest_status_emoji('INVOICE') r_eid = latest_event('RECEIPT'); r_stat = latest_status_emoji('RECEIPT') def render_cell(exists: bool, total: str, eid: str, stat: str, ptype: str = "") -> str: if not exists: return "Nill" # For INVOICE and RECEIPT, show individual line items if multiple exist if ptype.upper() in ['INVOICE', 'RECEIPT'] and sub is not None and not sub.empty: type_sub = sub[sub['processing_type'].astype(str).str.upper() == ptype.upper()] if len(type_sub) > 1: # Multiple transactions individual_lines = [] for _, r in type_sub.sort_values('id', ascending=False).iterrows(): amt = r.get('total_amount') evt = r.get('triumph_event', '—') status_val = str(r.get('triumph_status', '')).strip().lower() status_emoji = "" if any(x in status_val for x in ["success", "ok", "completed", "done"]): status_emoji = " ✅" elif any(x in status_val for x in ["fail", "error", "invalid", "dead"]): status_emoji = " ❌" elif any(x in status_val for x in ["pending", "queue", "waiting", "processing"]): status_emoji = " ⚠️" try: amt_str = f"{float(amt):,.2f}" except Exception: amt_str = str(amt) if amt is not None else '—' individual_lines.append(f"
{amt_str} ({evt}){status_emoji}
") return f"{total}
Total ({len(type_sub)} items)
{''.join(individual_lines)}" return f"{total}
({eid}) {stat}" return { "name": name, "journal": render_cell(has_rows('JOURNAL'), j_total, j_eid, j_stat), "banking": render_cell(has_rows('BANKING_JOURNAL'), b_total, b_eid, b_stat), "invoice": render_cell(has_rows('INVOICE'), i_total, i_eid, i_stat, 'INVOICE'), "receipt": render_cell(has_rows('RECEIPT'), r_total, r_eid, r_stat, 'RECEIPT'), } rows = [summarize_for(s) for s in STORES] # Build single HTML table header = ( "" "Store Name" "Journal" "Banking Journal" "Account Sales" "Account Payments" "" ) body = [] for r in rows: body.append( "" f"{r['name']}" f"{r['journal']}" f"{r['banking']}" f"{r['invoice']}" f"{r['receipt']}" "" ) store_summary_table_html = ( "
" "
Transaction Summary by Store
" "" + header + "".join(body) + "
" ) html = f"""

Hello Tucker Fresh,

Here’s your daily digest of posted transactions:

{store_summary_table_html}

Thank you for staying updated with us.

Best regards,
Workolik Team

""" return html def send_email(self, recipients: List[str], subject: str, html: str) -> Tuple[bool, str]: s = self.settings if not all([s.smtp_host, s.smtp_port, s.smtp_user, s.smtp_password, s.smtp_from_email]): return False, "SMTP settings are incomplete." # Optional BCC via env (comma-separated), default empty bcc_env = os.getenv("BCC_RECIPIENTS", "").strip() bcc_recipients = [e.strip() for e in bcc_env.split(',') if e.strip()] if bcc_env else [] all_recipients = recipients + bcc_recipients msg = MIMEMultipart("alternative") msg["From"] = f"{s.smtp_from_name} <{s.smtp_from_email}>" msg["To"] = ", ".join(recipients) msg["Subject"] = subject msg.attach(MIMEText(html, "html")) try: server = smtplib.SMTP(s.smtp_host, s.smtp_port, timeout=30) if s.smtp_use_tls: server.starttls() server.login(s.smtp_user, s.smtp_password) server.sendmail(s.smtp_from_email, all_recipients, msg.as_string()) server.quit() return True, "sent" except Exception as e: return False, str(e) def log_email(self, recipients: List[str], subject: str, date_for: str, status: str, error: str | None = None) -> None: with SessionLocal() as db: entry = EmailLog( recipients=", ".join(recipients), subject=subject, status=status, error=error, date_for=date_for, ) db.add(entry) db.commit() def has_sent_for_date(self, date_for: str) -> bool: """Return True if a successful send log exists for the given date.""" with SessionLocal() as db: row = ( db.query(EmailLog) .filter(EmailLog.date_for == date_for, EmailLog.status == "sent") .order_by(EmailLog.sent_at.desc()) .first() ) return row is not None def recent_logs(self, limit: int = 50) -> list[dict]: return _get_recent_logs_cached(limit) @st.cache_data(ttl=60) # Cache for 1 minute def _get_recent_logs_cached(limit: int = 50) -> list[dict]: """Cached function to get recent email logs.""" with SessionLocal() as db: rows = ( db.query(EmailLog) .order_by(EmailLog.sent_at.desc()) .limit(limit) .all() ) return [ { "id": r.id, "sent_at": r.sent_at, "recipients": r.recipients, "subject": r.subject, "status": r.status, "error": r.error, "date_for": r.date_for, } for r in rows ]