Files
Krow-workspace/backend/command-api/src/services/idempotency-store.js

211 lines
5.2 KiB
JavaScript

import { Pool } from 'pg';
import { resolveDatabasePoolConfig } from './db.js';
const DEFAULT_TTL_SECONDS = Number.parseInt(process.env.IDEMPOTENCY_TTL_SECONDS || '86400', 10);
const CLEANUP_EVERY_OPS = Number.parseInt(process.env.IDEMPOTENCY_CLEANUP_EVERY_OPS || '100', 10);
const memoryStore = new Map();
let adapterPromise = null;
function shouldUseSqlStore() {
const mode = (process.env.IDEMPOTENCY_STORE || '').toLowerCase();
if (mode === 'memory') {
return false;
}
if (mode === 'sql') {
return Boolean(resolveDatabasePoolConfig({ preferIdempotency: true, maxEnvVar: 'IDEMPOTENCY_DB_POOL_MAX' }));
}
return Boolean(resolveDatabasePoolConfig({ preferIdempotency: true, maxEnvVar: 'IDEMPOTENCY_DB_POOL_MAX' }));
}
function gcExpiredMemoryRecords(now = Date.now()) {
for (const [key, value] of memoryStore.entries()) {
if (value.expiresAt <= now) {
memoryStore.delete(key);
}
}
}
function createMemoryAdapter() {
return {
async read(compositeKey) {
gcExpiredMemoryRecords();
return memoryStore.get(compositeKey) || null;
},
async write({
compositeKey,
payload,
statusCode = 200,
}) {
const now = Date.now();
const existing = memoryStore.get(compositeKey);
if (existing && existing.expiresAt > now) {
return existing;
}
const record = {
payload,
statusCode,
createdAt: now,
expiresAt: now + (DEFAULT_TTL_SECONDS * 1000),
};
memoryStore.set(compositeKey, record);
return record;
},
};
}
async function createSqlAdapter() {
const poolConfig = resolveDatabasePoolConfig({
preferIdempotency: true,
maxEnvVar: 'IDEMPOTENCY_DB_POOL_MAX',
});
if (!poolConfig) {
throw new Error('Database connection settings are required for sql idempotency store');
}
const pool = new Pool(poolConfig);
await pool.query(`
CREATE TABLE IF NOT EXISTS command_idempotency (
composite_key TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
route TEXT NOT NULL,
idempotency_key TEXT NOT NULL,
status_code INTEGER NOT NULL,
response_payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL
);
`);
await pool.query(`
CREATE INDEX IF NOT EXISTS idx_command_idempotency_expires_at
ON command_idempotency (expires_at);
`);
let opCount = 0;
async function maybeCleanupExpiredRows() {
opCount += 1;
if (CLEANUP_EVERY_OPS <= 0 || opCount % CLEANUP_EVERY_OPS !== 0) {
return;
}
await pool.query('DELETE FROM command_idempotency WHERE expires_at <= NOW()');
}
function mapRow(row) {
return {
statusCode: row.status_code,
payload: row.response_payload,
};
}
return {
async read(compositeKey) {
await maybeCleanupExpiredRows();
const result = await pool.query(
`
SELECT status_code, response_payload
FROM command_idempotency
WHERE composite_key = $1
AND expires_at > NOW()
`,
[compositeKey]
);
if (result.rowCount === 0) {
return null;
}
return mapRow(result.rows[0]);
},
async write({
compositeKey,
userId,
route,
idempotencyKey,
payload,
statusCode = 200,
}) {
await maybeCleanupExpiredRows();
const expiresAt = new Date(Date.now() + (DEFAULT_TTL_SECONDS * 1000));
const payloadJson = JSON.stringify(payload);
await pool.query(
`
INSERT INTO command_idempotency (
composite_key,
user_id,
route,
idempotency_key,
status_code,
response_payload,
expires_at
)
VALUES ($1, $2, $3, $4, $5, $6::jsonb, $7)
ON CONFLICT (composite_key) DO NOTHING
`,
[compositeKey, userId, route, idempotencyKey, statusCode, payloadJson, expiresAt]
);
const existingResult = await pool.query(
`
SELECT status_code, response_payload
FROM command_idempotency
WHERE composite_key = $1
AND expires_at > NOW()
`,
[compositeKey]
);
if (existingResult.rowCount === 0) {
throw new Error('Idempotency write failed to persist or recover existing record');
}
return mapRow(existingResult.rows[0]);
},
};
}
async function getAdapter() {
if (!adapterPromise) {
adapterPromise = shouldUseSqlStore()
? createSqlAdapter()
: Promise.resolve(createMemoryAdapter());
}
return adapterPromise;
}
export function buildIdempotencyKey({ userId, route, idempotencyKey }) {
return `${userId}:${route}:${idempotencyKey}`;
}
export async function readIdempotentResult(compositeKey) {
const adapter = await getAdapter();
return adapter.read(compositeKey);
}
export async function writeIdempotentResult({
compositeKey,
userId,
route,
idempotencyKey,
payload,
statusCode = 200,
}) {
const adapter = await getAdapter();
return adapter.write({
compositeKey,
userId,
route,
idempotencyKey,
payload,
statusCode,
});
}
export function __resetIdempotencyStoreForTests() {
memoryStore.clear();
adapterPromise = null;
}