Files
Tuckerfresh-site/pages/see_payload.py
2026-04-07 12:44:06 +05:30

286 lines
11 KiB
Python

import streamlit as st
import pandas as pd
from sqlalchemy import text
from app_core.db.database import engine
from app_core.ui.layout import render_store_selector
@st.cache_data(ttl=300) # Cache for 5 minutes
def _load_tenant_data(tenant_id: int, limit: int = 10000):
"""Load data for a specific tenant with caching."""
with engine.connect() as conn:
df = pd.read_sql(
text('SELECT * FROM "tenantpostings" WHERE "tenant_id" = :t ORDER BY "id" DESC LIMIT :limit'),
conn,
params={"t": tenant_id, "limit": limit},
)
return df
def _detect_status_column(df: pd.DataFrame) -> str | None:
candidates = ["status", "state", "result", "triumph_status"]
lower_map = {c.lower(): c for c in df.columns}
for key in candidates:
if key in lower_map:
return lower_map[key]
for c in df.columns:
if "status" in c.lower():
return c
return None
def _normalize_name(name: str) -> str:
return "".join(ch for ch in name.lower() if ch.isalnum())
def _build_display_map(df: pd.DataFrame) -> dict[str, str]:
overrides = {
"triumph_status": "Status",
"triumph_event": "Event",
"outlet_name": "Outlet Name",
"tenant_id": "Tenant ID",
"processing_type": "Processing Type",
"total_amount": "Total Amount",
"created_at": "Date",
"updated_at": "Updated At",
"id": "SNo",
}
display_map: dict[str, str] = {}
used: set[str] = set()
for col in df.columns:
key = col.lower()
if key in overrides:
label = overrides[key]
else:
label = col.replace("_", " ").title()
base = label
suffix = 2
while label in used:
label = f"{base} {suffix}"
suffix += 1
used.add(label)
display_map[col] = label
return display_map
def _format_status_with_emoji(styler: "pd.io.formats.style.Styler", df: pd.DataFrame, status_col: str | None) -> "pd.io.formats.style.Styler":
if status_col is None or status_col not in df.columns:
return styler
def fmt(val):
v = str(val)
v_lower = v.lower()
if any(k in v_lower for k in ["success", "ok", "completed", "done", "active"]):
return f"{v}"
if any(k in v_lower for k in ["fail", "error", "dead", "invalid"]):
return f"{v}"
if any(k in v_lower for k in ["pending", "queue", "waiting", "processing"]):
return f"{v}"
return v
return styler.format({status_col: fmt})
def _badge_status_cells(styler: "pd.io.formats.style.Styler", df: pd.DataFrame, status_col: str | None) -> "pd.io.formats.style.Styler":
if status_col is None or status_col not in df.columns:
return styler
def badge(val):
v = str(val).lower()
bg = "#E2E8F0"; color = "#0F172A"
if any(k in v for k in ["success", "ok", "completed", "done", "active"]):
bg = "#E6F7EE"; color = "#166534"
elif any(k in v for k in ["fail", "error", "dead", "invalid"]):
bg = "#FDECEC"; color = "#991B1B"
elif any(k in v for k in ["pending", "queue", "waiting", "processing"]):
bg = "#FEF5E6"; color = "#92400E"
return f"background-color: {bg}; color:{color}; border-radius: 999px; padding: 4px 8px;"
return styler.map(badge, subset=pd.IndexSlice[:, [status_col]])
def _zebra_style(df: pd.DataFrame) -> "pd.io.formats.style.Styler":
df2 = df.reset_index(drop=True)
def zebra(row: pd.Series):
return ["background-color: rgba(2,6,23,0.03);" if (row.name % 2 == 0) else ""] * len(row)
styler = df2.style.apply(zebra, axis=1)
styler = styler.set_table_styles([
{"selector": "th", "props": "position: sticky; top: 0; background: #F0F6FF; color:#0F172A; font-weight:700;"},
{"selector": "tbody td", "props": "border-top: 1px solid rgba(15,23,42,0.06);"},
{"selector": "table", "props": "border-collapse: separate; border-spacing: 0;"},
])
styler = styler.hide(axis="index")
return styler
def _format_two_decimals_for_amounts(styler: "pd.io.formats.style.Styler", df: pd.DataFrame) -> "pd.io.formats.style.Styler":
candidates_norm = {"totalamount", "total_amount", "amount", "totalamounts", "totalamounttotals"}
targets = []
for c in df.columns:
if _normalize_name(c) in candidates_norm and pd.api.types.is_numeric_dtype(df[c]):
targets.append(c)
if targets:
styler = styler.format(formatter="{:.2f}", subset=pd.IndexSlice[:, targets])
return styler
def _format_date_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Format date columns to show only date part"""
df_formatted = df.copy()
for col in df_formatted.columns:
if 'created_at' in col.lower() or 'date' in col.lower():
if pd.api.types.is_datetime64_any_dtype(df_formatted[col]):
df_formatted[col] = df_formatted[col].dt.date
else:
# Try to convert to datetime first
try:
df_formatted[col] = pd.to_datetime(df_formatted[col]).dt.date
except:
pass
return df_formatted
def _pick_existing_columns(df: pd.DataFrame, names: list[str]) -> list[str]:
lower_map = {c.lower(): c for c in df.columns}
picked = []
for n in names:
if n.lower() in lower_map:
picked.append(lower_map[n.lower()])
return picked
def _stat_card(title: str, value: int | str, color: str, emoji: str) -> str:
return f"""
<div class=\"stat-card\" style=\"display:flex;align-items:center;gap:12px;padding:14px 16px;border-radius:14px;background:#fff;border:1px solid rgba(15,23,42,0.06);box-shadow:0 10px 24px rgba(2,6,23,0.08);transition:transform .15s ease, box-shadow .15s ease;\">
<div style=\"font-size:20px;\">{emoji}</div>
<div>
<div style=\"font-size:12px;color:#64748b;\">{title}</div>
<div style=\"font-size:20px;font-weight:800;color:{color};\">{value}</div>
</div>
</div>
"""
def render_page():
if st.session_state.get("auth_user") is None:
st.warning("Please login to continue.")
st.stop()
# Store selector (required before loading data view)
tenant_id, _ = render_store_selector()
if not tenant_id:
st.info("Please choose a store to view data.")
return
st.markdown(
"""
<style>
.stat-card:hover{transform:translateY(-2px);box-shadow:0 16px 36px rgba(2,6,23,0.12)}
.stat-row{margin-bottom:14px;}
.block-after-stats{margin-top:10px;}
</style>
""",
unsafe_allow_html=True,
)
st.title("DataHub")
st.caption("Inspect data from Warehouse.")
st.info("Connected to database ✅.")
df = _load_tenant_data(tenant_id)
status_col_global = _detect_status_column(df)
if status_col_global:
s = df[status_col_global].astype(str).str.lower()
ok = s.str_contains("success|ok|completed|done|active").sum() if hasattr(s, 'str_contains') else s.str.contains("success|ok|completed|done|active").sum()
bad = s.str_contains("fail|error|dead|invalid").sum() if hasattr(s, 'str_contains') else s.str.contains("fail|error|dead|invalid").sum()
pend = s.str_contains("pending|queue|waiting|processing").sum() if hasattr(s, 'str_contains') else s.str.contains("pending|queue|waiting|processing").sum()
total = len(df)
st.markdown('<div class="stat-row">', unsafe_allow_html=True)
c1, c2, c3, c4 = st.columns([1,1,1,2])
with c1: st.markdown(_stat_card("Success", ok, "#166534", ""), unsafe_allow_html=True)
with c2: st.markdown(_stat_card("Failed", bad, "#991B1B", ""), unsafe_allow_html=True)
with c3: st.markdown(_stat_card("Pending", pend, "#92400E", ""), unsafe_allow_html=True)
with c4: st.caption(f"Total rows: {total}")
st.markdown('</div>', unsafe_allow_html=True)
minimal_names = [
"id",
"created_at",
"outlet_name",
"processing_type",
"total_amount",
"triumph_status",
"triumph_event",
]
minimal_cols = _pick_existing_columns(df, minimal_names)
# Controls row: search only
q = st.text_input("Search", placeholder="Type to filter rows across all columns")
# Apply global search
filtered = df
if q:
q_lower = q.lower()
filtered = filtered[filtered.apply(lambda r: r.astype(str).str.lower().str.contains(q_lower).any(), axis=1)]
# Always use minimal columns
visible_cols = minimal_cols
if visible_cols:
filtered = filtered[visible_cols]
# Pagination (moved below the table; small controls)
total_rows = len(filtered)
default_page_size = 25
total_pages = max(1, (total_rows + default_page_size - 1) // default_page_size)
page_num_state_key = "payload_page_num"
if page_num_state_key not in st.session_state:
st.session_state[page_num_state_key] = 1
start = (st.session_state[page_num_state_key] - 1) * default_page_size
end = start + default_page_size
page_df = filtered.iloc[start:end]
# Build display names and style
display_map = _build_display_map(page_df)
display_df = page_df.rename(columns=display_map)
# Format date columns
display_df = _format_date_columns(display_df)
status_col_original = _detect_status_column(page_df)
status_col_display = display_map.get(status_col_original)
styled = _zebra_style(display_df)
styled = _format_two_decimals_for_amounts(styled, display_df)
# Always apply status badges
if status_col_display:
styled = _format_status_with_emoji(styled, display_df, status_col_display)
styled = _badge_status_cells(styled, display_df, status_col_display)
styled = _format_two_decimals_for_amounts(styled, display_df)
styled = styled.set_table_styles([
{"selector": "th", "props": "position: sticky; top: 0; background: #F0F6FF; color:#0F172A; font-weight:700;"},
{"selector": "tbody td", "props": "border-top: 1px solid rgba(15,23,42,0.06);"},
{"selector": "table", "props": "border-collapse: separate; border-spacing: 0;"},
]).hide(axis="index")
st.dataframe(styled, use_container_width=True, height=520)
# Bottom pagination controls
p1, p2, p3 = st.columns([1, 2, 1])
with p1:
st.caption(f"Showing {len(page_df)} of {total_rows} rows")
with p2:
st.caption("Page")
st.session_state[page_num_state_key] = st.number_input(
" ", min_value=1, max_value=total_pages, value=st.session_state[page_num_state_key], step=1, label_visibility="collapsed")
with p3:
download_df = filtered.rename(columns=_build_display_map(filtered))
st.download_button(
"Download filtered CSV",
data=download_df.to_csv(index=False).encode("utf-8"),
file_name="tenantpostings_filtered.csv",
use_container_width=True,
)