feat(backend): add foundation services and sql idempotency
This commit is contained in:
208
backend/command-api/src/services/idempotency-store.js
Normal file
208
backend/command-api/src/services/idempotency-store.js
Normal file
@@ -0,0 +1,208 @@
|
||||
import { Pool } from 'pg';
|
||||
|
||||
const DEFAULT_TTL_SECONDS = Number.parseInt(process.env.IDEMPOTENCY_TTL_SECONDS || '86400', 10);
|
||||
const CLEANUP_EVERY_OPS = Number.parseInt(process.env.IDEMPOTENCY_CLEANUP_EVERY_OPS || '100', 10);
|
||||
|
||||
const memoryStore = new Map();
|
||||
let adapterPromise = null;
|
||||
|
||||
function shouldUseSqlStore() {
|
||||
const mode = (process.env.IDEMPOTENCY_STORE || '').toLowerCase();
|
||||
if (mode === 'memory') {
|
||||
return false;
|
||||
}
|
||||
if (mode === 'sql') {
|
||||
return true;
|
||||
}
|
||||
return Boolean(process.env.IDEMPOTENCY_DATABASE_URL);
|
||||
}
|
||||
|
||||
function gcExpiredMemoryRecords(now = Date.now()) {
|
||||
for (const [key, value] of memoryStore.entries()) {
|
||||
if (value.expiresAt <= now) {
|
||||
memoryStore.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function createMemoryAdapter() {
|
||||
return {
|
||||
async read(compositeKey) {
|
||||
gcExpiredMemoryRecords();
|
||||
return memoryStore.get(compositeKey) || null;
|
||||
},
|
||||
async write({
|
||||
compositeKey,
|
||||
payload,
|
||||
statusCode = 200,
|
||||
}) {
|
||||
const now = Date.now();
|
||||
const existing = memoryStore.get(compositeKey);
|
||||
if (existing && existing.expiresAt > now) {
|
||||
return existing;
|
||||
}
|
||||
|
||||
const record = {
|
||||
payload,
|
||||
statusCode,
|
||||
createdAt: now,
|
||||
expiresAt: now + (DEFAULT_TTL_SECONDS * 1000),
|
||||
};
|
||||
memoryStore.set(compositeKey, record);
|
||||
return record;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async function createSqlAdapter() {
|
||||
const connectionString = process.env.IDEMPOTENCY_DATABASE_URL;
|
||||
if (!connectionString) {
|
||||
throw new Error('IDEMPOTENCY_DATABASE_URL is required for sql idempotency store');
|
||||
}
|
||||
|
||||
const pool = new Pool({
|
||||
connectionString,
|
||||
max: Number.parseInt(process.env.IDEMPOTENCY_DB_POOL_MAX || '5', 10),
|
||||
});
|
||||
|
||||
await pool.query(`
|
||||
CREATE TABLE IF NOT EXISTS command_idempotency (
|
||||
composite_key TEXT PRIMARY KEY,
|
||||
user_id TEXT NOT NULL,
|
||||
route TEXT NOT NULL,
|
||||
idempotency_key TEXT NOT NULL,
|
||||
status_code INTEGER NOT NULL,
|
||||
response_payload JSONB NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
expires_at TIMESTAMPTZ NOT NULL
|
||||
);
|
||||
`);
|
||||
await pool.query(`
|
||||
CREATE INDEX IF NOT EXISTS idx_command_idempotency_expires_at
|
||||
ON command_idempotency (expires_at);
|
||||
`);
|
||||
|
||||
let opCount = 0;
|
||||
|
||||
async function maybeCleanupExpiredRows() {
|
||||
opCount += 1;
|
||||
if (CLEANUP_EVERY_OPS <= 0 || opCount % CLEANUP_EVERY_OPS !== 0) {
|
||||
return;
|
||||
}
|
||||
await pool.query('DELETE FROM command_idempotency WHERE expires_at <= NOW()');
|
||||
}
|
||||
|
||||
function mapRow(row) {
|
||||
return {
|
||||
statusCode: row.status_code,
|
||||
payload: row.response_payload,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
async read(compositeKey) {
|
||||
await maybeCleanupExpiredRows();
|
||||
const result = await pool.query(
|
||||
`
|
||||
SELECT status_code, response_payload
|
||||
FROM command_idempotency
|
||||
WHERE composite_key = $1
|
||||
AND expires_at > NOW()
|
||||
`,
|
||||
[compositeKey]
|
||||
);
|
||||
|
||||
if (result.rowCount === 0) {
|
||||
return null;
|
||||
}
|
||||
return mapRow(result.rows[0]);
|
||||
},
|
||||
async write({
|
||||
compositeKey,
|
||||
userId,
|
||||
route,
|
||||
idempotencyKey,
|
||||
payload,
|
||||
statusCode = 200,
|
||||
}) {
|
||||
await maybeCleanupExpiredRows();
|
||||
|
||||
const expiresAt = new Date(Date.now() + (DEFAULT_TTL_SECONDS * 1000));
|
||||
const payloadJson = JSON.stringify(payload);
|
||||
|
||||
await pool.query(
|
||||
`
|
||||
INSERT INTO command_idempotency (
|
||||
composite_key,
|
||||
user_id,
|
||||
route,
|
||||
idempotency_key,
|
||||
status_code,
|
||||
response_payload,
|
||||
expires_at
|
||||
)
|
||||
VALUES ($1, $2, $3, $4, $5, $6::jsonb, $7)
|
||||
ON CONFLICT (composite_key) DO NOTHING
|
||||
`,
|
||||
[compositeKey, userId, route, idempotencyKey, statusCode, payloadJson, expiresAt]
|
||||
);
|
||||
|
||||
const existingResult = await pool.query(
|
||||
`
|
||||
SELECT status_code, response_payload
|
||||
FROM command_idempotency
|
||||
WHERE composite_key = $1
|
||||
AND expires_at > NOW()
|
||||
`,
|
||||
[compositeKey]
|
||||
);
|
||||
|
||||
if (existingResult.rowCount === 0) {
|
||||
throw new Error('Idempotency write failed to persist or recover existing record');
|
||||
}
|
||||
return mapRow(existingResult.rows[0]);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async function getAdapter() {
|
||||
if (!adapterPromise) {
|
||||
adapterPromise = shouldUseSqlStore()
|
||||
? createSqlAdapter()
|
||||
: Promise.resolve(createMemoryAdapter());
|
||||
}
|
||||
return adapterPromise;
|
||||
}
|
||||
|
||||
export function buildIdempotencyKey({ userId, route, idempotencyKey }) {
|
||||
return `${userId}:${route}:${idempotencyKey}`;
|
||||
}
|
||||
|
||||
export async function readIdempotentResult(compositeKey) {
|
||||
const adapter = await getAdapter();
|
||||
return adapter.read(compositeKey);
|
||||
}
|
||||
|
||||
export async function writeIdempotentResult({
|
||||
compositeKey,
|
||||
userId,
|
||||
route,
|
||||
idempotencyKey,
|
||||
payload,
|
||||
statusCode = 200,
|
||||
}) {
|
||||
const adapter = await getAdapter();
|
||||
return adapter.write({
|
||||
compositeKey,
|
||||
userId,
|
||||
route,
|
||||
idempotencyKey,
|
||||
payload,
|
||||
statusCode,
|
||||
});
|
||||
}
|
||||
|
||||
export function __resetIdempotencyStoreForTests() {
|
||||
memoryStore.clear();
|
||||
adapterPromise = null;
|
||||
}
|
||||
Reference in New Issue
Block a user