Files
Krow-workspace/backend/command-api/src/services/mobile-command-service.js

4544 lines
138 KiB
JavaScript

import crypto from 'node:crypto';
import { AppError } from '../lib/errors.js';
import { buildStaffOrderEligibilityBlockers, dedupeDocumentNames } from '../lib/staff-order-eligibility.js';
import { query, withTransaction } from './db.js';
import { loadActorContext, requireClientContext, requireStaffContext } from './actor-context.js';
import { recordGeofenceIncident } from './attendance-monitoring.js';
import { distanceMeters, resolveEffectiveClockInPolicy } from './clock-in-policy.js';
import { uploadLocationBatch } from './location-log-storage.js';
import { enqueueHubManagerAlert, enqueueUserAlert } from './notification-outbox.js';
import { registerPushToken, unregisterPushToken } from './notification-device-tokens.js';
import {
clockIn as clockInCommand,
clockOut as clockOutCommand,
createOrder as createOrderCommand,
} from './command-service.js';
const MOBILE_CANCELLABLE_ASSIGNMENT_STATUSES = ['ASSIGNED', 'ACCEPTED'];
const MOBILE_CANCELLABLE_APPLICATION_STATUSES = ['PENDING', 'CONFIRMED'];
const DISPATCH_TEAM_PRIORITY = {
CORE: 1,
CERTIFIED_LOCATION: 2,
MARKETPLACE: 3,
};
function parsePositiveIntEnv(name, fallback) {
const parsed = Number.parseInt(`${process.env[name] || fallback}`, 10);
return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback;
}
const SHIFT_SWAP_WINDOW_MINUTES = parsePositiveIntEnv('SHIFT_SWAP_WINDOW_MINUTES', 120);
const SHIFT_SWAP_MIN_LEAD_MINUTES = parsePositiveIntEnv('SHIFT_SWAP_MIN_LEAD_MINUTES', 15);
function toIsoOrNull(value) {
return value ? new Date(value).toISOString() : null;
}
function normalizeSlug(input) {
return `${input || ''}`
.toLowerCase()
.replace(/[^a-z0-9]+/g, '-')
.replace(/^-+|-+$/g, '')
.slice(0, 60);
}
function normalizePhone(value) {
if (!value) return null;
return `${value}`.trim();
}
function ensureArray(value) {
return Array.isArray(value) ? value : [];
}
function resolveDispatchPriority(teamType) {
return DISPATCH_TEAM_PRIORITY[teamType] || DISPATCH_TEAM_PRIORITY.MARKETPLACE;
}
function computeSwapExpiry(startsAt) {
const shiftStart = new Date(startsAt).getTime();
if (!Number.isFinite(shiftStart)) return null;
const now = Date.now();
const latestByWindow = now + (SHIFT_SWAP_WINDOW_MINUTES * 60 * 1000);
const latestByShiftLead = shiftStart - (SHIFT_SWAP_MIN_LEAD_MINUTES * 60 * 1000);
const expiresAtMs = Math.min(latestByWindow, latestByShiftLead);
if (!Number.isFinite(expiresAtMs) || expiresAtMs <= now) return null;
return new Date(expiresAtMs);
}
async function ensureStaffNotBlockedByBusiness(client, { tenantId, businessId, staffId }) {
const blocked = await client.query(
`
SELECT id, reason, issue_flags
FROM staff_blocks
WHERE tenant_id = $1
AND business_id = $2
AND staff_id = $3
LIMIT 1
`,
[tenantId, businessId, staffId]
);
if (blocked.rowCount > 0) {
throw new AppError('STAFF_BLOCKED', 'Staff is blocked from future shift assignments for this business', 409, {
businessId,
staffId,
blockId: blocked.rows[0].id,
reason: blocked.rows[0].reason || null,
issueFlags: ensureArray(blocked.rows[0].issue_flags || []),
});
}
}
async function loadMissingRequiredDocuments(client, { tenantId, roleCode, staffId }) {
if (!roleCode) return [];
const result = await client.query(
`
SELECT d.name
FROM documents d
WHERE d.tenant_id = $1
AND d.required_for_role_code = $2
AND d.document_type <> 'ATTIRE'
AND NOT EXISTS (
SELECT 1
FROM staff_documents sd
WHERE sd.tenant_id = d.tenant_id
AND sd.staff_id = $3
AND sd.document_id = d.id
AND sd.status = 'VERIFIED'
)
ORDER BY d.name ASC
`,
[tenantId, roleCode, staffId]
);
return dedupeDocumentNames(result.rows.map((row) => row.name));
}
function buildMissingDocumentErrorDetails({
roleCode,
orderId = null,
shiftId = null,
roleId = null,
missingDocumentNames = [],
}) {
const blockers = buildStaffOrderEligibilityBlockers({
missingDocumentNames,
});
return {
orderId,
shiftId,
roleId,
roleCode: roleCode || null,
blockers,
missingDocuments: dedupeDocumentNames(missingDocumentNames),
};
}
function buildAssignmentReferencePayload(assignment) {
return {
assignmentId: assignment.id,
shiftId: assignment.shift_id,
businessId: assignment.business_id,
vendorId: assignment.vendor_id,
staffId: assignment.staff_id,
clockPointId: assignment.clock_point_id,
};
}
function generateOrderNumber(prefix = 'ORD') {
const stamp = Date.now().toString().slice(-8);
const random = crypto.randomInt(100, 999);
return `${prefix}-${stamp}${random}`;
}
function normalizeWorkerCount(position) {
return position.workerCount ?? position.workersNeeded ?? 1;
}
function roleCodeFromName(name) {
return `${name || 'role'}`
.toUpperCase()
.replace(/[^A-Z0-9]+/g, '_')
.replace(/^_+|_+$/g, '')
.slice(0, 50) || 'ROLE';
}
function toArrayOfUniqueIntegers(values, fallback) {
const source = Array.isArray(values) && values.length > 0 ? values : fallback;
return [...new Set(source.map((value) => Number(value)).filter((value) => Number.isInteger(value) && value >= 0 && value <= 6))];
}
function combineDateAndTime(dateValue, timeValue) {
const [hours, minutes] = `${timeValue}`.split(':').map((value) => Number.parseInt(value, 10));
const date = new Date(`${dateValue}T00:00:00.000Z`);
if (Number.isNaN(date.getTime()) || Number.isNaN(hours) || Number.isNaN(minutes)) {
throw new AppError('VALIDATION_ERROR', 'Invalid date/time combination for order schedule', 400, {
date: dateValue,
time: timeValue,
});
}
date.setUTCHours(hours, minutes, 0, 0);
return date;
}
function buildShiftWindow(dateValue, startTime, endTime) {
const startsAt = combineDateAndTime(dateValue, startTime);
const endsAt = combineDateAndTime(dateValue, endTime);
if (endsAt <= startsAt) {
endsAt.setUTCDate(endsAt.getUTCDate() + 1);
}
return {
startsAt: startsAt.toISOString(),
endsAt: endsAt.toISOString(),
};
}
function materializeScheduleDates({ orderType, orderDate, startDate, endDate, recurrenceDays, daysOfWeek, horizonDays }) {
if (orderType === 'ONE_TIME') {
return [orderDate];
}
const from = new Date(`${startDate}T00:00:00.000Z`);
if (Number.isNaN(from.getTime())) {
throw new AppError('VALIDATION_ERROR', 'Invalid startDate', 400, { startDate });
}
const to = orderType === 'RECURRING'
? new Date(`${endDate}T00:00:00.000Z`)
: (() => {
if (endDate) return new Date(`${endDate}T00:00:00.000Z`);
const fallback = new Date(from);
fallback.setUTCDate(fallback.getUTCDate() + (horizonDays || 28));
return fallback;
})();
if (Number.isNaN(to.getTime()) || to < from) {
throw new AppError('VALIDATION_ERROR', 'Invalid scheduling window', 400, {
startDate,
endDate: endDate || null,
});
}
const activeDays = orderType === 'RECURRING'
? toArrayOfUniqueIntegers(recurrenceDays, [])
: toArrayOfUniqueIntegers(daysOfWeek, [1, 2, 3, 4, 5]);
const dates = [];
const cursor = new Date(from);
while (cursor <= to) {
if (activeDays.includes(cursor.getUTCDay())) {
dates.push(cursor.toISOString().slice(0, 10));
}
cursor.setUTCDate(cursor.getUTCDate() + 1);
}
if (dates.length === 0) {
throw new AppError('VALIDATION_ERROR', 'Schedule did not produce any shifts', 400, {
orderType,
startDate,
endDate: endDate || null,
activeDays,
});
}
return dates;
}
async function loadHubDetails(tenantId, businessId, hubId) {
const result = await query(
`
SELECT
cp.id,
cp.label,
cp.address,
cp.latitude,
cp.longitude,
cp.geofence_radius_meters,
cp.metadata
FROM clock_points cp
WHERE cp.tenant_id = $1
AND cp.business_id = $2
AND cp.id = $3
AND cp.status = 'ACTIVE'
`,
[tenantId, businessId, hubId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Hub not found in business scope', 404, { hubId });
}
return result.rows[0];
}
async function resolveRoleCatalogEntries(tenantId, positions) {
const roleIds = positions.map((position) => position.roleId).filter(Boolean);
const roleCodes = positions.map((position) => position.roleCode).filter(Boolean);
if (roleIds.length === 0 && roleCodes.length === 0) {
return new Map();
}
const result = await query(
`
SELECT id, code, name
FROM roles_catalog
WHERE tenant_id = $1
AND status = 'ACTIVE'
AND (
(cardinality($2::uuid[]) > 0 AND id = ANY($2::uuid[]))
OR
(cardinality($3::text[]) > 0 AND code = ANY($3::text[]))
)
`,
[tenantId, roleIds, roleCodes]
);
const lookup = new Map();
for (const row of result.rows) {
lookup.set(row.id, row);
lookup.set(row.code, row);
}
return lookup;
}
function buildShiftRoleEntry(position, roleLookup) {
const role = roleLookup.get(position.roleId) || roleLookup.get(position.roleCode);
const workersNeeded = normalizeWorkerCount(position);
const billRateCents = position.billRateCents ?? position.hourlyRateCents ?? 0;
const payRateCents = position.payRateCents ?? position.hourlyRateCents ?? 0;
return {
startTime: position.startTime,
endTime: position.endTime,
roleId: role?.id || position.roleId || null,
roleCode: role?.code || position.roleCode || roleCodeFromName(position.roleName),
roleName: role?.name || position.roleName || role?.name || 'Role',
workersNeeded,
payRateCents,
billRateCents,
metadata: {
lunchBreakMinutes: position.lunchBreakMinutes ?? 0,
paidBreak: position.paidBreak ?? false,
instantBook: position.instantBook ?? false,
...position.metadata,
},
};
}
function buildOrderShifts({ dates, positions, timezone, hub }) {
const shifts = [];
for (const dateValue of dates) {
const buckets = new Map();
for (const position of positions) {
const key = `${position.startTime}|${position.endTime}`;
const bucket = buckets.get(key) || [];
bucket.push(position);
buckets.set(key, bucket);
}
let shiftIndex = 0;
for (const [timeKey, bucketPositions] of buckets.entries()) {
shiftIndex += 1;
const [startTime, endTime] = timeKey.split('|');
const window = buildShiftWindow(dateValue, startTime, endTime);
const requiredWorkers = bucketPositions.reduce((sum, position) => sum + normalizeWorkerCount(position), 0);
shifts.push({
shiftCode: `SFT-${dateValue.replaceAll('-', '')}-${shiftIndex}`,
title: `${hub.label} ${startTime}-${endTime}`,
startsAt: window.startsAt,
endsAt: window.endsAt,
timezone: timezone || 'UTC',
clockPointId: hub.id,
locationName: hub.label,
locationAddress: hub.address || null,
latitude: hub.latitude == null ? undefined : Number(hub.latitude),
longitude: hub.longitude == null ? undefined : Number(hub.longitude),
geofenceRadiusMeters: hub.geofence_radius_meters || undefined,
requiredWorkers,
metadata: {
city: hub.metadata?.city || null,
state: hub.metadata?.state || null,
zipCode: hub.metadata?.zipCode || null,
},
roles: bucketPositions,
});
}
}
return shifts;
}
function buildMobileOrderMetadata(orderType, payload, extra = {}) {
return {
orderType,
source: 'mobile-api',
recurrenceDays: payload.recurrenceDays || payload.daysOfWeek || null,
startDate: payload.startDate || payload.orderDate || null,
endDate: payload.endDate || null,
...payload.metadata,
...extra,
};
}
function inferAttendanceSourceType(payload) {
if (payload.sourceType) return payload.sourceType;
if (payload.nfcTagId) return 'NFC';
if (payload.latitude != null && payload.longitude != null) return 'GEO';
return 'MANUAL';
}
async function buildOrderCreatePayloadFromMobile(actor, context, payload, orderType, extra = {}) {
const hubId = payload.hubId || extra.hubId;
const hub = await loadHubDetails(context.tenant.tenantId, context.business.businessId, hubId);
const positionInputs = payload.positions || extra.positions || [];
const roleLookup = await resolveRoleCatalogEntries(context.tenant.tenantId, positionInputs);
const normalizedPositions = positionInputs.map((position) => buildShiftRoleEntry(position, roleLookup));
const dates = materializeScheduleDates({
orderType,
orderDate: payload.orderDate || extra.orderDate,
startDate: payload.startDate || extra.startDate,
endDate: payload.endDate || extra.endDate,
recurrenceDays: payload.recurrenceDays || extra.recurrenceDays,
daysOfWeek: payload.daysOfWeek || extra.daysOfWeek,
horizonDays: payload.horizonDays || extra.horizonDays,
});
const shifts = buildOrderShifts({
dates,
positions: normalizedPositions,
timezone: payload.timezone || extra.timezone,
hub,
});
const startsAt = shifts[0]?.startsAt || null;
const endsAt = shifts[shifts.length - 1]?.endsAt || null;
return {
tenantId: context.tenant.tenantId,
businessId: context.business.businessId,
vendorId: payload.vendorId ?? extra.vendorId ?? null,
orderNumber: generateOrderNumber(orderType === 'ONE_TIME' ? 'ORD' : orderType === 'RECURRING' ? 'RCR' : 'PRM'),
title: payload.eventName || extra.eventName || 'Untitled Order',
description: payload.description ?? extra.description ?? null,
status: 'OPEN',
serviceType: payload.serviceType ?? extra.serviceType ?? 'EVENT',
startsAt,
endsAt,
locationName: hub.label,
locationAddress: hub.address || null,
latitude: hub.latitude == null ? undefined : Number(hub.latitude),
longitude: hub.longitude == null ? undefined : Number(hub.longitude),
notes: payload.notes ?? extra.notes ?? null,
metadata: buildMobileOrderMetadata(orderType, payload, extra.metadata || {}),
shifts: shifts.map((shift, index) => ({
...shift,
shiftCode: shift.shiftCode || `SFT-${index + 1}`,
roles: shift.roles.map((role) => ({
roleId: role.roleId,
roleCode: role.roleCode,
roleName: role.roleName,
workersNeeded: role.workersNeeded,
payRateCents: role.payRateCents,
billRateCents: role.billRateCents,
metadata: role.metadata,
})),
})),
};
}
async function loadEditableOrderTemplate(actorUid, tenantId, businessId, orderId) {
const context = await requireClientContext(actorUid);
if (context.tenant.tenantId !== tenantId || context.business.businessId !== businessId) {
throw new AppError('FORBIDDEN', 'Order is outside the current client context', 403, { orderId });
}
const result = await query(
`
SELECT
o.id AS "orderId",
o.title AS "eventName",
o.description,
o.notes,
o.vendor_id AS "vendorId",
o.service_type AS "serviceType",
o.metadata,
COALESCE(
json_agg(
json_build_object(
'shiftId', s.id,
'clockPointId', s.clock_point_id,
'date', to_char(s.starts_at AT TIME ZONE 'UTC', 'YYYY-MM-DD'),
'startTime', to_char(s.starts_at AT TIME ZONE 'UTC', 'HH24:MI'),
'endTime', to_char(s.ends_at AT TIME ZONE 'UTC', 'HH24:MI'),
'roles', (
SELECT json_agg(
json_build_object(
'roleId', sr.role_id,
'roleCode', sr.role_code,
'roleName', sr.role_name,
'workerCount', sr.workers_needed,
'hourlyRateCents', sr.bill_rate_cents,
'payRateCents', sr.pay_rate_cents,
'billRateCents', sr.bill_rate_cents,
'startTime', to_char(s.starts_at AT TIME ZONE 'UTC', 'HH24:MI'),
'endTime', to_char(s.ends_at AT TIME ZONE 'UTC', 'HH24:MI')
)
ORDER BY sr.role_name ASC
)
FROM shift_roles sr
WHERE sr.shift_id = s.id
)
)
ORDER BY s.starts_at ASC
),
'[]'::json
) AS shifts
FROM orders o
JOIN shifts s ON s.order_id = o.id
WHERE o.id = $1
AND o.tenant_id = $2
AND o.business_id = $3
AND s.starts_at > NOW()
AND s.status NOT IN ('CANCELLED', 'COMPLETED')
GROUP BY o.id
`,
[orderId, tenantId, businessId]
);
if (result.rowCount === 0) {
throw new AppError('ORDER_EDIT_BLOCKED', 'Order has no future shifts available for edit', 409, { orderId });
}
return result.rows[0];
}
async function cancelFutureOrderSlice(client, {
actorUid,
tenantId,
businessId,
orderId,
reason,
metadata = {},
}) {
const orderResult = await client.query(
`
SELECT id, order_number, status
FROM orders
WHERE id = $1
AND tenant_id = $2
AND business_id = $3
FOR UPDATE
`,
[orderId, tenantId, businessId]
);
if (orderResult.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Order not found for cancel flow', 404, { orderId });
}
const order = orderResult.rows[0];
const futureShiftsResult = await client.query(
`
SELECT id
FROM shifts
WHERE order_id = $1
AND starts_at > NOW()
AND status NOT IN ('CANCELLED', 'COMPLETED')
ORDER BY starts_at ASC
FOR UPDATE
`,
[order.id]
);
if (futureShiftsResult.rowCount === 0) {
return {
orderId: order.id,
orderNumber: order.order_number,
status: order.status,
futureOnly: true,
cancelledShiftCount: 0,
alreadyCancelled: true,
};
}
const shiftIds = futureShiftsResult.rows.map((row) => row.id);
await client.query(
`
UPDATE orders
SET metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
order.id,
JSON.stringify({
futureCancellationReason: reason || null,
futureCancellationBy: actorUid,
futureCancellationAt: new Date().toISOString(),
...metadata,
}),
]
);
await client.query(
`
UPDATE shifts
SET status = 'CANCELLED',
updated_at = NOW()
WHERE id = ANY($1::uuid[])
`,
[shiftIds]
);
await client.query(
`
UPDATE assignments
SET status = 'CANCELLED',
updated_at = NOW()
WHERE shift_id = ANY($1::uuid[])
AND status = ANY($2::text[])
`,
[shiftIds, MOBILE_CANCELLABLE_ASSIGNMENT_STATUSES]
);
await client.query(
`
UPDATE applications
SET status = 'CANCELLED',
updated_at = NOW()
WHERE shift_id = ANY($1::uuid[])
AND status = ANY($2::text[])
`,
[shiftIds, MOBILE_CANCELLABLE_APPLICATION_STATUSES]
);
for (const shiftId of shiftIds) {
const roleIds = await client.query(
'SELECT id FROM shift_roles WHERE shift_id = $1',
[shiftId]
);
for (const role of roleIds.rows) {
await refreshShiftRoleCounts(client, role.id);
}
await refreshShiftCounts(client, shiftId);
}
await insertDomainEvent(client, {
tenantId,
aggregateType: 'order',
aggregateId: order.id,
eventType: 'ORDER_FUTURE_SLICE_CANCELLED',
actorUserId: actorUid,
payload: {
reason: reason || null,
shiftIds,
futureOnly: true,
},
});
return {
orderId: order.id,
orderNumber: order.order_number,
status: 'CANCELLED',
futureOnly: true,
cancelledShiftCount: shiftIds.length,
};
}
async function resolveStaffAssignmentForClock(actorUid, tenantId, payload, { requireOpenSession = false } = {}) {
const context = await requireStaffContext(actorUid);
if (payload.assignmentId) {
return { assignmentId: payload.assignmentId, context };
}
if (payload.applicationId) {
const fromApplication = await query(
`
SELECT a.id AS "assignmentId"
FROM assignments a
JOIN applications app ON app.id = a.application_id
JOIN staffs s ON s.id = a.staff_id
WHERE a.tenant_id = $1
AND app.id = $2
AND s.user_id = $3
ORDER BY a.created_at DESC
LIMIT 1
`,
[tenantId, payload.applicationId, actorUid]
);
if (fromApplication.rowCount > 0) {
return { assignmentId: fromApplication.rows[0].assignmentId, context };
}
}
if (payload.shiftId) {
const fromShift = await query(
`
SELECT a.id AS "assignmentId"
FROM assignments a
JOIN staffs s ON s.id = a.staff_id
WHERE a.tenant_id = $1
AND a.shift_id = $2
AND s.user_id = $3
ORDER BY a.created_at DESC
LIMIT 1
`,
[tenantId, payload.shiftId, actorUid]
);
if (fromShift.rowCount > 0) {
return { assignmentId: fromShift.rows[0].assignmentId, context };
}
}
if (requireOpenSession) {
const openSession = await query(
`
SELECT attendance_sessions.assignment_id AS "assignmentId"
FROM attendance_sessions
JOIN staffs s ON s.id = attendance_sessions.staff_id
WHERE attendance_sessions.tenant_id = $1
AND s.user_id = $2
AND attendance_sessions.status = 'OPEN'
ORDER BY attendance_sessions.updated_at DESC
LIMIT 1
`,
[tenantId, actorUid]
);
if (openSession.rowCount > 0) {
return { assignmentId: openSession.rows[0].assignmentId, context };
}
}
throw new AppError('NOT_FOUND', 'No assignment found for the current staff clock action', 404, payload);
}
async function loadAssignmentMonitoringContext(client, tenantId, assignmentId, actorUid) {
const result = await client.query(
`
SELECT
a.id,
a.tenant_id,
a.business_id,
a.vendor_id,
a.shift_id,
a.shift_role_id,
a.staff_id,
a.status,
s.clock_point_id,
s.title AS shift_title,
s.starts_at,
s.ends_at,
s.clock_in_mode,
s.allow_clock_in_override,
cp.default_clock_in_mode,
cp.allow_clock_in_override AS default_allow_clock_in_override,
cp.nfc_tag_uid AS expected_nfc_tag_uid,
COALESCE(s.latitude, cp.latitude) AS expected_latitude,
COALESCE(s.longitude, cp.longitude) AS expected_longitude,
COALESCE(s.geofence_radius_meters, cp.geofence_radius_meters) AS geofence_radius_meters
FROM assignments a
JOIN staffs st ON st.id = a.staff_id
JOIN shifts s ON s.id = a.shift_id
LEFT JOIN clock_points cp ON cp.id = s.clock_point_id
WHERE a.tenant_id = $1
AND a.id = $2
AND st.user_id = $3
FOR UPDATE OF a, s
`,
[tenantId, assignmentId, actorUid]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Assignment not found in staff scope', 404, {
assignmentId,
});
}
return result.rows[0];
}
async function ensureActorUser(client, actor, fields = {}) {
await client.query(
`
INSERT INTO users (id, email, display_name, phone, status, metadata)
VALUES ($1, $2, $3, $4, 'ACTIVE', COALESCE($5::jsonb, '{}'::jsonb))
ON CONFLICT (id) DO UPDATE
SET email = COALESCE(EXCLUDED.email, users.email),
display_name = COALESCE(EXCLUDED.display_name, users.display_name),
phone = COALESCE(EXCLUDED.phone, users.phone),
metadata = COALESCE(users.metadata, '{}'::jsonb) || COALESCE(EXCLUDED.metadata, '{}'::jsonb),
updated_at = NOW()
`,
[
actor.uid,
fields.email ?? actor.email ?? null,
fields.displayName ?? actor.email ?? null,
fields.phone ?? null,
JSON.stringify(fields.metadata || {}),
]
);
}
async function insertDomainEvent(client, {
tenantId,
aggregateType,
aggregateId,
eventType,
actorUserId,
payload,
}) {
await client.query(
`
INSERT INTO domain_events (
tenant_id,
aggregate_type,
aggregate_id,
sequence,
event_type,
actor_user_id,
payload
)
SELECT
$1,
$2,
$3,
COALESCE(MAX(sequence) + 1, 1),
$4,
$5,
$6::jsonb
FROM domain_events
WHERE tenant_id = $1
AND aggregate_type = $2
AND aggregate_id = $3
`,
[tenantId, aggregateType, aggregateId, eventType, actorUserId, JSON.stringify(payload || {})]
);
}
async function requireBusinessMembership(client, businessId, userId) {
const result = await client.query(
`
SELECT bm.id, bm.tenant_id, bm.business_id, bm.user_id
FROM business_memberships bm
WHERE bm.business_id = $1
AND bm.user_id = $2
AND bm.membership_status = 'ACTIVE'
`,
[businessId, userId]
);
if (result.rowCount === 0) {
throw new AppError('FORBIDDEN', 'Business membership not found for current user', 403, {
businessId,
userId,
});
}
return result.rows[0];
}
async function requireClockPoint(client, tenantId, businessId, hubId, { forUpdate = false } = {}) {
const result = await client.query(
`
SELECT
id,
tenant_id,
business_id,
label,
status,
cost_center_id,
nfc_tag_uid,
latitude,
longitude,
geofence_radius_meters,
default_clock_in_mode,
allow_clock_in_override,
metadata
FROM clock_points
WHERE id = $1
AND tenant_id = $2
AND business_id = $3
${forUpdate ? 'FOR UPDATE' : ''}
`,
[hubId, tenantId, businessId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Hub not found in business scope', 404, {
tenantId,
businessId,
hubId,
});
}
return result.rows[0];
}
async function requireInvoice(client, tenantId, businessId, invoiceId) {
const result = await client.query(
`
SELECT id, tenant_id, business_id, status, metadata
FROM invoices
WHERE id = $1
AND tenant_id = $2
AND business_id = $3
FOR UPDATE
`,
[invoiceId, tenantId, businessId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Invoice not found in business scope', 404, {
tenantId,
businessId,
invoiceId,
});
}
return result.rows[0];
}
async function requireStaffByActor(client, tenantId, actorUid) {
const result = await client.query(
`
SELECT s.id, s.tenant_id, s.user_id, s.full_name, s.email, s.phone, s.metadata, s.primary_role,
w.id AS workforce_id, w.vendor_id, w.workforce_number
FROM staffs s
LEFT JOIN workforce w ON w.staff_id = s.id
WHERE s.tenant_id = $1
AND s.user_id = $2
ORDER BY s.created_at ASC
LIMIT 1
FOR UPDATE OF s
`,
[tenantId, actorUid]
);
if (result.rowCount === 0) {
throw new AppError('FORBIDDEN', 'Staff profile not found for current user', 403, {
tenantId,
actorUid,
});
}
return result.rows[0];
}
async function requireShiftRoleForStaffApply(client, tenantId, shiftId, roleId, staffId) {
const result = await client.query(
`
SELECT
s.id AS shift_id,
s.tenant_id,
s.business_id,
s.vendor_id,
s.clock_point_id,
s.status AS shift_status,
s.starts_at,
s.ends_at,
sr.id AS shift_role_id,
sr.role_code,
sr.role_name,
sr.workers_needed,
sr.assigned_count,
sr.pay_rate_cents,
swap_request.id AS swap_request_id
FROM shifts s
JOIN shift_roles sr ON sr.shift_id = s.id
LEFT JOIN LATERAL (
SELECT id
FROM shift_swap_requests
WHERE shift_role_id = sr.id
AND status = 'OPEN'
AND expires_at > NOW()
ORDER BY created_at DESC
LIMIT 1
) swap_request ON TRUE
WHERE s.id = $1
AND s.tenant_id = $2
AND ($3::uuid IS NULL OR sr.id = $3)
AND (
s.status IN ('OPEN', 'PENDING_CONFIRMATION')
OR (s.status = 'ASSIGNED' AND swap_request.id IS NOT NULL)
)
AND NOT EXISTS (
SELECT 1
FROM applications a
WHERE a.shift_role_id = sr.id
AND a.staff_id = $4
AND a.status IN ('PENDING', 'CONFIRMED', 'CHECKED_IN', 'COMPLETED')
)
ORDER BY sr.created_at ASC
LIMIT 1
FOR UPDATE OF s, sr
`,
[shiftId, tenantId, roleId || null, staffId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Open shift role not found or already applied', 404, {
tenantId,
shiftId,
roleId: roleId || null,
});
}
return result.rows[0];
}
async function loadDispatchMembership(client, {
tenantId,
businessId,
hubId,
staffId,
}) {
const result = await client.query(
`
SELECT
dtm.id,
dtm.team_type,
dtm.hub_id,
dtm.source,
dtm.effective_at,
dtm.expires_at
FROM dispatch_team_memberships dtm
WHERE dtm.tenant_id = $1
AND dtm.business_id = $2
AND dtm.staff_id = $3
AND dtm.status = 'ACTIVE'
AND dtm.effective_at <= NOW()
AND (dtm.expires_at IS NULL OR dtm.expires_at > NOW())
AND (dtm.hub_id IS NULL OR dtm.hub_id = $4)
ORDER BY
CASE dtm.team_type
WHEN 'CORE' THEN 1
WHEN 'CERTIFIED_LOCATION' THEN 2
ELSE 3
END ASC,
CASE WHEN dtm.hub_id = $4 THEN 0 ELSE 1 END ASC,
dtm.created_at ASC
LIMIT 1
`,
[tenantId, businessId, staffId, hubId || null]
);
if (result.rowCount === 0) {
return {
membershipId: null,
teamType: 'MARKETPLACE',
priority: resolveDispatchPriority('MARKETPLACE'),
source: 'SYSTEM',
scopedHubId: null,
};
}
return {
membershipId: result.rows[0].id,
teamType: result.rows[0].team_type,
priority: resolveDispatchPriority(result.rows[0].team_type),
source: result.rows[0].source,
scopedHubId: result.rows[0].hub_id,
};
}
async function requireSwapRequestForUpdate(client, tenantId, businessId, swapRequestId) {
const result = await client.query(
`
SELECT
srq.id,
srq.tenant_id,
srq.business_id,
srq.vendor_id,
srq.shift_id,
srq.shift_role_id,
srq.original_assignment_id,
srq.original_staff_id,
srq.requested_by_user_id,
srq.status,
srq.reason,
srq.expires_at,
srq.metadata,
a.status AS assignment_status,
a.application_id AS original_application_id,
s.clock_point_id,
s.starts_at,
s.ends_at,
s.title AS shift_title,
sr.role_name,
sr.role_code,
st.full_name AS original_staff_name,
st.user_id AS original_staff_user_id
FROM shift_swap_requests srq
JOIN assignments a ON a.id = srq.original_assignment_id
JOIN shifts s ON s.id = srq.shift_id
JOIN shift_roles sr ON sr.id = srq.shift_role_id
JOIN staffs st ON st.id = srq.original_staff_id
WHERE srq.id = $1
AND srq.tenant_id = $2
AND srq.business_id = $3
FOR UPDATE OF srq, a, s, sr
`,
[swapRequestId, tenantId, businessId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Shift swap request not found in business scope', 404, {
swapRequestId,
businessId,
});
}
return result.rows[0];
}
async function requireSwapCandidateApplication(client, swapRequest, applicationId) {
const result = await client.query(
`
SELECT
app.id,
app.staff_id,
app.status,
app.shift_id,
app.shift_role_id,
app.metadata,
st.full_name AS staff_name,
st.user_id AS staff_user_id,
w.id AS workforce_id
FROM applications app
JOIN staffs st ON st.id = app.staff_id
LEFT JOIN workforce w ON w.staff_id = st.id AND w.status = 'ACTIVE'
WHERE app.id = $1
AND app.shift_role_id = $2
AND app.shift_id = $3
FOR UPDATE OF app
`,
[applicationId, swapRequest.shift_role_id, swapRequest.shift_id]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Swap candidate application not found for this shift', 404, {
applicationId,
swapRequestId: swapRequest.id,
});
}
const application = result.rows[0];
if (!['PENDING', 'CONFIRMED'].includes(application.status)) {
throw new AppError('INVALID_SWAP_APPLICATION_STATE', 'Swap candidate must be pending or confirmed', 409, {
applicationId,
applicationStatus: application.status,
});
}
if (application.staff_id === swapRequest.original_staff_id) {
throw new AppError('INVALID_SWAP_APPLICATION', 'Original staff cannot be selected as their own replacement', 409, {
applicationId,
swapRequestId: swapRequest.id,
});
}
return application;
}
async function rejectOtherApplicationsForSwap(client, {
shiftRoleId,
selectedApplicationId = null,
reason,
actorUid,
}) {
await client.query(
`
UPDATE applications
SET status = 'REJECTED',
metadata = COALESCE(metadata, '{}'::jsonb) || $3::jsonb,
updated_at = NOW()
WHERE shift_role_id = $1
AND status IN ('PENDING', 'CONFIRMED')
AND ($2::uuid IS NULL OR id <> $2)
`,
[
shiftRoleId,
selectedApplicationId,
JSON.stringify({
rejectedBy: actorUid || 'system',
rejectionReason: reason,
rejectedAt: new Date().toISOString(),
}),
]
);
}
async function markSwapRequestStatus(client, {
swapRequestId,
status,
resolvedByUserId = null,
selectedApplicationId = null,
replacementAssignmentId = null,
metadata = {},
}) {
await client.query(
`
UPDATE shift_swap_requests
SET status = $2,
resolved_at = CASE WHEN $2 IN ('RESOLVED', 'CANCELLED', 'EXPIRED', 'AUTO_CANCELLED') THEN NOW() ELSE resolved_at END,
resolved_by_user_id = COALESCE($3, resolved_by_user_id),
selected_application_id = COALESCE($4, selected_application_id),
replacement_assignment_id = COALESCE($5, replacement_assignment_id),
metadata = COALESCE(metadata, '{}'::jsonb) || $6::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
swapRequestId,
status,
resolvedByUserId,
selectedApplicationId,
replacementAssignmentId,
JSON.stringify(metadata || {}),
]
);
}
async function requirePendingAssignmentForActor(client, tenantId, shiftId, actorUid) {
const result = await client.query(
`
SELECT
a.id,
a.tenant_id,
a.business_id,
a.vendor_id,
a.shift_id,
a.shift_role_id,
a.workforce_id,
a.staff_id,
a.status,
a.metadata
FROM assignments a
JOIN staffs s ON s.id = a.staff_id
WHERE a.tenant_id = $1
AND a.shift_id = $2
AND s.user_id = $3
AND a.status = 'ASSIGNED'
ORDER BY a.created_at ASC
LIMIT 1
FOR UPDATE OF a
`,
[tenantId, shiftId, actorUid]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Pending shift assignment not found for current user', 404, {
tenantId,
shiftId,
actorUid,
});
}
return result.rows[0];
}
async function requireAnyAssignmentForActor(client, tenantId, shiftId, actorUid) {
const result = await client.query(
`
SELECT
a.id,
a.tenant_id,
a.business_id,
a.vendor_id,
a.shift_id,
a.shift_role_id,
a.workforce_id,
a.staff_id,
a.status,
a.metadata
FROM assignments a
JOIN staffs s ON s.id = a.staff_id
WHERE a.tenant_id = $1
AND a.shift_id = $2
AND s.user_id = $3
ORDER BY a.created_at ASC
LIMIT 1
FOR UPDATE OF a
`,
[tenantId, shiftId, actorUid]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Shift assignment not found for current user', 404, {
tenantId,
shiftId,
actorUid,
});
}
return result.rows[0];
}
async function refreshShiftRoleCounts(client, shiftRoleId) {
await client.query(
`
UPDATE shift_roles sr
SET assigned_count = counts.assigned_count,
updated_at = NOW()
FROM (
SELECT $1::uuid AS shift_role_id,
COUNT(*) FILTER (
WHERE status IN ('ASSIGNED', 'ACCEPTED', 'SWAP_REQUESTED', 'CHECKED_IN', 'CHECKED_OUT', 'COMPLETED')
)::INTEGER AS assigned_count
FROM assignments
WHERE shift_role_id = $1
) counts
WHERE sr.id = counts.shift_role_id
`,
[shiftRoleId]
);
}
async function refreshShiftCounts(client, shiftId) {
await client.query(
`
UPDATE shifts s
SET assigned_workers = counts.assigned_workers,
updated_at = NOW()
FROM (
SELECT $1::uuid AS shift_id,
COUNT(*) FILTER (
WHERE status IN ('ASSIGNED', 'ACCEPTED', 'SWAP_REQUESTED', 'CHECKED_IN', 'CHECKED_OUT', 'COMPLETED')
)::INTEGER AS assigned_workers
FROM assignments
WHERE shift_id = $1
) counts
WHERE s.id = counts.shift_id
`,
[shiftId]
);
}
async function resolveStaffOnboardingScope(client, actorUid, tenantId, vendorId) {
const existing = await loadActorContext(actorUid);
if (existing.tenant?.tenantId && existing.vendor?.vendorId) {
return {
tenantId: existing.tenant.tenantId,
vendorId: existing.vendor.vendorId,
};
}
if (tenantId && vendorId) {
const verify = await client.query(
`
SELECT t.id AS tenant_id, v.id AS vendor_id
FROM tenants t
JOIN vendors v ON v.tenant_id = t.id
WHERE t.id = $1
AND v.id = $2
AND t.status = 'ACTIVE'
AND v.status = 'ACTIVE'
`,
[tenantId, vendorId]
);
if (verify.rowCount === 0) {
throw new AppError('VALIDATION_ERROR', 'tenantId and vendorId do not resolve to an active onboarding scope', 400, {
tenantId,
vendorId,
});
}
return {
tenantId,
vendorId,
};
}
const fallback = await client.query(
`
SELECT t.id AS tenant_id, v.id AS vendor_id
FROM tenants t
JOIN vendors v ON v.tenant_id = t.id
WHERE t.status = 'ACTIVE'
AND v.status = 'ACTIVE'
ORDER BY t.created_at ASC, v.created_at ASC
LIMIT 1
`
);
if (fallback.rowCount === 0) {
throw new AppError('CONFIGURATION_ERROR', 'No active tenant/vendor onboarding scope is configured', 500);
}
return {
tenantId: fallback.rows[0].tenant_id,
vendorId: fallback.rows[0].vendor_id,
};
}
function buildStaffMetadataPatch(existing, payload) {
return {
...existing,
...(payload.bio !== undefined ? { bio: payload.bio } : {}),
...(payload.firstName !== undefined ? { firstName: payload.firstName } : {}),
...(payload.lastName !== undefined ? { lastName: payload.lastName } : {}),
...(payload.preferredLocations !== undefined ? { preferredLocations: payload.preferredLocations } : {}),
...(payload.maxDistanceMiles !== undefined ? { maxDistanceMiles: payload.maxDistanceMiles } : {}),
...(payload.industries !== undefined ? { industries: payload.industries } : {}),
...(payload.skills !== undefined ? { skills: payload.skills } : {}),
...(payload.profileVisible !== undefined ? { profileVisible: payload.profileVisible } : {}),
};
}
export async function createHub(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const businessMembership = await requireBusinessMembership(client, context.business.businessId, actor.uid);
let costCenterId = payload.costCenterId || null;
if (costCenterId) {
const costCenter = await client.query(
`
SELECT id
FROM cost_centers
WHERE id = $1
AND tenant_id = $2
AND business_id = $3
`,
[costCenterId, context.tenant.tenantId, context.business.businessId]
);
if (costCenter.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Cost center not found in business scope', 404, {
costCenterId,
});
}
}
const result = await client.query(
`
INSERT INTO clock_points (
tenant_id,
business_id,
label,
address,
latitude,
longitude,
geofence_radius_meters,
nfc_tag_uid,
default_clock_in_mode,
allow_clock_in_override,
cost_center_id,
status,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6, COALESCE($7, 120), $8, COALESCE($9, 'EITHER'), COALESCE($10, TRUE), $11, 'ACTIVE', $12::jsonb)
RETURNING id
`,
[
context.tenant.tenantId,
context.business.businessId,
payload.name,
payload.fullAddress || null,
payload.latitude ?? null,
payload.longitude ?? null,
payload.geofenceRadiusMeters ?? null,
payload.nfcTagId || null,
payload.clockInMode || null,
payload.allowClockInOverride ?? null,
costCenterId,
JSON.stringify({
placeId: payload.placeId || null,
street: payload.street || null,
city: payload.city || null,
state: payload.state || null,
country: payload.country || null,
zipCode: payload.zipCode || null,
clockInPolicyConfiguredBy: businessMembership.id,
createdByMembershipId: businessMembership.id,
}),
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'clock_point',
aggregateId: result.rows[0].id,
eventType: 'HUB_CREATED',
actorUserId: actor.uid,
payload,
});
return { hubId: result.rows[0].id, created: true };
});
}
export async function updateHub(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const hub = await requireClockPoint(client, context.tenant.tenantId, context.business.businessId, payload.hubId, { forUpdate: true });
let costCenterId = payload.costCenterId;
if (costCenterId) {
const costCenter = await client.query(
`
SELECT id
FROM cost_centers
WHERE id = $1
AND tenant_id = $2
AND business_id = $3
`,
[costCenterId, context.tenant.tenantId, context.business.businessId]
);
if (costCenter.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Cost center not found in business scope', 404, {
costCenterId,
});
}
}
const nextMetadata = {
...(hub.metadata || {}),
...(payload.placeId !== undefined ? { placeId: payload.placeId } : {}),
...(payload.street !== undefined ? { street: payload.street } : {}),
...(payload.city !== undefined ? { city: payload.city } : {}),
...(payload.state !== undefined ? { state: payload.state } : {}),
...(payload.country !== undefined ? { country: payload.country } : {}),
...(payload.zipCode !== undefined ? { zipCode: payload.zipCode } : {}),
};
await client.query(
`
UPDATE clock_points
SET label = COALESCE($2, label),
address = COALESCE($3, address),
latitude = COALESCE($4, latitude),
longitude = COALESCE($5, longitude),
geofence_radius_meters = COALESCE($6, geofence_radius_meters),
cost_center_id = COALESCE($7, cost_center_id),
default_clock_in_mode = COALESCE($8, default_clock_in_mode),
allow_clock_in_override = COALESCE($9, allow_clock_in_override),
metadata = $10::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
hub.id,
payload.name || null,
payload.fullAddress || null,
payload.latitude ?? null,
payload.longitude ?? null,
payload.geofenceRadiusMeters ?? null,
costCenterId || null,
payload.clockInMode || null,
payload.allowClockInOverride ?? null,
JSON.stringify(nextMetadata),
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'clock_point',
aggregateId: hub.id,
eventType: 'HUB_UPDATED',
actorUserId: actor.uid,
payload,
});
return { hubId: hub.id, updated: true };
});
}
export async function deleteHub(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const hub = await requireClockPoint(client, context.tenant.tenantId, context.business.businessId, payload.hubId, { forUpdate: true });
const activeOrders = await client.query(
`
SELECT 1
FROM shifts
WHERE clock_point_id = $1
AND status IN ('DRAFT', 'OPEN', 'PENDING_CONFIRMATION', 'ASSIGNED', 'ACTIVE')
LIMIT 1
`,
[hub.id]
);
if (activeOrders.rowCount > 0) {
throw new AppError('HUB_DELETE_BLOCKED', 'Cannot delete a hub with active orders or shifts', 409, {
hubId: hub.id,
});
}
await client.query(
`
UPDATE clock_points
SET status = 'INACTIVE',
updated_at = NOW()
WHERE id = $1
`,
[hub.id]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'clock_point',
aggregateId: hub.id,
eventType: 'HUB_ARCHIVED',
actorUserId: actor.uid,
payload: { reason: payload.reason || null },
});
return { hubId: hub.id, deleted: true };
});
}
export async function assignHubNfc(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const hub = await requireClockPoint(client, context.tenant.tenantId, context.business.businessId, payload.hubId, { forUpdate: true });
await client.query(
`
UPDATE clock_points
SET nfc_tag_uid = $2,
updated_at = NOW()
WHERE id = $1
`,
[hub.id, payload.nfcTagId]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'clock_point',
aggregateId: hub.id,
eventType: 'HUB_NFC_ASSIGNED',
actorUserId: actor.uid,
payload,
});
return { hubId: hub.id, nfcTagId: payload.nfcTagId };
});
}
export async function assignHubManager(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const hub = await requireClockPoint(client, context.tenant.tenantId, context.business.businessId, payload.hubId, { forUpdate: true });
let businessMembershipId = payload.businessMembershipId || null;
if (!businessMembershipId) {
const membership = await client.query(
`
SELECT id
FROM business_memberships
WHERE tenant_id = $1
AND business_id = $2
AND user_id = $3
AND membership_status = 'ACTIVE'
`,
[context.tenant.tenantId, context.business.businessId, payload.managerUserId]
);
if (membership.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Business team member not found for hub manager assignment', 404, {
managerUserId: payload.managerUserId,
});
}
businessMembershipId = membership.rows[0].id;
} else {
const membership = await client.query(
`
SELECT id
FROM business_memberships
WHERE id = $1
AND tenant_id = $2
AND business_id = $3
AND membership_status = 'ACTIVE'
`,
[businessMembershipId, context.tenant.tenantId, context.business.businessId]
);
if (membership.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Business membership not found for hub manager assignment', 404, {
businessMembershipId,
});
}
}
const result = await client.query(
`
INSERT INTO hub_managers (tenant_id, hub_id, business_membership_id)
VALUES ($1, $2, $3)
ON CONFLICT (hub_id, business_membership_id) DO UPDATE
SET updated_at = NOW()
RETURNING id
`,
[context.tenant.tenantId, hub.id, businessMembershipId]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'clock_point',
aggregateId: hub.id,
eventType: 'HUB_MANAGER_ASSIGNED',
actorUserId: actor.uid,
payload: {
businessMembershipId,
},
});
return { managerAssignmentId: result.rows[0].id, hubId: hub.id, businessMembershipId };
});
}
export async function createShiftManager(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const invitedEmail = payload.email.trim().toLowerCase();
const fullName = `${payload.firstName} ${payload.lastName}`.trim();
const userLookup = await client.query(
`
SELECT id
FROM users
WHERE LOWER(email) = $1
LIMIT 1
`,
[invitedEmail]
);
const existingMembership = await client.query(
`
SELECT id, user_id, membership_status, metadata
FROM business_memberships
WHERE tenant_id = $1
AND business_id = $2
AND (
LOWER(invited_email) = $3
OR ($4::text IS NOT NULL AND user_id = $4)
)
LIMIT 1
FOR UPDATE
`,
[context.tenant.tenantId, context.business.businessId, invitedEmail, userLookup.rows[0]?.id || null]
);
const membershipMetadata = {
...(existingMembership.rows[0]?.metadata || {}),
firstName: payload.firstName,
lastName: payload.lastName,
fullName,
phone: normalizePhone(payload.phone),
source: 'mobile-api',
createdBy: actor.uid,
...(payload.metadata || {}),
};
let businessMembershipId;
let membershipStatus;
if (existingMembership.rowCount > 0) {
const result = await client.query(
`
UPDATE business_memberships
SET user_id = COALESCE(user_id, $2),
invited_email = $3,
membership_status = CASE
WHEN COALESCE(user_id, $2) IS NOT NULL THEN 'ACTIVE'
ELSE membership_status
END,
business_role = $4,
metadata = COALESCE(metadata, '{}'::jsonb) || $5::jsonb,
updated_at = NOW()
WHERE id = $1
RETURNING id, membership_status
`,
[
existingMembership.rows[0].id,
userLookup.rows[0]?.id || null,
invitedEmail,
payload.role || 'manager',
JSON.stringify(membershipMetadata),
]
);
businessMembershipId = result.rows[0].id;
membershipStatus = result.rows[0].membership_status;
} else {
const result = await client.query(
`
INSERT INTO business_memberships (
tenant_id,
business_id,
user_id,
invited_email,
membership_status,
business_role,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb)
RETURNING id, membership_status
`,
[
context.tenant.tenantId,
context.business.businessId,
userLookup.rows[0]?.id || null,
invitedEmail,
userLookup.rows[0]?.id ? 'ACTIVE' : 'INVITED',
payload.role || 'manager',
JSON.stringify(membershipMetadata),
]
);
businessMembershipId = result.rows[0].id;
membershipStatus = result.rows[0].membership_status;
}
let managerAssignmentId = null;
if (payload.hubId) {
const hub = await requireClockPoint(client, context.tenant.tenantId, context.business.businessId, payload.hubId, { forUpdate: true });
const assigned = await client.query(
`
INSERT INTO hub_managers (tenant_id, hub_id, business_membership_id)
VALUES ($1, $2, $3)
ON CONFLICT (hub_id, business_membership_id) DO UPDATE
SET updated_at = NOW()
RETURNING id
`,
[context.tenant.tenantId, hub.id, businessMembershipId]
);
managerAssignmentId = assigned.rows[0].id;
}
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'business_membership',
aggregateId: businessMembershipId,
eventType: 'SHIFT_MANAGER_CREATED',
actorUserId: actor.uid,
payload: {
invitedEmail,
fullName,
hubId: payload.hubId || null,
membershipStatus,
},
});
return {
businessMembershipId,
membershipStatus,
invitedEmail,
fullName,
role: payload.role || 'manager',
managerAssignmentId,
};
});
}
export async function approveInvoice(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const invoice = await requireInvoice(client, context.tenant.tenantId, context.business.businessId, payload.invoiceId);
await client.query(
`
UPDATE invoices
SET status = 'APPROVED',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[invoice.id, JSON.stringify({
approvedBy: actor.uid,
approvedAt: new Date().toISOString(),
})]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'invoice',
aggregateId: invoice.id,
eventType: 'INVOICE_APPROVED',
actorUserId: actor.uid,
payload: { invoiceId: invoice.id },
});
return { invoiceId: invoice.id, status: 'APPROVED' };
});
}
export async function disputeInvoice(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const invoice = await requireInvoice(client, context.tenant.tenantId, context.business.businessId, payload.invoiceId);
await client.query(
`
UPDATE invoices
SET status = 'DISPUTED',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[invoice.id, JSON.stringify({
disputedBy: actor.uid,
disputedAt: new Date().toISOString(),
disputeReason: payload.reason,
})]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'invoice',
aggregateId: invoice.id,
eventType: 'INVOICE_DISPUTED',
actorUserId: actor.uid,
payload: { invoiceId: invoice.id, reason: payload.reason },
});
return { invoiceId: invoice.id, status: 'DISPUTED', reason: payload.reason };
});
}
export async function rateWorkerFromCoverage(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const assignmentResult = await client.query(
`
SELECT a.id, a.tenant_id, a.business_id, a.staff_id
FROM assignments a
WHERE a.tenant_id = $1
AND a.business_id = $2
AND a.staff_id = $3
AND ($4::uuid IS NULL OR a.id = $4)
ORDER BY a.updated_at DESC
LIMIT 1
FOR UPDATE OF a
`,
[context.tenant.tenantId, context.business.businessId, payload.staffId, payload.assignmentId || null]
);
if (assignmentResult.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Assignment not found for worker review in business scope', 404, payload);
}
const assignment = assignmentResult.rows[0];
const reviewResult = await client.query(
`
INSERT INTO staff_reviews (
tenant_id,
business_id,
staff_id,
assignment_id,
reviewer_user_id,
rating,
review_text,
tags
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb)
ON CONFLICT (business_id, assignment_id, staff_id) DO UPDATE
SET reviewer_user_id = EXCLUDED.reviewer_user_id,
rating = EXCLUDED.rating,
review_text = EXCLUDED.review_text,
tags = EXCLUDED.tags,
updated_at = NOW()
RETURNING id
`,
[
context.tenant.tenantId,
context.business.businessId,
payload.staffId,
assignment.id,
actor.uid,
payload.rating,
payload.feedback || null,
JSON.stringify(ensureArray(payload.issueFlags || [])),
]
);
if (payload.markAsFavorite === true) {
await client.query(
`
INSERT INTO staff_favorites (tenant_id, business_id, staff_id, created_by_user_id)
VALUES ($1, $2, $3, $4)
ON CONFLICT (business_id, staff_id) DO UPDATE
SET created_by_user_id = EXCLUDED.created_by_user_id
`,
[context.tenant.tenantId, context.business.businessId, payload.staffId, actor.uid]
);
}
if (payload.markAsFavorite === false) {
await client.query(
`
DELETE FROM staff_favorites
WHERE tenant_id = $1
AND business_id = $2
AND staff_id = $3
`,
[context.tenant.tenantId, context.business.businessId, payload.staffId]
);
}
if (payload.markAsBlocked === true) {
await client.query(
`
INSERT INTO staff_blocks (
tenant_id,
business_id,
staff_id,
created_by_user_id,
reason,
issue_flags,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6::jsonb, $7::jsonb)
ON CONFLICT (business_id, staff_id) DO UPDATE
SET reason = EXCLUDED.reason,
issue_flags = EXCLUDED.issue_flags,
metadata = COALESCE(staff_blocks.metadata, '{}'::jsonb) || EXCLUDED.metadata,
updated_at = NOW()
`,
[
context.tenant.tenantId,
context.business.businessId,
payload.staffId,
actor.uid,
payload.feedback || null,
JSON.stringify(ensureArray(payload.issueFlags || [])),
JSON.stringify({
blockedByCoverageReview: true,
assignmentId: assignment.id,
}),
]
);
}
if (payload.markAsBlocked === false) {
await client.query(
`
DELETE FROM staff_blocks
WHERE tenant_id = $1
AND business_id = $2
AND staff_id = $3
`,
[context.tenant.tenantId, context.business.businessId, payload.staffId]
);
}
await client.query(
`
UPDATE staffs
SET average_rating = review_stats.avg_rating,
rating_count = review_stats.rating_count,
updated_at = NOW()
FROM (
SELECT staff_id,
ROUND(AVG(rating)::numeric, 2) AS avg_rating,
COUNT(*)::INTEGER AS rating_count
FROM staff_reviews
WHERE staff_id = $1
GROUP BY staff_id
) review_stats
WHERE staffs.id = review_stats.staff_id
`,
[payload.staffId]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'staff_review',
aggregateId: reviewResult.rows[0].id,
eventType: 'STAFF_REVIEWED_FROM_COVERAGE',
actorUserId: actor.uid,
payload,
});
return {
reviewId: reviewResult.rows[0].id,
assignmentId: assignment.id,
staffId: payload.staffId,
rating: payload.rating,
markAsFavorite: payload.markAsFavorite ?? null,
markAsBlocked: payload.markAsBlocked ?? null,
issueFlags: ensureArray(payload.issueFlags || []),
feedback: payload.feedback || null,
};
});
}
export async function cancelLateWorker(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const result = await client.query(
`
SELECT
a.id,
a.shift_id,
a.shift_role_id,
a.staff_id,
a.status,
s.required_workers,
s.assigned_workers,
s.tenant_id,
s.clock_point_id,
s.starts_at,
s.title AS shift_title,
st.user_id AS "staffUserId"
FROM assignments a
JOIN shifts s ON s.id = a.shift_id
JOIN staffs st ON st.id = a.staff_id
WHERE a.id = $1
AND a.tenant_id = $2
AND a.business_id = $3
FOR UPDATE OF a, s
`,
[payload.assignmentId, context.tenant.tenantId, context.business.businessId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Late worker assignment not found in business scope', 404, {
assignmentId: payload.assignmentId,
});
}
const assignment = result.rows[0];
if (['CHECKED_IN', 'CHECKED_OUT', 'COMPLETED'].includes(assignment.status)) {
throw new AppError('LATE_WORKER_CANCEL_BLOCKED', 'Worker is already checked in or completed and cannot be cancelled as late', 409, {
assignmentId: assignment.id,
});
}
const hasRecentIncident = await client.query(
`
SELECT 1
FROM geofence_incidents
WHERE assignment_id = $1
AND incident_type IN ('OUTSIDE_GEOFENCE', 'LOCATION_UNAVAILABLE', 'CLOCK_IN_OVERRIDE')
AND occurred_at >= $2::timestamptz - INTERVAL '30 minutes'
LIMIT 1
`,
[assignment.id, assignment.starts_at]
);
const shiftStartTime = assignment.starts_at ? new Date(assignment.starts_at).getTime() : null;
const startGraceElapsed = shiftStartTime != null
? Date.now() >= shiftStartTime + (10 * 60 * 1000)
: false;
if (!startGraceElapsed && hasRecentIncident.rowCount === 0) {
throw new AppError('LATE_WORKER_NOT_CONFIRMED', 'Late worker cancellation requires either a geofence incident or a started shift window', 409, {
assignmentId: assignment.id,
});
}
await client.query(
`
UPDATE assignments
SET status = 'CANCELLED',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[assignment.id, JSON.stringify({
cancellationReason: payload.reason || 'Cancelled for lateness',
cancelledBy: actor.uid,
cancelledAt: new Date().toISOString(),
})]
);
await refreshShiftRoleCounts(client, assignment.shift_role_id);
await refreshShiftCounts(client, assignment.shift_id);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'assignment',
aggregateId: assignment.id,
eventType: 'LATE_WORKER_CANCELLED',
actorUserId: actor.uid,
payload,
});
await enqueueHubManagerAlert(client, {
tenantId: context.tenant.tenantId,
businessId: context.business.businessId,
shiftId: assignment.shift_id,
assignmentId: assignment.id,
hubId: assignment.clock_point_id,
notificationType: 'LATE_WORKER_CANCELLED',
priority: 'HIGH',
subject: 'Late worker was removed from shift',
body: `${assignment.shift_title}: a late worker was cancelled and replacement search should begin`,
payload: {
assignmentId: assignment.id,
shiftId: assignment.shift_id,
reason: payload.reason || 'Cancelled for lateness',
},
dedupeScope: assignment.id,
});
await enqueueUserAlert(client, {
tenantId: context.tenant.tenantId,
businessId: context.business.businessId,
shiftId: assignment.shift_id,
assignmentId: assignment.id,
recipientUserId: assignment.staffUserId,
notificationType: 'SHIFT_ASSIGNMENT_CANCELLED_LATE',
priority: 'HIGH',
subject: 'Shift assignment cancelled',
body: `${assignment.shift_title}: your assignment was cancelled because you were marked late`,
payload: {
assignmentId: assignment.id,
shiftId: assignment.shift_id,
reason: payload.reason || 'Cancelled for lateness',
},
dedupeScope: assignment.id,
});
return {
assignmentId: assignment.id,
shiftId: assignment.shift_id,
replacementSearchTriggered: true,
status: 'CANCELLED',
};
});
}
export async function createClientOneTimeOrder(actor, payload) {
const context = await requireClientContext(actor.uid);
const commandPayload = await buildOrderCreatePayloadFromMobile(actor, context, payload, 'ONE_TIME');
return createOrderCommand(actor, commandPayload);
}
export async function createClientRecurringOrder(actor, payload) {
const context = await requireClientContext(actor.uid);
const commandPayload = await buildOrderCreatePayloadFromMobile(actor, context, payload, 'RECURRING');
return createOrderCommand(actor, commandPayload);
}
export async function createClientPermanentOrder(actor, payload) {
const context = await requireClientContext(actor.uid);
const commandPayload = await buildOrderCreatePayloadFromMobile(actor, context, payload, 'PERMANENT');
return createOrderCommand(actor, commandPayload);
}
export async function createEditedOrderCopy(actor, payload) {
const context = await requireClientContext(actor.uid);
const template = await loadEditableOrderTemplate(
actor.uid,
context.tenant.tenantId,
context.business.businessId,
payload.orderId
);
const templateShifts = Array.isArray(template.shifts) ? template.shifts : [];
const templatePositions = Array.from(
templateShifts.reduce((deduped, shift) => {
for (const role of (Array.isArray(shift.roles) ? shift.roles : [])) {
const normalized = {
...role,
startTime: role.startTime || shift.startTime,
endTime: role.endTime || shift.endTime,
};
const key = [
normalized.roleId || '',
normalized.roleCode || '',
normalized.roleName || '',
normalized.startTime || '',
normalized.endTime || '',
normalized.workerCount ?? '',
normalized.payRateCents ?? '',
normalized.billRateCents ?? '',
].join('|');
if (!deduped.has(key)) {
deduped.set(key, normalized);
}
}
return deduped;
}, new Map()).values()
);
const firstShift = templateShifts[0] || {};
const lastShift = templateShifts[templateShifts.length - 1] || {};
const inferredOrderType = payload.orderType || template.metadata?.orderType || 'ONE_TIME';
const inferredDays = [...new Set(templateShifts.map((shift) => new Date(`${shift.date}T00:00:00.000Z`).getUTCDay()).filter((value) => Number.isInteger(value)))];
const commandPayload = await buildOrderCreatePayloadFromMobile(
actor,
context,
{
...payload,
hubId: payload.hubId || firstShift.clockPointId,
vendorId: payload.vendorId ?? template.vendorId ?? undefined,
eventName: payload.eventName || template.eventName,
description: payload.description ?? template.description ?? undefined,
notes: payload.notes ?? template.notes ?? undefined,
serviceType: payload.serviceType ?? template.serviceType ?? undefined,
positions: payload.positions || templatePositions,
orderDate: payload.orderDate || firstShift.date,
startDate: payload.startDate || firstShift.date,
endDate: payload.endDate || lastShift.date || firstShift.date,
recurrenceDays: payload.recurrenceDays || inferredDays,
daysOfWeek: payload.daysOfWeek || inferredDays,
metadata: {
sourceOrderId: payload.orderId,
...template.metadata,
...payload.metadata,
},
},
inferredOrderType,
{
metadata: {
editSourceOrderId: payload.orderId,
},
}
);
return createOrderCommand(actor, commandPayload);
}
export async function cancelClientOrder(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
return cancelFutureOrderSlice(client, {
actorUid: actor.uid,
tenantId: context.tenant.tenantId,
businessId: context.business.businessId,
orderId: payload.orderId,
reason: payload.reason,
metadata: payload.metadata,
});
});
}
export async function staffClockIn(actor, payload) {
const context = await requireStaffContext(actor.uid);
const { assignmentId } = await resolveStaffAssignmentForClock(actor.uid, context.tenant.tenantId, payload);
return clockInCommand(actor, {
assignmentId,
sourceType: inferAttendanceSourceType(payload),
sourceReference: payload.sourceReference || payload.notes || null,
nfcTagUid: payload.nfcTagId || null,
deviceId: payload.deviceId || null,
latitude: payload.latitude,
longitude: payload.longitude,
accuracyMeters: payload.accuracyMeters,
capturedAt: payload.capturedAt,
overrideReason: payload.overrideReason || null,
proofNonce: payload.proofNonce || null,
proofTimestamp: payload.proofTimestamp || null,
attestationProvider: payload.attestationProvider || null,
attestationToken: payload.attestationToken || null,
rawPayload: {
notes: payload.notes || null,
isMockLocation: payload.isMockLocation ?? null,
...(payload.rawPayload || {}),
},
});
}
export async function staffClockOut(actor, payload) {
const context = await requireStaffContext(actor.uid);
const { assignmentId } = await resolveStaffAssignmentForClock(
actor.uid,
context.tenant.tenantId,
payload,
{ requireOpenSession: true }
);
return clockOutCommand(actor, {
assignmentId,
sourceType: inferAttendanceSourceType(payload),
sourceReference: payload.sourceReference || payload.notes || null,
nfcTagUid: payload.nfcTagId || null,
deviceId: payload.deviceId || null,
latitude: payload.latitude,
longitude: payload.longitude,
accuracyMeters: payload.accuracyMeters,
capturedAt: payload.capturedAt,
overrideReason: payload.overrideReason || null,
proofNonce: payload.proofNonce || null,
proofTimestamp: payload.proofTimestamp || null,
attestationProvider: payload.attestationProvider || null,
attestationToken: payload.attestationToken || null,
rawPayload: {
notes: payload.notes || null,
breakMinutes: payload.breakMinutes ?? null,
applicationId: payload.applicationId || null,
isMockLocation: payload.isMockLocation ?? null,
...(payload.rawPayload || {}),
},
});
}
export async function registerClientPushToken(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const token = await registerPushToken(client, {
tenantId: context.tenant.tenantId,
userId: actor.uid,
businessMembershipId: context.business.membershipId,
provider: payload.provider,
platform: payload.platform,
pushToken: payload.pushToken,
deviceId: payload.deviceId || null,
appVersion: payload.appVersion || null,
appBuild: payload.appBuild || null,
locale: payload.locale || null,
timezone: payload.timezone || null,
notificationsEnabled: payload.notificationsEnabled ?? true,
metadata: payload.metadata || {},
});
return {
tokenId: token.id,
provider: token.provider,
platform: token.platform,
notificationsEnabled: token.notificationsEnabled,
};
});
}
export async function unregisterClientPushToken(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const removed = await unregisterPushToken(client, {
tenantId: context.tenant.tenantId,
userId: actor.uid,
tokenId: payload.tokenId || null,
pushToken: payload.pushToken || null,
reason: payload.reason || 'CLIENT_SIGN_OUT',
});
return {
removedCount: removed.length,
removed,
};
});
}
export async function registerStaffPushToken(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const token = await registerPushToken(client, {
tenantId: context.tenant.tenantId,
userId: actor.uid,
staffId: context.staff.staffId,
provider: payload.provider,
platform: payload.platform,
pushToken: payload.pushToken,
deviceId: payload.deviceId || null,
appVersion: payload.appVersion || null,
appBuild: payload.appBuild || null,
locale: payload.locale || null,
timezone: payload.timezone || null,
notificationsEnabled: payload.notificationsEnabled ?? true,
metadata: payload.metadata || {},
});
return {
tokenId: token.id,
provider: token.provider,
platform: token.platform,
notificationsEnabled: token.notificationsEnabled,
};
});
}
export async function unregisterStaffPushToken(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const removed = await unregisterPushToken(client, {
tenantId: context.tenant.tenantId,
userId: actor.uid,
tokenId: payload.tokenId || null,
pushToken: payload.pushToken || null,
reason: payload.reason || 'STAFF_SIGN_OUT',
});
return {
removedCount: removed.length,
removed,
};
});
}
function summarizeLocationPoints(points, assignment) {
let outOfGeofenceCount = 0;
let missingCoordinateCount = 0;
let maxDistance = null;
let latestOutsidePoint = null;
let latestMissingPoint = null;
for (const point of points) {
if (point.latitude == null || point.longitude == null) {
missingCoordinateCount += 1;
latestMissingPoint = point;
continue;
}
const distance = distanceMeters(
{
latitude: point.latitude,
longitude: point.longitude,
},
{
latitude: assignment.expected_latitude,
longitude: assignment.expected_longitude,
}
);
if (distance != null) {
maxDistance = maxDistance == null ? distance : Math.max(maxDistance, distance);
if (
assignment.geofence_radius_meters != null
&& distance > assignment.geofence_radius_meters
) {
outOfGeofenceCount += 1;
latestOutsidePoint = { ...point, distanceToClockPointMeters: distance };
}
}
}
return {
outOfGeofenceCount,
missingCoordinateCount,
maxDistanceToClockPointMeters: maxDistance,
latestOutsidePoint,
latestMissingPoint,
};
}
export async function submitLocationStreamBatch(actor, payload) {
const context = await requireStaffContext(actor.uid);
const { assignmentId } = await resolveStaffAssignmentForClock(
actor.uid,
context.tenant.tenantId,
payload,
{ requireOpenSession: true }
);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const assignment = await loadAssignmentMonitoringContext(
client,
context.tenant.tenantId,
assignmentId,
actor.uid
);
const policy = resolveEffectiveClockInPolicy(assignment);
const points = [...payload.points]
.map((point) => ({
...point,
capturedAt: toIsoOrNull(point.capturedAt),
}))
.sort((left, right) => new Date(left.capturedAt).getTime() - new Date(right.capturedAt).getTime());
const batchId = crypto.randomUUID();
const summary = summarizeLocationPoints(points, assignment);
const objectUri = await uploadLocationBatch({
tenantId: assignment.tenant_id,
staffId: assignment.staff_id,
assignmentId: assignment.id,
batchId,
payload: {
...buildAssignmentReferencePayload(assignment),
effectiveClockInMode: policy.mode,
points,
metadata: payload.metadata || {},
},
});
await client.query(
`
INSERT INTO location_stream_batches (
id,
tenant_id,
business_id,
vendor_id,
shift_id,
assignment_id,
staff_id,
actor_user_id,
source_type,
device_id,
object_uri,
point_count,
out_of_geofence_count,
missing_coordinate_count,
max_distance_to_clock_point_meters,
started_at,
ended_at,
metadata
)
VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16::timestamptz, $17::timestamptz, $18::jsonb
)
`,
[
batchId,
assignment.tenant_id,
assignment.business_id,
assignment.vendor_id,
assignment.shift_id,
assignment.id,
assignment.staff_id,
actor.uid,
payload.sourceType,
payload.deviceId || null,
objectUri,
points.length,
summary.outOfGeofenceCount,
summary.missingCoordinateCount,
summary.maxDistanceToClockPointMeters,
points[0]?.capturedAt || null,
points[points.length - 1]?.capturedAt || null,
JSON.stringify(payload.metadata || {}),
]
);
const incidentIds = [];
if (summary.outOfGeofenceCount > 0) {
const incidentId = await recordGeofenceIncident(client, {
assignment,
actorUserId: actor.uid,
locationStreamBatchId: batchId,
incidentType: 'OUTSIDE_GEOFENCE',
severity: 'CRITICAL',
effectiveClockInMode: policy.mode,
sourceType: payload.sourceType,
deviceId: payload.deviceId || null,
latitude: summary.latestOutsidePoint?.latitude ?? null,
longitude: summary.latestOutsidePoint?.longitude ?? null,
accuracyMeters: summary.latestOutsidePoint?.accuracyMeters ?? null,
distanceToClockPointMeters: summary.latestOutsidePoint?.distanceToClockPointMeters ?? null,
withinGeofence: false,
message: `${summary.outOfGeofenceCount} location points were outside the configured geofence`,
occurredAt: summary.latestOutsidePoint?.capturedAt || points[points.length - 1]?.capturedAt || null,
metadata: {
pointCount: points.length,
outOfGeofenceCount: summary.outOfGeofenceCount,
objectUri,
},
});
incidentIds.push(incidentId);
await enqueueHubManagerAlert(client, {
tenantId: assignment.tenant_id,
businessId: assignment.business_id,
shiftId: assignment.shift_id,
assignmentId: assignment.id,
hubId: assignment.clock_point_id,
relatedIncidentId: incidentId,
notificationType: 'GEOFENCE_BREACH_ALERT',
priority: 'CRITICAL',
subject: 'Worker left the workplace geofence',
body: `${assignment.shift_title}: location stream shows the worker outside the geofence`,
payload: {
...buildAssignmentReferencePayload(assignment),
batchId,
objectUri,
outOfGeofenceCount: summary.outOfGeofenceCount,
},
dedupeScope: batchId,
});
}
if (summary.missingCoordinateCount > 0) {
const incidentId = await recordGeofenceIncident(client, {
assignment,
actorUserId: actor.uid,
locationStreamBatchId: batchId,
incidentType: 'LOCATION_UNAVAILABLE',
severity: 'WARNING',
effectiveClockInMode: policy.mode,
sourceType: payload.sourceType,
deviceId: payload.deviceId || null,
message: `${summary.missingCoordinateCount} location points were missing coordinates`,
occurredAt: summary.latestMissingPoint?.capturedAt || points[points.length - 1]?.capturedAt || null,
metadata: {
pointCount: points.length,
missingCoordinateCount: summary.missingCoordinateCount,
objectUri,
},
});
incidentIds.push(incidentId);
await enqueueHubManagerAlert(client, {
tenantId: assignment.tenant_id,
businessId: assignment.business_id,
shiftId: assignment.shift_id,
assignmentId: assignment.id,
hubId: assignment.clock_point_id,
relatedIncidentId: incidentId,
notificationType: 'LOCATION_SIGNAL_WARNING',
priority: 'HIGH',
subject: 'Worker location signal unavailable',
body: `${assignment.shift_title}: background location tracking reported missing coordinates`,
payload: {
...buildAssignmentReferencePayload(assignment),
batchId,
objectUri,
missingCoordinateCount: summary.missingCoordinateCount,
},
dedupeScope: `${batchId}:missing`,
});
}
await insertDomainEvent(client, {
tenantId: assignment.tenant_id,
aggregateType: 'location_stream_batch',
aggregateId: batchId,
eventType: 'LOCATION_STREAM_BATCH_RECORDED',
actorUserId: actor.uid,
payload: {
...buildAssignmentReferencePayload(assignment),
batchId,
objectUri,
pointCount: points.length,
outOfGeofenceCount: summary.outOfGeofenceCount,
missingCoordinateCount: summary.missingCoordinateCount,
},
});
return {
batchId,
assignmentId: assignment.id,
shiftId: assignment.shift_id,
effectiveClockInMode: policy.mode,
pointCount: points.length,
outOfGeofenceCount: summary.outOfGeofenceCount,
missingCoordinateCount: summary.missingCoordinateCount,
objectUri,
incidentIds,
};
});
}
export async function updateStaffAvailabilityDay(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const result = await client.query(
`
INSERT INTO staff_availability (
tenant_id,
staff_id,
day_of_week,
availability_status,
time_slots,
metadata
)
VALUES ($1, $2, $3, $4, $5::jsonb, $6::jsonb)
ON CONFLICT (staff_id, day_of_week) DO UPDATE
SET availability_status = EXCLUDED.availability_status,
time_slots = EXCLUDED.time_slots,
metadata = COALESCE(staff_availability.metadata, '{}'::jsonb) || EXCLUDED.metadata,
updated_at = NOW()
RETURNING id
`,
[
context.tenant.tenantId,
staff.id,
payload.dayOfWeek,
payload.availabilityStatus,
JSON.stringify(payload.slots || []),
JSON.stringify(payload.metadata || {}),
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'staff_availability',
aggregateId: result.rows[0].id,
eventType: 'STAFF_AVAILABILITY_UPDATED',
actorUserId: actor.uid,
payload,
});
return {
availabilityId: result.rows[0].id,
dayOfWeek: payload.dayOfWeek,
availabilityStatus: payload.availabilityStatus,
slots: payload.slots || [],
};
});
}
export async function quickSetStaffAvailability(actor, payload) {
const context = await requireStaffContext(actor.uid);
const presets = {
all: [0, 1, 2, 3, 4, 5, 6],
weekdays: [1, 2, 3, 4, 5],
weekends: [0, 6],
clear: [],
};
const selectedDays = presets[payload.quickSetType];
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
for (let day = 0; day <= 6; day += 1) {
const active = selectedDays.includes(day);
await client.query(
`
INSERT INTO staff_availability (
tenant_id,
staff_id,
day_of_week,
availability_status,
time_slots,
metadata
)
VALUES ($1, $2, $3, $4, $5::jsonb, $6::jsonb)
ON CONFLICT (staff_id, day_of_week) DO UPDATE
SET availability_status = EXCLUDED.availability_status,
time_slots = EXCLUDED.time_slots,
metadata = COALESCE(staff_availability.metadata, '{}'::jsonb) || EXCLUDED.metadata,
updated_at = NOW()
`,
[
context.tenant.tenantId,
staff.id,
day,
active ? 'AVAILABLE' : 'UNAVAILABLE',
JSON.stringify(active ? payload.slots || [{ start: '08:00', end: '18:00' }] : []),
JSON.stringify({
quickSetType: payload.quickSetType,
startDate: payload.startDate,
endDate: payload.endDate,
}),
]
);
}
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'staff',
aggregateId: staff.id,
eventType: 'STAFF_AVAILABILITY_QUICK_SET',
actorUserId: actor.uid,
payload,
});
return {
quickSetType: payload.quickSetType,
appliedDays: selectedDays,
};
});
}
export async function applyForShift(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const shiftRole = await requireShiftRoleForStaffApply(client, context.tenant.tenantId, payload.shiftId, payload.roleId, staff.id);
const dispatchMembership = await loadDispatchMembership(client, {
tenantId: context.tenant.tenantId,
businessId: shiftRole.business_id,
hubId: shiftRole.clock_point_id,
staffId: staff.id,
});
await ensureStaffNotBlockedByBusiness(client, {
tenantId: context.tenant.tenantId,
businessId: shiftRole.business_id,
staffId: staff.id,
});
const existingAssignment = await client.query(
`
SELECT id
FROM assignments
WHERE shift_role_id = $1
AND staff_id = $2
AND status IN ('ASSIGNED', 'ACCEPTED', 'SWAP_REQUESTED', 'CHECKED_IN', 'CHECKED_OUT', 'COMPLETED')
LIMIT 1
`,
[shiftRole.shift_role_id, staff.id]
);
if (existingAssignment.rowCount > 0) {
throw new AppError('CONFLICT', 'Staff is already assigned to this shift', 409, {
shiftId: payload.shiftId,
staffId: staff.id,
});
}
const instantBook = payload.instantBook === true && shiftRole.assigned_count < shiftRole.workers_needed && Boolean(staff.workforce_id);
const applicationResult = await client.query(
`
INSERT INTO applications (
tenant_id,
shift_id,
shift_role_id,
staff_id,
status,
origin,
metadata
)
VALUES ($1, $2, $3, $4, $5, 'STAFF', $6::jsonb)
ON CONFLICT (shift_role_id, staff_id) DO UPDATE
SET status = EXCLUDED.status,
origin = EXCLUDED.origin,
metadata = COALESCE(applications.metadata, '{}'::jsonb) || EXCLUDED.metadata,
updated_at = NOW()
RETURNING id, status
`,
[
context.tenant.tenantId,
shiftRole.shift_id,
shiftRole.shift_role_id,
staff.id,
instantBook ? 'CONFIRMED' : 'PENDING',
JSON.stringify({
appliedBy: actor.uid,
instantBookRequested: payload.instantBook === true,
dispatchTeamType: dispatchMembership.teamType,
dispatchPriority: dispatchMembership.priority,
dispatchTeamMembershipId: dispatchMembership.membershipId,
dispatchTeamScopeHubId: dispatchMembership.scopedHubId,
}),
]
);
let assignmentId = null;
let assignmentStatus = null;
if (instantBook) {
const assignmentResult = await client.query(
`
INSERT INTO assignments (
tenant_id,
business_id,
vendor_id,
shift_id,
shift_role_id,
workforce_id,
staff_id,
application_id,
status,
assigned_at,
accepted_at,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'ACCEPTED', NOW(), NOW(), $9::jsonb)
ON CONFLICT (shift_role_id, workforce_id) DO UPDATE
SET application_id = EXCLUDED.application_id,
status = 'ACCEPTED',
accepted_at = COALESCE(assignments.accepted_at, NOW()),
updated_at = NOW()
RETURNING id, status
`,
[
context.tenant.tenantId,
shiftRole.business_id,
shiftRole.vendor_id,
shiftRole.shift_id,
shiftRole.shift_role_id,
staff.workforce_id,
staff.id,
applicationResult.rows[0].id,
JSON.stringify({ source: 'staff-apply-instant-book' }),
]
);
assignmentId = assignmentResult.rows[0].id;
assignmentStatus = assignmentResult.rows[0].status;
await refreshShiftRoleCounts(client, shiftRole.shift_role_id);
await refreshShiftCounts(client, shiftRole.shift_id);
}
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'application',
aggregateId: applicationResult.rows[0].id,
eventType: instantBook ? 'SHIFT_INSTANT_BOOKED' : 'SHIFT_APPLIED',
actorUserId: actor.uid,
payload,
});
return {
applicationId: applicationResult.rows[0].id,
shiftId: shiftRole.shift_id,
roleId: shiftRole.shift_role_id,
status: instantBook ? 'CONFIRMED' : applicationResult.rows[0].status,
assignmentId,
assignmentStatus,
};
});
}
export async function bookOrder(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
if (!staff.workforce_id) {
throw new AppError('UNPROCESSABLE_ENTITY', 'Staff must have an active workforce profile before booking an order', 422, {
orderId: payload.orderId,
staffId: staff.id,
});
}
const roleLookup = await client.query(
`
SELECT id, code, name
FROM roles_catalog
WHERE tenant_id = $1
AND id = $2
AND status = 'ACTIVE'
LIMIT 1
`,
[context.tenant.tenantId, payload.roleId]
);
if (roleLookup.rowCount === 0) {
throw new AppError('VALIDATION_ERROR', 'roleId must reference an active role in the tenant catalog', 400, {
roleId: payload.roleId,
});
}
const selectedRole = roleLookup.rows[0];
const orderLookup = await client.query(
`
SELECT id, business_id, metadata
FROM orders
WHERE tenant_id = $1
AND id = $2
LIMIT 1
FOR UPDATE
`,
[context.tenant.tenantId, payload.orderId]
);
if (orderLookup.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Order not found', 404, {
orderId: payload.orderId,
});
}
const existingOrderParticipation = await client.query(
`
SELECT
s.id AS shift_id,
sr.id AS shift_role_id,
a.id AS assignment_id,
app.id AS application_id
FROM shifts s
JOIN shift_roles sr ON sr.shift_id = s.id
LEFT JOIN assignments a
ON a.shift_role_id = sr.id
AND a.staff_id = $3
AND a.status IN ('ASSIGNED', 'ACCEPTED', 'SWAP_REQUESTED', 'CHECKED_IN', 'CHECKED_OUT', 'COMPLETED')
LEFT JOIN applications app
ON app.shift_role_id = sr.id
AND app.staff_id = $3
AND app.status IN ('PENDING', 'CONFIRMED', 'CHECKED_IN', 'COMPLETED')
WHERE s.tenant_id = $1
AND s.order_id = $2
AND s.starts_at > NOW()
AND (a.id IS NOT NULL OR app.id IS NOT NULL)
LIMIT 1
`,
[context.tenant.tenantId, payload.orderId, staff.id]
);
if (existingOrderParticipation.rowCount > 0) {
throw new AppError('CONFLICT', 'Staff already has participation on this order', 409, {
orderId: payload.orderId,
shiftId: existingOrderParticipation.rows[0].shift_id,
shiftRoleId: existingOrderParticipation.rows[0].shift_role_id,
});
}
const candidateRoles = await client.query(
`
SELECT
s.id AS shift_id,
s.order_id,
s.business_id,
s.vendor_id,
s.clock_point_id,
s.status AS shift_status,
s.starts_at,
s.ends_at,
COALESCE(s.timezone, 'UTC') AS timezone,
to_char(s.starts_at AT TIME ZONE COALESCE(s.timezone, 'UTC'), 'YYYY-MM-DD') AS local_date,
to_char(s.starts_at AT TIME ZONE COALESCE(s.timezone, 'UTC'), 'HH24:MI') AS local_start_time,
to_char(s.ends_at AT TIME ZONE COALESCE(s.timezone, 'UTC'), 'HH24:MI') AS local_end_time,
sr.id AS shift_role_id,
COALESCE(sr.role_id, rc.id) AS catalog_role_id,
COALESCE(sr.role_code, rc.code) AS role_code,
COALESCE(sr.role_name, rc.name) AS role_name,
sr.workers_needed,
sr.assigned_count,
COALESCE((sr.metadata->>'instantBook')::boolean, FALSE) AS instant_book
FROM shifts s
JOIN shift_roles sr ON sr.shift_id = s.id
LEFT JOIN roles_catalog rc
ON rc.tenant_id = s.tenant_id
AND (rc.id = sr.role_id OR (sr.role_id IS NULL AND rc.code = sr.role_code))
WHERE s.tenant_id = $1
AND s.order_id = $2
AND s.starts_at > NOW()
AND COALESCE(sr.role_id, rc.id) = $3
ORDER BY s.starts_at ASC, sr.created_at ASC
FOR UPDATE OF s, sr
`,
[context.tenant.tenantId, payload.orderId, payload.roleId]
);
if (candidateRoles.rowCount === 0) {
throw new AppError('UNPROCESSABLE_ENTITY', 'Order has no future shifts available for this role', 422, {
orderId: payload.orderId,
roleId: payload.roleId,
});
}
const blockedOrUnavailable = candidateRoles.rows.find((row) => row.shift_status !== 'OPEN' || row.assigned_count >= row.workers_needed);
if (blockedOrUnavailable) {
throw new AppError('UNPROCESSABLE_ENTITY', 'Order is no longer fully bookable', 422, {
orderId: payload.orderId,
roleId: payload.roleId,
shiftId: blockedOrUnavailable.shift_id,
shiftRoleId: blockedOrUnavailable.shift_role_id,
});
}
await ensureStaffNotBlockedByBusiness(client, {
tenantId: context.tenant.tenantId,
businessId: candidateRoles.rows[0].business_id,
staffId: staff.id,
});
const missingRequiredDocuments = await loadMissingRequiredDocuments(client, {
tenantId: context.tenant.tenantId,
roleCode: selectedRole.code,
staffId: staff.id,
});
if (missingRequiredDocuments.length > 0) {
throw new AppError('UNPROCESSABLE_ENTITY', 'Staff is missing required documents for this role', 422, buildMissingDocumentErrorDetails({
orderId: payload.orderId,
roleId: payload.roleId,
roleCode: selectedRole.code,
missingDocumentNames: missingRequiredDocuments,
}));
}
const bookingId = crypto.randomUUID();
const assignedShifts = [];
for (const row of candidateRoles.rows) {
const dispatchMembership = await loadDispatchMembership(client, {
tenantId: context.tenant.tenantId,
businessId: row.business_id,
hubId: row.clock_point_id,
staffId: staff.id,
});
const instantBook = Boolean(row.instant_book);
const applicationResult = await client.query(
`
INSERT INTO applications (
tenant_id,
shift_id,
shift_role_id,
staff_id,
status,
origin,
metadata
)
VALUES ($1, $2, $3, $4, $5, 'STAFF', $6::jsonb)
ON CONFLICT (shift_role_id, staff_id) DO NOTHING
RETURNING id, status
`,
[
context.tenant.tenantId,
row.shift_id,
row.shift_role_id,
staff.id,
instantBook ? 'CONFIRMED' : 'PENDING',
JSON.stringify({
bookingId,
bookedBy: actor.uid,
source: 'staff-order-booking',
orderId: payload.orderId,
catalogRoleId: payload.roleId,
roleCode: selectedRole.code,
dispatchTeamType: dispatchMembership.teamType,
dispatchPriority: dispatchMembership.priority,
dispatchTeamMembershipId: dispatchMembership.membershipId,
dispatchTeamScopeHubId: dispatchMembership.scopedHubId,
}),
]
);
if (applicationResult.rowCount === 0) {
throw new AppError('CONFLICT', 'Order booking conflicted with an existing application', 409, {
orderId: payload.orderId,
shiftId: row.shift_id,
shiftRoleId: row.shift_role_id,
});
}
const assignmentResult = await client.query(
`
INSERT INTO assignments (
tenant_id,
business_id,
vendor_id,
shift_id,
shift_role_id,
workforce_id,
staff_id,
application_id,
status,
assigned_at,
accepted_at,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW(), CASE WHEN $10::boolean THEN NOW() ELSE NULL END, $11::jsonb)
ON CONFLICT (shift_role_id, workforce_id) DO NOTHING
RETURNING id, status
`,
[
context.tenant.tenantId,
row.business_id,
row.vendor_id,
row.shift_id,
row.shift_role_id,
staff.workforce_id,
staff.id,
applicationResult.rows[0].id,
instantBook ? 'ACCEPTED' : 'ASSIGNED',
instantBook,
JSON.stringify({
bookingId,
bookedBy: actor.uid,
source: 'staff-order-booking',
orderId: payload.orderId,
catalogRoleId: payload.roleId,
roleCode: selectedRole.code,
pendingApproval: !instantBook,
dispatchTeamType: dispatchMembership.teamType,
dispatchPriority: dispatchMembership.priority,
dispatchTeamMembershipId: dispatchMembership.membershipId,
dispatchTeamScopeHubId: dispatchMembership.scopedHubId,
}),
]
);
if (assignmentResult.rowCount === 0) {
throw new AppError('CONFLICT', 'Order booking conflicted with an existing assignment', 409, {
orderId: payload.orderId,
shiftId: row.shift_id,
shiftRoleId: row.shift_role_id,
});
}
await refreshShiftRoleCounts(client, row.shift_role_id);
await refreshShiftCounts(client, row.shift_id);
assignedShifts.push({
shiftId: row.shift_id,
date: row.local_date,
startsAt: row.starts_at,
endsAt: row.ends_at,
startTime: row.local_start_time,
endTime: row.local_end_time,
timezone: row.timezone,
assignmentId: assignmentResult.rows[0].id,
assignmentStatus: assignmentResult.rows[0].status,
});
}
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'order',
aggregateId: payload.orderId,
eventType: candidateRoles.rows.every((row) => row.instant_book) ? 'STAFF_ORDER_BOOKED_CONFIRMED' : 'STAFF_ORDER_BOOKED_PENDING',
actorUserId: actor.uid,
payload: {
bookingId,
roleId: payload.roleId,
roleCode: selectedRole.code,
assignedShiftCount: assignedShifts.length,
},
});
return {
bookingId,
orderId: payload.orderId,
roleId: payload.roleId,
roleCode: selectedRole.code,
roleName: selectedRole.name,
assignedShiftCount: assignedShifts.length,
status: candidateRoles.rows.every((row) => row.instant_book) ? 'CONFIRMED' : 'PENDING',
assignedShifts,
};
});
}
export async function acceptPendingShift(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const assignment = await requirePendingAssignmentForActor(client, context.tenant.tenantId, payload.shiftId, actor.uid);
await client.query(
`
UPDATE assignments
SET status = 'ACCEPTED',
accepted_at = COALESCE(accepted_at, NOW()),
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[assignment.id, JSON.stringify({ acceptedBy: actor.uid })]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'assignment',
aggregateId: assignment.id,
eventType: 'STAFF_PENDING_SHIFT_ACCEPTED',
actorUserId: actor.uid,
payload,
});
return {
assignmentId: assignment.id,
shiftId: assignment.shift_id,
status: 'ACCEPTED',
};
});
}
export async function declinePendingShift(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const assignment = await requirePendingAssignmentForActor(client, context.tenant.tenantId, payload.shiftId, actor.uid);
await client.query(
`
UPDATE assignments
SET status = 'CANCELLED',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[assignment.id, JSON.stringify({
declinedBy: actor.uid,
declineReason: payload.reason || null,
})]
);
await client.query(
`
UPDATE applications
SET status = 'REJECTED',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE shift_role_id = $1
AND staff_id = $3
AND status IN ('PENDING', 'CONFIRMED')
`,
[assignment.shift_role_id, JSON.stringify({ rejectedBy: actor.uid, reason: payload.reason || null }), assignment.staff_id]
);
await refreshShiftRoleCounts(client, assignment.shift_role_id);
await refreshShiftCounts(client, assignment.shift_id);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'assignment',
aggregateId: assignment.id,
eventType: 'STAFF_PENDING_SHIFT_DECLINED',
actorUserId: actor.uid,
payload,
});
return {
assignmentId: assignment.id,
shiftId: assignment.shift_id,
status: 'DECLINED',
};
});
}
export async function requestShiftSwap(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const assignmentResult = await client.query(
`
SELECT
a.id,
a.tenant_id,
a.business_id,
a.vendor_id,
a.shift_id,
a.shift_role_id,
a.workforce_id,
a.staff_id,
a.status,
a.metadata,
s.starts_at,
s.clock_point_id,
s.title AS shift_title,
sr.role_name,
st.full_name AS staff_name
FROM assignments a
JOIN staffs st ON st.id = a.staff_id
JOIN shifts s ON s.id = a.shift_id
JOIN shift_roles sr ON sr.id = a.shift_role_id
WHERE a.tenant_id = $1
AND a.shift_id = $2
AND st.user_id = $3
ORDER BY a.created_at ASC
LIMIT 1
FOR UPDATE OF a, s, sr
`,
[context.tenant.tenantId, payload.shiftId, actor.uid]
);
if (assignmentResult.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Shift assignment not found for current user', 404, {
shiftId: payload.shiftId,
});
}
const assignment = assignmentResult.rows[0];
if (assignment.status !== 'ACCEPTED') {
throw new AppError('INVALID_SWAP_STATE', 'Only accepted future shifts can be marked for swap', 409, {
shiftId: payload.shiftId,
assignmentStatus: assignment.status,
});
}
const expiresAt = computeSwapExpiry(assignment.starts_at);
if (!expiresAt) {
throw new AppError('SWAP_WINDOW_UNAVAILABLE', 'Shift is too close to start time for a valid swap window', 409, {
shiftId: payload.shiftId,
startsAt: assignment.starts_at,
minimumLeadMinutes: SHIFT_SWAP_MIN_LEAD_MINUTES,
});
}
const existingSwap = await client.query(
`
SELECT id, status, expires_at
FROM shift_swap_requests
WHERE original_assignment_id = $1
AND status = 'OPEN'
ORDER BY created_at DESC
LIMIT 1
FOR UPDATE
`,
[assignment.id]
);
let swapRequestId;
if (existingSwap.rowCount > 0) {
swapRequestId = existingSwap.rows[0].id;
await client.query(
`
UPDATE shift_swap_requests
SET reason = COALESCE($2, reason),
expires_at = $3,
metadata = COALESCE(metadata, '{}'::jsonb) || $4::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
swapRequestId,
payload.reason || null,
expiresAt.toISOString(),
JSON.stringify({
reopenedAt: new Date().toISOString(),
swapRequestedBy: actor.uid,
}),
]
);
} else {
const swapRequestResult = await client.query(
`
INSERT INTO shift_swap_requests (
tenant_id,
business_id,
vendor_id,
shift_id,
shift_role_id,
original_assignment_id,
original_staff_id,
requested_by_user_id,
status,
reason,
expires_at,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'OPEN', $9, $10, $11::jsonb)
RETURNING id
`,
[
context.tenant.tenantId,
assignment.business_id,
assignment.vendor_id,
assignment.shift_id,
assignment.shift_role_id,
assignment.id,
assignment.staff_id,
actor.uid,
payload.reason || null,
expiresAt.toISOString(),
JSON.stringify({
requestedAt: new Date().toISOString(),
requestedBy: actor.uid,
}),
]
);
swapRequestId = swapRequestResult.rows[0].id;
}
await client.query(
`
UPDATE assignments
SET status = 'SWAP_REQUESTED',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[assignment.id, JSON.stringify({
swapRequestedAt: new Date().toISOString(),
swapReason: payload.reason || null,
swapRequestId,
swapExpiresAt: expiresAt.toISOString(),
})]
);
await refreshShiftRoleCounts(client, assignment.shift_role_id);
await refreshShiftCounts(client, assignment.shift_id);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'shift_swap_request',
aggregateId: swapRequestId,
eventType: 'SHIFT_SWAP_REQUESTED',
actorUserId: actor.uid,
payload: {
...payload,
assignmentId: assignment.id,
expiresAt: expiresAt.toISOString(),
},
});
await enqueueHubManagerAlert(client, {
tenantId: context.tenant.tenantId,
businessId: assignment.business_id,
shiftId: assignment.shift_id,
assignmentId: assignment.id,
hubId: assignment.clock_point_id,
notificationType: 'SHIFT_SWAP_REQUESTED',
priority: 'HIGH',
subject: 'Shift swap requested',
body: `${assignment.staff_name || 'A worker'} requested a swap for ${assignment.shift_title || assignment.role_name || 'a shift'}`,
payload: {
swapRequestId,
assignmentId: assignment.id,
shiftId: assignment.shift_id,
expiresAt: expiresAt.toISOString(),
reason: payload.reason || null,
},
dedupeScope: swapRequestId,
});
return {
swapRequestId,
assignmentId: assignment.id,
shiftId: assignment.shift_id,
status: 'SWAP_REQUESTED',
expiresAt: expiresAt.toISOString(),
};
});
}
export async function resolveShiftSwapRequest(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const swapRequest = await requireSwapRequestForUpdate(
client,
context.tenant.tenantId,
context.business.businessId,
payload.swapRequestId
);
if (swapRequest.status !== 'OPEN') {
throw new AppError('INVALID_SWAP_REQUEST_STATE', 'Only open swap requests can be resolved', 409, {
swapRequestId: payload.swapRequestId,
swapRequestStatus: swapRequest.status,
});
}
if (new Date(swapRequest.expires_at).getTime() <= Date.now()) {
throw new AppError('SWAP_REQUEST_EXPIRED', 'The swap request has already expired and must be handled by the expiry worker', 409, {
swapRequestId: payload.swapRequestId,
expiresAt: swapRequest.expires_at,
});
}
const candidate = await requireSwapCandidateApplication(client, swapRequest, payload.applicationId);
await client.query(
`
UPDATE applications
SET status = 'CONFIRMED',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
candidate.id,
JSON.stringify({
selectedForSwapAt: new Date().toISOString(),
selectedForSwapBy: actor.uid,
selectedForSwapRequestId: swapRequest.id,
selectionNote: payload.note || null,
}),
]
);
await rejectOtherApplicationsForSwap(client, {
shiftRoleId: swapRequest.shift_role_id,
selectedApplicationId: candidate.id,
reason: 'Replacement selected for swap request',
actorUid: actor.uid,
});
await client.query(
`
UPDATE assignments
SET status = 'SWAPPED_OUT',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
swapRequest.original_assignment_id,
JSON.stringify({
swappedOutAt: new Date().toISOString(),
swapResolvedBy: actor.uid,
swapRequestId: swapRequest.id,
replacementApplicationId: candidate.id,
}),
]
);
const replacementAssignmentResult = await client.query(
`
INSERT INTO assignments (
tenant_id,
business_id,
vendor_id,
shift_id,
shift_role_id,
workforce_id,
staff_id,
application_id,
status,
assigned_at,
accepted_at,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'ACCEPTED', NOW(), NOW(), $9::jsonb)
RETURNING id, status
`,
[
context.tenant.tenantId,
swapRequest.business_id,
swapRequest.vendor_id,
swapRequest.shift_id,
swapRequest.shift_role_id,
candidate.workforce_id,
candidate.staff_id,
candidate.id,
JSON.stringify({
source: 'swap-resolution',
swapRequestId: swapRequest.id,
originalAssignmentId: swapRequest.original_assignment_id,
resolvedBy: actor.uid,
}),
]
);
await markSwapRequestStatus(client, {
swapRequestId: swapRequest.id,
status: 'RESOLVED',
resolvedByUserId: actor.uid,
selectedApplicationId: candidate.id,
replacementAssignmentId: replacementAssignmentResult.rows[0].id,
metadata: {
resolvedAt: new Date().toISOString(),
resolutionNote: payload.note || null,
},
});
await refreshShiftRoleCounts(client, swapRequest.shift_role_id);
await refreshShiftCounts(client, swapRequest.shift_id);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'shift_swap_request',
aggregateId: swapRequest.id,
eventType: 'SHIFT_SWAP_RESOLVED',
actorUserId: actor.uid,
payload: {
applicationId: candidate.id,
replacementAssignmentId: replacementAssignmentResult.rows[0].id,
note: payload.note || null,
},
});
await enqueueUserAlert(client, {
tenantId: context.tenant.tenantId,
businessId: swapRequest.business_id,
shiftId: swapRequest.shift_id,
assignmentId: swapRequest.original_assignment_id,
recipientUserId: swapRequest.original_staff_user_id,
notificationType: 'SHIFT_SWAP_RESOLVED',
priority: 'HIGH',
subject: 'Swap request resolved',
body: `A replacement has been confirmed for ${swapRequest.shift_title || 'your shift'}`,
payload: {
swapRequestId: swapRequest.id,
replacementAssignmentId: replacementAssignmentResult.rows[0].id,
},
dedupeScope: swapRequest.id,
});
await enqueueUserAlert(client, {
tenantId: context.tenant.tenantId,
businessId: swapRequest.business_id,
shiftId: swapRequest.shift_id,
assignmentId: replacementAssignmentResult.rows[0].id,
recipientUserId: candidate.staff_user_id,
notificationType: 'SHIFT_SWAP_ASSIGNMENT_CONFIRMED',
priority: 'HIGH',
subject: 'You were selected as the shift replacement',
body: `You have been confirmed for ${swapRequest.shift_title || 'a shift'} via swap coverage`,
payload: {
swapRequestId: swapRequest.id,
assignmentId: replacementAssignmentResult.rows[0].id,
},
dedupeScope: replacementAssignmentResult.rows[0].id,
});
return {
swapRequestId: swapRequest.id,
status: 'RESOLVED',
originalAssignmentId: swapRequest.original_assignment_id,
replacementAssignmentId: replacementAssignmentResult.rows[0].id,
applicationId: candidate.id,
};
});
}
export async function cancelShiftSwapRequest(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const swapRequest = await requireSwapRequestForUpdate(
client,
context.tenant.tenantId,
context.business.businessId,
payload.swapRequestId
);
if (swapRequest.status !== 'OPEN') {
throw new AppError('INVALID_SWAP_REQUEST_STATE', 'Only open swap requests can be cancelled', 409, {
swapRequestId: payload.swapRequestId,
swapRequestStatus: swapRequest.status,
});
}
await client.query(
`
UPDATE assignments
SET status = 'ACCEPTED',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
swapRequest.original_assignment_id,
JSON.stringify({
swapCancelledAt: new Date().toISOString(),
swapCancelledBy: actor.uid,
swapCancellationReason: payload.reason || null,
}),
]
);
await rejectOtherApplicationsForSwap(client, {
shiftRoleId: swapRequest.shift_role_id,
selectedApplicationId: null,
reason: payload.reason || 'Swap request cancelled',
actorUid: actor.uid,
});
await markSwapRequestStatus(client, {
swapRequestId: swapRequest.id,
status: 'CANCELLED',
resolvedByUserId: actor.uid,
metadata: {
cancelledAt: new Date().toISOString(),
cancellationReason: payload.reason || null,
},
});
await refreshShiftRoleCounts(client, swapRequest.shift_role_id);
await refreshShiftCounts(client, swapRequest.shift_id);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'shift_swap_request',
aggregateId: swapRequest.id,
eventType: 'SHIFT_SWAP_CANCELLED',
actorUserId: actor.uid,
payload,
});
await enqueueUserAlert(client, {
tenantId: context.tenant.tenantId,
businessId: swapRequest.business_id,
shiftId: swapRequest.shift_id,
assignmentId: swapRequest.original_assignment_id,
recipientUserId: swapRequest.original_staff_user_id,
notificationType: 'SHIFT_SWAP_CANCELLED',
priority: 'NORMAL',
subject: 'Swap request cancelled',
body: `Your swap request for ${swapRequest.shift_title || 'the shift'} was cancelled`,
payload: {
swapRequestId: swapRequest.id,
reason: payload.reason || null,
},
dedupeScope: swapRequest.id,
});
return {
swapRequestId: swapRequest.id,
status: 'CANCELLED',
assignmentId: swapRequest.original_assignment_id,
};
});
}
export async function createDispatchTeamMembership(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
if (payload.effectiveAt && payload.expiresAt && new Date(payload.expiresAt).getTime() <= new Date(payload.effectiveAt).getTime()) {
throw new AppError('VALIDATION_ERROR', 'expiresAt must be after effectiveAt', 400, {
effectiveAt: payload.effectiveAt,
expiresAt: payload.expiresAt,
});
}
if (payload.teamType === 'CERTIFIED_LOCATION' && !payload.hubId) {
throw new AppError('VALIDATION_ERROR', 'hubId is required for CERTIFIED_LOCATION memberships', 400);
}
if (payload.hubId) {
await requireClockPoint(client, context.tenant.tenantId, context.business.businessId, payload.hubId, { forUpdate: true });
}
const staffResult = await client.query(
`
SELECT id
FROM staffs
WHERE tenant_id = $1
AND id = $2
LIMIT 1
FOR UPDATE
`,
[context.tenant.tenantId, payload.staffId]
);
if (staffResult.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Staff profile not found in tenant scope', 404, {
staffId: payload.staffId,
});
}
const existing = await client.query(
`
SELECT id, status
FROM dispatch_team_memberships
WHERE tenant_id = $1
AND business_id = $2
AND staff_id = $3
AND team_type = $4
AND (
($5::uuid IS NULL AND hub_id IS NULL)
OR hub_id = $5
)
LIMIT 1
FOR UPDATE
`,
[
context.tenant.tenantId,
context.business.businessId,
payload.staffId,
payload.teamType,
payload.hubId || null,
]
);
let membershipId;
if (existing.rowCount > 0) {
membershipId = existing.rows[0].id;
await client.query(
`
UPDATE dispatch_team_memberships
SET status = 'ACTIVE',
source = COALESCE($2, source),
reason = COALESCE($3, reason),
effective_at = COALESCE($4::timestamptz, effective_at, NOW()),
expires_at = $5::timestamptz,
metadata = COALESCE(metadata, '{}'::jsonb) || $6::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
membershipId,
payload.source || 'MANUAL',
payload.reason || null,
payload.effectiveAt || null,
payload.expiresAt || null,
JSON.stringify(payload.metadata || {}),
]
);
} else {
const created = await client.query(
`
INSERT INTO dispatch_team_memberships (
tenant_id,
business_id,
hub_id,
staff_id,
team_type,
source,
status,
reason,
effective_at,
expires_at,
created_by_user_id,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6, 'ACTIVE', $7, COALESCE($8::timestamptz, NOW()), $9::timestamptz, $10, $11::jsonb)
RETURNING id
`,
[
context.tenant.tenantId,
context.business.businessId,
payload.hubId || null,
payload.staffId,
payload.teamType,
payload.source || 'MANUAL',
payload.reason || null,
payload.effectiveAt || null,
payload.expiresAt || null,
actor.uid,
JSON.stringify(payload.metadata || {}),
]
);
membershipId = created.rows[0].id;
}
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'dispatch_team_membership',
aggregateId: membershipId,
eventType: 'DISPATCH_TEAM_MEMBERSHIP_UPSERTED',
actorUserId: actor.uid,
payload,
});
return {
membershipId,
staffId: payload.staffId,
teamType: payload.teamType,
hubId: payload.hubId || null,
status: 'ACTIVE',
priority: resolveDispatchPriority(payload.teamType),
};
});
}
export async function removeDispatchTeamMembership(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const existing = await client.query(
`
SELECT id, team_type, staff_id, hub_id
FROM dispatch_team_memberships
WHERE id = $1
AND tenant_id = $2
AND business_id = $3
FOR UPDATE
`,
[payload.membershipId, context.tenant.tenantId, context.business.businessId]
);
if (existing.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Dispatch team membership not found', 404, {
membershipId: payload.membershipId,
});
}
await client.query(
`
UPDATE dispatch_team_memberships
SET status = 'INACTIVE',
reason = COALESCE($2, reason),
expires_at = COALESCE(expires_at, NOW()),
metadata = COALESCE(metadata, '{}'::jsonb) || $3::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
payload.membershipId,
payload.reason || null,
JSON.stringify({
removedAt: new Date().toISOString(),
removedBy: actor.uid,
}),
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'dispatch_team_membership',
aggregateId: payload.membershipId,
eventType: 'DISPATCH_TEAM_MEMBERSHIP_REMOVED',
actorUserId: actor.uid,
payload,
});
return {
membershipId: payload.membershipId,
status: 'INACTIVE',
};
});
}
export async function submitCompletedShiftForApproval(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const assignment = await requireAnyAssignmentForActor(client, context.tenant.tenantId, payload.shiftId, actor.uid);
if (!['CHECKED_OUT', 'COMPLETED'].includes(assignment.status)) {
throw new AppError('INVALID_TIMESHEET_STATE', 'Only completed or checked-out shifts can be submitted for approval', 409, {
shiftId: payload.shiftId,
assignmentStatus: assignment.status,
});
}
const timesheetResult = await client.query(
`
INSERT INTO timesheets (
tenant_id,
assignment_id,
staff_id,
status,
metadata
)
VALUES ($1, $2, $3, 'SUBMITTED', $4::jsonb)
ON CONFLICT (assignment_id) DO UPDATE
SET status = CASE
WHEN timesheets.status IN ('APPROVED', 'PAID') THEN timesheets.status
ELSE 'SUBMITTED'
END,
metadata = COALESCE(timesheets.metadata, '{}'::jsonb) || EXCLUDED.metadata,
updated_at = NOW()
RETURNING id, status, metadata
`,
[
context.tenant.tenantId,
assignment.id,
assignment.staff_id,
JSON.stringify({
submittedAt: new Date().toISOString(),
submittedBy: actor.uid,
submissionNote: payload.note || null,
}),
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'timesheet',
aggregateId: timesheetResult.rows[0].id,
eventType: 'TIMESHEET_SUBMITTED_FOR_APPROVAL',
actorUserId: actor.uid,
payload,
});
return {
assignmentId: assignment.id,
shiftId: assignment.shift_id,
timesheetId: timesheetResult.rows[0].id,
status: timesheetResult.rows[0].status,
submitted: timesheetResult.rows[0].status === 'SUBMITTED',
};
});
}
export async function setupStaffProfile(actor, payload) {
return withTransaction(async (client) => {
const scope = await resolveStaffOnboardingScope(client, actor.uid, payload.tenantId, payload.vendorId);
await ensureActorUser(client, actor, {
email: payload.email ?? actor.email ?? null,
displayName: payload.fullName,
phone: payload.phoneNumber,
metadata: { source: 'staff-profile-setup' },
});
await client.query(
`
INSERT INTO tenant_memberships (tenant_id, user_id, membership_status, base_role, metadata)
VALUES ($1, $2, 'ACTIVE', 'member', '{"source":"staff-profile-setup"}'::jsonb)
ON CONFLICT (tenant_id, user_id) DO UPDATE
SET membership_status = 'ACTIVE',
updated_at = NOW()
`,
[scope.tenantId, actor.uid]
);
await client.query(
`
INSERT INTO vendor_memberships (tenant_id, vendor_id, user_id, membership_status, vendor_role, metadata)
VALUES ($1, $2, $3, 'ACTIVE', 'member', '{"source":"staff-profile-setup"}'::jsonb)
ON CONFLICT (vendor_id, user_id) DO UPDATE
SET membership_status = 'ACTIVE',
updated_at = NOW()
`,
[scope.tenantId, scope.vendorId, actor.uid]
);
const fullName = payload.fullName.trim();
const [firstName, ...lastParts] = fullName.split(/\s+/);
const lastName = lastParts.join(' ');
const metadata = {
bio: payload.bio || null,
firstName,
lastName,
preferredLocations: ensureArray(payload.preferredLocations || []),
maxDistanceMiles: payload.maxDistanceMiles ?? null,
industries: ensureArray(payload.industries || []),
skills: ensureArray(payload.skills || []),
};
const staffResult = await client.query(
`
INSERT INTO staffs (
tenant_id,
user_id,
full_name,
email,
phone,
status,
primary_role,
onboarding_status,
metadata
)
VALUES ($1, $2, $3, $4, $5, 'ACTIVE', $6, 'COMPLETED', $7::jsonb)
ON CONFLICT (tenant_id, user_id) DO UPDATE
SET full_name = EXCLUDED.full_name,
email = COALESCE(EXCLUDED.email, staffs.email),
phone = COALESCE(EXCLUDED.phone, staffs.phone),
primary_role = COALESCE(EXCLUDED.primary_role, staffs.primary_role),
onboarding_status = 'COMPLETED',
metadata = COALESCE(staffs.metadata, '{}'::jsonb) || EXCLUDED.metadata,
updated_at = NOW()
RETURNING id
`,
[
scope.tenantId,
actor.uid,
fullName,
payload.email ?? actor.email ?? null,
normalizePhone(payload.phoneNumber),
payload.primaryRole || ensureArray(payload.skills || [])[0] || 'GENERAL_EVENT_STAFF',
JSON.stringify(metadata),
]
);
const workforceResult = await client.query(
`
INSERT INTO workforce (
tenant_id,
vendor_id,
staff_id,
workforce_number,
employment_type,
status,
metadata
)
VALUES ($1, $2, $3, $4, 'TEMP', 'ACTIVE', '{"source":"staff-profile-setup"}'::jsonb)
ON CONFLICT (vendor_id, staff_id) DO UPDATE
SET status = 'ACTIVE',
updated_at = NOW()
RETURNING id
`,
[
scope.tenantId,
scope.vendorId,
staffResult.rows[0].id,
`WF-${normalizeSlug(firstName).toUpperCase()}-${crypto.randomUUID().slice(0, 8).toUpperCase()}`,
]
);
await insertDomainEvent(client, {
tenantId: scope.tenantId,
aggregateType: 'staff',
aggregateId: staffResult.rows[0].id,
eventType: 'STAFF_PROFILE_SETUP_COMPLETED',
actorUserId: actor.uid,
payload,
});
return {
staffId: staffResult.rows[0].id,
workforceId: workforceResult.rows[0].id,
tenantId: scope.tenantId,
vendorId: scope.vendorId,
completed: true,
};
});
}
export async function updatePersonalInfo(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor, {
email: payload.email ?? actor.email ?? null,
displayName: payload.displayName || null,
phone: payload.phone || null,
});
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const existingMetadata = staff.metadata || {};
const nextMetadata = buildStaffMetadataPatch(existingMetadata, payload);
const fullName = [
payload.firstName || existingMetadata.firstName || staff.full_name.split(' ')[0] || '',
payload.lastName || existingMetadata.lastName || staff.full_name.split(' ').slice(1).join(' ') || '',
].filter(Boolean).join(' ').trim() || staff.full_name;
await client.query(
`
UPDATE staffs
SET full_name = $2,
email = COALESCE($3, email),
phone = COALESCE($4, phone),
metadata = $5::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[staff.id, fullName, payload.email ?? null, normalizePhone(payload.phone) ?? null, JSON.stringify(nextMetadata)]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'staff',
aggregateId: staff.id,
eventType: 'STAFF_PERSONAL_INFO_UPDATED',
actorUserId: actor.uid,
payload,
});
return {
staffId: staff.id,
fullName,
email: payload.email ?? staff.email ?? null,
phone: normalizePhone(payload.phone) ?? staff.phone ?? null,
metadata: nextMetadata,
};
});
}
export async function updateProfileExperience(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const nextMetadata = buildStaffMetadataPatch(staff.metadata || {}, payload);
await client.query(
`
UPDATE staffs
SET metadata = $2::jsonb,
primary_role = COALESCE($3, primary_role),
updated_at = NOW()
WHERE id = $1
`,
[
staff.id,
JSON.stringify(nextMetadata),
payload.primaryRole || null,
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'staff',
aggregateId: staff.id,
eventType: 'STAFF_EXPERIENCE_UPDATED',
actorUserId: actor.uid,
payload,
});
return {
staffId: staff.id,
industries: ensureArray(nextMetadata.industries || []),
skills: ensureArray(nextMetadata.skills || []),
};
});
}
export async function updatePreferredLocations(actor, payload) {
return updatePersonalInfo(actor, {
preferredLocations: payload.preferredLocations,
maxDistanceMiles: payload.maxDistanceMiles,
});
}
export async function createEmergencyContact(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const result = await client.query(
`
INSERT INTO emergency_contacts (
tenant_id,
staff_id,
full_name,
phone,
relationship_type,
is_primary,
metadata
)
VALUES ($1, $2, $3, $4, $5, COALESCE($6, FALSE), $7::jsonb)
RETURNING id
`,
[
context.tenant.tenantId,
staff.id,
payload.fullName,
normalizePhone(payload.phone),
payload.relationshipType,
payload.isPrimary ?? false,
JSON.stringify(payload.metadata || {}),
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'emergency_contact',
aggregateId: result.rows[0].id,
eventType: 'EMERGENCY_CONTACT_CREATED',
actorUserId: actor.uid,
payload,
});
return { contactId: result.rows[0].id, created: true };
});
}
export async function updateEmergencyContact(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const result = await client.query(
`
UPDATE emergency_contacts
SET full_name = COALESCE($2, full_name),
phone = COALESCE($3, phone),
relationship_type = COALESCE($4, relationship_type),
is_primary = COALESCE($5, is_primary),
metadata = COALESCE(metadata, '{}'::jsonb) || $6::jsonb,
updated_at = NOW()
WHERE id = $1
AND tenant_id = $7
AND staff_id = $8
RETURNING id
`,
[
payload.contactId,
payload.fullName || null,
normalizePhone(payload.phone) || null,
payload.relationshipType || null,
payload.isPrimary ?? null,
JSON.stringify(payload.metadata || {}),
context.tenant.tenantId,
staff.id,
]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Emergency contact not found for current staff user', 404, {
contactId: payload.contactId,
});
}
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'emergency_contact',
aggregateId: result.rows[0].id,
eventType: 'EMERGENCY_CONTACT_UPDATED',
actorUserId: actor.uid,
payload,
});
return { contactId: result.rows[0].id, updated: true };
});
}
async function ensureTaxFormDocument(client, tenantId, formType) {
const normalizedName = formType.toUpperCase() === 'I9' ? 'I-9' : 'W-4';
const result = await client.query(
`
INSERT INTO documents (tenant_id, document_type, name, metadata)
VALUES ($1, 'TAX_FORM', $2, '{"required":true}'::jsonb)
ON CONFLICT (tenant_id, document_type, name) DO UPDATE
SET updated_at = NOW()
RETURNING id, name
`,
[tenantId, normalizedName]
);
return result.rows[0];
}
export async function saveTaxFormDraft(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const document = await ensureTaxFormDocument(client, context.tenant.tenantId, payload.formType);
const result = await client.query(
`
INSERT INTO staff_documents (
tenant_id,
staff_id,
document_id,
file_uri,
status,
metadata
)
VALUES ($1, $2, $3, NULL, 'PENDING', $4::jsonb)
ON CONFLICT (staff_id, document_id) DO UPDATE
SET metadata = COALESCE(staff_documents.metadata, '{}'::jsonb) || EXCLUDED.metadata,
updated_at = NOW()
RETURNING id
`,
[
context.tenant.tenantId,
staff.id,
document.id,
JSON.stringify({
formType: document.name,
formStatus: 'IN_PROGRESS',
fields: payload.fields,
lastSavedAt: new Date().toISOString(),
}),
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'staff_document',
aggregateId: result.rows[0].id,
eventType: 'TAX_FORM_DRAFT_SAVED',
actorUserId: actor.uid,
payload,
});
return { staffDocumentId: result.rows[0].id, formType: document.name, status: 'IN_PROGRESS' };
});
}
export async function submitTaxForm(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const document = await ensureTaxFormDocument(client, context.tenant.tenantId, payload.formType);
const result = await client.query(
`
INSERT INTO staff_documents (
tenant_id,
staff_id,
document_id,
file_uri,
status,
metadata
)
VALUES ($1, $2, $3, NULL, 'PENDING', $4::jsonb)
ON CONFLICT (staff_id, document_id) DO UPDATE
SET metadata = COALESCE(staff_documents.metadata, '{}'::jsonb) || EXCLUDED.metadata,
updated_at = NOW()
RETURNING id
`,
[
context.tenant.tenantId,
staff.id,
document.id,
JSON.stringify({
formType: document.name,
formStatus: 'SUBMITTED',
submittedAt: new Date().toISOString(),
fields: payload.fields,
}),
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'staff_document',
aggregateId: result.rows[0].id,
eventType: 'TAX_FORM_SUBMITTED',
actorUserId: actor.uid,
payload,
});
return { staffDocumentId: result.rows[0].id, formType: document.name, status: 'SUBMITTED' };
});
}
export async function addStaffBankAccount(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const existingPrimary = await client.query(
`
SELECT id
FROM accounts
WHERE tenant_id = $1
AND owner_staff_id = $2
AND is_primary = TRUE
LIMIT 1
`,
[context.tenant.tenantId, staff.id]
);
const accountResult = await client.query(
`
INSERT INTO accounts (
tenant_id,
owner_type,
owner_staff_id,
provider_name,
provider_reference,
last4,
is_primary,
metadata
)
VALUES ($1, 'STAFF', $2, $3, $4, $5, $6, $7::jsonb)
RETURNING id, last4, is_primary
`,
[
context.tenant.tenantId,
staff.id,
payload.bankName,
`manual:${payload.routingNumber.slice(-4)}:${payload.accountNumber.slice(-4)}`,
payload.accountNumber.slice(-4),
existingPrimary.rowCount === 0,
JSON.stringify({
accountType: payload.accountType,
routingNumberMasked: `***${payload.routingNumber.slice(-4)}`,
}),
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'account',
aggregateId: accountResult.rows[0].id,
eventType: 'STAFF_BANK_ACCOUNT_ADDED',
actorUserId: actor.uid,
payload: {
accountType: payload.accountType,
bankName: payload.bankName,
},
});
return {
accountId: accountResult.rows[0].id,
last4: accountResult.rows[0].last4,
isPrimary: accountResult.rows[0].is_primary,
};
});
}
export async function updatePrivacyVisibility(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const nextMetadata = buildStaffMetadataPatch(staff.metadata || {}, {
profileVisible: payload.profileVisible,
});
await client.query(
`
UPDATE staffs
SET metadata = $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[staff.id, JSON.stringify(nextMetadata)]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'staff',
aggregateId: staff.id,
eventType: 'STAFF_PRIVACY_UPDATED',
actorUserId: actor.uid,
payload,
});
return {
staffId: staff.id,
profileVisible: Boolean(payload.profileVisible),
};
});
}