feat(api): complete M5 swap and dispatch backend slice

This commit is contained in:
zouantchaw
2026-03-18 10:40:04 +01:00
parent 32f6cd55c8
commit 26a853184f
18 changed files with 2170 additions and 109 deletions

View File

@@ -207,11 +207,38 @@ export const shiftDecisionSchema = z.object({
reason: z.string().max(1000).optional(),
});
export const shiftSwapResolveSchema = z.object({
swapRequestId: z.string().uuid(),
applicationId: z.string().uuid(),
note: z.string().max(2000).optional(),
});
export const shiftSwapCancelSchema = z.object({
swapRequestId: z.string().uuid(),
reason: z.string().max(1000).optional(),
});
export const shiftSubmitApprovalSchema = z.object({
shiftId: z.string().uuid(),
note: z.string().max(2000).optional(),
});
export const dispatchTeamMembershipCreateSchema = z.object({
staffId: z.string().uuid(),
hubId: z.string().uuid().optional(),
teamType: z.enum(['CORE', 'CERTIFIED_LOCATION', 'MARKETPLACE']),
source: z.enum(['MANUAL', 'AUTOMATED', 'SYSTEM']).optional(),
reason: z.string().max(1000).optional(),
effectiveAt: z.string().datetime().optional(),
expiresAt: z.string().datetime().optional(),
metadata: z.record(z.any()).optional(),
});
export const dispatchTeamMembershipDeleteSchema = z.object({
membershipId: z.string().uuid(),
reason: z.string().max(1000).optional(),
});
export const staffClockInSchema = z.object({
assignmentId: z.string().uuid().optional(),
shiftId: z.string().uuid().optional(),

View File

@@ -10,7 +10,9 @@ import {
assignHubManager,
assignHubNfc,
cancelLateWorker,
cancelShiftSwapRequest,
cancelClientOrder,
createDispatchTeamMembership,
createEmergencyContact,
createClientOneTimeOrder,
createClientPermanentOrder,
@@ -24,6 +26,8 @@ import {
rateWorkerFromCoverage,
registerClientPushToken,
registerStaffPushToken,
removeDispatchTeamMembership,
resolveShiftSwapRequest,
requestShiftSwap,
saveTaxFormDraft,
setupStaffProfile,
@@ -55,6 +59,8 @@ import {
clientPermanentOrderSchema,
clientRecurringOrderSchema,
coverageReviewSchema,
dispatchTeamMembershipCreateSchema,
dispatchTeamMembershipDeleteSchema,
emergencyContactCreateSchema,
emergencyContactUpdateSchema,
hubAssignManagerSchema,
@@ -73,6 +79,8 @@ import {
shiftManagerCreateSchema,
shiftApplySchema,
shiftDecisionSchema,
shiftSwapCancelSchema,
shiftSwapResolveSchema,
shiftSubmitApprovalSchema,
staffClockInSchema,
staffClockOutSchema,
@@ -90,7 +98,9 @@ const defaultHandlers = {
assignHubManager,
assignHubNfc,
cancelLateWorker,
cancelShiftSwapRequest,
cancelClientOrder,
createDispatchTeamMembership,
createEmergencyContact,
createClientOneTimeOrder,
createClientPermanentOrder,
@@ -104,6 +114,8 @@ const defaultHandlers = {
rateWorkerFromCoverage,
registerClientPushToken,
registerStaffPushToken,
removeDispatchTeamMembership,
resolveShiftSwapRequest,
requestShiftSwap,
saveTaxFormDraft,
setupStaffProfile,
@@ -301,6 +313,41 @@ export function createMobileCommandsRouter(handlers = defaultHandlers) {
paramShape: (req) => ({ ...req.body, assignmentId: req.params.assignmentId }),
}));
router.post(...mobileCommand('/client/coverage/swap-requests/:swapRequestId/resolve', {
schema: shiftSwapResolveSchema,
policyAction: 'client.coverage.write',
resource: 'shift_swap_request',
handler: handlers.resolveShiftSwapRequest,
paramShape: (req) => ({ ...req.body, swapRequestId: req.params.swapRequestId }),
}));
router.post(...mobileCommand('/client/coverage/swap-requests/:swapRequestId/cancel', {
schema: shiftSwapCancelSchema,
policyAction: 'client.coverage.write',
resource: 'shift_swap_request',
handler: handlers.cancelShiftSwapRequest,
paramShape: (req) => ({ ...req.body, swapRequestId: req.params.swapRequestId }),
}));
router.post(...mobileCommand('/client/coverage/dispatch-teams/memberships', {
schema: dispatchTeamMembershipCreateSchema,
policyAction: 'client.coverage.write',
resource: 'dispatch_team',
handler: handlers.createDispatchTeamMembership,
}));
router.delete(...mobileCommand('/client/coverage/dispatch-teams/memberships/:membershipId', {
schema: dispatchTeamMembershipDeleteSchema,
policyAction: 'client.coverage.write',
resource: 'dispatch_team',
handler: handlers.removeDispatchTeamMembership,
paramShape: (req) => ({
...req.body,
membershipId: req.params.membershipId,
reason: req.body?.reason || req.query.reason,
}),
}));
router.post(...mobileCommand('/staff/profile/setup', {
schema: staffProfileSetupSchema,
policyAction: 'staff.profile.write',

View File

@@ -15,6 +15,19 @@ import {
const MOBILE_CANCELLABLE_ASSIGNMENT_STATUSES = ['ASSIGNED', 'ACCEPTED'];
const MOBILE_CANCELLABLE_APPLICATION_STATUSES = ['PENDING', 'CONFIRMED'];
const DISPATCH_TEAM_PRIORITY = {
CORE: 1,
CERTIFIED_LOCATION: 2,
MARKETPLACE: 3,
};
function parsePositiveIntEnv(name, fallback) {
const parsed = Number.parseInt(`${process.env[name] || fallback}`, 10);
return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback;
}
const SHIFT_SWAP_WINDOW_MINUTES = parsePositiveIntEnv('SHIFT_SWAP_WINDOW_MINUTES', 120);
const SHIFT_SWAP_MIN_LEAD_MINUTES = parsePositiveIntEnv('SHIFT_SWAP_MIN_LEAD_MINUTES', 15);
function toIsoOrNull(value) {
return value ? new Date(value).toISOString() : null;
@@ -37,6 +50,21 @@ function ensureArray(value) {
return Array.isArray(value) ? value : [];
}
function resolveDispatchPriority(teamType) {
return DISPATCH_TEAM_PRIORITY[teamType] || DISPATCH_TEAM_PRIORITY.MARKETPLACE;
}
function computeSwapExpiry(startsAt) {
const shiftStart = new Date(startsAt).getTime();
if (!Number.isFinite(shiftStart)) return null;
const now = Date.now();
const latestByWindow = now + (SHIFT_SWAP_WINDOW_MINUTES * 60 * 1000);
const latestByShiftLead = shiftStart - (SHIFT_SWAP_MIN_LEAD_MINUTES * 60 * 1000);
const expiresAtMs = Math.min(latestByWindow, latestByShiftLead);
if (!Number.isFinite(expiresAtMs) || expiresAtMs <= now) return null;
return new Date(expiresAtMs);
}
async function ensureStaffNotBlockedByBusiness(client, { tenantId, businessId, staffId }) {
const blocked = await client.query(
`
@@ -849,6 +877,7 @@ async function requireShiftRoleForStaffApply(client, tenantId, shiftId, roleId,
s.tenant_id,
s.business_id,
s.vendor_id,
s.clock_point_id,
s.status AS shift_status,
s.starts_at,
s.ends_at,
@@ -857,13 +886,26 @@ async function requireShiftRoleForStaffApply(client, tenantId, shiftId, roleId,
sr.role_name,
sr.workers_needed,
sr.assigned_count,
sr.pay_rate_cents
sr.pay_rate_cents,
swap_request.id AS swap_request_id
FROM shifts s
JOIN shift_roles sr ON sr.shift_id = s.id
LEFT JOIN LATERAL (
SELECT id
FROM shift_swap_requests
WHERE shift_role_id = sr.id
AND status = 'OPEN'
AND expires_at > NOW()
ORDER BY created_at DESC
LIMIT 1
) swap_request ON TRUE
WHERE s.id = $1
AND s.tenant_id = $2
AND ($3::uuid IS NULL OR sr.id = $3)
AND s.status IN ('OPEN', 'PENDING_CONFIRMATION', 'ASSIGNED')
AND (
s.status IN ('OPEN', 'PENDING_CONFIRMATION')
OR (s.status = 'ASSIGNED' AND swap_request.id IS NOT NULL)
)
AND NOT EXISTS (
SELECT 1
FROM applications a
@@ -887,6 +929,217 @@ async function requireShiftRoleForStaffApply(client, tenantId, shiftId, roleId,
return result.rows[0];
}
async function loadDispatchMembership(client, {
tenantId,
businessId,
hubId,
staffId,
}) {
const result = await client.query(
`
SELECT
dtm.id,
dtm.team_type,
dtm.hub_id,
dtm.source,
dtm.effective_at,
dtm.expires_at
FROM dispatch_team_memberships dtm
WHERE dtm.tenant_id = $1
AND dtm.business_id = $2
AND dtm.staff_id = $3
AND dtm.status = 'ACTIVE'
AND dtm.effective_at <= NOW()
AND (dtm.expires_at IS NULL OR dtm.expires_at > NOW())
AND (dtm.hub_id IS NULL OR dtm.hub_id = $4)
ORDER BY
CASE dtm.team_type
WHEN 'CORE' THEN 1
WHEN 'CERTIFIED_LOCATION' THEN 2
ELSE 3
END ASC,
CASE WHEN dtm.hub_id = $4 THEN 0 ELSE 1 END ASC,
dtm.created_at ASC
LIMIT 1
`,
[tenantId, businessId, staffId, hubId || null]
);
if (result.rowCount === 0) {
return {
membershipId: null,
teamType: 'MARKETPLACE',
priority: resolveDispatchPriority('MARKETPLACE'),
source: 'SYSTEM',
scopedHubId: null,
};
}
return {
membershipId: result.rows[0].id,
teamType: result.rows[0].team_type,
priority: resolveDispatchPriority(result.rows[0].team_type),
source: result.rows[0].source,
scopedHubId: result.rows[0].hub_id,
};
}
async function requireSwapRequestForUpdate(client, tenantId, businessId, swapRequestId) {
const result = await client.query(
`
SELECT
srq.id,
srq.tenant_id,
srq.business_id,
srq.vendor_id,
srq.shift_id,
srq.shift_role_id,
srq.original_assignment_id,
srq.original_staff_id,
srq.requested_by_user_id,
srq.status,
srq.reason,
srq.expires_at,
srq.metadata,
a.status AS assignment_status,
a.application_id AS original_application_id,
s.clock_point_id,
s.starts_at,
s.ends_at,
s.title AS shift_title,
sr.role_name,
sr.role_code,
st.full_name AS original_staff_name,
st.user_id AS original_staff_user_id
FROM shift_swap_requests srq
JOIN assignments a ON a.id = srq.original_assignment_id
JOIN shifts s ON s.id = srq.shift_id
JOIN shift_roles sr ON sr.id = srq.shift_role_id
JOIN staffs st ON st.id = srq.original_staff_id
WHERE srq.id = $1
AND srq.tenant_id = $2
AND srq.business_id = $3
FOR UPDATE OF srq, a, s, sr
`,
[swapRequestId, tenantId, businessId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Shift swap request not found in business scope', 404, {
swapRequestId,
businessId,
});
}
return result.rows[0];
}
async function requireSwapCandidateApplication(client, swapRequest, applicationId) {
const result = await client.query(
`
SELECT
app.id,
app.staff_id,
app.status,
app.shift_id,
app.shift_role_id,
app.metadata,
st.full_name AS staff_name,
st.user_id AS staff_user_id,
w.id AS workforce_id
FROM applications app
JOIN staffs st ON st.id = app.staff_id
LEFT JOIN workforce w ON w.staff_id = st.id AND w.status = 'ACTIVE'
WHERE app.id = $1
AND app.shift_role_id = $2
AND app.shift_id = $3
FOR UPDATE OF app
`,
[applicationId, swapRequest.shift_role_id, swapRequest.shift_id]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Swap candidate application not found for this shift', 404, {
applicationId,
swapRequestId: swapRequest.id,
});
}
const application = result.rows[0];
if (!['PENDING', 'CONFIRMED'].includes(application.status)) {
throw new AppError('INVALID_SWAP_APPLICATION_STATE', 'Swap candidate must be pending or confirmed', 409, {
applicationId,
applicationStatus: application.status,
});
}
if (application.staff_id === swapRequest.original_staff_id) {
throw new AppError('INVALID_SWAP_APPLICATION', 'Original staff cannot be selected as their own replacement', 409, {
applicationId,
swapRequestId: swapRequest.id,
});
}
return application;
}
async function rejectOtherApplicationsForSwap(client, {
shiftRoleId,
selectedApplicationId = null,
reason,
actorUid,
}) {
await client.query(
`
UPDATE applications
SET status = 'REJECTED',
metadata = COALESCE(metadata, '{}'::jsonb) || $3::jsonb,
updated_at = NOW()
WHERE shift_role_id = $1
AND status IN ('PENDING', 'CONFIRMED')
AND ($2::uuid IS NULL OR id <> $2)
`,
[
shiftRoleId,
selectedApplicationId,
JSON.stringify({
rejectedBy: actorUid || 'system',
rejectionReason: reason,
rejectedAt: new Date().toISOString(),
}),
]
);
}
async function markSwapRequestStatus(client, {
swapRequestId,
status,
resolvedByUserId = null,
selectedApplicationId = null,
replacementAssignmentId = null,
metadata = {},
}) {
await client.query(
`
UPDATE shift_swap_requests
SET status = $2,
resolved_at = CASE WHEN $2 IN ('RESOLVED', 'CANCELLED', 'EXPIRED', 'AUTO_CANCELLED') THEN NOW() ELSE resolved_at END,
resolved_by_user_id = COALESCE($3, resolved_by_user_id),
selected_application_id = COALESCE($4, selected_application_id),
replacement_assignment_id = COALESCE($5, replacement_assignment_id),
metadata = COALESCE(metadata, '{}'::jsonb) || $6::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
swapRequestId,
status,
resolvedByUserId,
selectedApplicationId,
replacementAssignmentId,
JSON.stringify(metadata || {}),
]
);
}
async function requirePendingAssignmentForActor(client, tenantId, shiftId, actorUid) {
const result = await client.query(
`
@@ -2499,6 +2752,12 @@ export async function applyForShift(actor, payload) {
await ensureActorUser(client, actor);
const staff = await requireStaffByActor(client, context.tenant.tenantId, actor.uid);
const shiftRole = await requireShiftRoleForStaffApply(client, context.tenant.tenantId, payload.shiftId, payload.roleId, staff.id);
const dispatchMembership = await loadDispatchMembership(client, {
tenantId: context.tenant.tenantId,
businessId: shiftRole.business_id,
hubId: shiftRole.clock_point_id,
staffId: staff.id,
});
await ensureStaffNotBlockedByBusiness(client, {
tenantId: context.tenant.tenantId,
businessId: shiftRole.business_id,
@@ -2551,6 +2810,10 @@ export async function applyForShift(actor, payload) {
JSON.stringify({
appliedBy: actor.uid,
instantBookRequested: payload.instantBook === true,
dispatchTeamType: dispatchMembership.teamType,
dispatchPriority: dispatchMembership.priority,
dispatchTeamMembershipId: dispatchMembership.membershipId,
dispatchTeamScopeHubId: dispatchMembership.scopedHubId,
}),
]
);
@@ -2704,13 +2967,134 @@ export async function requestShiftSwap(actor, payload) {
const context = await requireStaffContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const assignment = await requireAnyAssignmentForActor(client, context.tenant.tenantId, payload.shiftId, actor.uid);
if (!['ACCEPTED', 'CHECKED_IN', 'CHECKED_OUT'].includes(assignment.status)) {
throw new AppError('INVALID_SWAP_STATE', 'Only accepted or worked shifts can be marked for swap', 409, {
const assignmentResult = await client.query(
`
SELECT
a.id,
a.tenant_id,
a.business_id,
a.vendor_id,
a.shift_id,
a.shift_role_id,
a.workforce_id,
a.staff_id,
a.status,
a.metadata,
s.starts_at,
s.clock_point_id,
s.title AS shift_title,
sr.role_name,
st.full_name AS staff_name
FROM assignments a
JOIN staffs st ON st.id = a.staff_id
JOIN shifts s ON s.id = a.shift_id
JOIN shift_roles sr ON sr.id = a.shift_role_id
WHERE a.tenant_id = $1
AND a.shift_id = $2
AND st.user_id = $3
ORDER BY a.created_at ASC
LIMIT 1
FOR UPDATE OF a, s, sr
`,
[context.tenant.tenantId, payload.shiftId, actor.uid]
);
if (assignmentResult.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Shift assignment not found for current user', 404, {
shiftId: payload.shiftId,
});
}
const assignment = assignmentResult.rows[0];
if (assignment.status !== 'ACCEPTED') {
throw new AppError('INVALID_SWAP_STATE', 'Only accepted future shifts can be marked for swap', 409, {
shiftId: payload.shiftId,
assignmentStatus: assignment.status,
});
}
const expiresAt = computeSwapExpiry(assignment.starts_at);
if (!expiresAt) {
throw new AppError('SWAP_WINDOW_UNAVAILABLE', 'Shift is too close to start time for a valid swap window', 409, {
shiftId: payload.shiftId,
startsAt: assignment.starts_at,
minimumLeadMinutes: SHIFT_SWAP_MIN_LEAD_MINUTES,
});
}
const existingSwap = await client.query(
`
SELECT id, status, expires_at
FROM shift_swap_requests
WHERE original_assignment_id = $1
AND status = 'OPEN'
ORDER BY created_at DESC
LIMIT 1
FOR UPDATE
`,
[assignment.id]
);
let swapRequestId;
if (existingSwap.rowCount > 0) {
swapRequestId = existingSwap.rows[0].id;
await client.query(
`
UPDATE shift_swap_requests
SET reason = COALESCE($2, reason),
expires_at = $3,
metadata = COALESCE(metadata, '{}'::jsonb) || $4::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
swapRequestId,
payload.reason || null,
expiresAt.toISOString(),
JSON.stringify({
reopenedAt: new Date().toISOString(),
swapRequestedBy: actor.uid,
}),
]
);
} else {
const swapRequestResult = await client.query(
`
INSERT INTO shift_swap_requests (
tenant_id,
business_id,
vendor_id,
shift_id,
shift_role_id,
original_assignment_id,
original_staff_id,
requested_by_user_id,
status,
reason,
expires_at,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'OPEN', $9, $10, $11::jsonb)
RETURNING id
`,
[
context.tenant.tenantId,
assignment.business_id,
assignment.vendor_id,
assignment.shift_id,
assignment.shift_role_id,
assignment.id,
assignment.staff_id,
actor.uid,
payload.reason || null,
expiresAt.toISOString(),
JSON.stringify({
requestedAt: new Date().toISOString(),
requestedBy: actor.uid,
}),
]
);
swapRequestId = swapRequestResult.rows[0].id;
}
await client.query(
`
UPDATE assignments
@@ -2722,22 +3106,516 @@ export async function requestShiftSwap(actor, payload) {
[assignment.id, JSON.stringify({
swapRequestedAt: new Date().toISOString(),
swapReason: payload.reason || null,
swapRequestId,
swapExpiresAt: expiresAt.toISOString(),
})]
);
await refreshShiftRoleCounts(client, assignment.shift_role_id);
await refreshShiftCounts(client, assignment.shift_id);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'assignment',
aggregateId: assignment.id,
aggregateType: 'shift_swap_request',
aggregateId: swapRequestId,
eventType: 'SHIFT_SWAP_REQUESTED',
actorUserId: actor.uid,
payload,
payload: {
...payload,
assignmentId: assignment.id,
expiresAt: expiresAt.toISOString(),
},
});
await enqueueHubManagerAlert(client, {
tenantId: context.tenant.tenantId,
businessId: assignment.business_id,
shiftId: assignment.shift_id,
assignmentId: assignment.id,
hubId: assignment.clock_point_id,
notificationType: 'SHIFT_SWAP_REQUESTED',
priority: 'HIGH',
subject: 'Shift swap requested',
body: `${assignment.staff_name || 'A worker'} requested a swap for ${assignment.shift_title || assignment.role_name || 'a shift'}`,
payload: {
swapRequestId,
assignmentId: assignment.id,
shiftId: assignment.shift_id,
expiresAt: expiresAt.toISOString(),
reason: payload.reason || null,
},
dedupeScope: swapRequestId,
});
return {
swapRequestId,
assignmentId: assignment.id,
shiftId: assignment.shift_id,
status: 'SWAP_REQUESTED',
expiresAt: expiresAt.toISOString(),
};
});
}
export async function resolveShiftSwapRequest(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const swapRequest = await requireSwapRequestForUpdate(
client,
context.tenant.tenantId,
context.business.businessId,
payload.swapRequestId
);
if (swapRequest.status !== 'OPEN') {
throw new AppError('INVALID_SWAP_REQUEST_STATE', 'Only open swap requests can be resolved', 409, {
swapRequestId: payload.swapRequestId,
swapRequestStatus: swapRequest.status,
});
}
if (new Date(swapRequest.expires_at).getTime() <= Date.now()) {
throw new AppError('SWAP_REQUEST_EXPIRED', 'The swap request has already expired and must be handled by the expiry worker', 409, {
swapRequestId: payload.swapRequestId,
expiresAt: swapRequest.expires_at,
});
}
const candidate = await requireSwapCandidateApplication(client, swapRequest, payload.applicationId);
await client.query(
`
UPDATE applications
SET status = 'CONFIRMED',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
candidate.id,
JSON.stringify({
selectedForSwapAt: new Date().toISOString(),
selectedForSwapBy: actor.uid,
selectedForSwapRequestId: swapRequest.id,
selectionNote: payload.note || null,
}),
]
);
await rejectOtherApplicationsForSwap(client, {
shiftRoleId: swapRequest.shift_role_id,
selectedApplicationId: candidate.id,
reason: 'Replacement selected for swap request',
actorUid: actor.uid,
});
await client.query(
`
UPDATE assignments
SET status = 'SWAPPED_OUT',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
swapRequest.original_assignment_id,
JSON.stringify({
swappedOutAt: new Date().toISOString(),
swapResolvedBy: actor.uid,
swapRequestId: swapRequest.id,
replacementApplicationId: candidate.id,
}),
]
);
const replacementAssignmentResult = await client.query(
`
INSERT INTO assignments (
tenant_id,
business_id,
vendor_id,
shift_id,
shift_role_id,
workforce_id,
staff_id,
application_id,
status,
assigned_at,
accepted_at,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'ACCEPTED', NOW(), NOW(), $9::jsonb)
RETURNING id, status
`,
[
context.tenant.tenantId,
swapRequest.business_id,
swapRequest.vendor_id,
swapRequest.shift_id,
swapRequest.shift_role_id,
candidate.workforce_id,
candidate.staff_id,
candidate.id,
JSON.stringify({
source: 'swap-resolution',
swapRequestId: swapRequest.id,
originalAssignmentId: swapRequest.original_assignment_id,
resolvedBy: actor.uid,
}),
]
);
await markSwapRequestStatus(client, {
swapRequestId: swapRequest.id,
status: 'RESOLVED',
resolvedByUserId: actor.uid,
selectedApplicationId: candidate.id,
replacementAssignmentId: replacementAssignmentResult.rows[0].id,
metadata: {
resolvedAt: new Date().toISOString(),
resolutionNote: payload.note || null,
},
});
await refreshShiftRoleCounts(client, swapRequest.shift_role_id);
await refreshShiftCounts(client, swapRequest.shift_id);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'shift_swap_request',
aggregateId: swapRequest.id,
eventType: 'SHIFT_SWAP_RESOLVED',
actorUserId: actor.uid,
payload: {
applicationId: candidate.id,
replacementAssignmentId: replacementAssignmentResult.rows[0].id,
note: payload.note || null,
},
});
await enqueueUserAlert(client, {
tenantId: context.tenant.tenantId,
businessId: swapRequest.business_id,
shiftId: swapRequest.shift_id,
assignmentId: swapRequest.original_assignment_id,
recipientUserId: swapRequest.original_staff_user_id,
notificationType: 'SHIFT_SWAP_RESOLVED',
priority: 'HIGH',
subject: 'Swap request resolved',
body: `A replacement has been confirmed for ${swapRequest.shift_title || 'your shift'}`,
payload: {
swapRequestId: swapRequest.id,
replacementAssignmentId: replacementAssignmentResult.rows[0].id,
},
dedupeScope: swapRequest.id,
});
await enqueueUserAlert(client, {
tenantId: context.tenant.tenantId,
businessId: swapRequest.business_id,
shiftId: swapRequest.shift_id,
assignmentId: replacementAssignmentResult.rows[0].id,
recipientUserId: candidate.staff_user_id,
notificationType: 'SHIFT_SWAP_ASSIGNMENT_CONFIRMED',
priority: 'HIGH',
subject: 'You were selected as the shift replacement',
body: `You have been confirmed for ${swapRequest.shift_title || 'a shift'} via swap coverage`,
payload: {
swapRequestId: swapRequest.id,
assignmentId: replacementAssignmentResult.rows[0].id,
},
dedupeScope: replacementAssignmentResult.rows[0].id,
});
return {
swapRequestId: swapRequest.id,
status: 'RESOLVED',
originalAssignmentId: swapRequest.original_assignment_id,
replacementAssignmentId: replacementAssignmentResult.rows[0].id,
applicationId: candidate.id,
};
});
}
export async function cancelShiftSwapRequest(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const swapRequest = await requireSwapRequestForUpdate(
client,
context.tenant.tenantId,
context.business.businessId,
payload.swapRequestId
);
if (swapRequest.status !== 'OPEN') {
throw new AppError('INVALID_SWAP_REQUEST_STATE', 'Only open swap requests can be cancelled', 409, {
swapRequestId: payload.swapRequestId,
swapRequestStatus: swapRequest.status,
});
}
await client.query(
`
UPDATE assignments
SET status = 'ACCEPTED',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
swapRequest.original_assignment_id,
JSON.stringify({
swapCancelledAt: new Date().toISOString(),
swapCancelledBy: actor.uid,
swapCancellationReason: payload.reason || null,
}),
]
);
await rejectOtherApplicationsForSwap(client, {
shiftRoleId: swapRequest.shift_role_id,
selectedApplicationId: null,
reason: payload.reason || 'Swap request cancelled',
actorUid: actor.uid,
});
await markSwapRequestStatus(client, {
swapRequestId: swapRequest.id,
status: 'CANCELLED',
resolvedByUserId: actor.uid,
metadata: {
cancelledAt: new Date().toISOString(),
cancellationReason: payload.reason || null,
},
});
await refreshShiftRoleCounts(client, swapRequest.shift_role_id);
await refreshShiftCounts(client, swapRequest.shift_id);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'shift_swap_request',
aggregateId: swapRequest.id,
eventType: 'SHIFT_SWAP_CANCELLED',
actorUserId: actor.uid,
payload,
});
await enqueueUserAlert(client, {
tenantId: context.tenant.tenantId,
businessId: swapRequest.business_id,
shiftId: swapRequest.shift_id,
assignmentId: swapRequest.original_assignment_id,
recipientUserId: swapRequest.original_staff_user_id,
notificationType: 'SHIFT_SWAP_CANCELLED',
priority: 'NORMAL',
subject: 'Swap request cancelled',
body: `Your swap request for ${swapRequest.shift_title || 'the shift'} was cancelled`,
payload: {
swapRequestId: swapRequest.id,
reason: payload.reason || null,
},
dedupeScope: swapRequest.id,
});
return {
swapRequestId: swapRequest.id,
status: 'CANCELLED',
assignmentId: swapRequest.original_assignment_id,
};
});
}
export async function createDispatchTeamMembership(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
if (payload.effectiveAt && payload.expiresAt && new Date(payload.expiresAt).getTime() <= new Date(payload.effectiveAt).getTime()) {
throw new AppError('VALIDATION_ERROR', 'expiresAt must be after effectiveAt', 400, {
effectiveAt: payload.effectiveAt,
expiresAt: payload.expiresAt,
});
}
if (payload.teamType === 'CERTIFIED_LOCATION' && !payload.hubId) {
throw new AppError('VALIDATION_ERROR', 'hubId is required for CERTIFIED_LOCATION memberships', 400);
}
if (payload.hubId) {
await requireClockPoint(client, context.tenant.tenantId, context.business.businessId, payload.hubId, { forUpdate: true });
}
const staffResult = await client.query(
`
SELECT id
FROM staffs
WHERE tenant_id = $1
AND id = $2
LIMIT 1
FOR UPDATE
`,
[context.tenant.tenantId, payload.staffId]
);
if (staffResult.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Staff profile not found in tenant scope', 404, {
staffId: payload.staffId,
});
}
const existing = await client.query(
`
SELECT id, status
FROM dispatch_team_memberships
WHERE tenant_id = $1
AND business_id = $2
AND staff_id = $3
AND team_type = $4
AND (
($5::uuid IS NULL AND hub_id IS NULL)
OR hub_id = $5
)
LIMIT 1
FOR UPDATE
`,
[
context.tenant.tenantId,
context.business.businessId,
payload.staffId,
payload.teamType,
payload.hubId || null,
]
);
let membershipId;
if (existing.rowCount > 0) {
membershipId = existing.rows[0].id;
await client.query(
`
UPDATE dispatch_team_memberships
SET status = 'ACTIVE',
source = COALESCE($2, source),
reason = COALESCE($3, reason),
effective_at = COALESCE($4::timestamptz, effective_at, NOW()),
expires_at = $5::timestamptz,
metadata = COALESCE(metadata, '{}'::jsonb) || $6::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
membershipId,
payload.source || 'MANUAL',
payload.reason || null,
payload.effectiveAt || null,
payload.expiresAt || null,
JSON.stringify(payload.metadata || {}),
]
);
} else {
const created = await client.query(
`
INSERT INTO dispatch_team_memberships (
tenant_id,
business_id,
hub_id,
staff_id,
team_type,
source,
status,
reason,
effective_at,
expires_at,
created_by_user_id,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6, 'ACTIVE', $7, COALESCE($8::timestamptz, NOW()), $9::timestamptz, $10, $11::jsonb)
RETURNING id
`,
[
context.tenant.tenantId,
context.business.businessId,
payload.hubId || null,
payload.staffId,
payload.teamType,
payload.source || 'MANUAL',
payload.reason || null,
payload.effectiveAt || null,
payload.expiresAt || null,
actor.uid,
JSON.stringify(payload.metadata || {}),
]
);
membershipId = created.rows[0].id;
}
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'dispatch_team_membership',
aggregateId: membershipId,
eventType: 'DISPATCH_TEAM_MEMBERSHIP_UPSERTED',
actorUserId: actor.uid,
payload,
});
return {
membershipId,
staffId: payload.staffId,
teamType: payload.teamType,
hubId: payload.hubId || null,
status: 'ACTIVE',
priority: resolveDispatchPriority(payload.teamType),
};
});
}
export async function removeDispatchTeamMembership(actor, payload) {
const context = await requireClientContext(actor.uid);
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const existing = await client.query(
`
SELECT id, team_type, staff_id, hub_id
FROM dispatch_team_memberships
WHERE id = $1
AND tenant_id = $2
AND business_id = $3
FOR UPDATE
`,
[payload.membershipId, context.tenant.tenantId, context.business.businessId]
);
if (existing.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Dispatch team membership not found', 404, {
membershipId: payload.membershipId,
});
}
await client.query(
`
UPDATE dispatch_team_memberships
SET status = 'INACTIVE',
reason = COALESCE($2, reason),
expires_at = COALESCE(expires_at, NOW()),
metadata = COALESCE(metadata, '{}'::jsonb) || $3::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
payload.membershipId,
payload.reason || null,
JSON.stringify({
removedAt: new Date().toISOString(),
removedBy: actor.uid,
}),
]
);
await insertDomainEvent(client, {
tenantId: context.tenant.tenantId,
aggregateType: 'dispatch_team_membership',
aggregateId: payload.membershipId,
eventType: 'DISPATCH_TEAM_MEMBERSHIP_REMOVED',
actorUserId: actor.uid,
payload,
});
return {
membershipId: payload.membershipId,
status: 'INACTIVE',
};
});
}

