feat(backend): implement v2 domain slice and live smoke
This commit is contained in:
@@ -8,7 +8,7 @@ import { createCommandsRouter } from './routes/commands.js';
|
||||
|
||||
const logger = pino({ level: process.env.LOG_LEVEL || 'info' });
|
||||
|
||||
export function createApp() {
|
||||
export function createApp(options = {}) {
|
||||
const app = express();
|
||||
|
||||
app.use(requestContext);
|
||||
@@ -21,7 +21,7 @@ export function createApp() {
|
||||
app.use(express.json({ limit: '2mb' }));
|
||||
|
||||
app.use(healthRouter);
|
||||
app.use('/commands', createCommandsRouter());
|
||||
app.use('/commands', createCommandsRouter(options.commandHandlers));
|
||||
|
||||
app.use(notFoundHandler);
|
||||
app.use(errorHandler);
|
||||
|
||||
14
backend/command-api/src/contracts/commands/attendance.js
Normal file
14
backend/command-api/src/contracts/commands/attendance.js
Normal file
@@ -0,0 +1,14 @@
|
||||
import { z } from 'zod';
|
||||
|
||||
export const attendanceCommandSchema = z.object({
|
||||
assignmentId: z.string().uuid(),
|
||||
sourceType: z.enum(['NFC', 'GEO', 'QR', 'MANUAL', 'SYSTEM']),
|
||||
sourceReference: z.string().max(255).optional(),
|
||||
nfcTagUid: z.string().max(255).optional(),
|
||||
deviceId: z.string().max(255).optional(),
|
||||
latitude: z.number().min(-90).max(90).optional(),
|
||||
longitude: z.number().min(-180).max(180).optional(),
|
||||
accuracyMeters: z.number().int().nonnegative().optional(),
|
||||
capturedAt: z.string().datetime().optional(),
|
||||
rawPayload: z.record(z.any()).optional(),
|
||||
});
|
||||
@@ -0,0 +1,7 @@
|
||||
import { z } from 'zod';
|
||||
|
||||
export const favoriteStaffSchema = z.object({
|
||||
tenantId: z.string().uuid(),
|
||||
businessId: z.string().uuid(),
|
||||
staffId: z.string().uuid(),
|
||||
});
|
||||
@@ -0,0 +1,8 @@
|
||||
import { z } from 'zod';
|
||||
|
||||
export const orderCancelSchema = z.object({
|
||||
orderId: z.string().uuid(),
|
||||
tenantId: z.string().uuid(),
|
||||
reason: z.string().max(1000).optional(),
|
||||
metadata: z.record(z.any()).optional(),
|
||||
});
|
||||
57
backend/command-api/src/contracts/commands/order-create.js
Normal file
57
backend/command-api/src/contracts/commands/order-create.js
Normal file
@@ -0,0 +1,57 @@
|
||||
import { z } from 'zod';
|
||||
|
||||
const roleSchema = z.object({
|
||||
roleCode: z.string().min(1).max(100),
|
||||
roleName: z.string().min(1).max(120),
|
||||
workersNeeded: z.number().int().positive(),
|
||||
payRateCents: z.number().int().nonnegative().optional(),
|
||||
billRateCents: z.number().int().nonnegative().optional(),
|
||||
metadata: z.record(z.any()).optional(),
|
||||
});
|
||||
|
||||
const shiftSchema = z.object({
|
||||
shiftCode: z.string().min(1).max(80),
|
||||
title: z.string().min(1).max(160),
|
||||
status: z.enum([
|
||||
'DRAFT',
|
||||
'OPEN',
|
||||
'PENDING_CONFIRMATION',
|
||||
'ASSIGNED',
|
||||
'ACTIVE',
|
||||
'COMPLETED',
|
||||
'CANCELLED',
|
||||
]).optional(),
|
||||
startsAt: z.string().datetime(),
|
||||
endsAt: z.string().datetime(),
|
||||
timezone: z.string().min(1).max(80).optional(),
|
||||
clockPointId: z.string().uuid().optional(),
|
||||
locationName: z.string().max(160).optional(),
|
||||
locationAddress: z.string().max(300).optional(),
|
||||
latitude: z.number().min(-90).max(90).optional(),
|
||||
longitude: z.number().min(-180).max(180).optional(),
|
||||
geofenceRadiusMeters: z.number().int().positive().optional(),
|
||||
requiredWorkers: z.number().int().positive(),
|
||||
notes: z.string().max(5000).optional(),
|
||||
metadata: z.record(z.any()).optional(),
|
||||
roles: z.array(roleSchema).min(1),
|
||||
});
|
||||
|
||||
export const orderCreateSchema = z.object({
|
||||
tenantId: z.string().uuid(),
|
||||
businessId: z.string().uuid(),
|
||||
vendorId: z.string().uuid().optional(),
|
||||
orderNumber: z.string().min(1).max(80),
|
||||
title: z.string().min(1).max(160),
|
||||
description: z.string().max(5000).optional(),
|
||||
status: z.enum(['DRAFT', 'OPEN', 'FILLED', 'ACTIVE', 'COMPLETED', 'CANCELLED']).optional(),
|
||||
serviceType: z.enum(['EVENT', 'CATERING', 'HOTEL', 'RESTAURANT', 'OTHER']).optional(),
|
||||
startsAt: z.string().datetime().optional(),
|
||||
endsAt: z.string().datetime().optional(),
|
||||
locationName: z.string().max(160).optional(),
|
||||
locationAddress: z.string().max(300).optional(),
|
||||
latitude: z.number().min(-90).max(90).optional(),
|
||||
longitude: z.number().min(-180).max(180).optional(),
|
||||
notes: z.string().max(5000).optional(),
|
||||
metadata: z.record(z.any()).optional(),
|
||||
shifts: z.array(shiftSchema).min(1),
|
||||
});
|
||||
35
backend/command-api/src/contracts/commands/order-update.js
Normal file
35
backend/command-api/src/contracts/commands/order-update.js
Normal file
@@ -0,0 +1,35 @@
|
||||
import { z } from 'zod';
|
||||
|
||||
const nullableString = (max) => z.union([z.string().max(max), z.null()]);
|
||||
const nullableDateTime = z.union([z.string().datetime(), z.null()]);
|
||||
const nullableUuid = z.union([z.string().uuid(), z.null()]);
|
||||
|
||||
const orderUpdateShape = {
|
||||
orderId: z.string().uuid(),
|
||||
tenantId: z.string().uuid(),
|
||||
vendorId: nullableUuid.optional(),
|
||||
title: nullableString(160).optional(),
|
||||
description: nullableString(5000).optional(),
|
||||
status: z.enum(['DRAFT', 'OPEN', 'FILLED', 'ACTIVE', 'COMPLETED']).optional(),
|
||||
serviceType: z.enum(['EVENT', 'CATERING', 'HOTEL', 'RESTAURANT', 'OTHER']).optional(),
|
||||
startsAt: nullableDateTime.optional(),
|
||||
endsAt: nullableDateTime.optional(),
|
||||
locationName: nullableString(160).optional(),
|
||||
locationAddress: nullableString(300).optional(),
|
||||
latitude: z.union([z.number().min(-90).max(90), z.null()]).optional(),
|
||||
longitude: z.union([z.number().min(-180).max(180), z.null()]).optional(),
|
||||
notes: nullableString(5000).optional(),
|
||||
metadata: z.record(z.any()).optional(),
|
||||
};
|
||||
|
||||
export const orderUpdateSchema = z.object(orderUpdateShape).superRefine((value, ctx) => {
|
||||
const mutableKeys = Object.keys(orderUpdateShape).filter((key) => !['orderId', 'tenantId'].includes(key));
|
||||
const hasMutableField = mutableKeys.some((key) => Object.prototype.hasOwnProperty.call(value, key));
|
||||
if (!hasMutableField) {
|
||||
ctx.addIssue({
|
||||
code: z.ZodIssueCode.custom,
|
||||
message: 'At least one mutable order field must be provided',
|
||||
path: [],
|
||||
});
|
||||
}
|
||||
});
|
||||
@@ -0,0 +1,8 @@
|
||||
import { z } from 'zod';
|
||||
|
||||
export const shiftAcceptSchema = z.object({
|
||||
shiftId: z.string().uuid().optional(),
|
||||
shiftRoleId: z.string().uuid(),
|
||||
workforceId: z.string().uuid(),
|
||||
metadata: z.record(z.any()).optional(),
|
||||
});
|
||||
@@ -0,0 +1,10 @@
|
||||
import { z } from 'zod';
|
||||
|
||||
export const shiftAssignStaffSchema = z.object({
|
||||
shiftId: z.string().uuid(),
|
||||
tenantId: z.string().uuid(),
|
||||
shiftRoleId: z.string().uuid(),
|
||||
workforceId: z.string().uuid(),
|
||||
applicationId: z.string().uuid().optional(),
|
||||
metadata: z.record(z.any()).optional(),
|
||||
});
|
||||
@@ -0,0 +1,17 @@
|
||||
import { z } from 'zod';
|
||||
|
||||
export const shiftStatusChangeSchema = z.object({
|
||||
shiftId: z.string().uuid(),
|
||||
tenantId: z.string().uuid(),
|
||||
status: z.enum([
|
||||
'DRAFT',
|
||||
'OPEN',
|
||||
'PENDING_CONFIRMATION',
|
||||
'ASSIGNED',
|
||||
'ACTIVE',
|
||||
'COMPLETED',
|
||||
'CANCELLED',
|
||||
]),
|
||||
reason: z.string().max(1000).optional(),
|
||||
metadata: z.record(z.any()).optional(),
|
||||
});
|
||||
11
backend/command-api/src/contracts/commands/staff-review.js
Normal file
11
backend/command-api/src/contracts/commands/staff-review.js
Normal file
@@ -0,0 +1,11 @@
|
||||
import { z } from 'zod';
|
||||
|
||||
export const staffReviewSchema = z.object({
|
||||
tenantId: z.string().uuid(),
|
||||
businessId: z.string().uuid(),
|
||||
staffId: z.string().uuid(),
|
||||
assignmentId: z.string().uuid(),
|
||||
rating: z.number().int().min(1).max(5),
|
||||
reviewText: z.string().max(5000).optional(),
|
||||
tags: z.array(z.string().min(1).max(80)).max(20).optional(),
|
||||
});
|
||||
@@ -3,10 +3,45 @@ import { AppError } from '../lib/errors.js';
|
||||
import { requireAuth, requirePolicy } from '../middleware/auth.js';
|
||||
import { requireIdempotencyKey } from '../middleware/idempotency.js';
|
||||
import { buildIdempotencyKey, readIdempotentResult, writeIdempotentResult } from '../services/idempotency-store.js';
|
||||
import { commandBaseSchema } from '../contracts/commands/command-base.js';
|
||||
import {
|
||||
addFavoriteStaff,
|
||||
clockIn,
|
||||
clockOut,
|
||||
createOrder,
|
||||
createStaffReview,
|
||||
updateOrder,
|
||||
cancelOrder,
|
||||
changeShiftStatus,
|
||||
assignStaffToShift,
|
||||
removeFavoriteStaff,
|
||||
acceptShift,
|
||||
} from '../services/command-service.js';
|
||||
import { attendanceCommandSchema } from '../contracts/commands/attendance.js';
|
||||
import { favoriteStaffSchema } from '../contracts/commands/favorite-staff.js';
|
||||
import { orderCancelSchema } from '../contracts/commands/order-cancel.js';
|
||||
import { orderCreateSchema } from '../contracts/commands/order-create.js';
|
||||
import { orderUpdateSchema } from '../contracts/commands/order-update.js';
|
||||
import { shiftAssignStaffSchema } from '../contracts/commands/shift-assign-staff.js';
|
||||
import { shiftAcceptSchema } from '../contracts/commands/shift-accept.js';
|
||||
import { shiftStatusChangeSchema } from '../contracts/commands/shift-status-change.js';
|
||||
import { staffReviewSchema } from '../contracts/commands/staff-review.js';
|
||||
|
||||
function parseBody(body) {
|
||||
const parsed = commandBaseSchema.safeParse(body || {});
|
||||
const defaultHandlers = {
|
||||
addFavoriteStaff,
|
||||
assignStaffToShift,
|
||||
cancelOrder,
|
||||
changeShiftStatus,
|
||||
clockIn,
|
||||
clockOut,
|
||||
createOrder,
|
||||
createStaffReview,
|
||||
removeFavoriteStaff,
|
||||
acceptShift,
|
||||
updateOrder,
|
||||
};
|
||||
|
||||
function parseBody(schema, body) {
|
||||
const parsed = schema.safeParse(body || {});
|
||||
if (!parsed.success) {
|
||||
throw new AppError('VALIDATION_ERROR', 'Invalid command payload', 400, {
|
||||
issues: parsed.error.issues,
|
||||
@@ -15,50 +50,37 @@ function parseBody(body) {
|
||||
return parsed.data;
|
||||
}
|
||||
|
||||
function createCommandResponse(route, requestId, idempotencyKey) {
|
||||
return {
|
||||
accepted: true,
|
||||
async function runIdempotentCommand(req, res, work) {
|
||||
const route = `${req.baseUrl}${req.route.path}`;
|
||||
const compositeKey = buildIdempotencyKey({
|
||||
userId: req.actor.uid,
|
||||
route,
|
||||
commandId: `${route}:${Date.now()}`,
|
||||
idempotencyKey,
|
||||
requestId,
|
||||
idempotencyKey: req.idempotencyKey,
|
||||
});
|
||||
|
||||
const existing = await readIdempotentResult(compositeKey);
|
||||
if (existing) {
|
||||
return res.status(existing.statusCode).json(existing.payload);
|
||||
}
|
||||
|
||||
const payload = await work();
|
||||
const responsePayload = {
|
||||
...payload,
|
||||
idempotencyKey: req.idempotencyKey,
|
||||
requestId: req.requestId,
|
||||
};
|
||||
const persisted = await writeIdempotentResult({
|
||||
compositeKey,
|
||||
userId: req.actor.uid,
|
||||
route,
|
||||
idempotencyKey: req.idempotencyKey,
|
||||
payload: responsePayload,
|
||||
statusCode: 200,
|
||||
});
|
||||
return res.status(persisted.statusCode).json(persisted.payload);
|
||||
}
|
||||
|
||||
function buildCommandHandler(policyAction, policyResource) {
|
||||
return async (req, res, next) => {
|
||||
try {
|
||||
parseBody(req.body);
|
||||
|
||||
const route = `${req.baseUrl}${req.route.path}`;
|
||||
const compositeKey = buildIdempotencyKey({
|
||||
userId: req.actor.uid,
|
||||
route,
|
||||
idempotencyKey: req.idempotencyKey,
|
||||
});
|
||||
|
||||
const existing = await readIdempotentResult(compositeKey);
|
||||
if (existing) {
|
||||
return res.status(existing.statusCode).json(existing.payload);
|
||||
}
|
||||
|
||||
const payload = createCommandResponse(route, req.requestId, req.idempotencyKey);
|
||||
const persisted = await writeIdempotentResult({
|
||||
compositeKey,
|
||||
userId: req.actor.uid,
|
||||
route,
|
||||
idempotencyKey: req.idempotencyKey,
|
||||
payload,
|
||||
statusCode: 200,
|
||||
});
|
||||
return res.status(persisted.statusCode).json(persisted.payload);
|
||||
} catch (error) {
|
||||
return next(error);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
export function createCommandsRouter() {
|
||||
export function createCommandsRouter(handlers = defaultHandlers) {
|
||||
const router = Router();
|
||||
|
||||
router.post(
|
||||
@@ -66,7 +88,14 @@ export function createCommandsRouter() {
|
||||
requireAuth,
|
||||
requireIdempotencyKey,
|
||||
requirePolicy('orders.create', 'order'),
|
||||
buildCommandHandler('orders.create', 'order')
|
||||
async (req, res, next) => {
|
||||
try {
|
||||
const payload = parseBody(orderCreateSchema, req.body);
|
||||
return await runIdempotentCommand(req, res, () => handlers.createOrder(req.actor, payload));
|
||||
} catch (error) {
|
||||
return next(error);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
router.post(
|
||||
@@ -74,7 +103,17 @@ export function createCommandsRouter() {
|
||||
requireAuth,
|
||||
requireIdempotencyKey,
|
||||
requirePolicy('orders.update', 'order'),
|
||||
buildCommandHandler('orders.update', 'order')
|
||||
async (req, res, next) => {
|
||||
try {
|
||||
const payload = parseBody(orderUpdateSchema, {
|
||||
...req.body,
|
||||
orderId: req.params.orderId,
|
||||
});
|
||||
return await runIdempotentCommand(req, res, () => handlers.updateOrder(req.actor, payload));
|
||||
} catch (error) {
|
||||
return next(error);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
router.post(
|
||||
@@ -82,7 +121,17 @@ export function createCommandsRouter() {
|
||||
requireAuth,
|
||||
requireIdempotencyKey,
|
||||
requirePolicy('orders.cancel', 'order'),
|
||||
buildCommandHandler('orders.cancel', 'order')
|
||||
async (req, res, next) => {
|
||||
try {
|
||||
const payload = parseBody(orderCancelSchema, {
|
||||
...req.body,
|
||||
orderId: req.params.orderId,
|
||||
});
|
||||
return await runIdempotentCommand(req, res, () => handlers.cancelOrder(req.actor, payload));
|
||||
} catch (error) {
|
||||
return next(error);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
router.post(
|
||||
@@ -90,7 +139,17 @@ export function createCommandsRouter() {
|
||||
requireAuth,
|
||||
requireIdempotencyKey,
|
||||
requirePolicy('shifts.change-status', 'shift'),
|
||||
buildCommandHandler('shifts.change-status', 'shift')
|
||||
async (req, res, next) => {
|
||||
try {
|
||||
const payload = parseBody(shiftStatusChangeSchema, {
|
||||
...req.body,
|
||||
shiftId: req.params.shiftId,
|
||||
});
|
||||
return await runIdempotentCommand(req, res, () => handlers.changeShiftStatus(req.actor, payload));
|
||||
} catch (error) {
|
||||
return next(error);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
router.post(
|
||||
@@ -98,7 +157,17 @@ export function createCommandsRouter() {
|
||||
requireAuth,
|
||||
requireIdempotencyKey,
|
||||
requirePolicy('shifts.assign-staff', 'shift'),
|
||||
buildCommandHandler('shifts.assign-staff', 'shift')
|
||||
async (req, res, next) => {
|
||||
try {
|
||||
const payload = parseBody(shiftAssignStaffSchema, {
|
||||
...req.body,
|
||||
shiftId: req.params.shiftId,
|
||||
});
|
||||
return await runIdempotentCommand(req, res, () => handlers.assignStaffToShift(req.actor, payload));
|
||||
} catch (error) {
|
||||
return next(error);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
router.post(
|
||||
@@ -106,7 +175,102 @@ export function createCommandsRouter() {
|
||||
requireAuth,
|
||||
requireIdempotencyKey,
|
||||
requirePolicy('shifts.accept', 'shift'),
|
||||
buildCommandHandler('shifts.accept', 'shift')
|
||||
async (req, res, next) => {
|
||||
try {
|
||||
const payload = parseBody(shiftAcceptSchema, {
|
||||
...req.body,
|
||||
shiftId: req.params.shiftId,
|
||||
});
|
||||
return await runIdempotentCommand(req, res, () => handlers.acceptShift(req.actor, payload));
|
||||
} catch (error) {
|
||||
return next(error);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
router.post(
|
||||
'/attendance/clock-in',
|
||||
requireAuth,
|
||||
requireIdempotencyKey,
|
||||
requirePolicy('attendance.clock-in', 'attendance'),
|
||||
async (req, res, next) => {
|
||||
try {
|
||||
const payload = parseBody(attendanceCommandSchema, req.body);
|
||||
return await runIdempotentCommand(req, res, () => handlers.clockIn(req.actor, payload));
|
||||
} catch (error) {
|
||||
return next(error);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
router.post(
|
||||
'/attendance/clock-out',
|
||||
requireAuth,
|
||||
requireIdempotencyKey,
|
||||
requirePolicy('attendance.clock-out', 'attendance'),
|
||||
async (req, res, next) => {
|
||||
try {
|
||||
const payload = parseBody(attendanceCommandSchema, req.body);
|
||||
return await runIdempotentCommand(req, res, () => handlers.clockOut(req.actor, payload));
|
||||
} catch (error) {
|
||||
return next(error);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
router.post(
|
||||
'/businesses/:businessId/favorite-staff',
|
||||
requireAuth,
|
||||
requireIdempotencyKey,
|
||||
requirePolicy('business.favorite-staff', 'staff'),
|
||||
async (req, res, next) => {
|
||||
try {
|
||||
const payload = parseBody(favoriteStaffSchema, {
|
||||
...req.body,
|
||||
businessId: req.params.businessId,
|
||||
});
|
||||
return await runIdempotentCommand(req, res, () => handlers.addFavoriteStaff(req.actor, payload));
|
||||
} catch (error) {
|
||||
return next(error);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
router.delete(
|
||||
'/businesses/:businessId/favorite-staff/:staffId',
|
||||
requireAuth,
|
||||
requireIdempotencyKey,
|
||||
requirePolicy('business.unfavorite-staff', 'staff'),
|
||||
async (req, res, next) => {
|
||||
try {
|
||||
const payload = parseBody(favoriteStaffSchema, {
|
||||
...req.body,
|
||||
businessId: req.params.businessId,
|
||||
staffId: req.params.staffId,
|
||||
});
|
||||
return await runIdempotentCommand(req, res, () => handlers.removeFavoriteStaff(req.actor, payload));
|
||||
} catch (error) {
|
||||
return next(error);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
router.post(
|
||||
'/assignments/:assignmentId/reviews',
|
||||
requireAuth,
|
||||
requireIdempotencyKey,
|
||||
requirePolicy('assignments.review-staff', 'assignment'),
|
||||
async (req, res, next) => {
|
||||
try {
|
||||
const payload = parseBody(staffReviewSchema, {
|
||||
...req.body,
|
||||
assignmentId: req.params.assignmentId,
|
||||
});
|
||||
return await runIdempotentCommand(req, res, () => handlers.createStaffReview(req.actor, payload));
|
||||
} catch (error) {
|
||||
return next(error);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return router;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { Router } from 'express';
|
||||
import { checkDatabaseHealth, isDatabaseConfigured } from '../services/db.js';
|
||||
|
||||
export const healthRouter = Router();
|
||||
|
||||
@@ -13,3 +14,32 @@ function healthHandler(req, res) {
|
||||
|
||||
healthRouter.get('/health', healthHandler);
|
||||
healthRouter.get('/healthz', healthHandler);
|
||||
|
||||
healthRouter.get('/readyz', async (req, res) => {
|
||||
if (!isDatabaseConfigured()) {
|
||||
return res.status(503).json({
|
||||
ok: false,
|
||||
service: 'krow-command-api',
|
||||
status: 'DATABASE_NOT_CONFIGURED',
|
||||
requestId: req.requestId,
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
const ok = await checkDatabaseHealth();
|
||||
return res.status(ok ? 200 : 503).json({
|
||||
ok,
|
||||
service: 'krow-command-api',
|
||||
status: ok ? 'READY' : 'DATABASE_UNAVAILABLE',
|
||||
requestId: req.requestId,
|
||||
});
|
||||
} catch (error) {
|
||||
return res.status(503).json({
|
||||
ok: false,
|
||||
service: 'krow-command-api',
|
||||
status: 'DATABASE_UNAVAILABLE',
|
||||
details: { message: error.message },
|
||||
requestId: req.requestId,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
1553
backend/command-api/src/services/command-service.js
Normal file
1553
backend/command-api/src/services/command-service.js
Normal file
File diff suppressed because it is too large
Load Diff
94
backend/command-api/src/services/db.js
Normal file
94
backend/command-api/src/services/db.js
Normal file
@@ -0,0 +1,94 @@
|
||||
import { Pool } from 'pg';
|
||||
|
||||
let pool;
|
||||
|
||||
function parseIntOrDefault(value, fallback) {
|
||||
const parsed = Number.parseInt(`${value || fallback}`, 10);
|
||||
return Number.isFinite(parsed) ? parsed : fallback;
|
||||
}
|
||||
|
||||
export function resolveDatabasePoolConfig({
|
||||
preferIdempotency = false,
|
||||
maxEnvVar = 'DB_POOL_MAX',
|
||||
} = {}) {
|
||||
const primaryUrl = preferIdempotency
|
||||
? process.env.IDEMPOTENCY_DATABASE_URL || process.env.DATABASE_URL
|
||||
: process.env.DATABASE_URL || process.env.IDEMPOTENCY_DATABASE_URL;
|
||||
|
||||
if (primaryUrl) {
|
||||
return {
|
||||
connectionString: primaryUrl,
|
||||
max: parseIntOrDefault(process.env[maxEnvVar], 10),
|
||||
idleTimeoutMillis: parseIntOrDefault(process.env.DB_IDLE_TIMEOUT_MS, 30000),
|
||||
};
|
||||
}
|
||||
|
||||
const user = process.env.DB_USER;
|
||||
const password = process.env.DB_PASSWORD;
|
||||
const database = process.env.DB_NAME;
|
||||
const host = process.env.DB_HOST || (
|
||||
process.env.INSTANCE_CONNECTION_NAME
|
||||
? `/cloudsql/${process.env.INSTANCE_CONNECTION_NAME}`
|
||||
: ''
|
||||
);
|
||||
|
||||
if (!user || password == null || !database || !host) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
host,
|
||||
port: parseIntOrDefault(process.env.DB_PORT, 5432),
|
||||
user,
|
||||
password,
|
||||
database,
|
||||
max: parseIntOrDefault(process.env[maxEnvVar], 10),
|
||||
idleTimeoutMillis: parseIntOrDefault(process.env.DB_IDLE_TIMEOUT_MS, 30000),
|
||||
};
|
||||
}
|
||||
|
||||
export function isDatabaseConfigured() {
|
||||
return Boolean(resolveDatabasePoolConfig());
|
||||
}
|
||||
|
||||
function getPool() {
|
||||
if (!pool) {
|
||||
const resolved = resolveDatabasePoolConfig();
|
||||
if (!resolved) {
|
||||
throw new Error('Database connection settings are required');
|
||||
}
|
||||
pool = new Pool(resolved);
|
||||
}
|
||||
return pool;
|
||||
}
|
||||
|
||||
export async function query(text, params = []) {
|
||||
return getPool().query(text, params);
|
||||
}
|
||||
|
||||
export async function withTransaction(work) {
|
||||
const client = await getPool().connect();
|
||||
try {
|
||||
await client.query('BEGIN');
|
||||
const result = await work(client);
|
||||
await client.query('COMMIT');
|
||||
return result;
|
||||
} catch (error) {
|
||||
await client.query('ROLLBACK');
|
||||
throw error;
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
}
|
||||
|
||||
export async function checkDatabaseHealth() {
|
||||
const result = await query('SELECT 1 AS ok');
|
||||
return result.rows[0]?.ok === 1;
|
||||
}
|
||||
|
||||
export async function closePool() {
|
||||
if (pool) {
|
||||
await pool.end();
|
||||
pool = null;
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
import { Pool } from 'pg';
|
||||
import { resolveDatabasePoolConfig } from './db.js';
|
||||
|
||||
const DEFAULT_TTL_SECONDS = Number.parseInt(process.env.IDEMPOTENCY_TTL_SECONDS || '86400', 10);
|
||||
const CLEANUP_EVERY_OPS = Number.parseInt(process.env.IDEMPOTENCY_CLEANUP_EVERY_OPS || '100', 10);
|
||||
@@ -12,9 +13,9 @@ function shouldUseSqlStore() {
|
||||
return false;
|
||||
}
|
||||
if (mode === 'sql') {
|
||||
return true;
|
||||
return Boolean(resolveDatabasePoolConfig({ preferIdempotency: true, maxEnvVar: 'IDEMPOTENCY_DB_POOL_MAX' }));
|
||||
}
|
||||
return Boolean(process.env.IDEMPOTENCY_DATABASE_URL);
|
||||
return Boolean(resolveDatabasePoolConfig({ preferIdempotency: true, maxEnvVar: 'IDEMPOTENCY_DB_POOL_MAX' }));
|
||||
}
|
||||
|
||||
function gcExpiredMemoryRecords(now = Date.now()) {
|
||||
@@ -55,15 +56,16 @@ function createMemoryAdapter() {
|
||||
}
|
||||
|
||||
async function createSqlAdapter() {
|
||||
const connectionString = process.env.IDEMPOTENCY_DATABASE_URL;
|
||||
if (!connectionString) {
|
||||
throw new Error('IDEMPOTENCY_DATABASE_URL is required for sql idempotency store');
|
||||
const poolConfig = resolveDatabasePoolConfig({
|
||||
preferIdempotency: true,
|
||||
maxEnvVar: 'IDEMPOTENCY_DB_POOL_MAX',
|
||||
});
|
||||
|
||||
if (!poolConfig) {
|
||||
throw new Error('Database connection settings are required for sql idempotency store');
|
||||
}
|
||||
|
||||
const pool = new Pool({
|
||||
connectionString,
|
||||
max: Number.parseInt(process.env.IDEMPOTENCY_DB_POOL_MAX || '5', 10),
|
||||
});
|
||||
const pool = new Pool(poolConfig);
|
||||
|
||||
await pool.query(`
|
||||
CREATE TABLE IF NOT EXISTS command_idempotency (
|
||||
|
||||
Reference in New Issue
Block a user