Files
Krow-workspace/backend/command-api/src/services/command-service.js
2026-03-19 10:28:13 +01:00

1725 lines
48 KiB
JavaScript

import { AppError } from '../lib/errors.js';
import { withTransaction } from './db.js';
import { recordGeofenceIncident } from './attendance-monitoring.js';
import { recordAttendanceSecurityProof } from './attendance-security.js';
import { evaluateClockInAttempt } from './clock-in-policy.js';
import { enqueueHubManagerAlert } from './notification-outbox.js';
function toIsoOrNull(value) {
return value ? new Date(value).toISOString() : null;
}
const ACTIVE_ASSIGNMENT_STATUSES = new Set([
'ASSIGNED',
'ACCEPTED',
'CHECKED_IN',
'CHECKED_OUT',
'COMPLETED',
]);
const CANCELLABLE_ASSIGNMENT_STATUSES = ['ASSIGNED', 'ACCEPTED'];
const CANCELLABLE_APPLICATION_STATUSES = ['PENDING', 'CONFIRMED'];
const SHIFT_STATUS_TRANSITIONS = {
DRAFT: new Set(['OPEN', 'CANCELLED']),
OPEN: new Set(['PENDING_CONFIRMATION', 'ASSIGNED', 'ACTIVE', 'CANCELLED']),
PENDING_CONFIRMATION: new Set(['ASSIGNED', 'CANCELLED']),
ASSIGNED: new Set(['ACTIVE', 'COMPLETED', 'CANCELLED']),
ACTIVE: new Set(['COMPLETED', 'CANCELLED']),
COMPLETED: new Set([]),
CANCELLED: new Set([]),
};
async function ensureActorUser(client, actor) {
await client.query(
`
INSERT INTO users (id, email, display_name, status)
VALUES ($1, $2, $3, 'ACTIVE')
ON CONFLICT (id) DO UPDATE
SET email = COALESCE(EXCLUDED.email, users.email),
display_name = COALESCE(EXCLUDED.display_name, users.display_name),
updated_at = NOW()
`,
[actor.uid, actor.email, actor.email]
);
}
async function ensureStaffNotBlockedByBusiness(client, { tenantId, businessId, staffId }) {
const blocked = await client.query(
`
SELECT id, reason, issue_flags
FROM staff_blocks
WHERE tenant_id = $1
AND business_id = $2
AND staff_id = $3
LIMIT 1
`,
[tenantId, businessId, staffId]
);
if (blocked.rowCount > 0) {
throw new AppError('STAFF_BLOCKED', 'Staff is blocked from future shift assignments for this business', 409, {
businessId,
staffId,
blockId: blocked.rows[0].id,
reason: blocked.rows[0].reason || null,
issueFlags: Array.isArray(blocked.rows[0].issue_flags) ? blocked.rows[0].issue_flags : [],
});
}
}
async function insertDomainEvent(client, {
tenantId,
aggregateType,
aggregateId,
eventType,
actorUserId,
payload,
}) {
await client.query(
`
INSERT INTO domain_events (
tenant_id,
aggregate_type,
aggregate_id,
sequence,
event_type,
actor_user_id,
payload
)
SELECT
$1,
$2,
$3,
COALESCE(MAX(sequence) + 1, 1),
$4,
$5,
$6::jsonb
FROM domain_events
WHERE tenant_id = $1
AND aggregate_type = $2
AND aggregate_id = $3
`,
[tenantId, aggregateType, aggregateId, eventType, actorUserId, JSON.stringify(payload || {})]
);
}
async function requireBusiness(client, tenantId, businessId) {
const result = await client.query(
`
SELECT id, business_name
FROM businesses
WHERE id = $1 AND tenant_id = $2
`,
[businessId, tenantId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Business not found in tenant scope', 404, {
tenantId,
businessId,
});
}
return result.rows[0];
}
async function requireVendor(client, tenantId, vendorId) {
const result = await client.query(
`
SELECT id, company_name
FROM vendors
WHERE id = $1 AND tenant_id = $2
`,
[vendorId, tenantId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Vendor not found in tenant scope', 404, {
tenantId,
vendorId,
});
}
return result.rows[0];
}
async function requireShiftRole(client, shiftRoleId) {
const result = await client.query(
`
SELECT sr.id,
sr.shift_id,
sr.role_code,
sr.role_name,
sr.workers_needed,
sr.assigned_count,
s.tenant_id,
s.business_id,
s.vendor_id
FROM shift_roles sr
JOIN shifts s ON s.id = sr.shift_id
WHERE sr.id = $1
FOR UPDATE
`,
[shiftRoleId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Shift role not found', 404, { shiftRoleId });
}
return result.rows[0];
}
async function requireAssignment(client, assignmentId) {
const result = await client.query(
`
SELECT a.id,
a.tenant_id,
a.business_id,
a.vendor_id,
a.shift_id,
a.shift_role_id,
a.workforce_id,
a.staff_id,
a.status,
s.clock_point_id,
s.title AS shift_title,
s.starts_at,
s.ends_at,
s.clock_in_mode,
s.allow_clock_in_override,
cp.nfc_tag_uid AS expected_nfc_tag_uid,
COALESCE(s.latitude, cp.latitude) AS expected_latitude,
COALESCE(s.longitude, cp.longitude) AS expected_longitude,
COALESCE(s.geofence_radius_meters, cp.geofence_radius_meters) AS geofence_radius_meters,
cp.default_clock_in_mode,
cp.allow_clock_in_override AS default_allow_clock_in_override
FROM assignments a
JOIN shifts s ON s.id = a.shift_id
LEFT JOIN clock_points cp ON cp.id = s.clock_point_id
WHERE a.id = $1
FOR UPDATE OF a, s
`,
[assignmentId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Assignment not found', 404, { assignmentId });
}
return result.rows[0];
}
async function requireOrder(client, tenantId, orderId) {
const result = await client.query(
`
SELECT id,
tenant_id,
business_id,
vendor_id,
order_number,
status,
starts_at,
ends_at
FROM orders
WHERE id = $1
AND tenant_id = $2
FOR UPDATE
`,
[orderId, tenantId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Order not found in tenant scope', 404, {
tenantId,
orderId,
});
}
return result.rows[0];
}
async function requireShift(client, tenantId, shiftId) {
const result = await client.query(
`
SELECT id,
tenant_id,
order_id,
business_id,
vendor_id,
status,
starts_at,
ends_at,
required_workers,
assigned_workers
FROM shifts
WHERE id = $1
AND tenant_id = $2
FOR UPDATE
`,
[shiftId, tenantId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Shift not found in tenant scope', 404, {
tenantId,
shiftId,
});
}
return result.rows[0];
}
async function requireWorkforce(client, tenantId, workforceId) {
const result = await client.query(
`
SELECT id, tenant_id, vendor_id, staff_id, status
FROM workforce
WHERE id = $1
AND tenant_id = $2
AND status = 'ACTIVE'
`,
[workforceId, tenantId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Workforce record not found', 404, {
tenantId,
workforceId,
});
}
return result.rows[0];
}
async function findAssignmentForShiftRoleWorkforce(client, shiftRoleId, workforceId) {
const result = await client.query(
`
SELECT id,
tenant_id,
shift_id,
shift_role_id,
workforce_id,
staff_id,
application_id,
status
FROM assignments
WHERE shift_role_id = $1
AND workforce_id = $2
FOR UPDATE
`,
[shiftRoleId, workforceId]
);
return result.rows[0] || null;
}
async function requireApplication(client, tenantId, applicationId) {
const result = await client.query(
`
SELECT id,
tenant_id,
shift_id,
shift_role_id,
staff_id,
status
FROM applications
WHERE id = $1
AND tenant_id = $2
FOR UPDATE
`,
[applicationId, tenantId]
);
if (result.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Application not found in tenant scope', 404, {
tenantId,
applicationId,
});
}
return result.rows[0];
}
async function refreshShiftRoleCounts(client, shiftRoleId) {
await client.query(
`
UPDATE shift_roles sr
SET assigned_count = counts.assigned_count,
updated_at = NOW()
FROM (
SELECT $1::uuid AS shift_role_id,
COUNT(*) FILTER (
WHERE status IN ('ASSIGNED', 'ACCEPTED', 'CHECKED_IN', 'CHECKED_OUT', 'COMPLETED')
)::INTEGER AS assigned_count
FROM assignments
WHERE shift_role_id = $1
) counts
WHERE sr.id = counts.shift_role_id
`,
[shiftRoleId]
);
}
async function refreshShiftCounts(client, shiftId) {
await client.query(
`
UPDATE shifts s
SET assigned_workers = counts.assigned_workers,
updated_at = NOW()
FROM (
SELECT $1::uuid AS shift_id,
COUNT(*) FILTER (
WHERE status IN ('ASSIGNED', 'ACCEPTED', 'CHECKED_IN', 'CHECKED_OUT', 'COMPLETED')
)::INTEGER AS assigned_workers
FROM assignments
WHERE shift_id = $1
) counts
WHERE s.id = counts.shift_id
`,
[shiftId]
);
}
async function transitionShiftStatus(client, shiftId, fromStatus, toStatus) {
if (fromStatus === toStatus) {
return toStatus;
}
const allowed = SHIFT_STATUS_TRANSITIONS[fromStatus] || new Set();
if (!allowed.has(toStatus)) {
throw new AppError('INVALID_SHIFT_STATUS_TRANSITION', 'Invalid shift status transition', 409, {
shiftId,
fromStatus,
toStatus,
});
}
await client.query(
`
UPDATE shifts
SET status = $2,
updated_at = NOW()
WHERE id = $1
`,
[shiftId, toStatus]
);
return toStatus;
}
function assertChronologicalWindow(startsAt, endsAt, code = 'VALIDATION_ERROR') {
if (startsAt && endsAt && new Date(startsAt) >= new Date(endsAt)) {
throw new AppError(code, 'start time must be earlier than end time', 400);
}
}
function buildOrderUpdateStatement(payload) {
const fieldSpecs = [
['vendorId', 'vendor_id', (value) => value],
['title', 'title', (value) => value],
['description', 'description', (value) => value],
['status', 'status', (value) => value],
['serviceType', 'service_type', (value) => value],
['startsAt', 'starts_at', (value) => toIsoOrNull(value)],
['endsAt', 'ends_at', (value) => toIsoOrNull(value)],
['locationName', 'location_name', (value) => value],
['locationAddress', 'location_address', (value) => value],
['latitude', 'latitude', (value) => value],
['longitude', 'longitude', (value) => value],
['notes', 'notes', (value) => value],
['metadata', 'metadata', (value) => JSON.stringify(value || {})],
];
const updates = [];
const values = [];
for (const [inputKey, column, transform] of fieldSpecs) {
if (!Object.prototype.hasOwnProperty.call(payload, inputKey)) {
continue;
}
values.push(transform(payload[inputKey]));
const placeholder = `$${values.length}`;
updates.push(
column === 'metadata'
? `${column} = ${placeholder}::jsonb`
: `${column} = ${placeholder}`
);
}
if (updates.length === 0) {
throw new AppError('VALIDATION_ERROR', 'At least one mutable order field must be provided', 400);
}
updates.push('updated_at = NOW()');
return { updates, values };
}
export async function createOrder(actor, payload) {
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
await requireBusiness(client, payload.tenantId, payload.businessId);
if (payload.vendorId) {
await requireVendor(client, payload.tenantId, payload.vendorId);
}
const orderResult = await client.query(
`
INSERT INTO orders (
tenant_id,
business_id,
vendor_id,
order_number,
title,
description,
status,
service_type,
starts_at,
ends_at,
location_name,
location_address,
latitude,
longitude,
notes,
created_by_user_id,
metadata
)
VALUES (
$1, $2, $3, $4, $5, $6, COALESCE($7, 'OPEN'), COALESCE($8, 'EVENT'),
$9, $10, $11, $12, $13, $14, $15, $16, $17::jsonb
)
RETURNING id, order_number, status
`,
[
payload.tenantId,
payload.businessId,
payload.vendorId || null,
payload.orderNumber,
payload.title,
payload.description || null,
payload.status || null,
payload.serviceType || null,
toIsoOrNull(payload.startsAt),
toIsoOrNull(payload.endsAt),
payload.locationName || null,
payload.locationAddress || null,
payload.latitude ?? null,
payload.longitude ?? null,
payload.notes || null,
actor.uid,
JSON.stringify(payload.metadata || {}),
]
);
const order = orderResult.rows[0];
const createdShifts = [];
for (const shiftInput of payload.shifts) {
const shiftResult = await client.query(
`
INSERT INTO shifts (
tenant_id,
order_id,
business_id,
vendor_id,
clock_point_id,
shift_code,
title,
status,
starts_at,
ends_at,
timezone,
location_name,
location_address,
latitude,
longitude,
geofence_radius_meters,
required_workers,
assigned_workers,
notes,
metadata
)
VALUES (
$1, $2, $3, $4, $5, $6, $7, COALESCE($8, 'OPEN'), $9, $10, COALESCE($11, 'UTC'),
$12, $13, $14, $15, $16, $17, 0, $18, $19::jsonb
)
RETURNING id, shift_code, title, required_workers
`,
[
payload.tenantId,
order.id,
payload.businessId,
payload.vendorId || null,
shiftInput.clockPointId || null,
shiftInput.shiftCode,
shiftInput.title,
shiftInput.status || null,
toIsoOrNull(shiftInput.startsAt),
toIsoOrNull(shiftInput.endsAt),
shiftInput.timezone || null,
shiftInput.locationName || payload.locationName || null,
shiftInput.locationAddress || payload.locationAddress || null,
shiftInput.latitude ?? payload.latitude ?? null,
shiftInput.longitude ?? payload.longitude ?? null,
shiftInput.geofenceRadiusMeters ?? null,
shiftInput.requiredWorkers,
shiftInput.notes || null,
JSON.stringify(shiftInput.metadata || {}),
]
);
const shift = shiftResult.rows[0];
for (const roleInput of shiftInput.roles) {
await client.query(
`
INSERT INTO shift_roles (
shift_id,
role_id,
role_code,
role_name,
workers_needed,
assigned_count,
pay_rate_cents,
bill_rate_cents,
metadata
)
VALUES ($1, $2, $3, $4, $5, 0, $6, $7, $8::jsonb)
`,
[
shift.id,
roleInput.roleId || null,
roleInput.roleCode,
roleInput.roleName,
roleInput.workersNeeded,
roleInput.payRateCents || 0,
roleInput.billRateCents || 0,
JSON.stringify(roleInput.metadata || {}),
]
);
}
createdShifts.push(shift);
}
await insertDomainEvent(client, {
tenantId: payload.tenantId,
aggregateType: 'order',
aggregateId: order.id,
eventType: 'ORDER_CREATED',
actorUserId: actor.uid,
payload: {
orderId: order.id,
shiftCount: createdShifts.length,
},
});
return {
orderId: order.id,
orderNumber: order.order_number,
status: order.status,
shiftCount: createdShifts.length,
shiftIds: createdShifts.map((shift) => shift.id),
};
});
}
export async function acceptShift(actor, payload) {
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const shiftRole = await requireShiftRole(client, payload.shiftRoleId);
if (payload.shiftId && shiftRole.shift_id !== payload.shiftId) {
throw new AppError('VALIDATION_ERROR', 'shiftId does not match shiftRoleId', 400, {
shiftId: payload.shiftId,
shiftRoleId: payload.shiftRoleId,
});
}
if (shiftRole.assigned_count >= shiftRole.workers_needed) {
const existingFilledAssignment = await findAssignmentForShiftRoleWorkforce(
client,
shiftRole.id,
payload.workforceId
);
if (!existingFilledAssignment || existingFilledAssignment.status === 'CANCELLED') {
throw new AppError('SHIFT_ROLE_FILLED', 'Shift role is already filled', 409, {
shiftRoleId: payload.shiftRoleId,
});
}
}
const workforce = await requireWorkforce(client, shiftRole.tenant_id, payload.workforceId);
const existingAssignment = await findAssignmentForShiftRoleWorkforce(client, shiftRole.id, workforce.id);
let assignment;
if (existingAssignment) {
if (existingAssignment.status === 'CANCELLED') {
throw new AppError('ASSIGNMENT_CANCELLED', 'Cancelled assignment cannot be accepted', 409, {
assignmentId: existingAssignment.id,
});
}
if (['ACCEPTED', 'CHECKED_IN', 'CHECKED_OUT', 'COMPLETED'].includes(existingAssignment.status)) {
assignment = existingAssignment;
} else {
const updatedAssignment = await client.query(
`
UPDATE assignments
SET status = 'ACCEPTED',
accepted_at = COALESCE(accepted_at, NOW()),
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb,
updated_at = NOW()
WHERE id = $1
RETURNING id, status
`,
[existingAssignment.id, JSON.stringify(payload.metadata || {})]
);
assignment = updatedAssignment.rows[0];
}
} else {
const assignmentResult = await client.query(
`
INSERT INTO assignments (
tenant_id,
business_id,
vendor_id,
shift_id,
shift_role_id,
workforce_id,
staff_id,
status,
assigned_at,
accepted_at,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7, 'ACCEPTED', NOW(), NOW(), $8::jsonb)
RETURNING id, status
`,
[
shiftRole.tenant_id,
shiftRole.business_id,
shiftRole.vendor_id,
shiftRole.shift_id,
shiftRole.id,
workforce.id,
workforce.staff_id,
JSON.stringify(payload.metadata || {}),
]
);
assignment = assignmentResult.rows[0];
}
await refreshShiftRoleCounts(client, shiftRole.id);
await refreshShiftCounts(client, shiftRole.shift_id);
const shift = await requireShift(client, shiftRole.tenant_id, shiftRole.shift_id);
if (['OPEN', 'PENDING_CONFIRMATION'].includes(shift.status)) {
await transitionShiftStatus(client, shift.id, shift.status, 'ASSIGNED');
}
await insertDomainEvent(client, {
tenantId: shiftRole.tenant_id,
aggregateType: 'assignment',
aggregateId: assignment.id,
eventType: 'SHIFT_ACCEPTED',
actorUserId: actor.uid,
payload: {
shiftId: shiftRole.shift_id,
shiftRoleId: shiftRole.id,
workforceId: workforce.id,
},
});
return {
assignmentId: assignment.id,
shiftId: shiftRole.shift_id,
shiftRoleId: shiftRole.id,
status: assignment.status,
};
});
}
export async function updateOrder(actor, payload) {
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const existingOrder = await requireOrder(client, payload.tenantId, payload.orderId);
if (Object.prototype.hasOwnProperty.call(payload, 'vendorId') && payload.vendorId) {
await requireVendor(client, payload.tenantId, payload.vendorId);
}
const nextStartsAt = Object.prototype.hasOwnProperty.call(payload, 'startsAt')
? payload.startsAt
: existingOrder.starts_at;
const nextEndsAt = Object.prototype.hasOwnProperty.call(payload, 'endsAt')
? payload.endsAt
: existingOrder.ends_at;
assertChronologicalWindow(nextStartsAt, nextEndsAt);
const { updates, values } = buildOrderUpdateStatement(payload);
values.push(payload.orderId, payload.tenantId);
const orderResult = await client.query(
`
UPDATE orders
SET ${updates.join(', ')}
WHERE id = $${values.length - 1}
AND tenant_id = $${values.length}
RETURNING id, order_number, status, updated_at
`,
values
);
const order = orderResult.rows[0];
await insertDomainEvent(client, {
tenantId: payload.tenantId,
aggregateType: 'order',
aggregateId: order.id,
eventType: 'ORDER_UPDATED',
actorUserId: actor.uid,
payload: {
updatedFields: Object.keys(payload).filter((key) => !['orderId', 'tenantId'].includes(key)),
},
});
return {
orderId: order.id,
orderNumber: order.order_number,
status: order.status,
updatedAt: order.updated_at,
};
});
}
export async function cancelOrder(actor, payload) {
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const order = await requireOrder(client, payload.tenantId, payload.orderId);
if (order.status === 'CANCELLED') {
return {
orderId: order.id,
orderNumber: order.order_number,
status: order.status,
alreadyCancelled: true,
};
}
const blockingAssignments = await client.query(
`
SELECT a.id
FROM assignments a
JOIN shifts s ON s.id = a.shift_id
WHERE s.order_id = $1
AND a.status IN ('CHECKED_IN', 'CHECKED_OUT', 'COMPLETED')
LIMIT 1
`,
[order.id]
);
if (blockingAssignments.rowCount > 0) {
throw new AppError('ORDER_CANCEL_BLOCKED', 'Order has active or completed assignments and cannot be cancelled', 409, {
orderId: order.id,
});
}
await client.query(
`
UPDATE orders
SET status = 'CANCELLED',
notes = CASE
WHEN $2::text IS NULL THEN notes
WHEN notes IS NULL OR notes = '' THEN $2::text
ELSE CONCAT(notes, E'\\n', $2::text)
END,
metadata = COALESCE(metadata, '{}'::jsonb) || $3::jsonb,
updated_at = NOW()
WHERE id = $1
`,
[order.id, payload.reason || null, JSON.stringify(payload.metadata || {})]
);
const affectedShiftIds = await client.query(
`
SELECT id
FROM shifts
WHERE order_id = $1
`,
[order.id]
);
await client.query(
`
UPDATE shifts
SET status = CASE
WHEN status = 'COMPLETED' THEN status
ELSE 'CANCELLED'
END,
updated_at = NOW()
WHERE order_id = $1
`,
[order.id]
);
await client.query(
`
UPDATE assignments
SET status = 'CANCELLED',
updated_at = NOW()
WHERE shift_id IN (SELECT id FROM shifts WHERE order_id = $1)
AND status = ANY($2::text[])
`,
[order.id, CANCELLABLE_ASSIGNMENT_STATUSES]
);
await client.query(
`
UPDATE applications
SET status = 'CANCELLED',
updated_at = NOW()
WHERE shift_id IN (SELECT id FROM shifts WHERE order_id = $1)
AND status = ANY($2::text[])
`,
[order.id, CANCELLABLE_APPLICATION_STATUSES]
);
for (const row of affectedShiftIds.rows) {
const roleIds = await client.query(
'SELECT id FROM shift_roles WHERE shift_id = $1',
[row.id]
);
for (const role of roleIds.rows) {
await refreshShiftRoleCounts(client, role.id);
}
await refreshShiftCounts(client, row.id);
}
await insertDomainEvent(client, {
tenantId: payload.tenantId,
aggregateType: 'order',
aggregateId: order.id,
eventType: 'ORDER_CANCELLED',
actorUserId: actor.uid,
payload: {
reason: payload.reason || null,
},
});
return {
orderId: order.id,
orderNumber: order.order_number,
status: 'CANCELLED',
cancelledShiftCount: affectedShiftIds.rowCount,
};
});
}
export async function changeShiftStatus(actor, payload) {
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const shift = await requireShift(client, payload.tenantId, payload.shiftId);
if (payload.status === 'COMPLETED') {
const openSession = await client.query(
`
SELECT id
FROM attendance_sessions
WHERE assignment_id IN (SELECT id FROM assignments WHERE shift_id = $1)
AND status = 'OPEN'
LIMIT 1
`,
[shift.id]
);
if (openSession.rowCount > 0) {
throw new AppError('SHIFT_COMPLETE_BLOCKED', 'Shift has open attendance sessions', 409, {
shiftId: shift.id,
});
}
}
const nextStatus = await transitionShiftStatus(client, shift.id, shift.status, payload.status);
if (nextStatus === 'CANCELLED') {
await client.query(
`
UPDATE assignments
SET status = 'CANCELLED',
updated_at = NOW()
WHERE shift_id = $1
AND status = ANY($2::text[])
`,
[shift.id, CANCELLABLE_ASSIGNMENT_STATUSES]
);
await client.query(
`
UPDATE applications
SET status = 'CANCELLED',
updated_at = NOW()
WHERE shift_id = $1
AND status = ANY($2::text[])
`,
[shift.id, CANCELLABLE_APPLICATION_STATUSES]
);
}
if (nextStatus === 'COMPLETED') {
await client.query(
`
UPDATE assignments
SET status = 'COMPLETED',
updated_at = NOW()
WHERE shift_id = $1
AND status IN ('CHECKED_OUT', 'ACCEPTED')
`,
[shift.id]
);
}
const roleIds = await client.query('SELECT id FROM shift_roles WHERE shift_id = $1', [shift.id]);
for (const role of roleIds.rows) {
await refreshShiftRoleCounts(client, role.id);
}
await refreshShiftCounts(client, shift.id);
await insertDomainEvent(client, {
tenantId: payload.tenantId,
aggregateType: 'shift',
aggregateId: shift.id,
eventType: 'SHIFT_STATUS_CHANGED',
actorUserId: actor.uid,
payload: {
fromStatus: shift.status,
toStatus: nextStatus,
reason: payload.reason || null,
metadata: payload.metadata || {},
},
});
return {
shiftId: shift.id,
orderId: shift.order_id,
status: nextStatus,
};
});
}
export async function assignStaffToShift(actor, payload) {
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const shift = await requireShift(client, payload.tenantId, payload.shiftId);
const shiftRole = await requireShiftRole(client, payload.shiftRoleId);
if (shiftRole.shift_id !== shift.id) {
throw new AppError('VALIDATION_ERROR', 'shiftId does not match shiftRoleId', 400, {
shiftId: payload.shiftId,
shiftRoleId: payload.shiftRoleId,
});
}
const workforce = await requireWorkforce(client, payload.tenantId, payload.workforceId);
await ensureStaffNotBlockedByBusiness(client, {
tenantId: shift.tenant_id,
businessId: shift.business_id,
staffId: workforce.staff_id,
});
let application = null;
if (payload.applicationId) {
application = await requireApplication(client, payload.tenantId, payload.applicationId);
if (application.shift_id !== shift.id || application.shift_role_id !== shiftRole.id || application.staff_id !== workforce.staff_id) {
throw new AppError('VALIDATION_ERROR', 'applicationId does not match shift role and workforce staff', 400, {
applicationId: payload.applicationId,
});
}
}
const existingAssignment = await findAssignmentForShiftRoleWorkforce(client, shiftRole.id, workforce.id);
if (existingAssignment && existingAssignment.status !== 'CANCELLED') {
return {
assignmentId: existingAssignment.id,
shiftId: shift.id,
shiftRoleId: shiftRole.id,
status: existingAssignment.status,
existing: true,
};
}
if (shiftRole.assigned_count >= shiftRole.workers_needed) {
throw new AppError('SHIFT_ROLE_FILLED', 'Shift role is already filled', 409, {
shiftRoleId: shiftRole.id,
});
}
const assignmentResult = await client.query(
`
INSERT INTO assignments (
tenant_id,
business_id,
vendor_id,
shift_id,
shift_role_id,
workforce_id,
staff_id,
application_id,
status,
assigned_at,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'ASSIGNED', NOW(), $9::jsonb)
RETURNING id, status
`,
[
shift.tenant_id,
shift.business_id,
shift.vendor_id,
shift.id,
shiftRole.id,
workforce.id,
workforce.staff_id,
application?.id || null,
JSON.stringify(payload.metadata || {}),
]
);
if (application) {
await client.query(
`
UPDATE applications
SET status = 'CONFIRMED',
updated_at = NOW()
WHERE id = $1
`,
[application.id]
);
}
await refreshShiftRoleCounts(client, shiftRole.id);
await refreshShiftCounts(client, shift.id);
if (['OPEN', 'PENDING_CONFIRMATION'].includes(shift.status)) {
await transitionShiftStatus(client, shift.id, shift.status, 'ASSIGNED');
}
const assignment = assignmentResult.rows[0];
await insertDomainEvent(client, {
tenantId: payload.tenantId,
aggregateType: 'assignment',
aggregateId: assignment.id,
eventType: 'STAFF_ASSIGNED',
actorUserId: actor.uid,
payload: {
shiftId: shift.id,
shiftRoleId: shiftRole.id,
workforceId: workforce.id,
applicationId: application?.id || null,
},
});
return {
assignmentId: assignment.id,
shiftId: shift.id,
shiftRoleId: shiftRole.id,
status: assignment.status,
existing: false,
};
});
}
async function createAttendanceEvent(actor, payload, eventType) {
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const assignment = await requireAssignment(client, payload.assignmentId);
const capturedAt = toIsoOrNull(payload.capturedAt) || new Date().toISOString();
let securityProof = null;
async function rejectAttendanceAttempt({
errorCode,
reason,
incidentType = 'CLOCK_IN_REJECTED',
severity = 'WARNING',
effectiveClockInMode = null,
distance = null,
withinGeofence = null,
metadata = {},
details = {},
}) {
const incidentId = await recordGeofenceIncident(client, {
assignment,
actorUserId: actor.uid,
incidentType,
severity,
effectiveClockInMode,
sourceType: payload.sourceType,
nfcTagUid: payload.nfcTagUid || null,
deviceId: payload.deviceId || null,
latitude: payload.latitude ?? null,
longitude: payload.longitude ?? null,
accuracyMeters: payload.accuracyMeters ?? null,
distanceToClockPointMeters: distance,
withinGeofence,
overrideReason: payload.overrideReason || null,
message: reason,
occurredAt: capturedAt,
metadata: {
eventType,
...metadata,
},
});
const rejectedEvent = await client.query(
`
INSERT INTO attendance_events (
tenant_id,
assignment_id,
shift_id,
staff_id,
clock_point_id,
event_type,
source_type,
source_reference,
nfc_tag_uid,
device_id,
latitude,
longitude,
accuracy_meters,
distance_to_clock_point_meters,
within_geofence,
validation_status,
validation_reason,
captured_at,
raw_payload
)
VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, 'REJECTED', $16, $17, $18::jsonb
)
RETURNING id
`,
[
assignment.tenant_id,
assignment.id,
assignment.shift_id,
assignment.staff_id,
assignment.clock_point_id,
eventType,
payload.sourceType,
payload.sourceReference || null,
payload.nfcTagUid || null,
payload.deviceId || null,
payload.latitude ?? null,
payload.longitude ?? null,
payload.accuracyMeters ?? null,
distance,
withinGeofence,
reason,
capturedAt,
JSON.stringify({
...(payload.rawPayload || {}),
securityProofId: securityProof?.proofId || null,
securityObjectUri: securityProof?.objectUri || null,
}),
]
);
await insertDomainEvent(client, {
tenantId: assignment.tenant_id,
aggregateType: 'attendance_event',
aggregateId: rejectedEvent.rows[0].id,
eventType: `${eventType}_REJECTED`,
actorUserId: actor.uid,
payload: {
assignmentId: assignment.id,
sourceType: payload.sourceType,
reason,
incidentId,
...details,
},
});
throw new AppError(errorCode, reason, 409, {
assignmentId: payload.assignmentId,
attendanceEventId: rejectedEvent.rows[0].id,
distanceToClockPointMeters: distance,
effectiveClockInMode,
...details,
});
}
try {
securityProof = await recordAttendanceSecurityProof(client, {
assignment,
actor,
payload,
eventType,
capturedAt,
});
} catch (error) {
if (!(error instanceof AppError) || error.code !== 'ATTENDANCE_SECURITY_FAILED') {
throw error;
}
await rejectAttendanceAttempt({
errorCode: error.code,
reason: error.message,
incidentType: 'CLOCK_IN_REJECTED',
severity: error.details?.securityCode?.startsWith('NFC') ? 'CRITICAL' : 'WARNING',
effectiveClockInMode: assignment.clock_in_mode || assignment.default_clock_in_mode || null,
metadata: {
securityCode: error.details?.securityCode || null,
},
details: {
securityCode: error.details?.securityCode || null,
securityProofId: error.details?.proofId || null,
securityObjectUri: error.details?.objectUri || null,
},
});
}
const validation = evaluateClockInAttempt(assignment, payload);
if (validation.validationStatus === 'REJECTED') {
await rejectAttendanceAttempt({
errorCode: 'ATTENDANCE_VALIDATION_FAILED',
reason: validation.validationReason,
incidentType: validation.validationCode === 'NFC_MISMATCH'
? 'NFC_MISMATCH'
: 'CLOCK_IN_REJECTED',
severity: validation.validationCode === 'NFC_MISMATCH' ? 'CRITICAL' : 'WARNING',
effectiveClockInMode: validation.effectiveClockInMode,
distance: validation.distance,
withinGeofence: validation.withinGeofence,
metadata: {
validationCode: validation.validationCode,
},
details: {
validationCode: validation.validationCode,
},
});
}
async function loadExistingAttendanceSession() {
const existing = await client.query(
`
SELECT id, status, check_in_at AS "clockInAt"
FROM attendance_sessions
WHERE assignment_id = $1
ORDER BY updated_at DESC
LIMIT 1
`,
[assignment.id]
);
return existing.rows[0] || null;
}
const sessionResult = await client.query(
`
SELECT id, status, check_in_at AS "clockInAt"
FROM attendance_sessions
WHERE assignment_id = $1
`,
[assignment.id]
);
if (eventType === 'CLOCK_IN' && sessionResult.rowCount > 0) {
const existingSession = sessionResult.rows[0];
if (existingSession.status === 'OPEN') {
throw new AppError('ALREADY_CLOCKED_IN', 'An active attendance session already exists for this assignment', 409, {
assignmentId: assignment.id,
sessionId: existingSession.id,
clockInAt: existingSession.clockInAt || null,
});
}
throw new AppError('ATTENDANCE_SESSION_EXISTS', 'Attendance session already exists for this assignment', 409, {
assignmentId: assignment.id,
sessionId: existingSession.id,
clockInAt: existingSession.clockInAt || null,
});
}
if (eventType === 'CLOCK_OUT' && sessionResult.rowCount === 0) {
throw new AppError('ATTENDANCE_NOT_OPEN', 'Assignment does not have an open attendance session', 409, {
assignmentId: assignment.id,
});
}
const eventResult = await client.query(
`
INSERT INTO attendance_events (
tenant_id,
assignment_id,
shift_id,
staff_id,
clock_point_id,
event_type,
source_type,
source_reference,
nfc_tag_uid,
device_id,
latitude,
longitude,
accuracy_meters,
distance_to_clock_point_meters,
within_geofence,
validation_status,
validation_reason,
captured_at,
raw_payload
)
VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19::jsonb
)
RETURNING id, validation_status
`,
[
assignment.tenant_id,
assignment.id,
assignment.shift_id,
assignment.staff_id,
assignment.clock_point_id,
eventType,
payload.sourceType,
payload.sourceReference || null,
payload.nfcTagUid || null,
payload.deviceId || null,
payload.latitude ?? null,
payload.longitude ?? null,
payload.accuracyMeters ?? null,
validation.distance,
validation.withinGeofence,
validation.validationStatus,
validation.overrideUsed ? validation.overrideReason : validation.validationReason,
capturedAt,
JSON.stringify({
...(payload.rawPayload || {}),
securityProofId: securityProof?.proofId || null,
securityAttestationStatus: securityProof?.attestationStatus || null,
securityObjectUri: securityProof?.objectUri || null,
}),
]
);
if (validation.overrideUsed) {
const incidentId = await recordGeofenceIncident(client, {
assignment,
actorUserId: actor.uid,
incidentType: 'CLOCK_IN_OVERRIDE',
severity: 'WARNING',
effectiveClockInMode: validation.effectiveClockInMode,
sourceType: payload.sourceType,
nfcTagUid: payload.nfcTagUid || null,
deviceId: payload.deviceId || null,
latitude: payload.latitude ?? null,
longitude: payload.longitude ?? null,
accuracyMeters: payload.accuracyMeters ?? null,
distanceToClockPointMeters: validation.distance,
withinGeofence: validation.withinGeofence,
overrideReason: validation.overrideReason,
message: validation.validationReason,
occurredAt: capturedAt,
metadata: {
validationCode: validation.validationCode,
eventType,
},
});
await enqueueHubManagerAlert(client, {
tenantId: assignment.tenant_id,
businessId: assignment.business_id,
shiftId: assignment.shift_id,
assignmentId: assignment.id,
hubId: assignment.clock_point_id,
relatedIncidentId: incidentId,
notificationType: 'CLOCK_IN_OVERRIDE_REVIEW',
priority: 'HIGH',
subject: 'Clock-in override requires review',
body: `${assignment.shift_title}: clock-in override submitted by ${actor.email || actor.uid}`,
payload: {
assignmentId: assignment.id,
shiftId: assignment.shift_id,
staffId: assignment.staff_id,
reason: validation.overrideReason,
validationReason: validation.validationReason,
effectiveClockInMode: validation.effectiveClockInMode,
eventType,
},
dedupeScope: incidentId,
});
}
let sessionId;
if (eventType === 'CLOCK_IN') {
let insertedSession;
try {
insertedSession = await client.query(
`
INSERT INTO attendance_sessions (
tenant_id,
assignment_id,
staff_id,
clock_in_event_id,
status,
check_in_at
)
VALUES ($1, $2, $3, $4, 'OPEN', $5)
RETURNING id
`,
[assignment.tenant_id, assignment.id, assignment.staff_id, eventResult.rows[0].id, capturedAt]
);
} catch (error) {
if (error?.code !== '23505') {
throw error;
}
const existingSession = await loadExistingAttendanceSession();
if (existingSession?.status === 'OPEN') {
throw new AppError('ALREADY_CLOCKED_IN', 'An active attendance session already exists for this assignment', 409, {
assignmentId: assignment.id,
sessionId: existingSession.id,
clockInAt: existingSession.clockInAt || null,
});
}
throw new AppError('ATTENDANCE_SESSION_EXISTS', 'Attendance session already exists for this assignment', 409, {
assignmentId: assignment.id,
sessionId: existingSession?.id || null,
clockInAt: existingSession?.clockInAt || null,
});
}
sessionId = insertedSession.rows[0].id;
await client.query(
`
UPDATE assignments
SET status = 'CHECKED_IN',
checked_in_at = $2,
updated_at = NOW()
WHERE id = $1
`,
[assignment.id, capturedAt]
);
} else {
const existingSession = sessionResult.rows[0];
await client.query(
`
UPDATE attendance_sessions
SET clock_out_event_id = $2,
status = 'CLOSED',
check_out_at = $3,
worked_minutes = GREATEST(EXTRACT(EPOCH FROM ($3 - check_in_at))::INTEGER / 60, 0),
updated_at = NOW()
WHERE id = $1
`,
[existingSession.id, eventResult.rows[0].id, capturedAt]
);
sessionId = existingSession.id;
await client.query(
`
UPDATE assignments
SET status = 'CHECKED_OUT',
checked_out_at = $2,
updated_at = NOW()
WHERE id = $1
`,
[assignment.id, capturedAt]
);
}
await insertDomainEvent(client, {
tenantId: assignment.tenant_id,
aggregateType: 'attendance_event',
aggregateId: eventResult.rows[0].id,
eventType,
actorUserId: actor.uid,
payload: {
assignmentId: assignment.id,
sessionId,
sourceType: payload.sourceType,
validationStatus: validation.validationStatus,
},
});
return {
attendanceEventId: eventResult.rows[0].id,
assignmentId: assignment.id,
sessionId,
status: eventType,
validationStatus: eventResult.rows[0].validation_status,
effectiveClockInMode: validation.effectiveClockInMode,
overrideUsed: validation.overrideUsed,
securityProofId: securityProof?.proofId || null,
attestationStatus: securityProof?.attestationStatus || null,
};
});
}
export async function clockIn(actor, payload) {
return createAttendanceEvent(actor, payload, 'CLOCK_IN');
}
export async function clockOut(actor, payload) {
return createAttendanceEvent(actor, payload, 'CLOCK_OUT');
}
export async function addFavoriteStaff(actor, payload) {
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
await requireBusiness(client, payload.tenantId, payload.businessId);
const staffResult = await client.query(
`
SELECT id
FROM staffs
WHERE id = $1 AND tenant_id = $2
`,
[payload.staffId, payload.tenantId]
);
if (staffResult.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Staff not found in tenant scope', 404, {
staffId: payload.staffId,
});
}
const favoriteResult = await client.query(
`
INSERT INTO staff_favorites (
tenant_id,
business_id,
staff_id,
created_by_user_id
)
VALUES ($1, $2, $3, $4)
ON CONFLICT (business_id, staff_id) DO UPDATE
SET created_by_user_id = EXCLUDED.created_by_user_id
RETURNING id
`,
[payload.tenantId, payload.businessId, payload.staffId, actor.uid]
);
await insertDomainEvent(client, {
tenantId: payload.tenantId,
aggregateType: 'staff_favorite',
aggregateId: favoriteResult.rows[0].id,
eventType: 'STAFF_FAVORITED',
actorUserId: actor.uid,
payload,
});
return {
favoriteId: favoriteResult.rows[0].id,
businessId: payload.businessId,
staffId: payload.staffId,
};
});
}
export async function removeFavoriteStaff(actor, payload) {
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const deleted = await client.query(
`
DELETE FROM staff_favorites
WHERE tenant_id = $1
AND business_id = $2
AND staff_id = $3
RETURNING id
`,
[payload.tenantId, payload.businessId, payload.staffId]
);
if (deleted.rowCount === 0) {
throw new AppError('NOT_FOUND', 'Favorite staff record not found', 404, payload);
}
await insertDomainEvent(client, {
tenantId: payload.tenantId,
aggregateType: 'staff_favorite',
aggregateId: deleted.rows[0].id,
eventType: 'STAFF_UNFAVORITED',
actorUserId: actor.uid,
payload,
});
return {
removed: true,
businessId: payload.businessId,
staffId: payload.staffId,
};
});
}
export async function createStaffReview(actor, payload) {
return withTransaction(async (client) => {
await ensureActorUser(client, actor);
const assignment = await requireAssignment(client, payload.assignmentId);
if (assignment.business_id !== payload.businessId || assignment.staff_id !== payload.staffId) {
throw new AppError('VALIDATION_ERROR', 'Assignment does not match business/staff review target', 400, {
assignmentId: payload.assignmentId,
businessId: payload.businessId,
staffId: payload.staffId,
});
}
const reviewResult = await client.query(
`
INSERT INTO staff_reviews (
tenant_id,
business_id,
staff_id,
assignment_id,
reviewer_user_id,
rating,
review_text,
tags
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb)
ON CONFLICT (business_id, assignment_id, staff_id) DO UPDATE
SET reviewer_user_id = EXCLUDED.reviewer_user_id,
rating = EXCLUDED.rating,
review_text = EXCLUDED.review_text,
tags = EXCLUDED.tags,
updated_at = NOW()
RETURNING id, rating
`,
[
payload.tenantId,
payload.businessId,
payload.staffId,
payload.assignmentId,
actor.uid,
payload.rating,
payload.reviewText || null,
JSON.stringify(payload.tags || []),
]
);
await client.query(
`
UPDATE staffs
SET average_rating = review_stats.avg_rating,
rating_count = review_stats.rating_count,
updated_at = NOW()
FROM (
SELECT staff_id,
ROUND(AVG(rating)::numeric, 2) AS avg_rating,
COUNT(*)::INTEGER AS rating_count
FROM staff_reviews
WHERE staff_id = $1
GROUP BY staff_id
) review_stats
WHERE staffs.id = review_stats.staff_id
`,
[payload.staffId]
);
await insertDomainEvent(client, {
tenantId: payload.tenantId,
aggregateType: 'staff_review',
aggregateId: reviewResult.rows[0].id,
eventType: 'STAFF_REVIEW_CREATED',
actorUserId: actor.uid,
payload: {
staffId: payload.staffId,
assignmentId: payload.assignmentId,
rating: payload.rating,
},
});
return {
reviewId: reviewResult.rows[0].id,
assignmentId: payload.assignmentId,
staffId: payload.staffId,
rating: reviewResult.rows[0].rating,
};
});
}