View File

@@ -1,5 +1,5 @@
import { query, withTransaction } from './db.js';
import { enqueueNotification } from './notification-outbox.js';
import { enqueueHubManagerAlert, enqueueNotification, enqueueUserAlert } from './notification-outbox.js';
import {
markPushTokenInvalid,
resolveNotificationTargetTokens,
@@ -28,6 +28,82 @@ export function computeRetryDelayMinutes(attemptNumber) {
return Math.min(5 * (2 ** Math.max(attemptNumber - 1, 0)), 60);
}
async function refreshShiftRoleCounts(client, shiftRoleId) {
await client.query(
`
UPDATE shift_roles sr
SET assigned_count = counts.assigned_count,
updated_at = NOW()
FROM (
SELECT $1::uuid AS shift_role_id,
COUNT(*) FILTER (
WHERE status IN ('ASSIGNED', 'ACCEPTED', 'SWAP_REQUESTED', 'CHECKED_IN', 'CHECKED_OUT', 'COMPLETED')
)::INTEGER AS assigned_count
FROM assignments
WHERE shift_role_id = $1
) counts
WHERE sr.id = counts.shift_role_id
`,
[shiftRoleId]
);
}
async function refreshShiftCounts(client, shiftId) {
await client.query(
`
UPDATE shifts s
SET assigned_workers = counts.assigned_workers,
updated_at = NOW()
FROM (
SELECT $1::uuid AS shift_id,
COUNT(*) FILTER (
WHERE status IN ('ASSIGNED', 'ACCEPTED', 'SWAP_REQUESTED', 'CHECKED_IN', 'CHECKED_OUT', 'COMPLETED')
)::INTEGER AS assigned_workers
FROM assignments
WHERE shift_id = $1
) counts
WHERE s.id = counts.shift_id
`,
[shiftId]
);
}
async function insertDomainEvent(client, {
tenantId,
aggregateType,
aggregateId,
eventType,
actorUserId = null,
payload = {},
}) {
await client.query(
`
INSERT INTO domain_events (
tenant_id,
aggregate_type,
aggregate_id,
sequence,
event_type,
actor_user_id,
payload
)
SELECT
$1,
$2,
$3,
COALESCE(MAX(sequence) + 1, 1),
$4,
$5,
$6::jsonb
FROM domain_events
WHERE tenant_id = $1
AND aggregate_type = $2
AND aggregate_id = $3
`,
[tenantId, aggregateType, aggregateId, eventType, actorUserId, JSON.stringify(payload || {})]
);
}
async function recordDeliveryAttempt(client, {
notificationId,
devicePushTokenId = null,
@@ -226,6 +302,183 @@ async function enqueueDueShiftReminders() {
return { enqueued };
}
async function claimExpiredSwapRequests(limit) {
return withTransaction(async (client) => {
const claimed = await client.query(
`
WITH due AS (
SELECT id
FROM shift_swap_requests
WHERE (
(status = 'OPEN' AND expires_at <= NOW())
OR (status = 'EXPIRED' AND updated_at <= NOW() - INTERVAL '2 minutes')
)
ORDER BY expires_at ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
)
UPDATE shift_swap_requests srq
SET status = 'EXPIRED',
updated_at = NOW()
FROM due
WHERE srq.id = due.id
RETURNING srq.id
`,
[limit]
);
if (claimed.rowCount === 0) {
return [];
}
const details = await client.query(
`
SELECT
srq.id,
srq.tenant_id,
srq.business_id,
srq.shift_id,
srq.shift_role_id,
srq.original_assignment_id,
srq.original_staff_id,
srq.reason,
srq.expires_at,
s.clock_point_id,
s.title AS shift_title,
st.user_id AS original_staff_user_id
FROM shift_swap_requests srq
JOIN shifts s ON s.id = srq.shift_id
JOIN staffs st ON st.id = srq.original_staff_id
WHERE srq.id = ANY($1::uuid[])
`,
[claimed.rows.map((row) => row.id)]
);
return details.rows;
});
}
async function processExpiredSwapRequests({
limit = parseIntEnv('SHIFT_SWAP_AUTO_CANCEL_BATCH_LIMIT', 25),
} = {}) {
const claimed = await claimExpiredSwapRequests(limit);
let autoCancelled = 0;
for (const swapRequest of claimed) {
await withTransaction(async (client) => {
await client.query(
`
UPDATE assignments
SET status = 'CANCELLED',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
AND status IN ('SWAP_REQUESTED', 'ACCEPTED')
`,
[
swapRequest.original_assignment_id,
JSON.stringify({
swapAutoCancelledAt: new Date().toISOString(),
swapAutoCancelledReason: 'Swap window expired without replacement',
}),
]
);
await client.query(
`
UPDATE applications
SET status = 'REJECTED',
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE shift_role_id = $1
AND status IN ('PENDING', 'CONFIRMED')
`,
[
swapRequest.shift_role_id,
JSON.stringify({
rejectedBy: 'system',
rejectionReason: 'Swap request expired',
rejectedAt: new Date().toISOString(),
}),
]
);
await client.query(
`
UPDATE shift_swap_requests
SET status = 'AUTO_CANCELLED',
resolved_at = NOW(),
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[
swapRequest.id,
JSON.stringify({
autoCancelledAt: new Date().toISOString(),
autoCancelledReason: 'Swap window expired without replacement',
}),
]
);
await refreshShiftRoleCounts(client, swapRequest.shift_role_id);
await refreshShiftCounts(client, swapRequest.shift_id);
await insertDomainEvent(client, {
tenantId: swapRequest.tenant_id,
aggregateType: 'shift_swap_request',
aggregateId: swapRequest.id,
eventType: 'SHIFT_SWAP_AUTO_CANCELLED',
actorUserId: null,
payload: {
reason: swapRequest.reason || null,
expiredAt: swapRequest.expires_at,
},
});
await enqueueHubManagerAlert(client, {
tenantId: swapRequest.tenant_id,
businessId: swapRequest.business_id,
shiftId: swapRequest.shift_id,
assignmentId: swapRequest.original_assignment_id,
hubId: swapRequest.clock_point_id,
notificationType: 'SHIFT_SWAP_AUTO_CANCELLED',
priority: 'CRITICAL',
subject: 'Shift swap expired without coverage',
body: `${swapRequest.shift_title || 'A shift'} lost its assigned worker after the two-hour swap window expired`,
payload: {
swapRequestId: swapRequest.id,
assignmentId: swapRequest.original_assignment_id,
shiftId: swapRequest.shift_id,
},
dedupeScope: swapRequest.id,
});
await enqueueUserAlert(client, {
tenantId: swapRequest.tenant_id,
businessId: swapRequest.business_id,
shiftId: swapRequest.shift_id,
assignmentId: swapRequest.original_assignment_id,
recipientUserId: swapRequest.original_staff_user_id,
notificationType: 'SHIFT_SWAP_AUTO_CANCELLED',
priority: 'HIGH',
subject: 'Shift swap expired',
body: 'Your shift swap request expired without a replacement and the assignment was cancelled',
payload: {
swapRequestId: swapRequest.id,
shiftId: swapRequest.shift_id,
},
dedupeScope: swapRequest.id,
});
});
autoCancelled += 1;
}
return {
claimed: claimed.length,
autoCancelled,
};
}
async function settleNotification(notification, deliveryResults, maxAttempts) {
const successCount = deliveryResults.filter((result) => result.deliveryStatus === 'SENT').length;
const simulatedCount = deliveryResults.filter((result) => result.deliveryStatus === 'SIMULATED').length;
@@ -301,10 +554,13 @@ export async function dispatchPendingNotifications({
sender = createPushSender(),
} = {}) {
const maxAttempts = parseIntEnv('NOTIFICATION_MAX_ATTEMPTS', 5);
const swapSummary = await processExpiredSwapRequests();
const reminderSummary = await enqueueDueShiftReminders();
const claimed = await claimDueNotifications(limit);
const summary = {
swapRequestsClaimed: swapSummary.claimed,
swapRequestsAutoCancelled: swapSummary.autoCancelled,
remindersEnqueued: reminderSummary.enqueued,
claimed: claimed.length,
sent: 0,