Files
Krow-workspace/backend/command-api/src/services/mobile-command-service.js

3311 lines
101 KiB
JavaScript

import crypto from 'node:crypto';
import { AppError } from '../lib/errors.js';
import { query, withTransaction } from './db.js';
import { loadActorContext, requireClientContext, requireStaffContext } from './actor-context.js';
import { recordGeofenceIncident } from './attendance-monitoring.js';
import { distanceMeters, resolveEffectiveClockInPolicy } from './clock-in-policy.js';
import { uploadLocationBatch } from './location-log-storage.js';
import { enqueueHubManagerAlert, enqueueUserAlert } from './notification-outbox.js';
import { registerPushToken, unregisterPushToken } from './notification-device-tokens.js';
import {
clockIn as clockInCommand,
clockOut as clockOutCommand,
createOrder as createOrderCommand,
} from './command-service.js';
const MOBILE_CANCELLABLE_ASSIGNMENT_STATUSES = ['ASSIGNED', 'ACCEPTED'];
const MOBILE_CANCELLABLE_APPLICATION_STATUSES = ['PENDING', 'CONFIRMED'];
function toIsoOrNull(value) {
return value ? new Date(value).toISOString() : null;
}
function normalizeSlug(input) {
return `${input || ''}`
.toLowerCase()
.replace(/[^a-z0-9]+/g, '-')
.replace(/^-+|-+$/g, '')
.slice(0, 60);
}
function normalizePhone(value) {
if (!value) return null;
return `${value}`.trim();
}
function ensureArray(value) {
return Array.isArray(value) ? value : [];
}
async function ensureStaffNotBlockedByBusiness(client, { tenantId, businessId, staffId }) {
const blocked = await client.query(
`
SELECT id, reason, issue_flags
FROM staff_blocks
WHERE tenant_id = $1
AND business_id = $2
AND staff_id = $3
LIMIT 1
`,
[tenantId, businessId, staffId]
);
if (blocked.rowCount > 0) {
throw new AppError('STAFF_BLOCKED', 'Staff is blocked from future shift assignments for this business', 409, {
businessId,
staffId,
blockId: blocked.rows[0].id,
reason: blocked.rows[0].reason || null,
issueFlags: ensureArray(blocked.rows[0].issue_flags || []),
});
}
}
function buildAssignmentReferencePayload(assignment) {
return {
assignmentId: assignment.id,
shiftId: assignment.shift_id,
businessId: assignment.business_id,
vendorId: assignment.vendor_id,
staffId: assignment.staff_id,
clockPointId: assignment.clock_point_id,
};
}
function generateOrderNumber(prefix = 'ORD') {
const stamp = Date.now().toString().slice(-8);
const random = crypto.randomInt(100, 999);
return `${prefix}-${stamp}${random}`;
}
function normalizeWorkerCount(position) {
return position.workerCount ?? position.workersNeeded ?? 1;
}
function roleCodeFromName(name) {
return `${name || 'role'}`
.toUpperCase()
.replace(/[^A-Z0-9]+/g, '_')
.replace(/^_+|_+$/g, '')
.slice(0, 50) || 'ROLE';
}
function toArrayOfUniqueIntegers(values, fallback) {
const source = Array.isArray(values) && values.length > 0 ? values : fallback;
return [...new Set(source.map((value) => Number(value)).filter((value) => Number.isInteger(value) && value >= 0 && value <= 6))];
}
function combineDateAndTime(dateValue, timeValue) {
const [hours, minutes] = `${timeValue}`.split(':').map((value) => Number.parseInt(value, 10));
const date = new Date(`${dateValue}T00:00:00.000Z`);
if (Number.isNaN(date.getTime()) || Number.isNaN(hours) || Number.isNaN(minutes)) {
throw new AppError('VALIDATION_ERROR', 'Invalid date/time combination for order schedule', 400, {
date: dateValue,
time: timeValue,
});
}
date.setUTCHours(hours, minutes, 0, 0);
return date;
}
function buildShiftWindow(dateValue, startTime, endTime) {
const startsAt = combineDateAndTime(dateValue, startTime);
const endsAt = combineDateAndTime(dateValue, endTime);
if (endsAt <= startsAt) {
endsAt.setUTCDate(endsAt.getUTCDate() + 1);
}
return {
startsAt: startsAt.toISOString(),
endsAt: endsAt.toISOString(),
};
}
function materializeScheduleDates({ orderType, orderDate, startDate, endDate, recurrenceDays, daysOfWeek, horizonDays }) {
if (orderType === 'ONE_TIME') {
return [orderDate];
}
const from = new Date(`${startDate}T00:00:00.000Z`);
if (Number.isNaN(from.getTime())) {
throw new AppError('VALIDATION_ERROR', 'Invalid startDate', 400, { startDate });
}
const to = orderType === 'RECURRING'
? new Date(`${endDate}T00:00:00.000Z`)
: (() => {
if (endDate) return new Date(`${endDate}T00:00:00.000Z`);
const fallback = new Date(from);
fallback.setUTCDate(fallback.getUTCDate() + (horizonDays || 28));
return fallback;
})();
if (Number.isNaN(to.getTime()) || to < from) {
throw new AppError('VALIDATION_ERROR', 'Invalid scheduling window', 400, {
startDate,
endDate: endDate || null,
});
}
const activeDays = orderType === 'RECURRING'
? toArrayOfUniqueIntegers(recurrenceDays, [])
: toArrayOfUniqueIntegers(daysOfWeek, [1, 2, 3, 4, 5]);
const dates = [];
const cursor = new Date(from);
while (cursor <= to) {
if (activeDays.includes(cursor.getUTCDay())) {
dates.push(cursor.toISOString().slice(0, 10));
}
cursor.setUTCDate(cursor.getUTCDate() + 1);
}
if (dates.length === 0) {
throw new AppError('VALIDATION_ERROR', 'Schedule did not produce any shifts', 400, {
orderType,
startDate,
endDate: endDate || null,
activeDays,
});
}
return dates;
}
async function loadHubDetails(tenantId, businessId, hubId) {
const result = await query(
`
SELECT
cp.id,
cp.label,
cp.address,
cp.latitude,
cp.longitude,
cp.geofence_radius_meters,
cp.metadata
FROM clock_points cp
WHERE cp.tenant_id = $1
AND cp.business_id = $2
AND cp.id = $3
AND cp.status = 'ACTIVE'
`,
[tenantId, businessId, hubId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Hub not found in business scope', 404, { hubId });
}
return result.rows[0];
}
async function resolveRoleCatalogEntries(tenantId, positions) {
const roleIds = positions.map((position) => position.roleId).filter(Boolean);
const roleCodes = positions.map((position) => position.roleCode).filter(Boolean);
if (roleIds.length === 0 && roleCodes.length === 0) {
return new Map();
}
const result = await query(
`
SELECT id, code, name
FROM roles_catalog
WHERE tenant_id = $1
AND status = 'ACTIVE'
AND (
(cardinality($2::uuid[]) > 0 AND id = ANY($2::uuid[]))
OR
(cardinality($3::text[]) > 0 AND code = ANY($3::text[]))
)
`,
[tenantId, roleIds, roleCodes]
);
const lookup = new Map();
for (const row of result.rows) {
lookup.set(row.id, row);
lookup.set(row.code, row);
}
return lookup;
}
function buildShiftRoleEntry(position, roleLookup) {
const role = roleLookup.get(position.roleId) || roleLookup.get(position.roleCode);
const workersNeeded = normalizeWorkerCount(position);
const billRateCents = position.billRateCents ?? position.hourlyRateCents ?? 0;
const payRateCents = position.payRateCents ?? position.hourlyRateCents ?? 0;
return {
startTime: position.startTime,
endTime: position.endTime,
roleId: role?.id || position.roleId || null,
roleCode: role?.code || position.roleCode || roleCodeFromName(position.roleName),
roleName: role?.name || position.roleName || role?.name || 'Role',
workersNeeded,
payRateCents,
billRateCents,
metadata: {
lunchBreakMinutes: position.lunchBreakMinutes ?? 0,
paidBreak: position.paidBreak ?? false,
instantBook: position.instantBook ?? false,
...position.metadata,
},
};
}
function buildOrderShifts({ dates, positions, timezone, hub }) {
const shifts = [];
for (const dateValue of dates) {
const buckets = new Map();
for (const position of positions) {
const key = `${position.startTime}|${position.endTime}`;
const bucket = buckets.get(key) || [];
bucket.push(position);
buckets.set(key, bucket);
}
let shiftIndex = 0;
for (const [timeKey, bucketPositions] of buckets.entries()) {
shiftIndex += 1;
const [startTime, endTime] = timeKey.split('|');
const window = buildShiftWindow(dateValue, startTime, endTime);
const requiredWorkers = bucketPositions.reduce((sum, position) => sum + normalizeWorkerCount(position), 0);
shifts.push({
shiftCode: `SFT-${dateValue.replaceAll('-', '')}-${shiftIndex}`,
title: `${hub.label} ${startTime}-${endTime}`,
startsAt: window.startsAt,
endsAt: window.endsAt,
timezone: timezone || 'UTC',
clockPointId: hub.id,
locationName: hub.label,
locationAddress: hub.address || null,
latitude: hub.latitude == null ? undefined : Number(hub.latitude),
longitude: hub.longitude == null ? undefined : Number(hub.longitude),
geofenceRadiusMeters: hub.geofence_radius_meters || undefined,
requiredWorkers,
metadata: {
city: hub.metadata?.city || null,
state: hub.metadata?.state || null,
zipCode: hub.metadata?.zipCode || null,
},
roles: bucketPositions,
});
}
}
return shifts;
}
function buildMobileOrderMetadata(orderType, payload, extra = {}) {
return {
orderType,
source: 'mobile-api',
recurrenceDays: payload.recurrenceDays || payload.daysOfWeek || null,
startDate: payload.startDate || payload.orderDate || null,
endDate: payload.endDate || null,
...payload.metadata,
...extra,
};
}
function inferAttendanceSourceType(payload) {
if (payload.sourceType) return payload.sourceType;
if (payload.nfcTagId) return 'NFC';
if (payload.latitude != null && payload.longitude != null) return 'GEO';
return 'MANUAL';
}
async function buildOrderCreatePayloadFromMobile(actor, context, payload, orderType, extra = {}) {
const hubId = payload.hubId || extra.hubId;
const hub = await loadHubDetails(context.tenant.tenantId, context.business.businessId, hubId);
const positionInputs = payload.positions || extra.positions || [];
const roleLookup = await resolveRoleCatalogEntries(context.tenant.tenantId, positionInputs);
const normalizedPositions = positionInputs.map((position) => buildShiftRoleEntry(position, roleLookup));
const dates = materializeScheduleDates({
orderType,
orderDate: payload.orderDate || extra.orderDate,
startDate: payload.startDate || extra.startDate,
endDate: payload.endDate || extra.endDate,
recurrenceDays: payload.recurrenceDays || extra.recurrenceDays,
daysOfWeek: payload.daysOfWeek || extra.daysOfWeek,
horizonDays: payload.horizonDays || extra.horizonDays,
});
const shifts = buildOrderShifts({
dates,
positions: normalizedPositions,
timezone: payload.timezone || extra.timezone,
hub,
});
const startsAt = shifts[0]?.startsAt || null;
const endsAt = shifts[shifts.length - 1]?.endsAt || null;
return {
tenantId: context.tenant.tenantId,
businessId: context.business.businessId,
vendorId: payload.vendorId ?? extra.vendorId ?? null,
orderNumber: generateOrderNumber(orderType === 'ONE_TIME' ? 'ORD' : orderType === 'RECURRING' ? 'RCR' : 'PRM'),
title: payload.eventName || extra.eventName || 'Untitled Order',
description: payload.description ?? extra.description ?? null,
status: 'OPEN',
serviceType: payload.serviceType ?? extra.serviceType ?? 'EVENT',
startsAt,
endsAt,
locationName: hub.label,
locationAddress: hub.address || null,
latitude: hub.latitude == null ? undefined : Number(hub.latitude),
longitude: hub.longitude == null ? undefined : Number(hub.longitude),
notes: payload.notes ?? extra.notes ?? null,
metadata: buildMobileOrderMetadata(orderType, payload, extra.metadata || {}),
shifts: shifts.map((shift, index) => ({
...shift,
shiftCode: shift.shiftCode || `SFT-${index + 1}`,
roles: shift.roles.map((role) => ({
roleId: role.roleId,
roleCode: role.roleCode,
roleName: role.roleName,
workersNeeded: role.workersNeeded,
payRateCents: role.payRateCents,
billRateCents: role.billRateCents,
metadata: role.metadata,
})),
})),
};
}
async function loadEditableOrderTemplate(actorUid, tenantId, businessId, orderId) {
const context = await requireClientContext(actorUid);
if (context.tenant.tenantId !== tenantId || context.business.businessId !== businessId) {
throw new AppError('FORBIDDEN', 'Order is outside the current client context', 403, { orderId });
}
const result = await query(
`
SELECT
o.id AS "orderId",
o.title AS "eventName",
o.description,
o.notes,
o.vendor_id AS "vendorId",
o.service_type AS "serviceType",
o.metadata,
COALESCE(
json_agg(
json_build_object(
'shiftId', s.id,
'clockPointId', s.clock_point_id,
'date', to_char(s.starts_at AT TIME ZONE 'UTC', 'YYYY-MM-DD'),
'startTime', to_char(s.starts_at AT TIME ZONE 'UTC', 'HH24:MI'),
'endTime', to_char(s.ends_at AT TIME ZONE 'UTC', 'HH24:MI'),
'roles', (
SELECT json_agg(
json_build_object(
'roleId', sr.role_id,
'roleCode', sr.role_code,
'roleName', sr.role_name,
'workerCount', sr.workers_needed,
'hourlyRateCents', sr.bill_rate_cents,
'payRateCents', sr.pay_rate_cents,
'billRateCents', sr.bill_rate_cents,
'startTime', to_char(s.starts_at AT TIME ZONE 'UTC', 'HH24:MI'),
'endTime', to_char(s.ends_at AT TIME ZONE 'UTC', 'HH24:MI')
)
ORDER BY sr.role_name ASC
)
FROM shift_roles sr
WHERE sr.shift_id = s.id
)
)
ORDER BY s.starts_at ASC
),
'[]'::json
) AS shifts
FROM orders o
JOIN shifts s ON s.order_id = o.id
WHERE o.id = $1
AND o.tenant_id = $2
AND o.business_id = $3
AND s.starts_at > NOW()
AND s.status NOT IN ('CANCELLED', 'COMPLETED')
GROUP BY o.id
`,
[orderId, tenantId, businessId]
);
if (result.rowCount === 0) {
throw new AppError('ORDER_EDIT_BLOCKED', 'Order has no future shifts available for edit', 409, { orderId });
}
return result.rows[0];
}
async function cancelFutureOrderSlice(client, {
actorUid,
tenantId,
businessId,
orderId,
reason,
metadata = {},
}) {
const orderResult = await client.query(
`
SELECT id, order_number, status
FROM orders
WHERE id = $1
AND tenant_id = $2
AND business_id = $3
FOR UPDATE
`,
[orderId, tenantId, businessId]
);
if (orderResult.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Order not found for cancel flow', 404, { orderId });
}
const order = orderResult.rows[0];
const futureShiftsResult = await client.query(
`
SELECT id
FROM shifts
WHERE order_id = $1
AND starts_at > NOW()
AND status NOT IN ('CANCELLED', 'COMPLETED')
ORDER BY starts_at ASC
FOR UPDATE
`,
[order.id]
);
if (futureShiftsResult.rowCount === 0) {
return {
orderId: order.id,
orderNumber: order.order_number,
status: order.status,
futureOnly: true,
cancelledShiftCount: 0,
alreadyCancelled: true,
};
}
const shiftIds = futureShiftsResult.rows.map((row) => row.id);
await client.query(
`
UPDATE orders
SET metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
order.id,
JSON.stringify({
futureCancellationReason: reason || null,
futureCancellationBy: actorUid,
futureCancellationAt: new Date().toISOString(),
...metadata,
}),
]
);
await client.query(
`
UPDATE shifts
SET status = 'CANCELLED',
updated_at = NOW()
WHERE id = ANY($1::uuid[])
`,
[shiftIds]
);
await client.query(
`
UPDATE assignments
SET status = 'CANCELLED',
updated_at = NOW()
WHERE shift_id = ANY($1::uuid[])
AND status = ANY($2::text[])
`,
[shiftIds, MOBILE_CANCELLABLE_ASSIGNMENT_STATUSES]
);
await client.query(
`
UPDATE applications
SET status = 'CANCELLED',
updated_at = NOW()
WHERE shift_id = ANY($1::uuid[])
AND status = ANY($2::text[])
`,
[shiftIds, MOBILE_CANCELLABLE_APPLICATION_STATUSES]
);
for (const shiftId of shiftIds) {
const roleIds = await client.query(
'SELECT id FROM shift_roles WHERE shift_id = $1',
[shiftId]
);
for (const role of roleIds.rows) {
await refreshShiftRoleCounts(client, role.id);
}
await refreshShiftCounts(client, shiftId);
}
await insertDomainEvent(client, {
tenantId,
aggregateType: 'order',
aggregateId: order.id,
eventType: 'ORDER_FUTURE_SLICE_CANCELLED',
actorUserId: actorUid,
payload: {
reason: reason || null,
shiftIds,
futureOnly: true,
},
});
return {
orderId: order.id,
orderNumber: order.order_number,
status: 'CANCELLED',
futureOnly: true,
cancelledShiftCount: shiftIds.length,
};
}
async function resolveStaffAssignmentForClock(actorUid, tenantId, payload, { requireOpenSession = false } = {}) {
const context = await requireStaffContext(actorUid);
if (payload.assignmentId) {
return { assignmentId: payload.assignmentId, context };
}
if (payload.applicationId) {
const fromApplication = await query(
`
SELECT a.id AS "assignmentId"
FROM assignments a
JOIN applications app ON app.id = a.application_id
JOIN staffs s ON s.id = a.staff_id
WHERE a.tenant_id = $1
AND app.id = $2
AND s.user_id = $3
ORDER BY a.created_at DESC
LIMIT 1
`,
[tenantId, payload.applicationId, actorUid]
);
if (fromApplication.rowCount > 0) {
return { assignmentId: fromApplication.rows[0].assignmentId, context };
}
}
if (payload.shiftId) {
const fromShift = await query(
`
SELECT a.id AS "assignmentId"
FROM assignments a
JOIN staffs s ON s.id = a.staff_id
WHERE a.tenant_id = $1
AND a.shift_id = $2
AND s.user_id = $3
ORDER BY a.created_at DESC
LIMIT 1
`,
[tenantId, payload.shiftId, actorUid]
);
if (fromShift.rowCount > 0) {
return { assignmentId: fromShift.rows[0].assignmentId, context };
}
}
if (requireOpenSession) {
const openSession = await query(
`
SELECT attendance_sessions.assignment_id AS "assignmentId"
FROM attendance_sessions
JOIN staffs s ON s.id = attendance_sessions.staff_id
WHERE attendance_sessions.tenant_id = $1
AND s.user_id = $2
AND attendance_sessions.status = 'OPEN'
ORDER BY attendance_sessions.updated_at DESC
LIMIT 1
`,
[tenantId, actorUid]
);
if (openSession.rowCount > 0) {
return { assignmentId: openSession.rows[0].assignmentId, context };
}
}
throw new AppError('NOT_FOUND', 'No assignment found for the current staff clock action', 404, payload);
}
async function loadAssignmentMonitoringContext(client, tenantId, assignmentId, actorUid) {
const result = await client.query(
`
SELECT
a.id,
a.tenant_id,
a.business_id,
a.vendor_id,
a.shift_id,
a.shift_role_id,
a.staff_id,
a.status,
s.clock_point_id,
s.title AS shift_title,
s.starts_at,
s.ends_at,
s.clock_in_mode,
s.allow_clock_in_override,
cp.default_clock_in_mode,
cp.allow_clock_in_override AS default_allow_clock_in_override,
cp.nfc_tag_uid AS expected_nfc_tag_uid,
COALESCE(s.latitude, cp.latitude) AS expected_latitude,
COALESCE(s.longitude, cp.longitude) AS expected_longitude,
COALESCE(s.geofence_radius_meters, cp.geofence_radius_meters) AS geofence_radius_meters
FROM assignments a
JOIN staffs st ON st.id = a.staff_id
JOIN shifts s ON s.id = a.shift_id
LEFT JOIN clock_points cp ON cp.id = s.clock_point_id
WHERE a.tenant_id = $1
AND a.id = $2
AND st.user_id = $3
FOR UPDATE OF a, s
`,
[tenantId, assignmentId, actorUid]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Assignment not found in staff scope', 404, {
assignmentId,
});
}
return result.rows[0];
}
async function ensureActorUser(client, actor, fields = {}) {
await client.query(
`
INSERT INTO users (id, email, display_name, phone, status, metadata)
VALUES ($1, $2, $3, $4, 'ACTIVE', COALESCE($5::jsonb, '{}'::jsonb))
ON CONFLICT (id) DO UPDATE
SET email = COALESCE(EXCLUDED.email, users.email),
display_name = COALESCE(EXCLUDED.display_name, users.display_name),
phone = COALESCE(EXCLUDED.phone, users.phone),
metadata = COALESCE(users.metadata, '{}'::jsonb) || COALESCE(EXCLUDED.metadata, '{}'::jsonb),
updated_at = NOW()
`,
[
actor.uid,
fields.email ?? actor.email ?? null,
fields.displayName ?? actor.email ?? null,
fields.phone ?? null,
JSON.stringify(fields.metadata || {}),
]
);
}
async function insertDomainEvent(client, {
tenantId,
aggregateType,
aggregateId,
eventType,
actorUserId,
payload,
}) {
await client.query(
`
INSERT INTO domain_events (
tenant_id,
aggregate_type,
aggregate_id,
sequence,
event_type,
actor_user_id,
payload
)
SELECT
$1,
$2,
$3,
COALESCE(MAX(sequence) + 1, 1),
$4,
$5,
$6::jsonb
FROM domain_events
WHERE tenant_id = $1
AND aggregate_type = $2
AND aggregate_id = $3
`,
[tenantId, aggregateType, aggregateId, eventType, actorUserId, JSON.stringify(payload || {})]
);
}
async function requireBusinessMembership(client, businessId, userId) {
const result = await client.query(
`
SELECT bm.id, bm.tenant_id, bm.business_id, bm.user_id
FROM business_memberships bm
WHERE bm.business_id = $1
AND bm.user_id = $2
AND bm.membership_status = 'ACTIVE'
`,
[businessId, userId]
);
if (result.rowCount === 0) {
throw new AppError('FORBIDDEN', 'Business membership not found for current user', 403, {
businessId,
userId,
});
}
return result.rows[0];
}
async function requireClockPoint(client, tenantId, businessId, hubId, { forUpdate = false } = {}) {
const result = await client.query(
`
SELECT
id,
tenant_id,
business_id,
label,
status,
cost_center_id,
nfc_tag_uid,
latitude,
longitude,
geofence_radius_meters,
default_clock_in_mode,
allow_clock_in_override,
metadata
FROM clock_points
WHERE id = $1
AND tenant_id = $2
AND business_id = $3
${forUpdate ? 'FOR UPDATE' : ''}
`,
[hubId, tenantId, businessId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Hub not found in business scope', 404, {
tenantId,
businessId,
hubId,
});
}
return result.rows[0];
}
async function requireInvoice(client, tenantId, businessId, invoiceId) {
const result = await client.query(
`
SELECT id, tenant_id, business_id, status, metadata
FROM invoices
WHERE id = $1
AND tenant_id = $2
AND business_id = $3
FOR UPDATE
`,
[invoiceId, tenantId, businessId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Invoice not found in business scope', 404, {
tenantId,
businessId,
invoiceId,
});
}
return result.rows[0];
}
async function requireStaffByActor(client, tenantId, actorUid) {
const result = await client.query(
`
SELECT s.id, s.tenant_id, s.user_id, s.full_name, s.email, s.phone, s.metadata, s.primary_role,
w.id AS workforce_id, w.vendor_id, w.workforce_number
FROM staffs s
LEFT JOIN workforce w ON w.staff_id = s.id
WHERE s.tenant_id = $1
AND s.user_id = $2
ORDER BY s.created_at ASC
LIMIT 1
FOR UPDATE OF s
`,
[tenantId, actorUid]
);
if (result.rowCount === 0) {
throw new AppError('FORBIDDEN', 'Staff profile not found for current user', 403, {
tenantId,
actorUid,
});
}
return result.rows[0];
}
async function requireShiftRoleForStaffApply(client, tenantId, shiftId, roleId, staffId) {
const result = await client.query(
`
SELECT
s.id AS shift_id,
s.tenant_id,
s.business_id,
s.vendor_id,
s.status AS shift_status,
s.starts_at,
s.ends_at,
sr.id AS shift_role_id,
sr.role_code,
sr.role_name,
sr.workers_needed,
sr.assigned_count,
sr.pay_rate_cents
FROM shifts s
JOIN shift_roles sr ON sr.shift_id = s.id
WHERE s.id = $1
AND s.tenant_id = $2
AND ($3::uuid IS NULL OR sr.id = $3)
AND s.status IN ('OPEN', 'PENDING_CONFIRMATION', 'ASSIGNED')
AND NOT EXISTS (
SELECT 1
FROM applications a
WHERE a.shift_role_id = sr.id
AND a.staff_id = $4
AND a.status IN ('PENDING', 'CONFIRMED', 'CHECKED_IN', 'COMPLETED')
)
ORDER BY sr.created_at ASC
LIMIT 1
FOR UPDATE OF s, sr
`,
[shiftId, tenantId, roleId || null, staffId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Open shift role not found or already applied', 404, {
tenantId,
shiftId,
roleId: roleId || null,
});
}
return result.rows[0];
}
async function requirePendingAssignmentForActor(client, tenantId, shiftId, actorUid) {
const result = await client.query(
`
SELECT
a.id,
a.tenant_id,
a.business_id,
a.vendor_id,
a.shift_id,
a.shift_role_id,
a.workforce_id,
a.staff_id,
a.status,
a.metadata
FROM assignments a
JOIN staffs s ON s.id = a.staff_id
WHERE a.tenant_id = $1
AND a.shift_id = $2
AND s.user_id = $3
AND a.status = 'ASSIGNED'
ORDER BY a.created_at ASC
LIMIT 1
FOR UPDATE OF a
`,
[tenantId, shiftId, actorUid]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Pending shift assignment not found for current user', 404, {
tenantId,
shiftId,
actorUid,
});
}
return result.rows[0];
}
async function requireAnyAssignmentForActor(client, tenantId, shiftId, actorUid) {
const result = await client.query(
`
SELECT
a.id,
a.tenant_id,
a.business_id,
a.vendor_id,
a.shift_id,
a.shift_role_id,
a.workforce_id,
a.staff_id,
a.status,
a.metadata
FROM assignments a
JOIN staffs s ON s.id = a.staff_id
WHERE a.tenant_id = $1
AND a.shift_id = $2
AND s.user_id = $3
ORDER BY a.created_at ASC
LIMIT 1
FOR UPDATE OF a
`,
[tenantId, shiftId, actorUid]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Shift assignment not found for current user', 404, {
tenantId,
shiftId,
actorUid,
});
}
return result.rows[0];
}
async function refreshShiftRoleCounts(client, shiftRoleId) {
await client.query(
`
UPDATE shift_roles sr
SET assigned_count = counts.assigned_count,
updated_at = NOW()
FROM (
SELECT $1::uuid AS shift_role_id,
COUNT(*) FILTER (
WHERE status IN ('ASSIGNED', 'ACCEPTED', 'SWAP_REQUESTED', 'CHECKED_IN', 'CHECKED_OUT', 'COMPLETED')
)::INTEGER AS assigned_count
FROM assignments
WHERE shift_role_id = $1
) counts
WHERE sr.id = counts.shift_role_id
`,
[shiftRoleId]
);
}
async function refreshShiftCounts(client, shiftId) {
await client.query(
`
UPDATE shifts s
SET assigned_workers = counts.assigned_workers,
updated_at = NOW()
FROM (
SELECT $1::uuid AS shift_id,
COUNT(*) FILTER (
WHERE status IN ('ASSIGNED', 'ACCEPTED', 'SWAP_REQUESTED', 'CHECKED_IN', 'CHECKED_OUT', 'COMPLETED')
)::INTEGER AS assigned_workers
FROM assignments
WHERE shift_id = $1
) counts
WHERE s.id = counts.shift_id
`,
[shiftId]
);
}
async function resolveStaffOnboardingScope(client, actorUid, tenantId, vendorId) {
const existing = await loadActorContext(actorUid);
if (existing.tenant?.tenantId && existing.vendor?.vendorId) {
return {
tenantId: existing.tenant.tenantId,
vendorId: existing.vendor.vendorId,
};
}
if (tenantId && vendorId) {
const verify = await client.query(
`
SELECT t.id AS tenant_id, v.id AS vendor_id
FROM tenants t
JOIN vendors v ON v.tenant_id = t.id
WHERE t.id = $1
AND v.id = $2
AND t.status = 'ACTIVE'
AND v.status = 'ACTIVE'
`,
[tenantId, vendorId]
);
if (verify.rowCount === 0) {
throw new AppError('VALIDATION_ERROR', 'tenantId and vendorId do not resolve to an active onboarding scope', 400, {
tenantId,
vendorId,
});
}
return {
tenantId,
vendorId,
};
}
const fallback = await client.query(
`
SELECT t.id AS tenant_id, v.id AS vendor_id
FROM tenants t
JOIN vendors v ON v.tenant_id = t.id
WHERE t.status = 'ACTIVE'
AND v.status = 'ACTIVE'
ORDER BY t.created_at ASC, v.created_at ASC
LIMIT 1
`
);
if (fallback.rowCount === 0) {
throw new AppError('CONFIGURATION_ERROR', 'No active tenant/vendor onboarding scope is configured', 500);
}
return {
tenantId: fallback.rows[0].tenant_id,
vendorId: fallback.rows[0].vendor_id,
};
}
function buildStaffMetadataPatch(existing, payload) {
return {
...existing,
...(payload.bio !== undefined ? { bio: payload.bio } : {}),
...(payload.firstName !== undefined ? { firstName: payload.firstName } : {}),
...(payload.lastName !== undefined ? { lastName: payload.lastName } : {}),
...(payload.preferredLocations !== undefined ? { preferredLocations: payload.preferredLocations } : {}),
...(payload.maxDistanceMiles !== undefined ? { maxDistanceMiles: payload.maxDistanceMiles } : {}),
...(payload.industries !== undefined ? { industries: payload.industries } : {}),
...(payload.skills !== undefined ? { skills: payload.skills } : {}),
...(payload.profileVisible !== undefined ? { profileVisible: payload.profileVisible } : {}),
};
}
export async function createHub(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const businessMembership = await requireBusinessMembership(client, context.business.businessId, actor.uid);
let costCenterId = payload.costCenterId || null;
if (costCenterId) {
const costCenter = await client.query(
`
SELECT id
FROM cost_centers
WHERE id = $1
AND tenant_id = $2
AND business_id = $3
`,
[costCenterId, context.tenant.tenantId, context.business.businessId]
);
if (costCenter.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Cost center not found in business scope', 404, {
costCenterId,
});
}
}
const result = await client.query(
`
INSERT INTO clock_points (
tenant_id,
business_id,
label,
address,
latitude,
longitude,
geofence_radius_meters,
nfc_tag_uid,
default_clock_in_mode,
allow_clock_in_override,
cost_center_id,
status,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6, COALESCE($7, 120), $8, COALESCE($9, 'EITHER'), COALESCE($10, TRUE), $11, 'ACTIVE', $12::jsonb)
RETURNING id
`,
[
context.tenant.tenantId,
context.business.businessId,
payload.name,
payload.fullAddress || null,
payload.latitude ?? null,
payload.longitude ?? null,
payload.geofenceRadiusMeters ?? null,
payload.nfcTagId || null,
payload.clockInMode || null,
payload.allowClockInOverride ?? null,
costCenterId,
JSON.stringify({
placeId: payload.placeId || null,
street: payload.street || null,
city: payload.city || null,
state: payload.state || null,
country: payload.country || null,
zipCode: payload.zipCode || null,
clockInPolicyConfiguredBy: businessMembership.id,
createdByMembershipId: businessMembership.id,
}),
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'clock_point',
aggregateId: result.rows[0].id,
eventType: 'HUB_CREATED',
actorUserId: actor.uid,
payload,
});
return { hubId: result.rows[0].id, created: true };
});
}
export async function updateHub(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const hub = await requireClockPoint(client, context.tenant.tenantId, context.business.businessId, payload.hubId, { forUpdate: true });
let costCenterId = payload.costCenterId;
if (costCenterId) {
const costCenter = await client.query(
`
SELECT id
FROM cost_centers
WHERE id = $1
AND tenant_id = $2
AND business_id = $3
`,
[costCenterId, context.tenant.tenantId, context.business.businessId]
);
if (costCenter.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Cost center not found in business scope', 404, {
costCenterId,
});
}
}
const nextMetadata = {
...(hub.metadata || {}),
...(payload.placeId !== undefined ? { placeId: payload.placeId } : {}),
...(payload.street !== undefined ? { street: payload.street } : {}),
...(payload.city !== undefined ? { city: payload.city } : {}),
...(payload.state !== undefined ? { state: payload.state } : {}),
...(payload.country !== undefined ? { country: payload.country } : {}),
...(payload.zipCode !== undefined ? { zipCode: payload.zipCode } : {}),
};
await client.query(
`
UPDATE clock_points
SET label = COALESCE($2, label),
address = COALESCE($3, address),
latitude = COALESCE($4, latitude),
longitude = COALESCE($5, longitude),
geofence_radius_meters = COALESCE($6, geofence_radius_meters),
cost_center_id = COALESCE($7, cost_center_id),
default_clock_in_mode = COALESCE($8, default_clock_in_mode),
allow_clock_in_override = COALESCE($9, allow_clock_in_override),
metadata = $10::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
hub.id,
payload.name || null,
payload.fullAddress || null,
payload.latitude ?? null,
payload.longitude ?? null,
payload.geofenceRadiusMeters ?? null,
costCenterId || null,
payload.clockInMode || null,
payload.allowClockInOverride ?? null,
JSON.stringify(nextMetadata),
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'clock_point',
aggregateId: hub.id,
eventType: 'HUB_UPDATED',
actorUserId: actor.uid,
payload,
});
return { hubId: hub.id, updated: true };
});
}
export async function deleteHub(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const hub = await requireClockPoint(client, context.tenant.tenantId, context.business.businessId, payload.hubId, { forUpdate: true });
const activeOrders = await client.query(
`
SELECT 1
FROM shifts
WHERE clock_point_id = $1
AND status IN ('DRAFT', 'OPEN', 'PENDING_CONFIRMATION', 'ASSIGNED', 'ACTIVE')
LIMIT 1
`,
[hub.id]
);
if (activeOrders.rowCount > 0) {
throw new AppError('HUB_DELETE_BLOCKED', 'Cannot delete a hub with active orders or shifts', 409, {
hubId: hub.id,
});
}
await client.query(
`
UPDATE clock_points
SET status = 'INACTIVE',
updated_at = NOW()
WHERE id = $1
`,
[hub.id]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'clock_point',
aggregateId: hub.id,
eventType: 'HUB_ARCHIVED',
actorUserId: actor.uid,
payload: { reason: payload.reason || null },
});
return { hubId: hub.id, deleted: true };
});
}
export async function assignHubNfc(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const hub = await requireClockPoint(client, context.tenant.tenantId, context.business.businessId, payload.hubId, { forUpdate: true });
await client.query(
`
UPDATE clock_points
SET nfc_tag_uid = $2,
updated_at = NOW()
WHERE id = $1
`,
[hub.id, payload.nfcTagId]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'clock_point',
aggregateId: hub.id,
eventType: 'HUB_NFC_ASSIGNED',
actorUserId: actor.uid,
payload,
});
return { hubId: hub.id, nfcTagId: payload.nfcTagId };
});
}
export async function assignHubManager(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const hub = await requireClockPoint(client, context.tenant.tenantId, context.business.businessId, payload.hubId, { forUpdate: true });
let businessMembershipId = payload.businessMembershipId || null;
if (!businessMembershipId) {
const membership = await client.query(
`
SELECT id
FROM business_memberships
WHERE tenant_id = $1
AND business_id = $2
AND user_id = $3
AND membership_status = 'ACTIVE'
`,
[context.tenant.tenantId, context.business.businessId, payload.managerUserId]
);
if (membership.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Business team member not found for hub manager assignment', 404, {
managerUserId: payload.managerUserId,
});
}
businessMembershipId = membership.rows[0].id;
} else {
const membership = await client.query(
`
SELECT id
FROM business_memberships
WHERE id = $1
AND tenant_id = $2
AND business_id = $3
AND membership_status = 'ACTIVE'
`,
[businessMembershipId, context.tenant.tenantId, context.business.businessId]
);
if (membership.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Business membership not found for hub manager assignment', 404, {
businessMembershipId,
});
}
}
const result = await client.query(
`
INSERT INTO hub_managers (tenant_id, hub_id, business_membership_id)
VALUES ($1, $2, $3)
ON CONFLICT (hub_id, business_membership_id) DO UPDATE
SET updated_at = NOW()
RETURNING id
`,
[context.tenant.tenantId, hub.id, businessMembershipId]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'clock_point',
aggregateId: hub.id,
eventType: 'HUB_MANAGER_ASSIGNED',
actorUserId: actor.uid,
payload: {
businessMembershipId,
},
});
return { managerAssignmentId: result.rows[0].id, hubId: hub.id, businessMembershipId };
});
}
export async function createShiftManager(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const invitedEmail = payload.email.trim().toLowerCase();
const fullName = `${payload.firstName} ${payload.lastName}`.trim();
const userLookup = await client.query(
`
SELECT id
FROM users
WHERE LOWER(email) = $1
LIMIT 1
`,
[invitedEmail]
);
const existingMembership = await client.query(
`
SELECT id, user_id, membership_status, metadata
FROM business_memberships
WHERE tenant_id = $1
AND business_id = $2
AND (
LOWER(invited_email) = $3
OR ($4::text IS NOT NULL AND user_id = $4)
)
LIMIT 1
FOR UPDATE
`,
[context.tenant.tenantId, context.business.businessId, invitedEmail, userLookup.rows[0]?.id || null]
);
const membershipMetadata = {
...(existingMembership.rows[0]?.metadata || {}),
firstName: payload.firstName,
lastName: payload.lastName,
fullName,
phone: normalizePhone(payload.phone),
source: 'mobile-api',
createdBy: actor.uid,
...(payload.metadata || {}),
};
let businessMembershipId;
let membershipStatus;
if (existingMembership.rowCount > 0) {
const result = await client.query(
`
UPDATE business_memberships
SET user_id = COALESCE(user_id, $2),
invited_email = $3,
membership_status = CASE
WHEN COALESCE(user_id, $2) IS NOT NULL THEN 'ACTIVE'
ELSE membership_status
END,
business_role = $4,
metadata = COALESCE(metadata, '{}'::jsonb) || $5::jsonb,
updated_at = NOW()
WHERE id = $1
RETURNING id, membership_status
`,
[
existingMembership.rows[0].id,
userLookup.rows[0]?.id || null,
invitedEmail,
payload.role || 'manager',
JSON.stringify(membershipMetadata),
]
);
businessMembershipId = result.rows[0].id;
membershipStatus = result.rows[0].membership_status;
} else {
const result = await client.query(
`
INSERT INTO business_memberships (
tenant_id,
business_id,
user_id,
invited_email,
membership_status,
business_role,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb)
RETURNING id, membership_status
`,
[
context.tenant.tenantId,
context.business.businessId,
userLookup.rows[0]?.id || null,
invitedEmail,
userLookup.rows[0]?.id ? 'ACTIVE' : 'INVITED',
payload.role || 'manager',
JSON.stringify(membershipMetadata),
]
);
businessMembershipId = result.rows[0].id;
membershipStatus = result.rows[0].membership_status;
}
let managerAssignmentId = null;
if (payload.hubId) {
const hub = await requireClockPoint(client, context.tenant.tenantId, context.business.businessId, payload.hubId, { forUpdate: true });
const assigned = await client.query(
`
INSERT INTO hub_managers (tenant_id, hub_id, business_membership_id)
VALUES ($1, $2, $3)
ON CONFLICT (hub_id, business_membership_id) DO UPDATE
SET updated_at = NOW()
RETURNING id
`,
[context.tenant.tenantId, hub.id, businessMembershipId]
);
managerAssignmentId = assigned.rows[0].id;
}
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'business_membership',
aggregateId: businessMembershipId,
eventType: 'SHIFT_MANAGER_CREATED',
actorUserId: actor.uid,
payload: {
invitedEmail,
fullName,
hubId: payload.hubId || null,
membershipStatus,
},
});
return {
businessMembershipId,
membershipStatus,
invitedEmail,
fullName,
role: payload.role || 'manager',
managerAssignmentId,
};
});
}
export async function approveInvoice(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const invoice = await requireInvoice(client, context.tenant.tenantId, context.business.businessId, payload.invoiceId);
await client.query(
`
UPDATE invoices
SET status = 'APPROVED',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[invoice.id, JSON.stringify({
approvedBy: actor.uid,
approvedAt: new Date().toISOString(),
})]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'invoice',
aggregateId: invoice.id,
eventType: 'INVOICE_APPROVED',
actorUserId: actor.uid,
payload: { invoiceId: invoice.id },
});
return { invoiceId: invoice.id, status: 'APPROVED' };
});
}
export async function disputeInvoice(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const invoice = await requireInvoice(client, context.tenant.tenantId, context.business.businessId, payload.invoiceId);
await client.query(
`
UPDATE invoices
SET status = 'DISPUTED',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[invoice.id, JSON.stringify({
disputedBy: actor.uid,
disputedAt: new Date().toISOString(),
disputeReason: payload.reason,
})]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'invoice',
aggregateId: invoice.id,
eventType: 'INVOICE_DISPUTED',
actorUserId: actor.uid,
payload: { invoiceId: invoice.id, reason: payload.reason },
});
return { invoiceId: invoice.id, status: 'DISPUTED', reason: payload.reason };
});
}
export async function rateWorkerFromCoverage(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const assignmentResult = await client.query(
`
SELECT a.id, a.tenant_id, a.business_id, a.staff_id
FROM assignments a
WHERE a.tenant_id = $1
AND a.business_id = $2
AND a.staff_id = $3
AND ($4::uuid IS NULL OR a.id = $4)
ORDER BY a.updated_at DESC
LIMIT 1
FOR UPDATE OF a
`,
[context.tenant.tenantId, context.business.businessId, payload.staffId, payload.assignmentId || null]
);
if (assignmentResult.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Assignment not found for worker review in business scope', 404, payload);
}
const assignment = assignmentResult.rows[0];
const reviewResult = await client.query(
`
INSERT INTO staff_reviews (
tenant_id,
business_id,
staff_id,
assignment_id,
reviewer_user_id,
rating,
review_text,
tags
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb)
ON CONFLICT (business_id, assignment_id, staff_id) DO UPDATE
SET reviewer_user_id = EXCLUDED.reviewer_user_id,
rating = EXCLUDED.rating,
review_text = EXCLUDED.review_text,
tags = EXCLUDED.tags,
updated_at = NOW()
RETURNING id
`,
[
context.tenant.tenantId,
context.business.businessId,
payload.staffId,
assignment.id,
actor.uid,
payload.rating,
payload.feedback || null,
JSON.stringify(ensureArray(payload.issueFlags || [])),
]
);
if (payload.markAsFavorite === true) {
await client.query(
`
INSERT INTO staff_favorites (tenant_id, business_id, staff_id, created_by_user_id)
VALUES ($1, $2, $3, $4)
ON CONFLICT (business_id, staff_id) DO UPDATE
SET created_by_user_id = EXCLUDED.created_by_user_id
`,
[context.tenant.tenantId, context.business.businessId, payload.staffId, actor.uid]
);
}
if (payload.markAsFavorite === false) {
await client.query(
`
DELETE FROM staff_favorites
WHERE tenant_id = $1
AND business_id = $2
AND staff_id = $3
`,
[context.tenant.tenantId, context.business.businessId, payload.staffId]
);
}
if (payload.markAsBlocked === true) {
await client.query(
`
INSERT INTO staff_blocks (
tenant_id,
business_id,
staff_id,
created_by_user_id,
reason,
issue_flags,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6::jsonb, $7::jsonb)
ON CONFLICT (business_id, staff_id) DO UPDATE
SET reason = EXCLUDED.reason,
issue_flags = EXCLUDED.issue_flags,
metadata = COALESCE(staff_blocks.metadata, '{}'::jsonb) || EXCLUDED.metadata,
updated_at = NOW()
`,
[
context.tenant.tenantId,
context.business.businessId,
payload.staffId,
actor.uid,
payload.feedback || null,
JSON.stringify(ensureArray(payload.issueFlags || [])),
JSON.stringify({
blockedByCoverageReview: true,
assignmentId: assignment.id,
}),
]
);
}
if (payload.markAsBlocked === false) {
await client.query(
`
DELETE FROM staff_blocks
WHERE tenant_id = $1
AND business_id = $2
AND staff_id = $3
`,
[context.tenant.tenantId, context.business.businessId, payload.staffId]
);
}
await client.query(
`
UPDATE staffs
SET average_rating = review_stats.avg_rating,
rating_count = review_stats.rating_count,
updated_at = NOW()
FROM (
SELECT staff_id,
ROUND(AVG(rating)::numeric, 2) AS avg_rating,
COUNT(*)::INTEGER AS rating_count
FROM staff_reviews
WHERE staff_id = $1
GROUP BY staff_id
) review_stats
WHERE staffs.id = review_stats.staff_id
`,
[payload.staffId]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'staff_review',
aggregateId: reviewResult.rows[0].id,
eventType: 'STAFF_REVIEWED_FROM_COVERAGE',
actorUserId: actor.uid,
payload,
});
return {
reviewId: reviewResult.rows[0].id,
assignmentId: assignment.id,
staffId: payload.staffId,
rating: payload.rating,
markAsFavorite: payload.markAsFavorite ?? null,
markAsBlocked: payload.markAsBlocked ?? null,
issueFlags: ensureArray(payload.issueFlags || []),
feedback: payload.feedback || null,
};
});
}
export async function cancelLateWorker(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const result = await client.query(
`
SELECT
a.id,
a.shift_id,
a.shift_role_id,
a.staff_id,
a.status,
s.required_workers,
s.assigned_workers,
s.tenant_id,
s.clock_point_id,
s.starts_at,
s.title AS shift_title,
st.user_id AS "staffUserId"
FROM assignments a
JOIN shifts s ON s.id = a.shift_id
JOIN staffs st ON st.id = a.staff_id
WHERE a.id = $1
AND a.tenant_id = $2
AND a.business_id = $3
FOR UPDATE OF a, s
`,
[payload.assignmentId, context.tenant.tenantId, context.business.businessId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Late worker assignment not found in business scope', 404, {
assignmentId: payload.assignmentId,
});
}
const assignment = result.rows[0];
if (['CHECKED_IN', 'CHECKED_OUT', 'COMPLETED'].includes(assignment.status)) {
throw new AppError('LATE_WORKER_CANCEL_BLOCKED', 'Worker is already checked in or completed and cannot be cancelled as late', 409, {
assignmentId: assignment.id,
});
}
const hasRecentIncident = await client.query(
`
SELECT 1
FROM geofence_incidents
WHERE assignment_id = $1
AND incident_type IN ('OUTSIDE_GEOFENCE', 'LOCATION_UNAVAILABLE', 'CLOCK_IN_OVERRIDE')
AND occurred_at >= $2::timestamptz - INTERVAL '30 minutes'
LIMIT 1
`,
[assignment.id, assignment.starts_at]
);
const shiftStartTime = assignment.starts_at ? new Date(assignment.starts_at).getTime() : null;
const startGraceElapsed = shiftStartTime != null
? Date.now() >= shiftStartTime + (10 * 60 * 1000)
: false;
if (!startGraceElapsed && hasRecentIncident.rowCount === 0) {
throw new AppError('LATE_WORKER_NOT_CONFIRMED', 'Late worker cancellation requires either a geofence incident or a started shift window', 409, {
assignmentId: assignment.id,
});
}
await client.query(
`
UPDATE assignments
SET status = 'CANCELLED',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[assignment.id, JSON.stringify({
cancellationReason: payload.reason || 'Cancelled for lateness',
cancelledBy: actor.uid,
cancelledAt: new Date().toISOString(),
})]
);
await refreshShiftRoleCounts(client, assignment.shift_role_id);
await refreshShiftCounts(client, assignment.shift_id);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'assignment',
aggregateId: assignment.id,
eventType: 'LATE_WORKER_CANCELLED',
actorUserId: actor.uid,
payload,
});
await enqueueHubManagerAlert(client, {
tenantId: context.tenant.tenantId,
businessId: context.business.businessId,
shiftId: assignment.shift_id,
assignmentId: assignment.id,
hubId: assignment.clock_point_id,
notificationType: 'LATE_WORKER_CANCELLED',
priority: 'HIGH',
subject: 'Late worker was removed from shift',
body: `${assignment.shift_title}: a late worker was cancelled and replacement search should begin`,
payload: {
assignmentId: assignment.id,
shiftId: assignment.shift_id,
reason: payload.reason || 'Cancelled for lateness',
},
dedupeScope: assignment.id,
});
await enqueueUserAlert(client, {
tenantId: context.tenant.tenantId,
businessId: context.business.businessId,
shiftId: assignment.shift_id,
assignmentId: assignment.id,
recipientUserId: assignment.staffUserId,
notificationType: 'SHIFT_ASSIGNMENT_CANCELLED_LATE',
priority: 'HIGH',
subject: 'Shift assignment cancelled',
body: `${assignment.shift_title}: your assignment was cancelled because you were marked late`,
payload: {
assignmentId: assignment.id,
shiftId: assignment.shift_id,
reason: payload.reason || 'Cancelled for lateness',
},
dedupeScope: assignment.id,
});
return {
assignmentId: assignment.id,
shiftId: assignment.shift_id,
replacementSearchTriggered: true,
status: 'CANCELLED',
};
});
}
export async function createClientOneTimeOrder(actor, payload) {
const context = await requireClientContext(actor.uid);
const commandPayload = await buildOrderCreatePayloadFromMobile(actor, context, payload, 'ONE_TIME');
return createOrderCommand(actor, commandPayload);
}
export async function createClientRecurringOrder(actor, payload) {
const context = await requireClientContext(actor.uid);
const commandPayload = await buildOrderCreatePayloadFromMobile(actor, context, payload, 'RECURRING');
return createOrderCommand(actor, commandPayload);
}
export async function createClientPermanentOrder(actor, payload) {
const context = await requireClientContext(actor.uid);
const commandPayload = await buildOrderCreatePayloadFromMobile(actor, context, payload, 'PERMANENT');
return createOrderCommand(actor, commandPayload);
}
export async function createEditedOrderCopy(actor, payload) {
const context = await requireClientContext(actor.uid);
const template = await loadEditableOrderTemplate(
actor.uid,
context.tenant.tenantId,
context.business.businessId,
payload.orderId
);
const templateShifts = Array.isArray(template.shifts) ? template.shifts : [];
const templatePositions = Array.from(
templateShifts.reduce((deduped, shift) => {
for (const role of (Array.isArray(shift.roles) ? shift.roles : [])) {
const normalized = {
...role,
startTime: role.startTime || shift.startTime,
endTime: role.endTime || shift.endTime,
};
const key = [
normalized.roleId || '',
normalized.roleCode || '',
normalized.roleName || '',
normalized.startTime || '',
normalized.endTime || '',
normalized.workerCount ?? '',
normalized.payRateCents ?? '',
normalized.billRateCents ?? '',
].join('|');
if (!deduped.has(key)) {
deduped.set(key, normalized);
}
}
return deduped;
}, new Map()).values()
);
const firstShift = templateShifts[0] || {};
const lastShift = templateShifts[templateShifts.length - 1] || {};
const inferredOrderType = payload.orderType || template.metadata?.orderType || 'ONE_TIME';
const inferredDays = [...new Set(templateShifts.map((shift) => new Date(`${shift.date}T00:00:00.000Z`).getUTCDay()).filter((value) => Number.isInteger(value)))];
const commandPayload = await buildOrderCreatePayloadFromMobile(
actor,
context,
{
...payload,
hubId: payload.hubId || firstShift.clockPointId,
vendorId: payload.vendorId ?? template.vendorId ?? undefined,
eventName: payload.eventName || template.eventName,
description: payload.description ?? template.description ?? undefined,
notes: payload.notes ?? template.notes ?? undefined,
serviceType: payload.serviceType ?? template.serviceType ?? undefined,
positions: payload.positions || templatePositions,
orderDate: payload.orderDate || firstShift.date,
startDate: payload.startDate || firstShift.date,
endDate: payload.endDate || lastShift.date || firstShift.date,
recurrenceDays: payload.recurrenceDays || inferredDays,
daysOfWeek: payload.daysOfWeek || inferredDays,
metadata: {
sourceOrderId: payload.orderId,
...template.metadata,
...payload.metadata,
},
},
inferredOrderType,
{
metadata: {
editSourceOrderId: payload.orderId,
},
}
);
return createOrderCommand(actor, commandPayload);
}
export async function cancelClientOrder(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
return cancelFutureOrderSlice(client, {
actorUid: actor.uid,
tenantId: context.tenant.tenantId,
businessId: context.business.businessId,
orderId: payload.orderId,
reason: payload.reason,
metadata: payload.metadata,
});
});
}
export async function staffClockIn(actor, payload) {
const context = await requireStaffContext(actor.uid);
const { assignmentId } = await resolveStaffAssignmentForClock(actor.uid, context.tenant.tenantId, payload);
return clockInCommand(actor, {
assignmentId,
sourceType: inferAttendanceSourceType(payload),
sourceReference: payload.sourceReference || payload.notes || null,
nfcTagUid: payload.nfcTagId || null,
deviceId: payload.deviceId || null,
latitude: payload.latitude,
longitude: payload.longitude,
accuracyMeters: payload.accuracyMeters,
capturedAt: payload.capturedAt,
overrideReason: payload.overrideReason || null,
proofNonce: payload.proofNonce || null,
proofTimestamp: payload.proofTimestamp || null,
attestationProvider: payload.attestationProvider || null,
attestationToken: payload.attestationToken || null,
rawPayload: {
notes: payload.notes || null,
isMockLocation: payload.isMockLocation ?? null,
...(payload.rawPayload || {}),
},
});
}
export async function staffClockOut(actor, payload) {
const context = await requireStaffContext(actor.uid);
const { assignmentId } = await resolveStaffAssignmentForClock(
actor.uid,
context.tenant.tenantId,
payload,
{ requireOpenSession: true }
);
return clockOutCommand(actor, {
assignmentId,
sourceType: inferAttendanceSourceType(payload),
sourceReference: payload.sourceReference || payload.notes || null,
nfcTagUid: payload.nfcTagId || null,
deviceId: payload.deviceId || null,
latitude: payload.latitude,
longitude: payload.longitude,
accuracyMeters: payload.accuracyMeters,
capturedAt: payload.capturedAt,
overrideReason: payload.overrideReason || null,
proofNonce: payload.proofNonce || null,
proofTimestamp: payload.proofTimestamp || null,
attestationProvider: payload.attestationProvider || null,
attestationToken: payload.attestationToken || null,
rawPayload: {
notes: payload.notes || null,
breakMinutes: payload.breakMinutes ?? null,
applicationId: payload.applicationId || null,
isMockLocation: payload.isMockLocation ?? null,
...(payload.rawPayload || {}),
},
});
}
export async function registerClientPushToken(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const token = await registerPushToken(client, {
tenantId: context.tenant.tenantId,
userId: actor.uid,
businessMembershipId: context.business.membershipId,
provider: payload.provider,
platform: payload.platform,
pushToken: payload.pushToken,
deviceId: payload.deviceId || null,
appVersion: payload.appVersion || null,
appBuild: payload.appBuild || null,
locale: payload.locale || null,
timezone: payload.timezone || null,
notificationsEnabled: payload.notificationsEnabled ?? true,
metadata: payload.metadata || {},
});
return {
tokenId: token.id,
provider: token.provider,
platform: token.platform,
notificationsEnabled: token.notificationsEnabled,
};
});
}
export async function unregisterClientPushToken(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const removed = await unregisterPushToken(client, {
tenantId: context.tenant.tenantId,
userId: actor.uid,
tokenId: payload.tokenId || null,
pushToken: payload.pushToken || null,
reason: payload.reason || 'CLIENT_SIGN_OUT',
});
return {
removedCount: removed.length,
removed,
};
});
}
export async function registerStaffPushToken(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const token = await registerPushToken(client, {
tenantId: context.tenant.tenantId,
userId: actor.uid,
staffId: context.staff.staffId,
provider: payload.provider,
platform: payload.platform,
pushToken: payload.pushToken,
deviceId: payload.deviceId || null,
appVersion: payload.appVersion || null,
appBuild: payload.appBuild || null,
locale: payload.locale || null,
timezone: payload.timezone || null,
notificationsEnabled: payload.notificationsEnabled ?? true,
metadata: payload.metadata || {},
});
return {
tokenId: token.id,
provider: token.provider,
platform: token.platform,
notificationsEnabled: token.notificationsEnabled,
};
});
}
export async function unregisterStaffPushToken(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const removed = await unregisterPushToken(client, {
tenantId: context.tenant.tenantId,
userId: actor.uid,
tokenId: payload.tokenId || null,
pushToken: payload.pushToken || null,
reason: payload.reason || 'STAFF_SIGN_OUT',
});
return {
removedCount: removed.length,
removed,
};
});
}
function summarizeLocationPoints(points, assignment) {
let outOfGeofenceCount = 0;
let missingCoordinateCount = 0;
let maxDistance = null;
let latestOutsidePoint = null;
let latestMissingPoint = null;
for (const point of points) {
if (point.latitude == null || point.longitude == null) {
missingCoordinateCount += 1;
latestMissingPoint = point;
continue;
}
const distance = distanceMeters(
{
latitude: point.latitude,
longitude: point.longitude,
},
{
latitude: assignment.expected_latitude,
longitude: assignment.expected_longitude,
}
);
if (distance != null) {
maxDistance = maxDistance == null ? distance : Math.max(maxDistance, distance);
if (
assignment.geofence_radius_meters != null
&& distance > assignment.geofence_radius_meters
) {
outOfGeofenceCount += 1;
latestOutsidePoint = { ...point, distanceToClockPointMeters: distance };
}
}
}
return {
outOfGeofenceCount,
missingCoordinateCount,
maxDistanceToClockPointMeters: maxDistance,
latestOutsidePoint,
latestMissingPoint,
};
}
export async function submitLocationStreamBatch(actor, payload) {
const context = await requireStaffContext(actor.uid);
const { assignmentId } = await resolveStaffAssignmentForClock(
actor.uid,
context.tenant.tenantId,
payload,
{ requireOpenSession: true }
);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const assignment = await loadAssignmentMonitoringContext(
client,
context.tenant.tenantId,
assignmentId,
actor.uid
);
const policy = resolveEffectiveClockInPolicy(assignment);
const points = [...payload.points]
.map((point) => ({
...point,
capturedAt: toIsoOrNull(point.capturedAt),
}))
.sort((left, right) => new Date(left.capturedAt).getTime() - new Date(right.capturedAt).getTime());
const batchId = crypto.randomUUID();
const summary = summarizeLocationPoints(points, assignment);
const objectUri = await uploadLocationBatch({
tenantId: assignment.tenant_id,
staffId: assignment.staff_id,
assignmentId: assignment.id,
batchId,
payload: {
...buildAssignmentReferencePayload(assignment),
effectiveClockInMode: policy.mode,
points,
metadata: payload.metadata || {},
},
});
await client.query(
`
INSERT INTO location_stream_batches (
id,
tenant_id,
business_id,
vendor_id,
shift_id,
assignment_id,
staff_id,
actor_user_id,
source_type,
device_id,
object_uri,
point_count,
out_of_geofence_count,
missing_coordinate_count,
max_distance_to_clock_point_meters,
started_at,
ended_at,
metadata
)
VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16::timestamptz, $17::timestamptz, $18::jsonb
)
`,
[
batchId,
assignment.tenant_id,
assignment.business_id,
assignment.vendor_id,
assignment.shift_id,
assignment.id,
assignment.staff_id,
actor.uid,
payload.sourceType,
payload.deviceId || null,
objectUri,
points.length,
summary.outOfGeofenceCount,
summary.missingCoordinateCount,
summary.maxDistanceToClockPointMeters,
points[0]?.capturedAt || null,
points[points.length - 1]?.capturedAt || null,
JSON.stringify(payload.metadata || {}),
]
);
const incidentIds = [];
if (summary.outOfGeofenceCount > 0) {
const incidentId = await recordGeofenceIncident(client, {
assignment,
actorUserId: actor.uid,
locationStreamBatchId: batchId,
incidentType: 'OUTSIDE_GEOFENCE',
severity: 'CRITICAL',
effectiveClockInMode: policy.mode,
sourceType: payload.sourceType,
deviceId: payload.deviceId || null,
latitude: summary.latestOutsidePoint?.latitude ?? null,
longitude: summary.latestOutsidePoint?.longitude ?? null,
accuracyMeters: summary.latestOutsidePoint?.accuracyMeters ?? null,
distanceToClockPointMeters: summary.latestOutsidePoint?.distanceToClockPointMeters ?? null,
withinGeofence: false,
message: `${summary.outOfGeofenceCount} location points were outside the configured geofence`,
occurredAt: summary.latestOutsidePoint?.capturedAt || points[points.length - 1]?.capturedAt || null,
metadata: {
pointCount: points.length,
outOfGeofenceCount: summary.outOfGeofenceCount,
objectUri,
},
});
incidentIds.push(incidentId);
await enqueueHubManagerAlert(client, {
tenantId: assignment.tenant_id,
businessId: assignment.business_id,
shiftId: assignment.shift_id,
assignmentId: assignment.id,
hubId: assignment.clock_point_id,
relatedIncidentId: incidentId,
notificationType: 'GEOFENCE_BREACH_ALERT',
priority: 'CRITICAL',
subject: 'Worker left the workplace geofence',
body: `${assignment.shift_title}: location stream shows the worker outside the geofence`,
payload: {
...buildAssignmentReferencePayload(assignment),
batchId,
objectUri,
outOfGeofenceCount: summary.outOfGeofenceCount,
},
dedupeScope: batchId,
});
}
if (summary.missingCoordinateCount > 0) {
const incidentId = await recordGeofenceIncident(client, {
assignment,
actorUserId: actor.uid,
locationStreamBatchId: batchId,
incidentType: 'LOCATION_UNAVAILABLE',
severity: 'WARNING',
effectiveClockInMode: policy.mode,
sourceType: payload.sourceType,
deviceId: payload.deviceId || null,
message: `${summary.missingCoordinateCount} location points were missing coordinates`,
occurredAt: summary.latestMissingPoint?.capturedAt || points[points.length - 1]?.capturedAt || null,
metadata: {
pointCount: points.length,
missingCoordinateCount: summary.missingCoordinateCount,
objectUri,
},
});
incidentIds.push(incidentId);
await enqueueHubManagerAlert(client, {
tenantId: assignment.tenant_id,
businessId: assignment.business_id,
shiftId: assignment.shift_id,
assignmentId: assignment.id,
hubId: assignment.clock_point_id,
relatedIncidentId: incidentId,
notificationType: 'LOCATION_SIGNAL_WARNING',
priority: 'HIGH',
subject: 'Worker location signal unavailable',
body: `${assignment.shift_title}: background location tracking reported missing coordinates`,
payload: {
...buildAssignmentReferencePayload(assignment),
batchId,
objectUri,
missingCoordinateCount: summary.missingCoordinateCount,
},
dedupeScope: `${batchId}:missing`,
});
}
await insertDomainEvent(client, {
tenantId: assignment.tenant_id,
aggregateType: 'location_stream_batch',
aggregateId: batchId,
eventType: 'LOCATION_STREAM_BATCH_RECORDED',
actorUserId: actor.uid,
payload: {
...buildAssignmentReferencePayload(assignment),
batchId,
objectUri,
pointCount: points.length,
outOfGeofenceCount: summary.outOfGeofenceCount,
missingCoordinateCount: summary.missingCoordinateCount,
},
});
return {
batchId,
assignmentId: assignment.id,
shiftId: assignment.shift_id,
effectiveClockInMode: policy.mode,
pointCount: points.length,
outOfGeofenceCount: summary.outOfGeofenceCount,
missingCoordinateCount: summary.missingCoordinateCount,
objectUri,
incidentIds,
};
});
}
export async function updateStaffAvailabilityDay(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const result = await client.query(
`
INSERT INTO staff_availability (
tenant_id,
staff_id,
day_of_week,
availability_status,
time_slots,
metadata
)
VALUES ($1, $2, $3, $4, $5::jsonb, $6::jsonb)
ON CONFLICT (staff_id, day_of_week) DO UPDATE
SET availability_status = EXCLUDED.availability_status,
time_slots = EXCLUDED.time_slots,
metadata = COALESCE(staff_availability.metadata, '{}'::jsonb) || EXCLUDED.metadata,
updated_at = NOW()
RETURNING id
`,
[
context.tenant.tenantId,
staff.id,
payload.dayOfWeek,
payload.availabilityStatus,
JSON.stringify(payload.slots || []),
JSON.stringify(payload.metadata || {}),
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'staff_availability',
aggregateId: result.rows[0].id,
eventType: 'STAFF_AVAILABILITY_UPDATED',
actorUserId: actor.uid,
payload,
});
return {
availabilityId: result.rows[0].id,
dayOfWeek: payload.dayOfWeek,
availabilityStatus: payload.availabilityStatus,
slots: payload.slots || [],
};
});
}
export async function quickSetStaffAvailability(actor, payload) {
const context = await requireStaffContext(actor.uid);
const presets = {
all: [0, 1, 2, 3, 4, 5, 6],
weekdays: [1, 2, 3, 4, 5],
weekends: [0, 6],
clear: [],
};
const selectedDays = presets[payload.quickSetType];
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
for (let day = 0; day <= 6; day += 1) {
const active = selectedDays.includes(day);
await client.query(
`
INSERT INTO staff_availability (
tenant_id,
staff_id,
day_of_week,
availability_status,
time_slots,
metadata
)
VALUES ($1, $2, $3, $4, $5::jsonb, $6::jsonb)
ON CONFLICT (staff_id, day_of_week) DO UPDATE
SET availability_status = EXCLUDED.availability_status,
time_slots = EXCLUDED.time_slots,
metadata = COALESCE(staff_availability.metadata, '{}'::jsonb) || EXCLUDED.metadata,
updated_at = NOW()
`,
[
context.tenant.tenantId,
staff.id,
day,
active ? 'AVAILABLE' : 'UNAVAILABLE',
JSON.stringify(active ? payload.slots || [{ start: '08:00', end: '18:00' }] : []),
JSON.stringify({
quickSetType: payload.quickSetType,
startDate: payload.startDate,
endDate: payload.endDate,
}),
]
);
}
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'staff',
aggregateId: staff.id,
eventType: 'STAFF_AVAILABILITY_QUICK_SET',
actorUserId: actor.uid,
payload,
});
return {
quickSetType: payload.quickSetType,
appliedDays: selectedDays,
};
});
}
export async function applyForShift(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const shiftRole = await requireShiftRoleForStaffApply(client, context.tenant.tenantId, payload.shiftId, payload.roleId, staff.id);
await ensureStaffNotBlockedByBusiness(client, {
tenantId: context.tenant.tenantId,
businessId: shiftRole.business_id,
staffId: staff.id,
});
const existingAssignment = await client.query(
`
SELECT id
FROM assignments
WHERE shift_role_id = $1
AND staff_id = $2
AND status IN ('ASSIGNED', 'ACCEPTED', 'SWAP_REQUESTED', 'CHECKED_IN', 'CHECKED_OUT', 'COMPLETED')
LIMIT 1
`,
[shiftRole.shift_role_id, staff.id]
);
if (existingAssignment.rowCount > 0) {
throw new AppError('CONFLICT', 'Staff is already assigned to this shift', 409, {
shiftId: payload.shiftId,
staffId: staff.id,
});
}
const instantBook = payload.instantBook === true && shiftRole.assigned_count < shiftRole.workers_needed && Boolean(staff.workforce_id);
const applicationResult = await client.query(
`
INSERT INTO applications (
tenant_id,
shift_id,
shift_role_id,
staff_id,
status,
origin,
metadata
)
VALUES ($1, $2, $3, $4, $5, 'STAFF', $6::jsonb)
ON CONFLICT (shift_role_id, staff_id) DO UPDATE
SET status = EXCLUDED.status,
origin = EXCLUDED.origin,
metadata = COALESCE(applications.metadata, '{}'::jsonb) || EXCLUDED.metadata,
updated_at = NOW()
RETURNING id, status
`,
[
context.tenant.tenantId,
shiftRole.shift_id,
shiftRole.shift_role_id,
staff.id,
instantBook ? 'CONFIRMED' : 'PENDING',
JSON.stringify({
appliedBy: actor.uid,
instantBookRequested: payload.instantBook === true,
}),
]
);
let assignmentId = null;
let assignmentStatus = null;
if (instantBook) {
const assignmentResult = await client.query(
`
INSERT INTO assignments (
tenant_id,
business_id,
vendor_id,
shift_id,
shift_role_id,
workforce_id,
staff_id,
application_id,
status,
assigned_at,
accepted_at,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'ACCEPTED', NOW(), NOW(), $9::jsonb)
ON CONFLICT (shift_role_id, workforce_id) DO UPDATE
SET application_id = EXCLUDED.application_id,
status = 'ACCEPTED',
accepted_at = COALESCE(assignments.accepted_at, NOW()),
updated_at = NOW()
RETURNING id, status
`,
[
context.tenant.tenantId,
shiftRole.business_id,
shiftRole.vendor_id,
shiftRole.shift_id,
shiftRole.shift_role_id,
staff.workforce_id,
staff.id,
applicationResult.rows[0].id,
JSON.stringify({ source: 'staff-apply-instant-book' }),
]
);
assignmentId = assignmentResult.rows[0].id;
assignmentStatus = assignmentResult.rows[0].status;
await refreshShiftRoleCounts(client, shiftRole.shift_role_id);
await refreshShiftCounts(client, shiftRole.shift_id);
}
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'application',
aggregateId: applicationResult.rows[0].id,
eventType: instantBook ? 'SHIFT_INSTANT_BOOKED' : 'SHIFT_APPLIED',
actorUserId: actor.uid,
payload,
});
return {
applicationId: applicationResult.rows[0].id,
shiftId: shiftRole.shift_id,
roleId: shiftRole.shift_role_id,
status: instantBook ? 'CONFIRMED' : applicationResult.rows[0].status,
assignmentId,
assignmentStatus,
};
});
}
export async function acceptPendingShift(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const assignment = await requirePendingAssignmentForActor(client, context.tenant.tenantId, payload.shiftId, actor.uid);
await client.query(
`
UPDATE assignments
SET status = 'ACCEPTED',
accepted_at = COALESCE(accepted_at, NOW()),
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[assignment.id, JSON.stringify({ acceptedBy: actor.uid })]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'assignment',
aggregateId: assignment.id,
eventType: 'STAFF_PENDING_SHIFT_ACCEPTED',
actorUserId: actor.uid,
payload,
});
return {
assignmentId: assignment.id,
shiftId: assignment.shift_id,
status: 'ACCEPTED',
};
});
}
export async function declinePendingShift(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const assignment = await requirePendingAssignmentForActor(client, context.tenant.tenantId, payload.shiftId, actor.uid);
await client.query(
`
UPDATE assignments
SET status = 'CANCELLED',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[assignment.id, JSON.stringify({
declinedBy: actor.uid,
declineReason: payload.reason || null,
})]
);
await client.query(
`
UPDATE applications
SET status = 'REJECTED',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE shift_role_id = $1
AND staff_id = $3
AND status IN ('PENDING', 'CONFIRMED')
`,
[assignment.shift_role_id, JSON.stringify({ rejectedBy: actor.uid, reason: payload.reason || null }), assignment.staff_id]
);
await refreshShiftRoleCounts(client, assignment.shift_role_id);
await refreshShiftCounts(client, assignment.shift_id);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'assignment',
aggregateId: assignment.id,
eventType: 'STAFF_PENDING_SHIFT_DECLINED',
actorUserId: actor.uid,
payload,
});
return {
assignmentId: assignment.id,
shiftId: assignment.shift_id,
status: 'DECLINED',
};
});
}
export async function requestShiftSwap(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const assignment = await requireAnyAssignmentForActor(client, context.tenant.tenantId, payload.shiftId, actor.uid);
if (!['ACCEPTED', 'CHECKED_IN', 'CHECKED_OUT'].includes(assignment.status)) {
throw new AppError('INVALID_SWAP_STATE', 'Only accepted or worked shifts can be marked for swap', 409, {
shiftId: payload.shiftId,
assignmentStatus: assignment.status,
});
}
await client.query(
`
UPDATE assignments
SET status = 'SWAP_REQUESTED',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[assignment.id, JSON.stringify({
swapRequestedAt: new Date().toISOString(),
swapReason: payload.reason || null,
})]
);
await refreshShiftRoleCounts(client, assignment.shift_role_id);
await refreshShiftCounts(client, assignment.shift_id);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'assignment',
aggregateId: assignment.id,
eventType: 'SHIFT_SWAP_REQUESTED',
actorUserId: actor.uid,
payload,
});
return {
assignmentId: assignment.id,
shiftId: assignment.shift_id,
status: 'SWAP_REQUESTED',
};
});
}
export async function submitCompletedShiftForApproval(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const assignment = await requireAnyAssignmentForActor(client, context.tenant.tenantId, payload.shiftId, actor.uid);
if (!['CHECKED_OUT', 'COMPLETED'].includes(assignment.status)) {
throw new AppError('INVALID_TIMESHEET_STATE', 'Only completed or checked-out shifts can be submitted for approval', 409, {
shiftId: payload.shiftId,
assignmentStatus: assignment.status,
});
}
const timesheetResult = await client.query(
`
INSERT INTO timesheets (
tenant_id,
assignment_id,
staff_id,
status,
metadata
)
VALUES ($1, $2, $3, 'SUBMITTED', $4::jsonb)
ON CONFLICT (assignment_id) DO UPDATE
SET status = CASE
WHEN timesheets.status IN ('APPROVED', 'PAID') THEN timesheets.status
ELSE 'SUBMITTED'
END,
metadata = COALESCE(timesheets.metadata, '{}'::jsonb) || EXCLUDED.metadata,
updated_at = NOW()
RETURNING id, status, metadata
`,
[
context.tenant.tenantId,
assignment.id,
assignment.staff_id,
JSON.stringify({
submittedAt: new Date().toISOString(),
submittedBy: actor.uid,
submissionNote: payload.note || null,
}),
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'timesheet',
aggregateId: timesheetResult.rows[0].id,
eventType: 'TIMESHEET_SUBMITTED_FOR_APPROVAL',
actorUserId: actor.uid,
payload,
});
return {
assignmentId: assignment.id,
shiftId: assignment.shift_id,
timesheetId: timesheetResult.rows[0].id,
status: timesheetResult.rows[0].status,
submitted: timesheetResult.rows[0].status === 'SUBMITTED',
};
});
}
export async function setupStaffProfile(actor, payload) {
return withTransaction(async (client) => {
const scope = await resolveStaffOnboardingScope(client, actor.uid, payload.tenantId, payload.vendorId);
await ensureActorUser(client, actor, {
email: payload.email ?? actor.email ?? null,
displayName: payload.fullName,
phone: payload.phoneNumber,
metadata: { source: 'staff-profile-setup' },
});
await client.query(
`
INSERT INTO tenant_memberships (tenant_id, user_id, membership_status, base_role, metadata)
VALUES ($1, $2, 'ACTIVE', 'member', '{"source":"staff-profile-setup"}'::jsonb)
ON CONFLICT (tenant_id, user_id) DO UPDATE
SET membership_status = 'ACTIVE',
updated_at = NOW()
`,
[scope.tenantId, actor.uid]
);
await client.query(
`
INSERT INTO vendor_memberships (tenant_id, vendor_id, user_id, membership_status, vendor_role, metadata)
VALUES ($1, $2, $3, 'ACTIVE', 'member', '{"source":"staff-profile-setup"}'::jsonb)
ON CONFLICT (vendor_id, user_id) DO UPDATE
SET membership_status = 'ACTIVE',
updated_at = NOW()
`,
[scope.tenantId, scope.vendorId, actor.uid]
);
const fullName = payload.fullName.trim();
const [firstName, ...lastParts] = fullName.split(/\s+/);
const lastName = lastParts.join(' ');
const metadata = {
bio: payload.bio || null,
firstName,
lastName,
preferredLocations: ensureArray(payload.preferredLocations || []),
maxDistanceMiles: payload.maxDistanceMiles ?? null,
industries: ensureArray(payload.industries || []),
skills: ensureArray(payload.skills || []),
};
const staffResult = await client.query(
`
INSERT INTO staffs (
tenant_id,
user_id,
full_name,
email,
phone,
status,
primary_role,
onboarding_status,
metadata
)
VALUES ($1, $2, $3, $4, $5, 'ACTIVE', $6, 'COMPLETED', $7::jsonb)
ON CONFLICT (tenant_id, user_id) DO UPDATE
SET full_name = EXCLUDED.full_name,
email = COALESCE(EXCLUDED.email, staffs.email),
phone = COALESCE(EXCLUDED.phone, staffs.phone),
primary_role = COALESCE(EXCLUDED.primary_role, staffs.primary_role),
onboarding_status = 'COMPLETED',
metadata = COALESCE(staffs.metadata, '{}'::jsonb) || EXCLUDED.metadata,
updated_at = NOW()
RETURNING id
`,
[
scope.tenantId,
actor.uid,
fullName,
payload.email ?? actor.email ?? null,
normalizePhone(payload.phoneNumber),
payload.primaryRole || ensureArray(payload.skills || [])[0] || 'GENERAL_EVENT_STAFF',
JSON.stringify(metadata),
]
);
const workforceResult = await client.query(
`
INSERT INTO workforce (
tenant_id,
vendor_id,
staff_id,
workforce_number,
employment_type,
status,
metadata
)
VALUES ($1, $2, $3, $4, 'TEMP', 'ACTIVE', '{"source":"staff-profile-setup"}'::jsonb)
ON CONFLICT (vendor_id, staff_id) DO UPDATE
SET status = 'ACTIVE',
updated_at = NOW()
RETURNING id
`,
[
scope.tenantId,
scope.vendorId,
staffResult.rows[0].id,
`WF-${normalizeSlug(firstName).toUpperCase()}-${crypto.randomUUID().slice(0, 8).toUpperCase()}`,
]
);
await insertDomainEvent(client, {
tenantId: scope.tenantId,
aggregateType: 'staff',
aggregateId: staffResult.rows[0].id,
eventType: 'STAFF_PROFILE_SETUP_COMPLETED',
actorUserId: actor.uid,
payload,
});
return {
staffId: staffResult.rows[0].id,
workforceId: workforceResult.rows[0].id,
tenantId: scope.tenantId,
vendorId: scope.vendorId,
completed: true,
};
});
}
export async function updatePersonalInfo(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor, {
email: payload.email ?? actor.email ?? null,
displayName: payload.displayName || null,
phone: payload.phone || null,
});
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const existingMetadata = staff.metadata || {};
const nextMetadata = buildStaffMetadataPatch(existingMetadata, payload);
const fullName = [
payload.firstName || existingMetadata.firstName || staff.full_name.split(' ')[0] || '',
payload.lastName || existingMetadata.lastName || staff.full_name.split(' ').slice(1).join(' ') || '',
].filter(Boolean).join(' ').trim() || staff.full_name;
await client.query(
`
UPDATE staffs
SET full_name = $2,
email = COALESCE($3, email),
phone = COALESCE($4, phone),
metadata = $5::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[staff.id, fullName, payload.email ?? null, normalizePhone(payload.phone) ?? null, JSON.stringify(nextMetadata)]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'staff',
aggregateId: staff.id,
eventType: 'STAFF_PERSONAL_INFO_UPDATED',
actorUserId: actor.uid,
payload,
});
return {
staffId: staff.id,
fullName,
email: payload.email ?? staff.email ?? null,
phone: normalizePhone(payload.phone) ?? staff.phone ?? null,
metadata: nextMetadata,
};
});
}
export async function updateProfileExperience(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const nextMetadata = buildStaffMetadataPatch(staff.metadata || {}, payload);
await client.query(
`
UPDATE staffs
SET metadata = $2::jsonb,
primary_role = COALESCE($3, primary_role),
updated_at = NOW()
WHERE id = $1
`,
[
staff.id,
JSON.stringify(nextMetadata),
payload.primaryRole || null,
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'staff',
aggregateId: staff.id,
eventType: 'STAFF_EXPERIENCE_UPDATED',
actorUserId: actor.uid,
payload,
});
return {
staffId: staff.id,
industries: ensureArray(nextMetadata.industries || []),
skills: ensureArray(nextMetadata.skills || []),
};
});
}
export async function updatePreferredLocations(actor, payload) {
return updatePersonalInfo(actor, {
preferredLocations: payload.preferredLocations,
maxDistanceMiles: payload.maxDistanceMiles,
});
}
export async function createEmergencyContact(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const result = await client.query(
`
INSERT INTO emergency_contacts (
tenant_id,
staff_id,
full_name,
phone,
relationship_type,
is_primary,
metadata
)
VALUES ($1, $2, $3, $4, $5, COALESCE($6, FALSE), $7::jsonb)
RETURNING id
`,
[
context.tenant.tenantId,
staff.id,
payload.fullName,
normalizePhone(payload.phone),
payload.relationshipType,
payload.isPrimary ?? false,
JSON.stringify(payload.metadata || {}),
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'emergency_contact',
aggregateId: result.rows[0].id,
eventType: 'EMERGENCY_CONTACT_CREATED',
actorUserId: actor.uid,
payload,
});
return { contactId: result.rows[0].id, created: true };
});
}
export async function updateEmergencyContact(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const result = await client.query(
`
UPDATE emergency_contacts
SET full_name = COALESCE($2, full_name),
phone = COALESCE($3, phone),
relationship_type = COALESCE($4, relationship_type),
is_primary = COALESCE($5, is_primary),
metadata = COALESCE(metadata, '{}'::jsonb) || $6::jsonb,
updated_at = NOW()
WHERE id = $1
AND tenant_id = $7
AND staff_id = $8
RETURNING id
`,
[
payload.contactId,
payload.fullName || null,
normalizePhone(payload.phone) || null,
payload.relationshipType || null,
payload.isPrimary ?? null,
JSON.stringify(payload.metadata || {}),
context.tenant.tenantId,
staff.id,
]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Emergency contact not found for current staff user', 404, {
contactId: payload.contactId,
});
}
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'emergency_contact',
aggregateId: result.rows[0].id,
eventType: 'EMERGENCY_CONTACT_UPDATED',
actorUserId: actor.uid,
payload,
});
return { contactId: result.rows[0].id, updated: true };
});
}
async function ensureTaxFormDocument(client, tenantId, formType) {
const normalizedName = formType.toUpperCase() === 'I9' ? 'I-9' : 'W-4';
const result = await client.query(
`
INSERT INTO documents (tenant_id, document_type, name, metadata)
VALUES ($1, 'TAX_FORM', $2, '{"required":true}'::jsonb)
ON CONFLICT (tenant_id, document_type, name) DO UPDATE
SET updated_at = NOW()
RETURNING id, name
`,
[tenantId, normalizedName]
);
return result.rows[0];
}
export async function saveTaxFormDraft(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const document = await ensureTaxFormDocument(client, context.tenant.tenantId, payload.formType);
const result = await client.query(
`
INSERT INTO staff_documents (
tenant_id,
staff_id,
document_id,
file_uri,
status,
metadata
)
VALUES ($1, $2, $3, NULL, 'PENDING', $4::jsonb)
ON CONFLICT (staff_id, document_id) DO UPDATE
SET metadata = COALESCE(staff_documents.metadata, '{}'::jsonb) || EXCLUDED.metadata,
updated_at = NOW()
RETURNING id
`,
[
context.tenant.tenantId,
staff.id,
document.id,
JSON.stringify({
formType: document.name,
formStatus: 'IN_PROGRESS',
fields: payload.fields,
lastSavedAt: new Date().toISOString(),
}),
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'staff_document',
aggregateId: result.rows[0].id,
eventType: 'TAX_FORM_DRAFT_SAVED',
actorUserId: actor.uid,
payload,
});
return { staffDocumentId: result.rows[0].id, formType: document.name, status: 'IN_PROGRESS' };
});
}
export async function submitTaxForm(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const document = await ensureTaxFormDocument(client, context.tenant.tenantId, payload.formType);
const result = await client.query(
`
INSERT INTO staff_documents (
tenant_id,
staff_id,
document_id,
file_uri,
status,
metadata
)
VALUES ($1, $2, $3, NULL, 'PENDING', $4::jsonb)
ON CONFLICT (staff_id, document_id) DO UPDATE
SET metadata = COALESCE(staff_documents.metadata, '{}'::jsonb) || EXCLUDED.metadata,
updated_at = NOW()
RETURNING id
`,
[
context.tenant.tenantId,
staff.id,
document.id,
JSON.stringify({
formType: document.name,
formStatus: 'SUBMITTED',
submittedAt: new Date().toISOString(),
fields: payload.fields,
}),
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'staff_document',
aggregateId: result.rows[0].id,
eventType: 'TAX_FORM_SUBMITTED',
actorUserId: actor.uid,
payload,
});
return { staffDocumentId: result.rows[0].id, formType: document.name, status: 'SUBMITTED' };
});
}
export async function addStaffBankAccount(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const existingPrimary = await client.query(
`
SELECT id
FROM accounts
WHERE tenant_id = $1
AND owner_staff_id = $2
AND is_primary = TRUE
LIMIT 1
`,
[context.tenant.tenantId, staff.id]
);
const accountResult = await client.query(
`
INSERT INTO accounts (
tenant_id,
owner_type,
owner_staff_id,
provider_name,
provider_reference,
last4,
is_primary,
metadata
)
VALUES ($1, 'STAFF', $2, $3, $4, $5, $6, $7::jsonb)
RETURNING id, last4, is_primary
`,
[
context.tenant.tenantId,
staff.id,
payload.bankName,
`manual:${payload.routingNumber.slice(-4)}:${payload.accountNumber.slice(-4)}`,
payload.accountNumber.slice(-4),
existingPrimary.rowCount === 0,
JSON.stringify({
accountType: payload.accountType,
routingNumberMasked: `***${payload.routingNumber.slice(-4)}`,
}),
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'account',
aggregateId: accountResult.rows[0].id,
eventType: 'STAFF_BANK_ACCOUNT_ADDED',
actorUserId: actor.uid,
payload: {
accountType: payload.accountType,
bankName: payload.bankName,
},
});
return {
accountId: accountResult.rows[0].id,
last4: accountResult.rows[0].last4,
isPrimary: accountResult.rows[0].is_primary,
};
});
}
export async function updatePrivacyVisibility(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const nextMetadata = buildStaffMetadataPatch(staff.metadata || {}, {
profileVisible: payload.profileVisible,
});
await client.query(
`
UPDATE staffs
SET metadata = $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[staff.id, JSON.stringify(nextMetadata)]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'staff',
aggregateId: staff.id,
eventType: 'STAFF_PRIVACY_UPDATED',
actorUserId: actor.uid,
payload,
});
return {
staffId: staff.id,
profileVisible: Boolean(payload.profileVisible),
};
});
}