from __future__ import annotations import smtplib import sys import os from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from typing import List, Tuple from datetime import date from datetime import date import pandas as pd from sqlalchemy import text import streamlit as st # Add the project root to Python path project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) if project_root not in sys.path: sys.path.insert(0, project_root) from app_core.config.settings import AppSettings, STORES from app_core.db.database import engine, SessionLocal from app_core.db.models import EmailLog class MailerService: def __init__(self, settings: AppSettings | None = None) -> None: self.settings = settings or AppSettings() def fetch_daily_rows(self, report_date) -> pd.DataFrame: sql = ( 'SELECT * FROM "tenantpostings" ' 'WHERE "created_at"::date = %(d)s ' 'ORDER BY "id" DESC ' 'LIMIT 10000' ) with engine.connect() as conn: df = pd.read_sql(sql, conn, params={"d": report_date}) return df def select_report_date(self, preferred: date | None = None) -> date | None: """Return preferred date if it has data; else most recent date with data; else None.""" with engine.connect() as conn: dates_df = pd.read_sql( 'SELECT "created_at"::date AS d, COUNT(*) AS c\n' 'FROM "tenantpostings"\n' 'GROUP BY d\n' 'ORDER BY d DESC', conn, ) if dates_df.empty: return None # Normalize if 'd' not in dates_df.columns: return None dates_df['d'] = pd.to_datetime(dates_df['d'], errors='coerce') available = [d.date() for d in dates_df['d'].dropna().tolist()] if preferred and preferred in available: return preferred return available[0] if available else None def build_email_html(self, row: dict, df: pd.DataFrame | None = None) -> str: # Robust de-duplication of the entire dataframe before processing if df is not None and not df.empty: def get_priority(status): val = str(status).lower() if any(x in val for x in ["success", "ok", "posted", "completed", "done"]): return 0 if any(x in val for x in ["pending", "queue", "waiting", "processing"]): return 1 return 2 df = df.copy() if 'triumph_status' in df.columns: df['_priority'] = df['triumph_status'].apply(get_priority) else: df['_priority'] = 2 # Sort by priority (success first) and then by ID (newest first) sort_cols = ['_priority', 'id'] df = df.sort_values(sort_cols, ascending=[True, False]) # 1. Deduplicate by triumph_event (if present and not empty) if 'triumph_event' in df.columns: has_event = (df['triumph_event'].fillna('').astype(str).str.strip() != '') & (df['triumph_event'].astype(str) != '-') df_with_ev = df[has_event].drop_duplicates(subset=['tenant_id', 'processing_type', 'triumph_event'], keep='first') df_no_ev = df[~has_event] df = pd.concat([df_with_ev, df_no_ev]).sort_values(sort_cols, ascending=[True, False]) # 2. Deduplicate by register_close_id (for Journals/Banking Journals) if 'register_close_id' in df.columns: has_rc = (df['register_close_id'].fillna('').astype(str).str.strip() != '') & (df['register_close_id'].astype(str) != '-') df_with_rc = df[has_rc].drop_duplicates(subset=['tenant_id', 'processing_type', 'register_close_id'], keep='first') df_no_rc = df[~has_rc] df = pd.concat([df_with_rc, df_no_rc]).sort_values(sort_cols, ascending=[True, False]) # 3. Deduplicate by sale_ids (for Invoices/Receipts) if 'sale_ids' in df.columns: has_sales = (df['sale_ids'].fillna('').astype(str).str.strip() != '') df_with_sales = df[has_sales].drop_duplicates(subset=['tenant_id', 'processing_type', 'sale_ids'], keep='first') df_no_sales = df[~has_sales] df = pd.concat([df_with_sales, df_no_sales]).sort_values(sort_cols, ascending=[True, False]) df = df.drop(columns=['_priority'], errors='ignore') outlet = row.get("outlet_name") or row.get("register_name") or "Outlet" division = row.get("division_code") or "PC" status = (row.get("triumph_status") or "Posted successfully").capitalize() register_close_id = row.get("register_close_id", "-") register_id = row.get("register_id", "-") def lines_for(ptype: str) -> list[str]: """Return formatted lines for all rows of a processing_type. Example line: 3,616.19 (Event ID: 2904783) """ if df is None or df.empty or 'processing_type' not in df.columns: return [] sub = df[df['processing_type'].astype(str).str.upper() == ptype.upper()] if 'processing_type' in df.columns else pd.DataFrame() if sub.empty: return [] # Data is already deduplicated at the start of build_email_html sub = sub.sort_values('id', ascending=False) result: list[str] = [] for _, r in sub.sort_values('id', ascending=False).iterrows(): amt = r.get('total_amount') evt = r.get('triumph_event', '-') try: amt_str = f"{float(amt):,.2f}" except Exception: amt_str = str(amt) if amt is not None else '-' result.append(f"{amt_str} (Event ID: {evt})") return result journal_lines = lines_for('JOURNAL') bank_journal_lines = lines_for('BANKING_JOURNAL') invoice_lines = lines_for('INVOICE') receipt_lines = lines_for('RECEIPT') # Optional: transaction summary by store (single table) store_summary_table_html = "" events_matrix_html = "" if isinstance(df, pd.DataFrame) and not df.empty and ('tenant_id' in df.columns): def summarize_for(store: dict) -> dict[str, str]: sid = store.get('tenant_id') name = store.get('label') sub = df[df['tenant_id'] == sid] # Data is already deduplicated at the start of build_email_html sub = sub.sort_values('id', ascending=False) def pick_total(kind: str) -> tuple[str, int]: if sub.empty or 'processing_type' not in sub.columns: return ("0.00", 0) s = sub[sub['processing_type'].astype(str).str.upper() == kind] if s.empty: return ("0.00", 0) try: total = float(s['total_amount'].fillna(0).sum()) if 'total_amount' in s.columns else 0.0 except Exception: total = 0.0 return (f"{total:,.2f}", len(s)) def has_rows(kind: str) -> bool: if sub.empty or 'processing_type' not in sub.columns: return False s = sub[sub['processing_type'].astype(str).str.upper() == kind] return not s.empty def latest_event(kind: str) -> str: if sub.empty or 'processing_type' not in sub.columns: return "-" s = sub[sub['processing_type'].astype(str).str.upper() == kind] if s.empty: return "-" series = s.get('triumph_event') if 'triumph_event' in s.columns else None if series is None or series.empty: return "-" try: return str(series.dropna().astype(str).iloc[0]) except Exception: return "-" def latest_status_emoji(kind: str) -> str: if sub.empty or 'processing_type' not in sub.columns: return "" s = sub[sub['processing_type'].astype(str).str.upper() == kind] if s.empty: return "" status_series = s.get('triumph_status') if 'triumph_status' in s.columns else None if status_series is None or status_series.empty: return "" try: val = str(status_series.iloc[0]).strip().lower() except Exception: val = "" if any(x in val for x in ["success", "ok", "completed", "done"]): return " ✅" if any(x in val for x in ["fail", "error", "invalid", "dead"]): return " ❌" if any(x in val for x in ["pending", "queue", "waiting", "processing"]): return " ⚠️" return "" j_total, _ = pick_total('JOURNAL') b_total, _ = pick_total('BANKING_JOURNAL') i_total, _ = pick_total('INVOICE') r_total, _ = pick_total('RECEIPT') j_eid = latest_event('JOURNAL'); j_stat = latest_status_emoji('JOURNAL') b_eid = latest_event('BANKING_JOURNAL'); b_stat = latest_status_emoji('BANKING_JOURNAL') i_eid = latest_event('INVOICE'); i_stat = latest_status_emoji('INVOICE') r_eid = latest_event('RECEIPT'); r_stat = latest_status_emoji('RECEIPT') def render_cell(exists: bool, total: str, eid: str, stat: str, ptype: str = "") -> str: if not exists: return "Nill" # For INVOICE and RECEIPT, show individual line items if multiple exist if ptype.upper() in ['INVOICE', 'RECEIPT'] and sub is not None and not sub.empty: type_sub = sub[sub['processing_type'].astype(str).str.upper() == ptype.upper()] if len(type_sub) > 1: # Multiple transactions individual_lines = [] for _, r in type_sub.sort_values('id', ascending=False).iterrows(): amt = r.get('total_amount') evt = r.get('triumph_event', '-') status_val = str(r.get('triumph_status', '')).strip().lower() status_emoji = "" if any(x in status_val for x in ["success", "ok", "completed", "done"]): status_emoji = " ✅" elif any(x in status_val for x in ["fail", "error", "invalid", "dead"]): status_emoji = " ❌" elif any(x in status_val for x in ["pending", "queue", "waiting", "processing"]): status_emoji = " ⚠️" try: amt_str = f"{float(amt):,.2f}" except Exception: amt_str = str(amt) if amt is not None else '-' individual_lines.append(f"
Hello Tucker Fresh,
Here's your daily digest of posted transactions:
{store_summary_table_html}Thank you for staying updated with us.
Best regards,
Workolik Team